我也是在做完前置设计后才开始学习Netty框架的,不得不说,相比直接使用Socket,Netty真是封装得太到位了。另外强烈推荐《Netty实战》这本书,花了两天时间读完,基本就可以对Netty的架构有个直观的了解。后续看时间再去看看Netty的源码吧。
选择Protobuf主要是因为客户端与服务端之间的传输需要进行编解码,而Protobuf可以实现极高效的序列化和反序列化,且Netty还直接支持Protobuf。让我觉得不用都不好意思了。
前置最优先的功能其实就是用户校验和返回路由信息及校验结果。由于配套使用了Redis,等后续的文章我再更新我的数据结构设计。这篇先专注在Netty和Protobuf吧。
我想这系列的文章也只会描述基本功能,介绍个框架吧。
Protobuf数据格式
如果在一个用户粒度分的极细的场景下,我们可以直接通过用户+密码的方式来得到用户的权限信息,比如该用户应该连接到哪个MQ集群,是否用户名密码匹配等等。所以对客户端来说,需要告知前置的就是用户名、密码这两个信息。而服务端需要返回的是校验码,校验信息和MQ集群地址,也是三个String就能解决的事。数据很好设计:
syntax = "proto3";
message usrinfo {
string usrname = 1;
string pwd = 2;
}
message authresponse {
string retcode = 1;
string address = 2;
}
然后需要做的就是
- 编译proto文件,生成java文件
- 把java文件放到你的工程里
- 在工程中引入protobuf-java-3.5.1.jar依赖包
用maven会比较方便,我这里eclipse一直有问题,就下了protobuf的包自己编译出protobuf-java-3.5.1.jar。
Netty服务端
在Netty框架下,主要需要设计的就是ChannalInboundHandler。服务端用于在获取客户端的数据后,访问Redis得到结果并返回给客户端。直接上代码吧:
package front.server;
import
import front.client.Msg;
import
import
import
import
import
import
import
import
import
import
import
public class FrontServer {
private final int port;
public FrontServer(int port) {
this.port=port;
}
public static void main(String[] args) throws InterruptedException{
int port = 9621;
new FrontServer(port).start();
}
public void start() throws InterruptedException{
EventLoopGroup g = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(g).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// TODO Auto-generated method stub
ch.pipeline().addLast("frameDecoder", new ProtobufVarint32FrameDecoder());// 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
//配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类
ch.pipeline().addLast("protobufDecoder",new ProtobufDecoder(Msg.usrinfo.getDefaultInstance()));
// 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
ch.pipeline().addLast("frameEncoder",
new ProtobufVarint32LengthFieldPrepender());
//配置Protobuf编码器,发送的消息会先经过编码
ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
// ----Protobuf处理器END----
ch.pipeline().addLast(new FrontServerhandler());
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
g.shutdownGracefully().sync();
}
}
}
package front.server;
import front.client.Msg;
import front.client.Msg.usrinfo;
import
import
import
import
import
import
import
import
public class FrontServerhandler extends SimpleChannelInboundHandler<Msg.usrinfo>{
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, usrinfo msg) throws Exception {
Msg.authresponse.Builder res = Msg.authresponse.newBuilder();
//访问Redis进行权限校验并填充res
……………………
ctx.writeAndFlush(res.build())
System.out.println("receive: "+msg.toString());
}
}
Netty客户端
客户端的功能是访问前置,在ChannelActive的时候发送自己的用户信息,获取权限信息后尝试连接MQ。这里只给出与前置连接的部分代码。
当然,这里不是最终代码,仅给出一个框架而已。
package front.client;
import
import
import
import
import
import
import
import
import
import
import
import
import
import
public class FrontClient {
private final String host;
private final int port;
public FrontClient(){
//作为开发,配的是本机地址
this.host="127.0.0.1";
this.port=9621;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// TODO Auto-generated method stub
ch.pipeline().addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast("protobufDecoder",new ProtobufDecoder(Msg.usrinfo.getDefaultInstance()));
ch.pipeline().addLast("frameEncoder",
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
ch.pipeline().addLast(new FrontClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException{
new FrontClient().start();
}
}
package front.client;
import
import
import
import
import
import
public class FrontClientHandler extends SimpleChannelInboundHandler<Msg.authresponse> {
//channelActive的时候发送用户信息
public void channelActive(ChannelHandlerContext ctx) {
Msg.usrinfo.Builder usrinfo = Msg.usrinfo.newBuilder();
usrinfo.setPwd("1234");
usrinfo.setUsrname("client");
usrinfo.build();
ctx.writeAndFlush(usrinfo);
}
@Override
protected void channelRead0(ChannelHandlerContext arg0, Msg.authresponse in) throws Exception {
System.out.println(in.toString());
//找个单例对象存储校验结果后续再处理。
……………………
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}