服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Netty分布式pipeline管道传播outBound事件源码解析

Netty分布式pipeline管道传播outBound事件源码解析

2022-10-25 15:30向南是个万人迷 Java教程

这篇文章主要介绍了Netty分布式pipeline管道传播outBound事件源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难

outbound事件传输流程

在我们业务代码中, 有可能使用wirte方法往写数据:

?
1
2
3
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程

这里我们同样给出两种写法

?
1
2
3
4
5
6
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1
    ctx.channel().write("test data");
    //写法2
    ctx.write("test data");
}

这两种写法有什么区别, 我们首先跟到第一种写法中去:

?
1
ctx.channel().write("test data");

这里获取ctx所绑定的channel

我们跟到AbstractChannel的write方法中:

?
1
2
3
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

这里pipeline是DefaultChannelPipeline

跟到其write方法中:

?
1
2
3
4
public final ChannelFuture write(Object msg) {
    //从tail节点开始(从最后的节点往前写)
    return tail.write(msg);
}

这里调用tail节点write方法, 这里我们应该能分析到, outbound事件, 是通过tail节点开始往上传播的, 带着这点猜想, 我们继往下看

其实tail节点并没有重写write方法, 最终会调用其父类AbstractChannelHandlerContext的write方法

AbstractChannelHandlerContext的write方法:

?
1
2
3
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}

我们看到这里有个newPromise()这个方法, 这里是创建一个Promise对象, 有关Promise的相关知识我们会在以后的章节剖析

我们继续跟write:

?
1
2
3
4
5
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    //代码省略
    write(msg, false, promise);
    return promise;
}

继续跟write:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //没有调flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

这里跟我们上一小节剖析过channelRead方法有点类似, 但是事件传输的方向有所不同, 这里findContextOutbound()是获取上一个标注outbound事件的HandlerContext

跟到findContextOutbound中

?
1
2
3
4
5
6
7
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

这里的逻辑我们似曾相识, 跟我们上一小节的findContextInbound()方法有点像, 只是过程是反过来的

在这里, 会找到当前context的上一个节点, 如果标注的事件不是outbound事件, 则继续往上找, 意思就是找到上一个标注outbound事件的节点

回到write方法:

?
1
AbstractChannelHandlerContext next = findContextOutbound();

这里将找到节点赋值到next属性中

因为我们之前分析的write事件是从tail节点传播的, 所以上一个节点就有可能是用户自定的handler所属的context

然后判断是否为当前eventLoop线程, 如果是不是, 则封装成task异步执行, 如果不是, 则继续判断是否调用了flush方法, 因为我们这里没有调用, 所以会执行到next.invokeWrite(m, promise),

我们继续跟invokeWrite

?
1
2
3
4
5
6
7
private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

这里会判断当前handler的状态是否是添加状态, 这里返回的是true, 将会走到invokeWrite0(msg, promise)这一步

继续跟invokeWrite0

?
1
2
3
4
5
6
7
8
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //调用当前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

这里的逻辑也似曾相识, 调用了当前节点包装的handler的write方法, 如果用户没有重写write方法, 则会交给其父类处理

我们跟到ChannelOutboundHandlerAdapter的write方法中看:

?
1
2
3
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

这里调用了当前ctx的write方法, 这种写法和我们小节开始的写法是相同的, 我们回顾一下:

?
1
2
3
4
5
6
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1
    ctx.channel().write("test data");
    //写法2
    ctx.write("test data");
}

我们跟到其write方法中, 这里走到的是AbstractChannelHandlerContext类的write方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //没有调flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我们所熟悉逻辑, 找到当前节点的上一个标注事件为outbound事件的节点, 继续执行invokeWrite方法, 根据之前的剖析, 我们知道最终会执行到上一个handler的write方法中

走到这里已经不难理解, ctx.channel().write("test data")其实是从tail节点开始传播写事件, 而ctx.write("test data")是从自身开始传播写事件

所以, 在handler中如果重写了write方法要传递write事件, 一定采用ctx.write("test data")这种方式或者交给其父类处理处理, 而不能采用ctx.channel().write("test data")这种方式, 因为会造成每次事件传输到这里都会从tail节点重新传输, 导致不可预知的错误

如果用代码中没有重写handler的write方法, 则事件会一直往上传输, 当传输完所有的outbound节点之后, 最后会走到head节点的wirte方法中

我们跟到HeadContext的write方法中

?
1
2
3
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我们看到write事件最终会流向这里, 通过unsafe对象进行最终的写操作

有关inbound事件和outbound事件的传输, 可通过下图进行说明:

Netty分布式pipeline管道传播outBound事件源码解析

以上就是Netty分布式pipeline管道传播outBound事件源码解析的详细内容,更多关于Netty分布式pipeline管道传播outBound的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/xiangnan6122/p/10204459.html

延伸 · 阅读

精彩推荐
  • Java教程解决springboot整合cxf-jaxrs中json转换的问题

    解决springboot整合cxf-jaxrs中json转换的问题

    这篇文章主要介绍了解决springboot整合cxf-jaxrs中json转换的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...

    未完成交响曲-KyleWang12272021-09-30
  • Java教程使用Spring boot 的profile功能实现多环境配置自动切换

    使用Spring boot 的profile功能实现多环境配置自动切换

    这篇文章主要介绍了使用Spring boot 的profile功能实现多环境配置自动切换的相关知识,非常不错,具有一定的参考借鉴价值 ,需要的朋友可以参考下...

    黑少5632021-06-17
  • Java教程SpringBoot解决ajax跨域问题的方法

    SpringBoot解决ajax跨域问题的方法

    这篇文章主要为大家详细介绍了SpringBoot解决ajax跨域问题的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    幕三少4512021-04-09
  • Java教程java遍历HashMap简单的方法

    java遍历HashMap简单的方法

    这篇文章主要介绍了java遍历HashMap简单的方法,以实例形式简单分析了采用java遍历HashMap的技巧,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    Bo年再无木小白5212019-12-09
  • Java教程Java实现短信发送验证码功能

    Java实现短信发送验证码功能

    这篇文章主要介绍了Java实现短信发送验证码功能,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下...

    程序员s10162021-06-06
  • Java教程Quartz集群原理以及配置应用的方法详解

    Quartz集群原理以及配置应用的方法详解

    Quartz是Java领域最著名的开源任务调度工具。Quartz提供了极为广泛的特性如持久化任务,集群和分布式任务等,下面这篇文章主要给大家介绍了关于Quartz集群...

    振宇要低调4362021-04-26
  • Java教程Java杂谈之重复代码是什么

    Java杂谈之重复代码是什么

    刚开始工作时,总有人开玩笑说,编程实际上就是 CV,调侃很多程序员写程序依靠的是复制粘贴。至今,很多初级甚至高级程序员写代码依旧是CV,就是把...

    JavaEdge.5892022-01-25
  • Java教程在Spring Boot中加载XML配置的完整步骤

    在Spring Boot中加载XML配置的完整步骤

    这篇文章主要给大家介绍了关于在Spring Boot中加载XML配置的完整步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值...

    涛GuoGuo的跟屁虫丶博Ke6132020-09-03