服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 最简最快了解RPC核心流程

最简最快了解RPC核心流程

2024-02-24 16:19不焦躁程序员 Java教程

本文主要以Dubbo为例介绍了RPC调用核心流程,同时,写了个简易的RPC调用代码。

本文主要以最简易最快速的方式介绍RPC调用核心流程,文中以Dubbo为例。同时,会写一个简易的RPC调用代码,方便理解和记忆核心组件和核心流程。

最简最快了解RPC核心流程

一、核心思想

RPC调用过程中,最粗矿的核心组件3个:Registry、Provider、Consumer。最粗矿的流程4个:注册、订阅、通知、调用。最简单的流程图就1个:

最简最快了解RPC核心流程

本文会继续细粒度地拆解以上流程,拆解之前,请牢记这段话:

RPC调用,不管中间流程多么复杂,不管代码多么复杂,所有的努力也只为做2件事情:

  • 在Consumer端,将ReferenceConfig配置的类转换成Proxy代理。
  • 在Provider端,将ServiceConfig配置的类转换成Proxy代理。

二、核心组件

为了能在Consumer端和Provider端生成各自的Proxy代理,并且发起调用和响应,需要如下核心组件:

(1) Registry:注册中心,主要是为了实现  Provider接口注册、Consumer订阅接口、接口变更通知、接口查找等功能。

(2) Proxy:服务代理,核心中的核心,一切的努力都是为了生成合适的Proxy服务代理。

  • Consumer的Proxy:Consumer端根据ReferenceConfig生成Proxy,此Proxy主要用于找到合适的Provider接口,然后发起网络调用。
  • Provider的Proxy:Provider端根据ServiceConfig生成Proxy,此Proxy主要作用是通过类似反射的方法调用本地代码,再将结果返回给Consumer。

(3) Protocol:服务协议,它相当于一个中间层,用于与注册中心打交道 和 封装 RPC 调用。它在初始化时会创建Client模块 与 服务端建立连接,也会生成用于远程调用的Invoker。

(4) Cluster:服务集群,主要用于路由、负载均衡、服务容错等。

(5) Invoker:服务调用者。

  • Consumer的服务调用者主要是利用Client模块发起远程调用,然后等待Provider返回结果。
  • Provider的服务调用者主要是根据接收到的消息利用反射生成本地代理,然后执行方法,再将结果返回到Consumer。

(6) Client:客户端模块,默认是Netty实现,主要用于客户端和服务端通讯(主要是服务调用),比如将请求的接口、参数、请求ID等封装起来发给Server端。

(7) Server:服务端模拟,默认是Netty实现。主要是用于客户端和服务端通讯。

三、核心流程

1.Consumer流程

(1) 流程

Consumer的流程实际上就是一个从ReferenceConfig 生成Proxy代理的过程。核心事情由Protocol完成。

  • 根据ReferenceConfig生成代理
  • 注册到注册中心、订阅注册中心事件
  • 建立NettyClient,并且与NettyServer建立连接
  • 生成客户端的ClientInvoker
  • 选择负载均衡和集群容错
  • ClientInvoker发起网络调用和等待结果

(2) 流程图:

最简最快了解RPC核心流程

2.Provider流程

(1) 流程

Provider的流程实际上就是个从ServiceConfig生成Proxy代理的过程。核心事情由PorxyProtocol完成。

  • 根据ServiceConfig生成本地代理
  • 注册到注册中心
  • 启动NettyServer等待客户端连接
  • 生成服务端Invoker
  • Invoker监听调用请求
  • 接收到请求后新建任务丢入到线程池去执行
  • 执行时会生成本地代理执行(比如通过反射去调用具体的方法),再将返回结果写出去

(2) 流程图:

最简最快了解RPC核心流程

3.整体流程图

最简最快了解RPC核心流程

四、简易代码实现

1.核心代码介绍

(1) 客户端Proxy:

/**
 * 获取代理Service
 */
@SuppressWarnings("unchecked")
public <T> T getService(Class clazz) throws Exception {

    return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();

            if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
                throw new IllegalAccessException("不能访问" + methodName + "方法");
            }
            if ("toString".equals(methodName)) {
                return clazz.getName() + "#" + methodName;
            }

            List<RegistryInfo> registryInfoList = interfaceMethodsRegistryInfoMap.get(clazz);
            if (registryInfoList == null) {
                throw new RuntimeException("无法找到对应的服务提供者");
            }

            LoadBalancer loadBalancer = new RandomLoadBalancer();
            RegistryInfo registryInfo = loadBalancer.choose(registryInfoList);

            ChannelHandlerContext ctx = registryChannelMap.get(registryInfo);

            String identity = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
            String requestId;

            synchronized (ProxyProtocol.this) {
                requestIdWorker.increment();
                requestId = String.valueOf(requestIdWorker.longValue());
            }

            ClientInvoker clientInvoker = new DefaultClientInvoker(method.getReturnType(), ctx, requestId, identity);

            inProcessInvokerMap.put(identity + "#" + requestId, clientInvoker);

            return clientInvoker.invoke(args);
        }
    });
}

(2) 服务端Proxy

private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

(3) Protocol

