扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要介绍了怎么用MINA、Netty、Twisted来实现消息分割,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
创新互联长期为成百上千客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为白云企业提供专业的成都网站建设、网站设计,白云网站改版等技术服务。拥有10余年丰富建站经验和众多成功案例,为您定制开发。
本文介绍一种消息分割方式,use a fixed length header that indicates the length of the body,用一个固定字节数的Header前缀来指定Body的字节数,以此来分割消息。
固定字节数的Header前缀来指定Body的字节数
上面图中 Header 固定为 4 字节,Header 中保存的是一个 4 字节(32位)的整数,例如 12 即为 0x0000000C,这个整数用来指定 Body 的长度(字节数)。当读完这么多字节的 Body 之后,又是下一条消息的 Header。
下面分别用MINA、Netty、Twisted来实现对这种消息的切合和解码。
MINA
MINA 提供了 PrefixedStringCodecFactory 来对这种类型的消息进行编码解码,PrefixedStringCodecFactory 默认 Header 的大小是4字节,当然也可以指定成1或2。
public class TcpServer {
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();
// 4字节的Header指定Body的字节数,对这种消息的处理
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}
class TcpServerHandle extends IoHandlerAdapter {
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
}
// 接收到新的数据
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = (String) message;
System.out.println("messageReceived:" + msg);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("sessionClosed");
}
}
Netty
Netty 使用 LengthFieldBasedFrameDecoder 来处理这种消息。下面代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5个参数,分别是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度,lengthFieldOffset为Header的位置,lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度,并不包含Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的全部内容,这里设为4表示去掉4字节的Header,只留下Body)。
public class TcpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// LengthFieldBasedFrameDecoder按行分割消息,取出body
pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
// 再按UTF-8编码转成字符串
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class TcpServerHandler extends ChannelInboundHandlerAdapter {
// 接收到新的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("channelRead:" + message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("channelInactive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Twisted
在Twisted中需要继承Int32StringReceiver,不再继承Protocol。Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而需要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。
# -*- coding:utf-8 –*-
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor
class TcpServerHandle(Int32StringReceiver):
# 新的连接建立
def connectionMade(self):
print 'connectionMade'
# 连接断开
def connectionLost(self, reason):
print 'connectionLost'
# 接收到新的数据
def stringReceived(self, data):
print 'stringReceived:' + data
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
下面是Java编写的一个客户端测试程序:
public class TcpClient {
public static void main(String[] args) throws IOException {
Socket socket = null;
DataOutputStream out = null;
try {
socket = new Socket("localhost", 8080);
out = new DataOutputStream(socket.getOutputStream());
// 请求服务器
String data1 = "牛顿";
byte[] outputBytes1 = data1.getBytes("UTF-8");
out.writeInt(outputBytes1.length); // write header
out.write(outputBytes1); // write body
String data2 = "爱因斯坦";
byte[] outputBytes2 = data2.getBytes("UTF-8");
out.writeInt(outputBytes2.length); // write header
out.write(outputBytes2); // write body
out.flush();
} finally {
// 关闭连接
out.close();
socket.close();
}
}
}
MINA服务器输出结果:
sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed
Netty服务器输出结果:
channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive
Twisted服务器输出结果:
connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost
感谢你能够认真阅读完这篇文章,希望小编分享的“怎么用MINA、Netty、Twisted来实现消息分割”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流