服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Netty分布式抽象编码器MessageToByteEncoder逻辑分析

Netty分布式抽象编码器MessageToByteEncoder逻辑分析

2022-10-27 12:15向南是个万人迷 Java教程

这篇文章主要介绍了Netty分布式抽象编码器MessageToByteEncoder的抽象逻辑分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前文回顾:Netty分布式编码器及写数据事件处理

MessageToByteEncoder

同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现

解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递write事件, 传递过程中会调用handler的write方法, 所以编码器码器可以重写write方法, 将数据编码成二进制字节流然后再继续传递write事件

首先看MessageToByteEncoder的类声明

?
1
2
3
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{
    //省略类体
}

这里继承ChannelOutboundHandlerAdapter, 说明是个outBoundhandler, 我们知道write事件是个outBound事件, 而outBound事件只能通过outBoundHandler进行传输

write事件传播过程中要调用handler的write方法

我们跟到MessageToByteEncoder的write方法中:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }
 
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

首先通过 if (acceptOutboundMessage(msg)) 判断当前对象是否可处理

如果可处理, 则进入if块中的逻辑, 如果不能处理, 则进入else块, 通过ctx.write(msg, promise)继续传递write事件

我们看if块中

 I cast = (I) msg 这里是强制类型转换, 转换成I类型, I类型是个泛型, 具体类型由用户定义

 buf = allocateBuffer(ctx, cast, preferDirect) 这里进行缓冲区分配

跟到allocateBuffer方法中

?
1
2
3
4
5
6
7
8
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                           boolean preferDirect) throws Exception {
    if (preferDirect) {
        return ctx.alloc().ioBuffer();
    } else {
        return ctx.alloc().heapBuffer();
    }
}

这里会直接通过ctx的内存分配器进行内存分配, 通过判断preferDirect来分配堆内存或者堆外内存, 默认情况下是分配堆外内存

有关内存分配, 我们之前已经做过相关的剖析

回到write方法中:

内存分配结束之后会调用encode(ctx, cast, buf)方法进行编码, 该类由子类实现

子类可以通过继承该类, 重写encode方法, 将参数对象cast编码成字节写入到传入的ByteBuf中, 就完成了编码工作

编码完成后后, 会通过ReferenceCountUtil.release(cast)将cast对象释放

 if (buf.isReadable()) 这里判断buf是否有可读字节, 如果有可读字节, 则继续传递write事件

如果没有可读字节, 则将buf进行释放, 继续传播write事件, 传递一个空的ByteBuf

最后将buf设置为空

以上就是有关抽象编码器的抽象逻辑, 具体的编码逻辑还需要其子类去做,更多关于Netty分布式抽象编码器MessageToByteEncoder的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/xiangnan6122/p/10208131.html

延伸 · 阅读

精彩推荐