服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C/C++ - C++基于reactor的服务器百万并发实现与讲解

C++基于reactor的服务器百万并发实现与讲解

2023-02-16 15:24恒者走天下 C/C++

这篇文章主要介绍了C++基于reactor的服务器百万并发实现与讲解,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

reactor实现的原理请参考:
http://www.tuohang.net/article/263341.html
本次百万并发的代码实现也是基于上面代码进行更改而来

并发量和承载的概念
并发量:一个服务器能同时承载客户端的数量
承载:客户端发送给服务器的请求(http或tcp等)在200ms内可以返回正确的结果

一、服务器的代码实现与讲解

结构体代码主要构建的结构如图所示
链表结构,每个eventblock结点,包括一个ntyevent数组,数组中存储fd

C++基于reactor的服务器百万并发实现与讲解

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*结构体定义链表数组*/
struct ntyevent {
    int fd;//要监听的文件描述符
    int events;//对应的监听事件,   EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
    void *arg;//指向自己结构体指针
    int (*callback)(int fd, int events, void *arg);
    
    int status;//是否在监听:1->在红黑树上(监听),0->不在(不监听)
    char buffer[BUFFER_LENGTH];
    int length;
    long last_active;
};
 
struct eventblock {
 
    struct eventblock *next;
    struct ntyevent *events;//数组
    
};
 
struct ntyreactor {
    //句柄
    int epfd;
    //结点个数
    int blkcnt;
    struct eventblock *evblk; //fd --> 100w
};

初始化fd 上树、下树代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//nty_event_set(event, sockfd, acceptor, reactor);
//初始化sockfd
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {
 
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);
 
    return ;
    
}
//nty_event_add(reactor->epfd, EPOLLIN, event);
//对监听的epoll红黑树上的结点的修改
int nty_event_add(int epfd, int events, struct ntyevent *ev) {
 
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;
 
    int op;
    if (ev->status == 1) {
        op = EPOLL_CTL_MOD;
    } else {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }
 
    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
        return -1;
    }
 
    return 0;
}
 
int nty_event_del(int epfd, struct ntyevent *ev) {
 
    struct epoll_event ep_ev = {0, {0}};
 
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
 
    return 0;
}

回调函数代码的书写
注意看recv_cb的回调函数中,recv之后,立马下树,然后又重新初始化fd,上树。这样做的目的是因为代码逻辑是recv收到数据后,立即原样send,所以需要对fd的属性进行更改,需要重新初始化赋值,然后上树

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
int recv_cb(int fd, int events, void *arg) {
 
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
 
    int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); //
    nty_event_del(reactor->epfd, ev);
 
    if (len > 0) {
        
        ev->length = len;
        ev->buffer[len] = '\0';
 
        printf("C[%d]:%s\n", fd, ev->buffer);
 
        nty_event_set(ev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ev);
        
        
    } else if (len == 0) {
 
        close(ev->fd);
        //printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);
         
    } else {
 
        close(ev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
        
    }
 
    return len;
}
 
int send_cb(int fd, int events, void *arg) {
 
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
 
    int len = send(fd, ev->buffer, ev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
 
        nty_event_del(reactor->epfd, ev);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
        
    } else {
 
        close(ev->fd);
 
        nty_event_del(reactor->epfd, ev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
 
    }
 
    return len;
}
 
int accept_cb(int fd, int events, void *arg) {
 
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    if (reactor == NULL) return -1;
 
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
 
    int clientfd;
 
    if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
        if (errno != EAGAIN && errno != EINTR) {
            
        }
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    int flag = 0;
    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    /*存储*/
    struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
    
    nty_event_set(event, clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
 
    
    printf("new connect [%s:%d], pos[%d]\n",
        inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
 
    return 0;
 
}

链表的初始化与销毁

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//初始化链表
int ntyreactor_init(struct ntyreactor *reactor) {
 
    if (reactor == NULL) return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
 
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0) {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
 
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
 
    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
 
    block->events = evs;
    block->next = NULL;
 
    reactor->evblk = block;
    reactor->blkcnt = 1;
 
    return 0;
}

找到fd应在链表数组中存储的位置并返回

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//新增块数(eventblock结点个数)
//ntyreactor_alloc(reactor);
int ntyreactor_alloc(struct ntyreactor *reactor) {
 
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
 
    struct eventblock *blk = reactor->evblk;
    while (blk->next != NULL) {
        blk = blk->next;
    }
 
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
 
    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
 
    block->events = evs;
    block->next = NULL;
 
    blk->next = block;
    reactor->blkcnt ++; //
 
    return 0;
}
 
//struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
 
    int blkidx = sockfd / MAX_EPOLL_EVENTS;
    //如果块数(eventblock结点个数)不能满足新的sockfd的存放
    while (blkidx >= reactor->blkcnt) {
        //新增块数(eventblock结点个数)
        ntyreactor_alloc(reactor);
    }
 
    //找到存放sockfd的块(eventblock对应的结点)
    int i = 0;
    struct eventblock *blk = reactor->evblk;
    while(i ++ < blkidx && blk != NULL) {
        blk = blk->next;
    }
 
    //返回对应块(eventblock对应的结点)的存放sockfd数组的那个具体位置
    return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}

上树,并初始化链表数组上对应的fd

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
//上树,并初始化链表数组上对应的fd
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
 
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
 
    //reactor->evblk->events[sockfd];
    //找到sock所在的具体位置
    struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
 
    初始化sockfd
    nty_event_set(event, sockfd, acceptor, reactor);
    //对监听的epoll红黑树上的结点的修改
    nty_event_add(reactor->epfd, EPOLLIN, event);
 
    return 0;
}

epollwait

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//ntyreactor_run(reactor);
int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->evblk == NULL) return -1;
    
    struct epoll_event events[MAX_EPOLL_EVENTS+1];
    
    int checkpos = 0, i;
 
    while (1) {
/*
        long now = time(NULL);
        for (i = 0;i < 100;i ++, checkpos ++) {
            if (checkpos == MAX_EPOLL_EVENTS) {
                checkpos = 0;
            }
 
            if (reactor->events[checkpos].status != 1) {
                continue;
            }
 
            long duration = now - reactor->events[checkpos].last_active;
 
            if (duration >= 60) {
                close(reactor->events[checkpos].fd);
                printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
                nty_event_del(reactor->epfd, &reactor->events[checkpos]);
            }
        }
*/
 
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
 
        for (i = 0;i < nready;i ++) {
 
            struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;
 
            //看fd连接是否发生变化
            if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            
        }
 
    }
}

