Scala使用Netty

java能写的换做scala也可以写,而且scala在语法上面右比java简洁很多,所以尝试用scala写一个简单的netty demo

服务端

ServerHandler 处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ServerHandler extends ChannelInboundHandlerAdapter{

/**
* 有客户端建立连接后调用
*/
override def channelActive(ctx: ChannelHandlerContext) :Unit ={
println("客户端建立连接后调用")
}

/**
* 接受客户端发送来的消息
*/
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
println("channelRead invoked 接受客户端发送来的消息" + msg)

val back = "connection success"
println("返回消息" + back)
ctx.writeAndFlush(back)
}
}

NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class NettyServer {

def bind(host: String, port: Int): Unit = {
//配置服务端线程池组
//用于服务器接收客户端连接
val bossGroup = new NioEventLoopGroup
//用户进行SocketChannel的网络读写
val workerGroup = new NioEventLoopGroup

try {
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootStrap = new ServerBootstrap
//将两个NIO线程组作为参数传入到ServerBootstrap
bootStrap.group(bossGroup, workerGroup)
//创建NioServerSocketChannel
.channel(classOf[NioServerSocketChannel])
//绑定I/O事件处理类
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
//解码器
ch.pipeline().addLast(new StringDecoder)
ch.pipeline().addLast(new StringEncoder)
ch.pipeline().addLast(new ServerHandler)
}
})
//绑定端口,调用sync方法等待绑定操作完成
val channelFuture = bootStrap.bind(host, port).sync()
println("服务已开启")
channelFuture.channel().closeFuture().sync()

} finally {
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
}
}
}

object NettyServer {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = 8083
val server = new StringNettyServer
println(s"IP$host,端口号$port")
server.bind(host, port)
}
}

客户端

ClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 有客户端建立连接后调用
*/
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("有客户端建立连接后调用")
val content = "你好,我是客户端"
ctx.writeAndFlush(content)
}


override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
println("读取服务端的消息" + msg)
}
}

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class StringNettyClient {

def connect(host: String, port: Int): Unit = {
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootStrap = new Bootstrap

try {
bootStrap.group(eventGroup)
//创建NioSocketChannel
.channel(classOf[NioSocketChannel])
//绑定I/O事件处理类
.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(new StringEncoder)
ch.pipeline().addLast(new StringDecoder)
ch.pipeline().addLast(new ClientHandler)
}
})
//发起异步连接操作
val channelFuture = bootStrap.connect(host, port).sync()
//等待服务关闭
channelFuture.channel().closeFuture().sync()

} finally {
//优雅的退出,释放线程池资源
eventGroup.shutdownGracefully()
}
}
}

object StringNettyClient {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = 8083
val client = new StringNettyClient
client.connect(host, port)
}
}

运行

先开启服务端

1
2
IPlocalhost,端口号8083
服务已开启

再开启客户端

1
有客户端建立连接后调用

服务端接收并反馈

1
2
3
有客户端建立连接后调用
接受客户端发送来的消息你好,我是客户端
返回消息connection success

客户端接收服务端结果

1
读取服务端的消息connection success

总结

上述通讯使用了StringEncoder编码器和StringDecoder解码器,所以调用ctx.writeAndFlush()方法的时候可以传入字符串,自动处理。