服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C/C++ - C++中的Reactor原理与实现

C++中的Reactor原理与实现

2023-02-16 15:22恒者走天下 C/C++

reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景,每种服务在服务端可能由多个方法组成,这篇文章主要介绍了Reactor原理与实现,需要的朋友可以参考下

一、Reactor介绍

reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。

中心思想是将所有要处理的I/o事件注册到一个中心I/o多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/o事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应l/o事件分发到对应的处理器中。

处理机制为:主程序将事件以及对应事件处理的方法在Reactor上进行注册, 如果相应的事件发生,Reactor将会主动调用事件注册的接口,即 回调函数.

二、代码实现

前提准备:1单例模式:单例模式(Singleton Pattern,也称为单件模式),使用最广泛的设计模式之一。其意图是保证一个类(结构体)仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
2.回调函数:把一段可执行的代码像参数传递那样传给其他代码,而这段代码会在某个时刻被调用执行,这就叫做回调。

对epoll反应堆中结构体定义

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*fd包含的属性*/
struct nitem { // fd
 
    int fd;     //要监听的文件描述符
 
    int status; //是否在监听:1->在红黑树上(监听),0->不在(不监听)
    int events; //对应的监听事件,  EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
    void *arg;  //指向自己结构体指针
#if 0
    NCALLBACK callback;
#else
    NCALLBACK *readcb;   // epollin
    NCALLBACK *writecb;  // epollout
    NCALLBACK *acceptcb; // epollin
#endif
    unsigned char sbuffer[BUFFER_LENGTH]; //
    int slength;
 
    unsigned char rbuffer[BUFFER_LENGTH];
    int rlength;
    
};
 
/*分块存储*/
struct itemblock {
 
    struct itemblock *next;
    struct nitem *items;
 
};
/*epoll反应堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {
 
    int epfd;
    struct itemblock *head;
 
};

单例模式,创建reactor的一个实例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*单例模式*/
struct reactor *instance = NULL;
int init_reactor(struct reactor *r) {
 
    if (r == NULL) return -1;
 
    int epfd = epoll_create(1); //int size
    r->epfd = epfd;
 
    // fd --> item
    r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
    if (r->head == NULL) {
        close(epfd);
        return -2;
    }
    memset(r->head, 0, sizeof(struct itemblock));
 
    r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
    if (r->head->items == NULL) {
        free(r->head);
        close(epfd);
        return -2;
    }
    memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
    
    r->head->next = NULL;
    
    return 0;
}
struct reactor *getInstance(void) { //singleton
 
    if (instance == NULL) {
 
        instance = (struct reactor *)malloc(sizeof(struct reactor));
        if (instance == NULL) return NULL;
        memset(instance, 0, sizeof(struct reactor));
 
        if (0 > init_reactor(instance)) {
            free(instance);
            return NULL;
        }
 
    }
 
    return instance;
}

事件注册

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到对应事件*/
/*驱动注册*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
 
    struct reactor *r = getInstance();
    
    struct epoll_event ev = {0};
    //1
    if (event == READ_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].readcb = cb;
        r->head->items[fd].arg = arg;
 
        ev.events = EPOLLIN;
        
    }
    //2
    else if (event == WRITE_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].writecb = cb;
        r->head->items[fd].arg = arg;
 
        ev.events = EPOLLOUT;
    }
    //3
    else if (event == ACCEPT_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].acceptcb = cb; //回调函数
        r->head->items[fd].arg = arg;
 
        ev.events = EPOLLIN;
    }
 
    ev.data.ptr = &r->head->items[fd];
 
    /*NOSET_CB 0*/
    if (r->head->items[fd].events == NOSET_CB) {
        if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
            printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
            return -1;
        }
        r->head->items[fd].events = event;
    } else if (r->head->items[fd].events != event) {
 
        if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
            printf("epoll_ctl EPOLL_CTL_MOD failed\n");
            return -1;
        }
        r->head->items[fd].events = event;
    }
    
    return 0;
}

回调函数书写

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
int write_callback(int fd, int event, void *arg) {
    struct reactor *R = getInstance();
    
    unsigned char *sbuffer = R->head->items[fd].sbuffer;
    int length = R->head->items[fd].slength;
    int ret = send(fd, sbuffer, length, 0);
    if (ret < length) {
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    } else {
        nreactor_set_event(fd, read_callback, READ_CB, NULL);
    }
    return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
    struct reactor *R = getInstance();
    unsigned char *buffer = R->head->items[fd].rbuffer;
    
#if 0 //ET
    int idx = 0, ret = 0;
    while (idx < BUFFER_LENGTH) {
        ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
        if (ret == -1) {
            break;
        } else if (ret > 0) {
            idx += ret;
        } else {// == 0
            break;
        }
    }
    if (idx == BUFFER_LENGTH && ret != -1) {
        nreactor_set_event(fd, read_callback, READ_CB, NULL);
    } else if (ret == 0) {
        nreactor_set_event
        //close(fd);
    } else {
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    }
    
#else //LT
    int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
    if (ret == 0) { // fin
        
        nreactor_del_event(fd, NULL, 0, NULL);
        close(fd);
        
    } else if (ret > 0) {
        unsigned char *sbuffer = R->head->items[fd].sbuffer;
        memcpy(sbuffer, buffer, ret);
        R->head->items[fd].slength = ret;
        printf("readcb: %s\n", sbuffer);
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    }
        
#endif
    
}
// web server
// ET / LT
int accept_callback(int fd, int event, void *arg) {
    int connfd;
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}

