脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|shell|

服务器之家 - 脚本之家 - Python - python grpc实现异步调用(不用grpc异步接口)

python grpc实现异步调用(不用grpc异步接口)

2024-04-18 14:35Hi20240217 Python

grpc同步调用更简单,但是在处理复杂任务时,会导致请求阻塞,影响吞吐,本文主要介绍了python grpc实现异步调用,不用grpc异步接口,具有一定的参考价值,感兴趣的可以了解一下

grpc同步调用更简单,但是在处理复杂任务时,会导致请求阻塞,影响吞吐。当然,可以采用grpc异步接口解决,本方采用另一种方式:服务端收到请求后放入请求队列,由独立的线程处理各个请求,之后调用客户端的服务,回复处理结果。即客户端也是服务端。

以下DEMO实现的功能:

  • 客户端与服务端之间通过mmap tmpfs文件,实现图像的传输
  • 推理服务有Request和Response二个接口
  • 服务端实现Request接口,客户端实现Response接口,这二个接口只用于发送消息
  • 服务端的消息处理线程处理完客户端的请求之后,调用客户端的Response接口

1.infer_session.proto

syntax = "proto3";
service Inference {
  rpc Request  (InferMessage) returns (Status) {} //服务端实现
  rpc Response (InferMessage) returns (Status) {} //客户端实现
}
message InferMessage {
  int32 frame_id = 1;    //帧号
  int32 client_port=2;   //客户端端口
  int32 shm_id=3;        //共享内存块id
  int32 width=4;         //图像宽度
  int32 height=5;        //图像高度
  int32 channels=6;      //图像通道数
  string session_id=7;   //会话uuid
}
message Status {
  int32 status = 1;         //状态码
  string error_message=2;   //错误信息
}

2.生成Python库函数

python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto

3.infer_session_server.py

from concurrent import futures
import logging
import threading
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import queue
import traceback
import time
from common import SharedMemory,ThreadSafeDict
import numpy as np

class InferenceServer(infer_session_pb2_grpc.InferenceServicer):
    def __init__(self) -> None:
        super().__init__()
        self.server=None
        self.black_list=set()
        
    def Request(self, request, context):
        self.request_queue.put(request)
        return infer_session_pb2.Status(status=0,error_message="")

    def Open(self,port=50051):
        self.process_running=True
        self.bind_addr="localhost:{}".format(port)
        self.client_session = ThreadSafeDict()
        self.request_queue= queue.Queue()

        self.process_thread = threading.Thread(target=self.Process)
        self.process_thread.start()

        self.service_ready_semaphore = threading.Semaphore(0)
        self.server_thread = threading.Thread(target=self.Run)
        self.server_thread.start()
        self.service_ready_semaphore.acquire()    
        return True

    def Run(self):
        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)
        self.server.add_insecure_port(self.bind_addr)
        self.server.start()
        print("Server started, listening on " + self.bind_addr)
        self.service_ready_semaphore.release()
        self.server.wait_for_termination()

    def Process(self):
        while self.process_running:
            if not self.request_queue.empty():
                request=self.request_queue.get(False,2)   
                if request.session_id in self.black_list:
                    if request.session_id in self.client_session:
                        del self.client_session[request.session_id]                    
                    continue
                try:
                    if request.session_id not in self.client_session:
                        record={}
                        print("connect:",request.client_port)
                        record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port))
                        record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel'])
                        grpc.channel_ready_future(record['channel']).result(timeout=5)
                        self.client_session[request.session_id]=record

                    shm=SharedMemory(request.width,request.height,request.channels,
                                     request.client_port,request.shm_id)
                    data = np.ndarray((request.height,request.width,request.channels), 
                                                dtype=np.uint8, buffer=shm.get())               
                    data+=1 #修改数据
                    shm.close()

                    ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5) 
                    if ret.status!=0:
                        print("Response Error:{} {}".format(ret.status,ret.error_message))
                except:
                    traceback.print_exc()
                    self.black_list.add(request.session_id)
                    if request.session_id in self.client_session:
                        del self.client_session[request.session_id]
            else:
                time.sleep(0.001)

    def Stop(self):
        print("Stop")
        self.server.stop(3)
        self.process_running=False
        self.process_thread.join()
        self.server_thread.join()