main函数;此服务器代码开设了100个监听的端口,目的是因为客户端测试程序也是运行在虚拟机的Ubuntu上,通过开三台来充当客户端来进行测试。有因为一台Ubuntu最多有6w个端口,3台有18w端口。如果服务器只开设一个监听端口,则最多有18w端口。因此要达到100w并发则应多开设端口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 3, 6w, 1, 100 ==
// <remoteip, remoteport, localip, localport>
int main(int argc, char *argv[]) {
 
    unsigned short port = SERVER_PORT; // listen 8888
    if (argc == 2) {
        port = atoi(argv[1]);//把参数 str 所指向的字符串转换为一个整数(类型为 int 型)
    }
    struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    /*初始化三个结构体,建立链表*/
    ntyreactor_init(reactor);
 
    int i = 0;
    int sockfds[PORT_COUNT] = {0};
    for (i = 0;i < PORT_COUNT;i ++) {
        //端口号的监听
        sockfds[i] = init_sock(port+i);
        //上树
        ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    }
 
    // epoll_wait
    ntyreactor_run(reactor);
    //
    ntyreactor_destory(reactor);
 
    for (i = 0;i < PORT_COUNT;i ++) {
        close(sockfds[i]);
    }
 
    free(reactor);
 
    return 0;
}

