服务器处理非活动连接
LDK Lv4

Linux在内核中提供了对连接是否处于活动状态的检查机制,可以通过socket选项中的KEEPALIVE来激活它。不过这种方式将使得应用程序对连接的管理变得复杂。

因此可以考虑在应用层实现类似于KEEPALIVE的机制,来管理所有处于非活动状态的连接。

下面一段代码利用alarm函数周期性触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务——关闭非活动连接。有关定时器链表,参见:基于升序链表的定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <error.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <cerrno>
#include "sort_timer_list.h"

#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5

static int pipefd[2]; // 用于创建相互连接的无名套接字,可以全双工通信。也可以当作管道用。
static sort_time_list timer_lst; // 基于升序链表的定时器
static int epollfd = 0;

/**
* @brief 将文件描述符设置为非阻塞状态
* @return -1: 出现错误, >=0: 返回原来的状态标志​​, 在需要临时修改标志并随后恢复原状态时非常有用
*/
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
if (old_option == -1) {
perror("fcntl(F_GETFL) failed");
return -1;
}
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
if (fcntl(fd, F_SETFL, new_option) == -1) {
perror("fcntl(F_SETFL) failed");
return -1;
}
return old_option;
}

/* 将文件描述符添加到内核事件表中,监听读事件 */
void addfd(int epollfd, int fd) {
epoll_event event;
// 设置要监听的文件描述符
event.data.fd = fd;
// 监听读事件,并且采用ET模式
event.events = EPOLLIN | EPOLLET;
// 添加到内核事件表
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
// 设置非阻塞
setnonblocking(fd);
}

/* 信号处理函数 */
void sig_handler(int sig) {
/**
* 在信号处理函数中,​​必须首先保存​​ errno的当前值。因为接下来的操作(如 send)可能会失败并修改 errno。
* 在函数返回前恢复 errno,可以避免干扰主程序中其他依赖 errno的逻辑
*/
int save_errno = errno;
int msg = sig;
/**send函数被认为是​​异步信号安全​​的,这意味着它可以在信号处理函数中安全地调用。*/
send(pipefd[1], (char *)&msg, 1, 0);
// 在函数返回前,恢复之前保存的 errno值,确保不会影响主程序的错误判断逻辑。
errno = save_errno;

/**为什么要通过管道发送信息?
* 信号处理函数 (sig_handler) 是在一个​​异步的、不可预测的​​上下文中执行的(可以看作是一个“中断”上下文)。
* 在这个上下文中,你不能调用绝大多数可能阻塞的或非可重入的函数(例如 printf, malloc等),否则可能导致死锁或未定义行为。
*
* 为了解决这个限制,常见的做法是创建一个​​管道(pipe)​​ 或 ​​Unix 域套接字​​,并将其注册到 epoll, kqueue, select等 ​​I/O 多路复用​​循环中。
* 信号处理函数只做一件事:将信号信息写入管道。这样一来,信号的“通知”就转换为了一个​​管道可读​​的 I/O 事件。主循环 (epoll_wait/select) 会检测到这个事件,然后在​​正常、安全的上下文​​中从容地进行后续处理
*/

/**用管道需要注意的问题
* 如果信号非常频繁地到达,管道可能会被写满(因为信号处理函数调用 send的速度可能快于主线程从另一端读取的速度)。
* 不过,单个信号只发送 1 字节数据,而管道通常有足够的缓冲区(通常至少 4KB)
*/
}

/* 设置信号的处理函数 */
void addsig(int sig) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
// 设置信号处理函数
sa.sa_handler = sig_handler;
/**SA_RESTART的作用是:如果某个系统调用(如 read, write, accept)被当前设置的信号中断,
* 内核会自动重启该系统调用​​,而不是让它返回错误 */
sa.sa_flags |= SA_RESTART;
/**在执行 sa_handler 所指向的信号处理函数 sig_handler 期间,进程会临时阻塞(屏蔽)所有其他信号
* sigfillset()函数会将sa_mask的所有bit位置为1 */
sigfillset(&sa.sa_mask);
// 将上面配置好的 struct sigaction 结构体 sa 应用于信号 sig .
assert(sigaction(sig, &sa, nullptr) != -1);
}