if __name__ == "__main__":

    logging.basicConfig()
    server=InferenceServer()
    server.Open()
    input()
    server.Stop()

4.infer_session_client.py

from __future__ import print_function
from concurrent import futures
import logging
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import threading
import numpy as np
import os
import queue
from common import SharedMemory
import time
import argparse
import uuid

class InferenceClient(infer_session_pb2_grpc.InferenceServicer):

    def __init__(self) -> None:
        super().__init__()
        self.send_index=0
        self.recv_index=None
        self.uuid=uuid.uuid4()
        print(self.uuid)

    def Response(self, response, context):
        request=self.busy_q.get()
        pred_data = np.ndarray((request.height,request.width,request.channels), 
                                    dtype=np.uint8, buffer=request.get())

        golden=np.ones(pred_data.shape,dtype=np.uint8)
        golden.fill(response.frame_id+1)

        result=(golden==pred_data).all()
        if not result:
           print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result))
        self.free_q.put(request)
        self.recv_index=response.frame_id
        return infer_session_pb2.Status(status=0,error_message="")
    
    def WaitFinish(self):
        while True:
            if self.send_index==self.recv_index:
                return
            time.sleep(0.001)

    def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"):
        try:
            self.client_port=client_port
            self.bind_addr="localhost:{}".format(client_port)
            self.free_q= queue.Queue(qsize*2)
            self.busy_q= queue.Queue(qsize*2)
            for shm_id in range(qsize):
                self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id))            
            self.channel=grpc.insecure_channel(remote_addr)
            grpc.channel_ready_future(self.channel).result(timeout=5)
            self.stub = infer_session_pb2_grpc.InferenceStub(self.channel)
            self.server_ready=False
            self.service_ready_semaphore = threading.Semaphore(0)
            self.server_thread = threading.Thread(target=self.Run)
            self.server_thread.start()
            self.service_ready_semaphore.acquire()
            return self.server_ready
        except:
            return False
        
    def Stop(self):
        print("Stop")
        self.server.stop(3)
        self.server_thread.join()

    def Request(self,frame_index):
        request=self.free_q.get()   
        data = np.ndarray((request.height,request.width,request.channels), 
                                    dtype=np.uint8, buffer=request.get())

        data.fill(frame_index)

        response = self.stub.Request(infer_session_pb2.InferMessage(frame_id=frame_index,
                                                                    client_port=self.client_port,
                                                                    shm_id=request.shm_id,
                                                                    width=request.width,
                                                                    height=request.height,
                                                                    channels=request.channels,
                                                                    session_id="{}".format(self.uuid)
                                                                    ))
        self.busy_q.put(request)
        self.send_index=frame_index
        return response.status==0

    def Run(self):
        try:
            self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
            infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)
            self.server.add_insecure_port(self.bind_addr)
            self.server.start()
            self.server_ready=True
            print("Server started, listening on " + self.bind_addr)
            self.service_ready_semaphore.release()
            self.server.wait_for_termination()
        except:
            self.service_ready_semaphore.release()

if __name__ == "__main__":

    parser = argparse.ArgumentParser(description="Demo of argparse")
    parser.add_argument('--port', type=int,  default=50000)
    parser.add_argument('--remote_addr', type=str, default="localhost:50051")
    args = parser.parse_args()
    logging.basicConfig()
    
    client=InferenceClient()
    client.Open(client_port=args.port,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr)

    while True:
        t0=time.time()
        count=128
        for i in range(count):
            client.Request(i)
        client.WaitFinish()
        t1=time.time()
        print("{} FPS:{:.3f}".format(args.port,count/(t1-t0)))   
    
    client.Stop()

5.common.py

import mmap
import numpy as np
import os
import threading

# 定义一个SharedMemory类,用于在共享内存中读取和写入数据
class SharedMemory(object):
    def __init__(self,width,height,channels,port,shm_id) -> None:
        self.width=width
        self.height=height
        self.channels=channels
        self.shm_id=shm_id
        self.filepath="/sys/fs/cgroup/{}_{}".format(port,shm_id)
        self.size=width*height*channels
        if not os.path.exists(self.filepath):
            os.mknod(self.filepath)        
            self.fd=os.open(self.filepath,os.O_RDWR|os.O_CREAT)
            os.ftruncate(self.fd,self.size)
        else:
            self.fd=os.open(self.filepath,os.O_RDWR)                
        self.mm=mmap.mmap(self.fd,self.size,access=mmap.ACCESS_WRITE)
        self.mm.seek(0)
    
    # 获取共享内存中的数据
    def get(self):
        return self.mm

    # 关闭共享内存
    def close(self):
        self.mm.close()
        os.close(self.fd)

