热门搜索 :
考研考公
您的当前位置:首页正文

设计一个消息中间件的前置(二)| Netty 4 + prot

来源:东饰资讯网

我也是在做完前置设计后才开始学习Netty框架的,不得不说,相比直接使用Socket,Netty真是封装得太到位了。另外强烈推荐《Netty实战》这本书,花了两天时间读完,基本就可以对Netty的架构有个直观的了解。后续看时间再去看看Netty的源码吧。

选择Protobuf主要是因为客户端与服务端之间的传输需要进行编解码,而Protobuf可以实现极高效的序列化和反序列化,且Netty还直接支持Protobuf。让我觉得不用都不好意思了。

前置最优先的功能其实就是用户校验和返回路由信息及校验结果。由于配套使用了Redis,等后续的文章我再更新我的数据结构设计。这篇先专注在Netty和Protobuf吧。

我想这系列的文章也只会描述基本功能,介绍个框架吧。

Protobuf数据格式

如果在一个用户粒度分的极细的场景下,我们可以直接通过用户+密码的方式来得到用户的权限信息,比如该用户应该连接到哪个MQ集群,是否用户名密码匹配等等。所以对客户端来说,需要告知前置的就是用户名、密码这两个信息。而服务端需要返回的是校验码,校验信息和MQ集群地址,也是三个String就能解决的事。数据很好设计:

syntax = "proto3";
message usrinfo {
    string usrname = 1;
    string pwd = 2;
}
message authresponse {
    string retcode = 1;
    string address = 2;
}

然后需要做的就是

  1. 编译proto文件,生成java文件
  2. 把java文件放到你的工程里
  3. 在工程中引入protobuf-java-3.5.1.jar依赖包
    用maven会比较方便,我这里eclipse一直有问题,就下了protobuf的包自己编译出protobuf-java-3.5.1.jar。

Netty服务端

在Netty框架下,主要需要设计的就是ChannalInboundHandler。服务端用于在获取客户端的数据后,访问Redis得到结果并返回给客户端。直接上代码吧:

package front.server;

import 

import front.client.Msg;
import 
import 
import 
import 
import 
import 
import 
import 
import 
import 
import 

public class FrontServer {
    private final int port;
    public FrontServer(int port) {
        this.port=port;
    }
    public static void main(String[] args) throws InterruptedException{
        int port = 9621;
        new FrontServer(port).start();
    }

    public void start() throws InterruptedException{
        EventLoopGroup g = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(g).channel(NioServerSocketChannel.class)
            .localAddress(new InetSocketAddress(port))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // TODO Auto-generated method stub
                    ch.pipeline().addLast("frameDecoder", new ProtobufVarint32FrameDecoder());// 用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)  
                    //配置Protobuf解码处理器,消息接收到了就会自动解码,ProtobufDecoder是netty自带的,Message是自己定义的Protobuf类  
                    ch.pipeline().addLast("protobufDecoder",new ProtobufDecoder(Msg.usrinfo.getDefaultInstance()));  
                    // 用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。  
                    ch.pipeline().addLast("frameEncoder",  
                            new ProtobufVarint32LengthFieldPrepender());  
                    //配置Protobuf编码器,发送的消息会先经过编码  
                    ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());  
                    // ----Protobuf处理器END----  
                    ch.pipeline().addLast(new FrontServerhandler());
                }
            });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            g.shutdownGracefully().sync();
        }
    }
}
package front.server;

import front.client.Msg;
import front.client.Msg.usrinfo;
import 
import 
import 
import 
import 
import 
import 
import 

public class FrontServerhandler extends SimpleChannelInboundHandler<Msg.usrinfo>{
    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, usrinfo msg) throws Exception {
    
        Msg.authresponse.Builder res = Msg.authresponse.newBuilder();
        //访问Redis进行权限校验并填充res
        ……………………
        ctx.writeAndFlush(res.build())
    System.out.println("receive: "+msg.toString());

    }
}

Netty客户端

客户端的功能是访问前置,在ChannelActive的时候发送自己的用户信息,获取权限信息后尝试连接MQ。这里只给出与前置连接的部分代码。
当然,这里不是最终代码,仅给出一个框架而已。

package front.client;

import 

import 
import 
import 
import 
import 
import 
import 
import 
import 
import 
import 
import 
import 

public class FrontClient {
    private final String host;
    private final int port;
    public FrontClient(){
        //作为开发,配的是本机地址
        this.host="127.0.0.1";
        this.port=9621;
    }
    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .remoteAddress(new InetSocketAddress(host,port))
            .handler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // TODO Auto-generated method stub
                    ch.pipeline().addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
                    ch.pipeline().addLast("protobufDecoder",new ProtobufDecoder(Msg.usrinfo.getDefaultInstance())); 
                    ch.pipeline().addLast("frameEncoder",  
                            new ProtobufVarint32LengthFieldPrepender());  
                    ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());  
                    ch.pipeline().addLast(new FrontClientHandler());
                }   
            });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully().sync();
        }
    }
    public static void main(String[] args) throws InterruptedException{
        new FrontClient().start();
    }
}
package front.client;

import 
import 
import 
import 
import 
import 
public class FrontClientHandler extends SimpleChannelInboundHandler<Msg.authresponse> {
     //channelActive的时候发送用户信息
    public void channelActive(ChannelHandlerContext ctx) {
        Msg.usrinfo.Builder usrinfo = Msg.usrinfo.newBuilder();
        usrinfo.setPwd("1234");
        usrinfo.setUsrname("client");
        usrinfo.build();
        ctx.writeAndFlush(usrinfo);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext arg0, Msg.authresponse in) throws Exception {
        System.out.println(in.toString());
        //找个单例对象存储校验结果后续再处理。
        ……………………  
    }
    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
Top