public ProxyProtocol(String registryUrl, List<ServiceConfig> serviceConfigList, List<ReferenceConfig> referenceConfigList, int port) throws Exception {
    this.serviceConfigList = serviceConfigList == null ? new ArrayList<>() : serviceConfigList;
    this.registryUrl = registryUrl;
    this.port = port;
    this.referenceConfigList = referenceConfigList == null ? new ArrayList<>() : referenceConfigList;

    //1、初始化注册中心
    initRegistry(this.registryUrl);

    //2、将服务注册到注册中心
    InetAddress addr = InetAddress.getLocalHost();
    String hostName = addr.getHostName();
    String hostAddr = addr.getHostAddress();
    registryInfo = new RegistryInfo(hostName, hostAddr, this.port);
    doRegistry(registryInfo);

    //3、初始化nettyServer,启动nettyServer
    if (!this.serviceConfigList.isEmpty()) {
        nettyServer = new NettyServer(this.serviceConfigList, this.interfaceMethodMap);
        nettyServer.init(this.port);
    }

    //如果是客户端引用启动,则初始化处理线程
    if (!this.referenceConfigList.isEmpty()) {
        initProcessor();
    }
}

(4) 客户端Invoker

@Override
public T invoke(Object[] args) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("interfaces", identity);

    JSONObject param = new JSONObject();
    if (args != null) {
        for (Object obj : args) {
            param.put(obj.getClass().getName(), obj);
        }
    }
    jsonObject.put("parameter", param);
    jsonObject.put("requestId", requestId);
    String msg = jsonObject.toJSONString() + Constants.DELIMITER_STR;
    System.out.println("发送给服务端JSON为:" + msg);

    ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getBytes());
    ctx.writeAndFlush(byteBuf);

    wait4Result();

    return result;
}

private void wait4Result() {
    synchronized (this) {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void setResult(String result) {
    synchronized (this) {
        this.result = (T) JSONObject.parseObject(result, returnType);
        notifyAll();
    }
}

(5) 服务端Invoker

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String message = (String) msg;
    System.out.println("提供者收到消息:" + message);
    //解析消费者发来的消息
    RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
    //接受到消息,启动线程池处理消费者发过来的请求
    threadPoolExecutor.execute(new RpcInvokerTask(rpcRequest));
}

/**
 * 处理消费者发过来的请求
 */
private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

(6) Client

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Constants.DELIMITER));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new NettyClientHandler());

                    System.out.println("initChannel - " + Thread.currentThread().getName());
                }
            });
    ChannelFuture cf = bootstrap.connect(ip, port).sync();
//            cf.channel().closeFuture().sync();
    System.out.println("客户端启动成功");
} catch (Exception e) {
    e.printStackTrace();
    group.shutdownGracefully();
}

(7) Server

public NettyServer(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethodMap) {
    this.serviceConfigList = serviceConfigList;
    this.interfaceMethodMap = interfaceMethodMap;
}

public int init(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, DELIMITER));
                    channel.pipeline().addLast(new StringDecoder());
                    channel.pipeline().addLast(new RpcInvokeHandler(serviceConfigList, interfaceMethodMap));
                }
            });
    ChannelFuture cf = bootstrap.bind(port).sync();
    System.out.println("启动NettyServer,端口为:" + port);
    return port;
}

2.项目地址

https://github.com/yclxiao/rpc-demo.git

五、总结

本文主要以Dubbo为例介绍了RPC调用核心流程,同时,写了个简易的RPC调用代码。

记住以上的流程图即可搞明白整个调用流程。然后再记住最核心的2句话:

所有的努力都是为了能在Consumer端和Provider端生成功能丰富的Proxy。核心事情由Protocol完成。

核心的5个部件:Registry、Proxy、Protocol、Invoker、Client、Server。

原文地址:https://mp.weixin.qq.com/s?__biz=MzI3OTA2MDQyOQ==&mid=2247485402&idx=1&sn=1671379186775dc4e04f332f362969ca

延伸 · 阅读

精彩推荐
  • Java教程Struts2的配置 struts.xml Action详解

    Struts2的配置 struts.xml Action详解

    这篇文章主要介绍了Struts2的配置 struts.xml Action详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    这昵称不错4362021-01-17
  • Java教程java图片压缩工具类

    java图片压缩工具类

    这篇文章主要为大家详细介绍了java图片压缩工具类,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Joker_Ye2622020-08-02
  • Java教程java实现图片裁切的工具类实例

    java实现图片裁切的工具类实例

    这篇文章主要介绍了java实现图片裁切的工具类实例,涉及Java针对图片的读取、修改等相关操作技巧,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    5iasp4332020-01-15
  • Java教程SpringCloud Hystrix的使用

    SpringCloud Hystrix的使用

    这篇文章主要介绍了SpringCloud Hystrix的使用,帮助大家更好的理解和学习使用SpringCloud,感兴趣的朋友可以了解下...

    遇见_line10792021-09-08
  • Java教程spring boot 的常用注解使用小结

    spring boot 的常用注解使用小结

    这篇文章主要介绍了spring boot 的常用注解使用小结,需要的朋友可以参考下...

    Java之家4272020-09-25
  • Java教程Java中调用SQL Server存储过程详解

    Java中调用SQL Server存储过程详解

    这篇文章主要介绍了Java中调用SQL Server存储过程详解,本文讲解了使用不带参数的存储过程、使用带有输入参数的存储过程、使用带有输出参数的存储过程、...

    junjie1972019-12-08
  • Java教程spring boot集成shiro详细教程(小结)

    spring boot集成shiro详细教程(小结)

    这篇文章主要介绍了spring boot 集成shiro详细教程,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    bweird6872021-03-15
  • Java教程Java springboot接口迅速上手,带你半小时极速入门

    Java springboot接口迅速上手,带你半小时极速入门

    这篇文章主要给大家介绍了关于SpringBoot实现API接口的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要...

    fjswcjswzy5962021-12-23