`
wen866595
  • 浏览: 264594 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

MINA 协议解码过滤器

阅读更多

 

为什么使用协议解码过滤器 ProtocolCodecFilter

1)  TCP保证所有的包以正确是顺序传递,但不保证发送方的一次写操作在接收方产生一次读操作。在 MINA 的术语中:没有 ProtocolCodecFilter ,发送方的一次 IoSession.write(Object message) 导致接收方多次 messageReceived(IoSession session, Object message) 事件,多次调用 IoSession.write(Object message) 可以导致单一的 messageReceived 事件。当服务器和客户端在同一台主机(或同一局域网)时,你可能不会遇到这种行为。但你的应用程序可能会遇到。

2)  大多数应用程序需要一种方法来找出当前消息的结束位置和下一个消息的开始位置。

你可以在IoHandler 里实现这些逻辑,但添加一个 ProtocolCodecFilter 可以使你的代码更加清晰和易于维护。它允许你分离协议逻辑和业务逻辑( IoHandler )。

 

 

应用程序基本上是接收一串字节,需要转换这些字节到消息(高层对象)。

有三种常用的技术来分割字节流到消息:
1、使用定长消息
2、使用固定长度的头来指示内容体长度
3、使用分隔符。

 

HTTP 协议里就使用了 2 3 两种方式。 HTTP 头和内容体之间是以单独一行的 CRLF 来分割的,在请求头里使用 Content-length 来指示内容体的长度。

 

 

 

这里使用分割符(换行)来编写时间服务器的解码器。

时间服务器的请求对象为java.lang.String,对应的协议解码器为:

org.apache.mina.filter.codec.textline.TextLineEncoder和org.apache.mina.filter.codec.textline.TextLineDecoder。

 

响应对象定义如下:

public class TimeResponse {
    private String client;      // 客户端发送到字符串
    private Date date;

    public TimeResponse(String client, Date date) {
        this.client = client;
        this.date = date;
    }

    public String getClient() {
        return client;
    }

    public Date getDate() {
        return date;
    }
}

 

 

下面是响应的编码器:

public class TimeResponseEncoder extends ProtocolEncoderAdapter {

    @Override
    public void encode(IoSession session, Object message,
            ProtocolEncoderOutput out) throws Exception {
       
        TimeResponse res = (TimeResponse) message;
       
        IoBuffer buff = IoBuffer.allocate(32);
        buff.setAutoExpand(true);
        buff.setAutoShrink(true);
       
       
        CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
        buff.putString(res.getClient() + " 您好!服务器时间是:", encoder);
       
        Date date = res.getDate();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd,HH时mm分ss秒");
       
        buff.putString(sdf.format(date), encoder);
        buff.put((byte)'\n');
       
        buff.flip();
       
        out.write(buff);
        out.flush();
       
        System.err.println("res encode end .... ");
    }
}

 

 

编写编码器的注意事项:

1 MINA IoSession 写队列里的每个对象调用 ProtocolEncode.encode 方法。因为业务处理器里写出的都是与编码器对应高层对象,所以可以直接进行类型转换。

2、从 JVM 堆分配 IoBuffer 。最好避免使用直接缓存,因为堆缓存一般有更好的性能。

3、开发人员不需要释放缓存, MINA 会释放。

4、在 dispose 方法里可以释放编码所需的资源。

 

 

然后是响应的解码器:

public class TimeResponseDecoder extends CumulativeProtocolDecoder {

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
       
        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();

        int start = in.position();

        int end = in.remaining() - 1;
       
        if (end >= 0 && in.get(end) == (byte) '\n') {
           
            String res = in.getString(decoder);
            out.write(res);
           
            return true;
           
        } else {
            in.position(start);
        }

        return false;
    }
}

 

注意事项:

1、 每当一个完整的消息被解码时,应当写到ProtocolDecoderOutput ,这些消息将传输通过过滤器链,最终到达你指定的 IoHandler.messageReceived 方法。

2、 你不负责释放缓存。

3、 当没有足够的数据来解码消息时,只需要返回false

4、 我们可以把解码过程中的状态信息存储在会话的属性里,也可以存储到Decoder 对象,但有很多不利:

a)  每个会话将需要自己的Decorder 对象。

b)  MINA保证同一时间不会有多于一个线程在同一个会话上执行 decode 方法,但不保证由同一线程来执行。假设一块数据由线程 1 处理,它认为还不能解码,当另一块数据到达时,它可能由另一个线程处理。为了避免可视性问题,你必须恰当地同步访问 decoder 的状态(会话的属性是存储在 ConcurrentHashMap ,所以它们可以在其他线程里自动可视)。

c)  在邮件列表上的讨论得出这个结论:选择把状态存储到会话还是Decoder 实例更多是爱好的问题。为了确保没有两个线程在同一个会话上运行 decode 方法, MINA 需要使用一些同步,这些同步也确保你不会有上面描述的可视性问题。

5、 当你的协议使用前缀长度时,IoBuffer.prefixedDataAvailable 方法是非常便利的方法,它支持 1 2 4 个字节。

6、 当解码了一个响应后,不要忘了重置解码器的状态(移除会话的属性是另一个方法)。

 

 

有了请求和响应的编解码器后,定义一个解码器工厂:

public class TimeResponseCodecFactory implements ProtocolCodecFactory {
    private ProtocolDecoder decoder;
    private ProtocolEncoder encoder;
   
    public TimeResponseCodecFactory(boolean server) {
        if(server) {
            decoder = new TextLineDecoder(Charset.forName("UTF-8"));
            encoder = new TimeResponseEncoder();
        } else {
            decoder = new TimeResponseDecoder();
            encoder = new TextLineEncoder(Charset.forName("UTF-8"));
        }
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

}

 

 

注意事项:

1、 对于每一个新的会话,MINA 调用 TimeResponseCodecFactory 的方法获取编码器和解码器。

2、 因为上面的编码器和解码器不存储会话的状态,所以可以让所有的会话共用同一个实例。

3、服务器端的解码器是请求解码器,编码器是响应编码器;客户端的解码器是响应解码器,编码器是请求编码器。

 

 

修改服务器和客户端的过滤器配置

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TimeResponseCodecFactory(true/false)));

 

 

服务器端的处理器现在可以直接写响应对象了:

public class TimeServerHandler extends IoHandlerAdapter {
    private Logger log = LoggerFactory.getLogger(TimeServerHandler.class);

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
       
        String str = message.toString();
       
        log.info("server receive :" + str);
       
        if("quit".equalsIgnoreCase(str.trim())) {
            session.close(true);
            return;
        }
       
        Date date = new Date();
       
        TimeResponse res = new TimeResponse(str, date);
       
        session.write(res);
       
        log.info("message writter ...");
    }
}

 

 

 

而客户端的处理器也可以直接处理响应对象:

public class ClientHandler extends IoHandlerAdapter {

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
       
        String msg = (String) message;
       
        System.out.println("response :" + msg);
    }
}

 

 

有了协议解码过滤器,处理器就不需要考虑底层的编解码的问题,只需要处理高层的业务对象,达到了分层的目的。

 

 

 

欢迎关注我的微信公众号: coderbee笔记

 

1
3
分享到:
评论
3 楼 zuoguodang 2011-11-01  
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
      throws Exception {
    // 读取长度
    int limit = in.limit();
    Integer left = (Integer) session.getAttribute("left");
    if (left == null || left.intValue() == 0) {
      byte[] lenbyte = new byte[4];
      in.get(lenbyte);
      int size = (int) ByteConvert.bytesToLong(lenbyte, 0, 0);
      if(size>in.limit()){
        byte[] bleft = new byte[limit-4];
        in.get(bleft);
        byte[] allleft = new byte[4+bleft.length];
        System.arraycopy(lenbyte, 0, allleft, 0, lenbyte.length);
        System.arraycopy(bleft, 0, allleft, lenbyte.length, bleft.length);
        session.setAttribute("left", allleft.length);
        session.setAttribute("data", allleft);
        return;
      }
      byte[] content = new byte[size - 4];
      in.get(content);
      dealData(content, size);
      int ileft = limit - size;

我一直都是直接操作buffer
2 楼 scholers 2011-08-21  
5、 当你的协议使用前缀长度时,IoBuffer.prefixedDataAvailable 方法是非常便利的方法,它支持 1 、 2 或 4 个字节。


--------------------------------------
如果是自定义长度的话,不是1,2,4定长,那么需要自己实现这个判断的接口.这点不是很自由.
1 楼 xuedong 2011-08-20  
学习中,转了

相关推荐

Global site tag (gtag.js) - Google Analytics