完整服务器代码展示

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/*链表存储数组,把epoll变成对事件的管理,用链表数组的目的就是为了回调函数*/
/*recv写法:代码逻辑是收到数据后,立即原样返回所以才那样写*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#define BUFFER_LENGTH       4096
#define MAX_EPOLL_EVENTS    1024
#define SERVER_PORT         8888
#define PORT_COUNT          100
typedef int NCALLBACK(int ,int, void*);
//struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
struct ntyevent {
    int fd;//要监听的文件描述符
    int events;//对应的监听事件,   EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
    void *arg;//指向自己结构体指针
    int (*callback)(int fd, int events, void *arg);
    int status;//是否在监听:1->在红黑树上(监听),0->不在(不监听)
    char buffer[BUFFER_LENGTH];
    int length;
    long last_active;
};
struct eventblock {
    struct eventblock *next;
    struct ntyevent *events;//数组
};
struct ntyreactor {
    //句柄
    int epfd;
    //结点个数
    int blkcnt;
    struct eventblock *evblk; //fd --> 100w
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);
//nty_event_set(event, sockfd, acceptor, reactor);
//初始化sockfd
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);
    return ;
}
//nty_event_add(reactor->epfd, EPOLLIN, event);
//对监听的epoll红黑树上的结点的修改
int nty_event_add(int epfd, int events, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;
    int op;
    if (ev->status == 1) {
        op = EPOLL_CTL_MOD;
    } else {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }
    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
        return -1;
    }
    return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    return 0;
}
int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
    int len = recv(fd, ev->buffer, BUFFER_LENGTH , 0); //
    nty_event_del(reactor->epfd, ev);
    if (len > 0) {
        ev->length = len;
        ev->buffer[len] = '\0';
        printf("C[%d]:%s\n", fd, ev->buffer);
        nty_event_set(ev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ev);
    } else if (len == 0) {
        close(ev->fd);
        //printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);
    } else {
        close(ev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}
int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    struct ntyevent *ev = ntyreactor_idx(reactor, fd);
    int len = send(fd, ev->buffer, ev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
        nty_event_del(reactor->epfd, ev);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
    } else {
        close(ev->fd);
        nty_event_del(reactor->epfd, ev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}
int accept_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor*)arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) {
        if (errno != EAGAIN && errno != EINTR) {
        }
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    int flag = 0;
    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    /*存储*/
    struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
    nty_event_set(event, clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
    printf("new connect [%s:%d], pos[%d]\n",
        inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
    return 0;
}
int init_sock(short port) {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(fd, F_SETFL, O_NONBLOCK);
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);
    bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
    if (listen(fd, 20) < 0) {
        printf("listen failed : %s\n", strerror(errno));
    }
    return fd;
}
//新增块数(eventblock结点个数)
//ntyreactor_alloc(reactor);
int ntyreactor_alloc(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
    struct eventblock *blk = reactor->evblk;
    while (blk->next != NULL) {
        blk = blk->next;
    }
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
    block->events = evs;
    block->next = NULL;
    blk->next = block;
    reactor->blkcnt ++; //
    return 0;
}
//struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
    int blkidx = sockfd / MAX_EPOLL_EVENTS;
    //如果块数(eventblock结点个数)不能满足新的sockfd的存放
    while (blkidx >= reactor->blkcnt) {
        //新增块数(eventblock结点个数)
        ntyreactor_alloc(reactor);
    }
    //找到存放sockfd的块(eventblock对应的结点)
    int i = 0;
    struct eventblock *blk = reactor->evblk;
    while(i ++ < blkidx && blk != NULL) {
        blk = blk->next;
    }
    //返回对应块(eventblock对应的结点)的存放sockfd数组的那个具体位置
    return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}
//初始化链表
int ntyreactor_init(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0) {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
    struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
    block->events = evs;
    block->next = NULL;
    reactor->evblk = block;
    reactor->blkcnt = 1;
    return 0;
}
int ntyreactor_destory(struct ntyreactor *reactor) {
    close(reactor->epfd);
    //free(reactor->events);
    struct eventblock *blk = reactor->evblk;
    struct eventblock *blk_next = NULL;
    while (blk != NULL) {
        blk_next = blk->next;
        free(blk->events);
        free(blk);
        blk = blk_next;
    }
    return 0;
}
//ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
//上树,并初始化链表数组上对应的fd
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
    //reactor->evblk->events[sockfd];
    //找到sock所在的具体位置
    struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
    初始化sockfd
    nty_event_set(event, sockfd, acceptor, reactor);
    //对监听的epoll红黑树上的结点的修改
    nty_event_add(reactor->epfd, EPOLLIN, event);
    return 0;
}
//ntyreactor_run(reactor);
int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->evblk == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS+1];
    int checkpos = 0, i;
    while (1) {
/*
        long now = time(NULL);
        for (i = 0;i < 100;i ++, checkpos ++) {
            if (checkpos == MAX_EPOLL_EVENTS) {
                checkpos = 0;
            }
            if (reactor->events[checkpos].status != 1) {
                continue;
            }
            long duration = now - reactor->events[checkpos].last_active;
            if (duration >= 60) {
                close(reactor->events[checkpos].fd);
                printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
                nty_event_del(reactor->epfd, &reactor->events[checkpos]);
            }
        }
*/
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0;i < nready;i ++) {
            struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;
            //看fd连接是否发生变化
            if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
        }
    }
}
// 3, 6w, 1, 100 ==
// <remoteip, remoteport, localip, localport>
int main(int argc, char *argv[]) {
    unsigned short port = SERVER_PORT; // listen 8888
    if (argc == 2) {
        port = atoi(argv[1]);//把参数 str 所指向的字符串转换为一个整数(类型为 int 型)
    }
    struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
    /*初始化三个结构体,建立链表*/
    ntyreactor_init(reactor);
    int i = 0;
    int sockfds[PORT_COUNT] = {0};
    for (i = 0;i < PORT_COUNT;i ++) {
        //端口号的监听
        sockfds[i] = init_sock(port+i);
        //上树
        ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    }
    // epoll_wait
    ntyreactor_run(reactor);
    //
    ntyreactor_destory(reactor);
    for (i = 0;i < PORT_COUNT;i ++) {
        close(sockfds[i]);
    }
    free(reactor);
    return 0;
}