监听描述符变化

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// accept --> EPOLL
/*epoll_wait监听0*/
int reactor_loop(int listenfd) {
 
    struct reactor *R = getInstance(); 
    
    struct epoll_event events[POLL_SIZE] = {0};
    while (1) {
        int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
        if (nready == -1) {
            continue;
        }
 
        int i = 0;
        for (i = 0;i < nready;i ++) {
            
            struct nitem *item = (struct nitem *)events[i].data.ptr;
            int connfd = item->fd;
 
            if (connfd == listenfd) { //
                item->acceptcb(listenfd, 0, NULL);
            } else {
            
                if (events[i].events & EPOLLIN) { //
                    item->readcb(connfd, 0, NULL);
                
                }
                if (events[i].events & EPOLLOUT) {
                    item->writecb(connfd, 0, NULL);
        
                }
            }
        }
 
    }
    return 0;
}

完整代码实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
#define MAXLNE  4096
#define POLL_SIZE   1024
#define BUFFER_LENGTH       1024
#define MAX_EPOLL_EVENT     1024
#define NOSET_CB    0
#define READ_CB     1
#define WRITE_CB    2
#define ACCEPT_CB   3
/*单例模式*/
typedef int NCALLBACK(int fd, int event, void *arg);
/*fd包含的属性*/
struct nitem { // fd
    int fd;     //要监听的文件描述符
    int status; //是否在监听:1->在红黑树上(监听),0->不在(不监听)
    int events; //对应的监听事件,  EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
    void *arg;  //指向自己结构体指针
#if 0
    NCALLBACK callback;
#else
    NCALLBACK *readcb;   // epollin
    NCALLBACK *writecb;  // epollout
    NCALLBACK *acceptcb; // epollin
#endif
    unsigned char sbuffer[BUFFER_LENGTH]; //
    int slength;
    unsigned char rbuffer[BUFFER_LENGTH];
    int rlength;
    
};
/*分块存储*/
struct itemblock {
    struct itemblock *next;
    struct nitem *items;
};
/*epoll反应堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {
    int epfd;
    struct itemblock *head;
};
/*初始化结构体*/
int init_reactor(struct reactor *r);
int read_callback(int fd, int event, void *arg);
int write_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg);
/*单例模式*/
struct reactor *instance = NULL;
struct reactor *getInstance(void) { //singleton
    if (instance == NULL) {
        instance = (struct reactor *)malloc(sizeof(struct reactor));
        if (instance == NULL) return NULL;
        memset(instance, 0, sizeof(struct reactor));
        if (0 > init_reactor(instance)) {
            free(instance);
            return NULL;
        }
    }
    return instance;
}
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到对应事件*/
/*驱动注册*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
    struct reactor *r = getInstance();
    
    struct epoll_event ev = {0};
    //1
    if (event == READ_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].readcb = cb;
        r->head->items[fd].arg = arg;
        ev.events = EPOLLIN;
        
    }
    //2
    else if (event == WRITE_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].writecb = cb;
        r->head->items[fd].arg = arg;
        ev.events = EPOLLOUT;
    }
    //3
    else if (event == ACCEPT_CB) {
        r->head->items[fd].fd = fd;
        r->head->items[fd].acceptcb = cb; //回调函数
        r->head->items[fd].arg = arg;
        ev.events = EPOLLIN;
    }
    ev.data.ptr = &r->head->items[fd];
    /*NOSET_CB 0*/
    if (r->head->items[fd].events == NOSET_CB) {
        if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
            printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
            return -1;
        }
        r->head->items[fd].events = event;
    } else if (r->head->items[fd].events != event) {
        if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
            printf("epoll_ctl EPOLL_CTL_MOD failed\n");
            return -1;
        }
        r->head->items[fd].events = event;
    }
    
    return 0;
}
/*nreactor_del_event(fd, NULL, 0, NULL);*/
/*下树*/
/*nreactor_del_event(fd, NULL, 0, NULL);*/
int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
    struct reactor *r = getInstance();
    
    struct epoll_event ev = {0};
    ev.data.ptr = arg;
    epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
    r->head->items[fd].events = 0;
    return 0;
}
int write_callback(int fd, int event, void *arg) {
    struct reactor *R = getInstance();
    
    unsigned char *sbuffer = R->head->items[fd].sbuffer;
    int length = R->head->items[fd].slength;
    int ret = send(fd, sbuffer, length, 0);
    if (ret < length) {
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    } else {
        nreactor_set_event(fd, read_callback, READ_CB, NULL);
    }
    return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
    struct reactor *R = getInstance();
    unsigned char *buffer = R->head->items[fd].rbuffer;
    
#if 0 //ET
    int idx = 0, ret = 0;
    while (idx < BUFFER_LENGTH) {
        ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
        if (ret == -1) {
            break;
        } else if (ret > 0) {
            idx += ret;
        } else {// == 0
            break;
        }
    }
    if (idx == BUFFER_LENGTH && ret != -1) {
        nreactor_set_event(fd, read_callback, READ_CB, NULL);
    } else if (ret == 0) {
        nreactor_set_event
        //close(fd);
    } else {
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    }
    
#else //LT
    int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
    if (ret == 0) { // fin
        
        nreactor_del_event(fd, NULL, 0, NULL);
        close(fd);
        
    } else if (ret > 0) {
        unsigned char *sbuffer = R->head->items[fd].sbuffer;
        memcpy(sbuffer, buffer, ret);
        R->head->items[fd].slength = ret;
        printf("readcb: %s\n", sbuffer);
        nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
    }
        
#endif
    
}
// web server
// ET / LT
int accept_callback(int fd, int event, void *arg) {
    int connfd;
    struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
int init_server(int port) {
    int listenfd;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
    return listenfd;
}
int init_reactor(struct reactor *r) {
    if (r == NULL) return -1;
    int epfd = epoll_create(1); //int size
    r->epfd = epfd;
    // fd --> item
    r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
    if (r->head == NULL) {
        close(epfd);
        return -2;
    }
    memset(r->head, 0, sizeof(struct itemblock));
    r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
    if (r->head->items == NULL) {
        free(r->head);
        close(epfd);
        return -2;
    }
    memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
    
    r->head->next = NULL;
    
    return 0;
}
// accept --> EPOLL
/*epoll_wait监听0*/
int reactor_loop(int listenfd) {
    struct reactor *R = getInstance(); 
    
    struct epoll_event events[POLL_SIZE] = {0};
    while (1) {
        int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
        if (nready == -1) {
            continue;
        }
        int i = 0;
        for (i = 0;i < nready;i ++) {
            
            struct nitem *item = (struct nitem *)events[i].data.ptr;
            int connfd = item->fd;
            if (connfd == listenfd) { //
                item->acceptcb(listenfd, 0, NULL);
            } else {
            
                if (events[i].events & EPOLLIN) { //
                    item->readcb(connfd, 0, NULL);
                
                }
                if (events[i].events & EPOLLOUT) {
                    item->writecb(connfd, 0, NULL);
        
                }
            }
        }
    }
    return 0;
}
int main(int argc, char **argv)
{
    
    int  connfd, n;
    int listenfd = init_server(9999);
    nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);
    //nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
    
    reactor_loop(listenfd);
     
    return 0;
}