# 定义一个ThreadSafeDict类,用于在多线程中安全地操作字典
class ThreadSafeDict:
    def __init__(self, initial_dict=None):
        self._dict = {} if initial_dict is None else initial_dict.copy()
        self.lock = threading.Lock()

    # 获取字典中的值
    def __getitem__(self, key):
        with self.lock:
            return self._dict[key]

    # 设置字典中的值
    def __setitem__(self, key, value):
        with self.lock:
            self._dict[key] = value

    # 删除字典中的值
    def __delitem__(self, key):
        with self.lock:
            del self._dict[key]

    # 检查字典中是否存在某个键
    def __contains__(self, item):
        with self.lock:
            return item in self._dict

    # 获取字典中的值,如果不存在则返回默认值
    def get(self, key, default=None):
        with self.lock:
            return self._dict.get(key, default)

    # 设置字典中的值,如果键已经存在则不改变值
    def setdefault(self, key, default):
        with self.lock:
            return self._dict.setdefault(key, default)

    # 更新字典
    def update(self, other_dict):
        with self.lock:
            self._dict.update(other_dict)

6.运行

python3 infer_session_server.py &
python3 infer_session_client.py --port 50001

7.输出

50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001

到此这篇关于python grpc实现异步调用(不用grpc异步接口)的文章就介绍到这了,更多相关python grpc异步调用内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://hi20240217.blog.csdn.net/article/details/136822993

延伸 · 阅读

精彩推荐
  • PythonPython文件的压缩与解压

    Python文件的压缩与解压

    这篇文章主要介绍了Python文件的压缩与解压,Python进行文件、文件夹压缩与解压,用到的是zipfile的第三方依赖库。根据不同应用场景封装了几个方法,下文...

    浅若清风cyf7712022-11-30
  • Pythonpython使用matplotlib绘制折线图

    python使用matplotlib绘制折线图

    这篇文章主要为大家详细介绍了python使用matplotlib绘制折线图,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 ...

    只晓得闲逛5262022-12-06
  • PythonPython实现的当前时间多加一天、一小时、一分钟操作示例

    Python实现的当前时间多加一天、一小时、一分钟操作示例

    这篇文章主要介绍了Python实现的当前时间多加一天、一小时、一分钟操作,结合实例形式分析了Python基于datetime模块进行日期时间操作相关使用技巧,需要的朋...

    lanyang1234567862021-02-22
  • PythonPython实现多线程抓取网页功能实例详解

    Python实现多线程抓取网页功能实例详解

    这篇文章主要介绍了Python实现多线程抓取网页功能,结合具体实例形式详细分析了Python多线程编程的相关操作技巧与注意事项,并附带demo实例给出了多线程抓...

    糖拌咸鱼4522020-11-15
  • PythonPython爬虫之Selenium多窗口切换的实现

    Python爬虫之Selenium多窗口切换的实现

    这篇文章主要介绍了Python爬虫之Selenium多窗口切换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋...

    程序猿杂记10482021-08-08
  • PythonPython二分查找详解

    Python二分查找详解

    这篇文章主要给大家汇总介绍了Python二分查找的几种实现的方法,有需要的小伙伴可以参考下。...

    脚本之家6102020-07-31
  • Python阿里最强 Python 自动化工具开源了!

    阿里最强 Python 自动化工具开源了!

    阿里内部开源了一个 iOS 端由 Python 编写的自动化工具,即:tidevice。它是一款跨平台的自动化开源工具。...

    人工智能与大数据技术11002021-05-21
  • Pythonpython实现颜色rgb和hex相互转换的函数

    python实现颜色rgb和hex相互转换的函数

    这篇文章主要介绍了python实现颜色rgb和hex相互转换的函数,可实现将rgb表示的颜色转换成hex值的功能,非常具有实用价值,需要的朋友可以参考下 ...

    liuli8732020-05-22