reactor的写法感觉和epoll的普通写法,感觉差别就是reactor多了个回调函数,具体没啥优点?
epoll是针对io的管理。 reactor对针对事件的管理
不同的事件,针对不同的回调函数
性能上没啥差异,但提高了代码的复用性。具体需要自己慢慢体会体会,呜呜呜呜还有体会到,编程思想不过关。

二、环境设置

限制是fd的限制,系统默认fd最多有1024个,按照一个连接一个fd的做法,那就需要百万个fd。这里有两种修改方法,一是使用ulimit -n命令,这个命令重启就失效;二是修改/etc/security/limits.conf文件,这是永久有效的,重启或sysctl -p生效。

?
1
2
* hard nofile 1048576
* soft nofile 1048576

hard是硬限制,不能超过该值,soft是软限制,可以超过,超过后就开始回收。
这个文件里还有一些其他的参数可以了解一下,fs.file_max是fd可取到的最大值,注意与fd最大个数区分。
突破这两个限制后,还会遇到一个问题,客户端会报错:connection timedout。连接超时,即是客户端未收到服务器对客户端connect()的回应包。这里有两种可能,客户端为收到服务器的包或是服务器未收到客户端的connect包。事实上,是因为系统有个防火墙iotables,这个防火墙是基于网卡和协议栈之间的过滤机制netfilter实现的。netfilter当连接数到达一定程度时,会不允许再向外发送connect包。修改也是通过/etc/security/limits.conf文件

?
1
net.nf_conntrack_max=1048576

突破这些限制,就可以实现百万并发了。
这里再介绍/etc/security/limits.conf中几个参数
net.ipv4.tcp_mem=262144 524288 786432是所有TCP协议栈所占空间的大小,单位是页(4KB)。介绍一下后面写的三个值,当所占空间大小超过第二个值时,系统会进行优化,此时如果占用空间降到第一个值以下,不再优化,第三个值是上限,不允许分配超过比大小的空间。
net.ipv4.tcp_wmem=2048 2048 4096是每个socket对应的写缓冲区大小,三个值分别是最小值、默认值、最大值,单位是B。
net.ipv4.tcp_rmem=2048 2048 4096是每个socket对应的读缓冲区大小,三个值分别是最小值、默认值、最大值,单位是B。
做百万并发时,如果内存不大,可以相应调小。在实际应用中,如果传输大文件,调大;如果传输的都是字符,调小,就可以接收更多fd。

到此这篇关于C++基于reactor的服务器百万并发实现的文章就介绍到这了,更多相关reactor服务器百万并发内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/weixin_52259848/article/details/125513518

延伸 · 阅读

精彩推荐
  • C/C++C++回溯算法深度优先搜索举例分析

    C++回溯算法深度优先搜索举例分析

    回溯在迷宫搜索中使用很常见,就是这条路走不通,然后返回前一个路口,继续下一条路。回溯算法说白了就是穷举法,下面让我们一起来看看回溯算法深...

    ymz123_10452022-10-28
  • C/C++解析C++函数的默认参数和占位参数及较之C语言的拓展

    解析C++函数的默认参数和占位参数及较之C语言的拓展

    这篇文章主要介绍了C++中的默认参数和占位参数及较之C语言的拓展,需要的朋友可以参考下...

    YoferZhang4832021-03-27
  • C/C++opengl实现直线扫描算法和区域填充算法

    opengl实现直线扫描算法和区域填充算法

    这篇文章主要为大家详细介绍了opengl实现直线扫描算法和区域填充算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考...

    process-z.com4142021-08-31
  • C/C++C++如何动态的生成对象详解

    C++如何动态的生成对象详解

    C++是不支持根据类名动态地生成对象的,比如从一个文本文件中读取类名然后构造一个对象.主要原因是没有丰富的动态元信息,没有单根类库。那么下面这...

    朝十晚八11152021-04-29
  • C/C++详解C语言数组灵活多变的访问形式

    详解C语言数组灵活多变的访问形式

    这篇文章主要介绍了详解C语言数组灵活多变的访问形式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友...

    zy_kxo6382021-10-20
  • C/C++详解C++ 拷贝构造函数和赋值运算符

    详解C++ 拷贝构造函数和赋值运算符

    本文主要介绍了拷贝构造函数和赋值运算符的区别,以及在什么时候调用拷贝构造函数、什么情况下调用赋值运算符。最后,简单的分析了下深拷贝和浅拷...

    Brook_icv9992021-04-21
  • C/C++C语言实现纸牌游戏之小猫钓鱼算法

    C语言实现纸牌游戏之小猫钓鱼算法

    这篇文章主要为大家详细介绍了C语言实现纸牌游戏之小猫钓鱼算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Gerald Kwok4772021-07-18
  • C/C++C语言编写汉诺塔游戏

    C语言编写汉诺塔游戏

    这篇文章主要介绍了C语言编写汉诺塔游戏,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    皆自落9082022-03-08