到此这篇关于Reactor原理与实现的文章就介绍到这了,更多相关Reactor原理内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/weixin_52259848/article/details/125354898

延伸 · 阅读

精彩推荐
  • C/C++C++数据结构红黑树全面分析

    C++数据结构红黑树全面分析

    今天的这一篇博客,我要跟大家介绍二叉搜索树中的另一颗树——红黑树,它主要是通过控制颜色来控制自身的平衡,但它的平衡没有AVL树的平衡那么严格...

    呆呆兽学编程8312022-10-10
  • C/C++C语言实现通讯录功能

    C语言实现通讯录功能

    这篇文章主要为大家详细介绍了C语言实现通讯录功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Lolita09173792021-06-20
  • C/C++C语言实现牛顿迭代法解方程详解

    C语言实现牛顿迭代法解方程详解

    这篇文章主要介绍了C语言实现牛顿迭代法解方程详解的相关资料,需要的朋友可以参考下...

    Engineer-Mr-Yang11722021-05-05
  • C/C++c语言中static修饰函数的方法及代码

    c语言中static修饰函数的方法及代码

    在本篇内容里小编给大家分享的是一篇关于c语言中static如何修饰函数的知识点内容,有需要朋友们可以跟着学习下。...

    小妮浅浅11332022-01-19
  • C/C++C++中的重载、覆盖、隐藏介绍

    C++中的重载、覆盖、隐藏介绍

    这篇文章主要介绍了C++中的重载、覆盖、隐藏介绍,需要的朋友可以参考下...

    C++教程网12032021-02-24
  • C/C++实例详解C++中指针与引用的区别

    实例详解C++中指针与引用的区别

    引用是C++引入的重要机制(C语言没有引用),它使原来在C中必须用指针来实现的功能有了另一种实现的选择,在书写形式上更为简洁,那么引用的本质是什么...

    ZhiboZhao8772021-11-21
  • C/C++C++关于Makefile的详解含通用模板

    C++关于Makefile的详解含通用模板

    今天小编就为大家分享一篇关于C++关于Makefile的详解含通用模板,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小...

    蜗牛2016122021-07-15
  • C/C++C++ opencv实现几何图形绘制

    C++ opencv实现几何图形绘制

    这篇文章主要为大家介绍了C++ opencv实现几何图形的绘制示例实现代码,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪...

    浅念念524022022-12-01