/* 处理定时任务的函数 */
void timer_handler() {
// 处理定时任务,实际上就是调用tick()
timer_lst.tick();
// 每次处理完任务后都调用alarm函数重新定时, 以不断触发SIGALARM信号
alarm(TIMESLOT);
}

/* 定时器的回调函数,它删除非活动连接socket上的注册事件,并关闭之 */
void cb_func(client_data *user_data) {
// 将sockfd从内核事件表中删除
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
// 关闭非活动连接
close(user_data->sockfd);
printf("closed fd %d\n", user_data->sockfd);
}

int main(int argc, char *argv[]) {
if (argc < 2) {
// 参数不够
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return -1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
memset(&address, '\0', sizeof(address));
address.sin_family = AF_INET; // 协议族
inet_pton(AF_INET, ip, &address.sin_addr); // ip地址(presentation to network)
address.sin_port = htons(port); // 端口

int socketfd = socket(PF_INET, SOCK_STREAM, 0);
assert(socketfd >= 0);

ret = bind(socketfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);

ret = listen(socketfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, socketfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); // 创建管道
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epollfd, pipefd[0]);

// 设置信号处理函数
addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;

client_data *users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT);

while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd;
/* 处理新到的客户端连接 */
if (sockfd == socketfd) { // 未受理(accetp)的连接,进行受理,并且开始定时
sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(socketfd, (sockaddr *)&client_address, &client_addrlength);
// 监听刚受理的连接
addfd(epollfd, connfd);
/* 保存被受理连接的用户数据 */
users[connfd].address = client_address;
users[connfd].sockfd = connfd;

util_timer *timer = new util_timer; // 创建定时器
timer->user_data = &users[connfd]; // 绑定用户数据
timer->cb_func = cb_func; // 设置回调函数
time_t cur = time(nullptr);
timer->expire = cur + 3 * TIMESLOT; // 超时事件设置为当前系统时间之后的15秒
users[connfd].timer = timer; // 保存定时器
timer_lst.add_timer(timer); // 定时器插入链表
}
// 管道上有读事件
else if ((sockfd == pipefd[0]) && events[i].events & EPOLLIN) {
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
// 错误处理
continue;
} else if (ret == 0) {
// 因为文件描述符是非阻塞的(前面设置),所以当没有文件可读时就会直接返回0。
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM: {
/**用timeout变量标记有定时任务需要处理,但是不立即处理。
* 因为定时任务的优先级不是很高。优先处理其他更重要的任务
*/
timeout = true;
break;
}
case SIGTERM: {
stop_server = true;
break;
}
}
}
}
}
/* 处理客户端连接上收到的数据 */
else if (events[i].events & EPOLLIN) {
memset(users[sockfd].buf, '\0', BUFFER_SIZE); // 使用之前清空缓存区
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);

printf("get %d bytes of client data %s from %d \n", ret, users[sockfd].buf, sockfd);
util_timer *timer = users[sockfd].timer;

if (ret < 0) {
/* 如果发生读错误,则关闭连接,并且移除其对应的定时器 */
if (errno != EAGAIN) {
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
}
} else if (ret == 0) {
/* 如果对方已经关闭连接,则我们也关闭连接,并移除对应的定时器 */
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
} else {
/* 如果某个客户连接上有数据可读,则我们要调整该连接对应的定时器,以延迟该连接被关闭的事件 */
if (timer) {
time_t cur = time(nullptr);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once \n");
timer_lst.adjust_timer(timer);
}
}
} else {
// others
}
}

/**最后处理定时事件,因为IO事件有更高的优先级。
* 当然,这样做将导致定时任务不能精确地按照预期的时间执行 */
if (timeout) {
timer_handler();
timeout = false;
}
}

close(socketfd);
close(pipefd[1]);
close(pipefd[0]);
delete[] users;
return 0;
}
由 Hexo 驱动 & 主题 Keep
本站由 提供部署服务
总字数 34.6k 访客数 访问量