扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
在实现http服务器之前,我们先需要一个可以接收数据和发送数据的tcp服务器。网络连接上消息处理可以分为两个阶段:1. 等待消息准备好 2. 消息处理。
创新互联专注于企业营销型网站建设、网站重做改版、泉州网站定制设计、自适应品牌网站建设、H5技术、商城开发、集团公司官网建设、成都外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为泉州等各大城市提供网站开发制作服务。对于高并发tcp服务器来说,将上述两个阶段分开是最好的选择。等待消息如何做到?就引出了I/O多路复用,使用一个I/O复用来监督多个网络连接,当网络连接上出现可用事件时,则I/O多路复用可返回相应的连接。
对于一个I/O复用来说,需要关注的是三种事件:1. I/O事件 2. 定时器事件 3. 信号。
二. reactor模型对于Linux来说,底层提供了三种I/O复用,分别是:1. select 2. poll 3. epoll。其中epoll是一个高效的选择,但是epoll太底层,它只是一个Linux的系统调用,需要通过某种事件处理机制进行进一步的封装。
reactor作为一种高效的事件处理模型,对epoll系统调用进行了进一步的封装。在普通的事件处理机制中,首先程序调用某个函数,然后函数执行,程序等待,当函数执行完毕后函数将结果和控制权返回给程序,最后程序继续处理。与普通函数不同的是,reactor模式中,并不是主动取调用某个API完成处理,而是相反。对于事件处理流程进行了逆置,应用程序需要提供相应的接口并且注册到reactor上,如果相应的事件发生,reactor将主动调用应用程序注册的接口,这些接口又被称为“回调函数”。reactor模型分为以下几个部分:
事件源(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在服务端系统中用一个整数表示。该事件可能来自外部,如来自客户端的连接请求、数据等。也可能来自内部,如定时器事件。
事件反应器(reactor):定义和应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关描述符相关的接口。它是事件处理器的调度核心,使用事件分离器来等待事件的发生。一旦事件发生,反应器先是分离每个事件,然后调度具体事件的事件处理器中的回调函数处理事件。
事件分离器(demultiplexer):是一个有操作系统提供的I/O复用函数,在此我们选用epoll。用来等待一个或多个事件的发生。调用者将会被阻塞,直到分离器分离的描述符集上有事件发生。
事件处理器(even handler):事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供reactor在相应的事件发生时调用,执行相应的事件处理。一般每个具体的事件处理器总是绑定一个有效的描述符句柄,用来识别事件和服务。
三. 基于reactor模型tcp服务器的实现(单线程)#include#include#include#include#include#include#include
#include#define BUFFER_LENGTH 2048
#define MAX_EPOLLSIZE 1024
typedef int (*HTTPCALLBACK)(int, int, void *);
struct _http_base;
struct _http_event;
struct _http_eventblock;
typedef struct _http_base
{
int _epoll_fd; // epoll句柄
struct _http_eventblock *_block; // 存储事件块链
int _block_cnt; // 块链块数
} http_base;
typedef struct _http_event
{
int _fd; // 事件句柄
int _event; // 事件
HTTPCALLBACK _callback; // 对应回调函数
void *_arg; // 回调函数参数
struct _http_base *_base; // 所属base
char _send_buffer[BUFFER_LENGTH]; // 发送缓冲区
int _send_length; // 待发送字节数
char _receive_buffer[BUFFER_LENGTH]; // 接收缓冲区
int _receive_length; // 接收数据长度
int _status; // 当前事件存在状态(0-epoll不管理状态 1-epoll管理状态)
int _is_listenfd; // 是否为监听套接字
} http_event;
typedef struct _http_eventblock
{
struct _http_event *events;
struct _http_eventblock *next;
} http_eventblock;
// 分配一个http_eventblock
http_eventblock *http_eventblock_new();
// 初始化http_base
http_base *http_base_new();
// 销毁http_base
void http_base_free(http_base *base);
// 事件循环
void dispatch(http_base *base);
// 增加http_eventblock块
int http_eventblock_alloc(http_base *base);
// 返回fd对应的http_event地址
struct _http_event *http_eventblock_index(http_base *base, int fd);
// 设置http_event
int http_event_set(http_base *base, http_event *h_event, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg);
// 重新设置http_event
int http_event_reset(http_base *base, http_event *h_event, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg);
// http_event加入epoll管理
int http_event_add(http_event *hpev);
// http_event修改epoll管理
int http_event_mod(http_event *hpev);
// http_event删除epoll管理
int http_event_del(http_event *hpev);
// 分配一个http_eventblock块(内部函数)
http_eventblock *http_eventblock_new()
{
// 分配events
http_event *new_events = malloc(sizeof(http_event) * MAX_EPOLLSIZE);
if (!new_events)
{
printf("create events in %s err %s\n", __func__, strerror(errno));
return NULL;
}
memset(new_events, 0, sizeof(http_event) * MAX_EPOLLSIZE);
// 分配block块
http_eventblock *new_block = malloc(sizeof(http_eventblock));
if (!new_block)
{
printf("create block in %s err %s\n", __func__, strerror(errno));
free(new_events);
return NULL;
}
memset(new_block, 0, sizeof(http_eventblock));
// 整合
new_block->events = new_events;
new_block->next = NULL;
return new_block;
}
http_base *http_base_new()
{
// 初始化base
http_base *base = malloc(sizeof(http_base));
if (!base)
{
printf("create base in %s err %s\n", __func__, strerror(errno));
return NULL;
}
memset(base, 0, sizeof(http_base));
// 初始化epoll_fd
base->_epoll_fd = epoll_create(1);
if (base->_epoll_fd<= 0)
{
printf("create epoll in %s err %s\n", __func__, strerror(errno));
free(base);
return NULL;
}
// 初始化块
base->_block = http_eventblock_new();
if (!base->_block)
{
printf("create block in %s err %s\n", __func__, strerror(errno));
close(base->_epoll_fd);
free(base);
return NULL;
}
// 初始化块数
base->_block_cnt = 1;
return base;
}
void http_base_free(http_base *base)
{
// 销毁epoll_fd;
close(base->_epoll_fd);
// 销毁httpevent_block
http_eventblock *block = base->_block;
while (block)
{
http_eventblock *next_block = block->next;
free(block->events);
free(block);
base->_block_cnt--;
block = next_block;
}
// 销毁base
free(base);
}
void dispatch(http_base *base)
{
if (!base)
{
return;
}
if (!base->_block)
{
return;
}
if (base->_epoll_fd<= 0)
{
return;
}
struct epoll_event events[MAX_EPOLLSIZE];
while (1)
{
int n_ready = epoll_wait(base->_epoll_fd, events, MAX_EPOLLSIZE, -1);
for (int i = 0; i< n_ready; ++i)
{
http_event *hpev = events[i].data.ptr;
if (hpev->_is_listenfd)
{
hpev->_callback(hpev->_fd, events[i].events, hpev->_arg);
}
else if (hpev->_event & EPOLLIN)
{
if (!hpev->_callback(hpev->_fd, events[i].events, hpev->_arg))
{
http_event_del(hpev);
close(hpev->_fd);
}
}
else if (hpev->_event & EPOLLOUT)
{
if (!hpev->_callback(hpev->_fd, events[i].events, hpev->_arg))
{
http_event_del(hpev);
close(hpev->_fd);
}
}
else if (hpev->_event & (EPOLLERR | EPOLLRDHUP | EPOLLHUP))
{
http_event_del(hpev);
close(hpev->_fd);
}
else
{
}
}
}
}
int http_eventblock_alloc(http_base *base)
{
if (!base)
{
printf("!base in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (!base->_block)
{
printf("!block in %s err %s\n", __func__, strerror(errno));
return 0;
}
http_eventblock *block = base->_block;
while (block->next)
{
block = block->next;
}
http_eventblock *new_block = http_eventblock_new();
if (!new_block)
{
return 0;
}
block->next = new_block;
base->_block_cnt++;
return 1;
}
struct _http_event *http_eventblock_index(http_base *base, int fd)
{
int block_index = fd / MAX_EPOLLSIZE;
// 块不够分配
while (base->_block_cnt<= block_index)
{
http_eventblock_alloc(base);
}
// 寻找块
http_eventblock *block = base->_block;
while (block_index)
{
block = block->next;
block_index--;
}
return &(block->events[fd % MAX_EPOLLSIZE]);
}
int http_event_set(http_base *base, http_event *hpev, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg)
{
if (!base)
{
return 0;
}
if (!hpev)
{
return 0;
}
hpev->_base = base;
hpev->_fd = fd;
hpev->_event = event;
hpev->_is_listenfd = is_listenfd;
hpev->_callback = callback;
hpev->_arg = arg;
hpev->_status = 0;
memset(hpev->_receive_buffer, 0, BUFFER_LENGTH);
memset(hpev->_send_buffer, 0, BUFFER_LENGTH);
hpev->_receive_length = 0;
hpev->_send_length = 0;
return 1;
}
int http_event_reset(http_base *base, http_event *hpev, int fd, int event, int is_listenfd, HTTPCALLBACK callback, void *arg)
{
if (!base)
{
return 0;
}
if (!hpev)
{
return 0;
}
hpev->_base = base;
hpev->_fd = fd;
hpev->_event = event;
hpev->_is_listenfd = is_listenfd;
hpev->_callback = callback;
hpev->_arg = arg;
return 1;
}
int http_event_add(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 1)
{
printf("event in %s exist %s\n", __func__, strerror(errno));
return 0;
}
hpev->_status = 1;
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
http_base *base = hpev->_base;
if (epoll_ctl(base->_epoll_fd, EPOLL_CTL_ADD, hpev->_fd, &ev)< 0)
{
printf("event add failed [fd=%d], events[%d]\n", hpev->_fd, hpev->_event);
return 0;
}
return 1;
}
int http_event_mod(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 0)
{
printf("event in %s is not exist %s\n", __func__, strerror(errno));
return 0;
}
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
http_base *base = hpev->_base;
if (epoll_ctl(base->_epoll_fd, EPOLL_CTL_MOD, hpev->_fd, &ev)< 0)
{
printf("event mod failed [fd=%d], events[%d]\n", hpev->_fd, hpev->_event);
return 0;
}
return 1;
}
int http_event_del(http_event *hpev)
{
if (!hpev)
{
printf("event in %s err %s\n", __func__, strerror(errno));
return 0;
}
if (hpev->_status == 0)
{
printf("event in %s is not exist %s\n", __func__, strerror(errno));
return 0;
}
hpev->_status = 0;
http_base *base = hpev->_base;
struct epoll_event ev = {0, {0}};
ev.data.ptr = hpev;
ev.events = hpev->_event;
epoll_ctl(base->_epoll_fd, EPOLL_CTL_DEL, hpev->_fd, &ev);
printf("disconnect, pos[%d]\n", hpev->_fd);
return 1;
}
int accept_callback(int fd, int event, void *arg);
int read_callback(int fd, int event, void *arg);
int send_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
if (!base)
{
return 0;
}
struct sockaddr_in client_address;
socklen_t client_address_len = sizeof(client_address);
int client_fd;
if ((client_fd = accept(fd, (struct sockaddr *)&client_address, &client_address_len)) == -1)
{
printf("accept: %s\n", strerror(errno));
return 0;
}
int flag = 0;
if ((flag = fcntl(client_fd, F_SETFL, O_NONBLOCK))< 0)
{
printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLLSIZE);
}
http_event *hpev = http_eventblock_index(base, client_fd);
http_event_set(base, hpev, client_fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_add(hpev);
printf("new connect [%s:%d], pos[%d]\n",
inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port), client_fd);
return 1;
}
int read_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
http_event *hpev = http_eventblock_index(base, fd);
if (hpev->_receive_length >= BUFFER_LENGTH)
{
return 0;
}
int byte_read = 0;
while (1)
{
byte_read = recv(fd,
hpev->_receive_buffer + hpev->_receive_length,
BUFFER_LENGTH - hpev->_receive_length, 0);
if (byte_read == -1)
{
if (errno == EINTR) {
continue;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
http_event_reset(base, hpev, fd, EPOLLOUT | EPOLLET | EPOLLRDHUP, 0, send_callback, base);
http_event_mod(hpev);
break;
}
return 0;
}
else if (byte_read == 0)
{
return 0;
}
hpev->_receive_length += byte_read;
}
printf("receive %s\n", hpev->_receive_buffer);
memcpy(hpev->_send_buffer, hpev->_receive_buffer, hpev->_receive_length);
hpev->_send_length = hpev->_receive_length;
memset(hpev->_receive_buffer, 0, BUFFER_LENGTH);
hpev->_receive_length = 0;
return 1;
}
int send_callback(int fd, int event, void *arg)
{
http_base *base = (http_base *)arg;
http_event *hpev = http_eventblock_index(base, fd);
int temp = 0;
int byte_have_send = 0;
int byte_to_send = hpev->_send_length;
while (1)
{
if (byte_to_send == 0)
{
http_event_reset(base, hpev, fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_mod(hpev);
break;
}
temp = send(hpev->_fd, hpev->_send_buffer, byte_to_send, 0);
if (temp == -1)
{
// 中断
if (errno == EINTR)
{
continue;
}
// 写缓冲区无数据
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
http_event_reset(base, hpev, fd, EPOLLIN | EPOLLET | EPOLLRDHUP, 0, read_callback, base);
http_event_mod(hpev);
break;
}
// 错误
else {
return 0;
}
}
byte_to_send -= temp;
byte_have_send += temp;
}
memset(hpev->_send_buffer, 0, BUFFER_LENGTH);
hpev->_send_length = 0;
return 1;
}
int init_sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
int reuse = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
bind(fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (listen(fd, 20)< 0)
{
printf("listen failed : %s\n", strerror(errno));
}
return fd;
}
int main()
{
// 创建base
http_base *base = http_base_new();
// 创建监听事件
int listen_fd = init_sock(8080);
// 加入epoll管理
http_event *listen_event = http_eventblock_index(base, listen_fd);
http_event_set(base, listen_event, listen_fd, EPOLLIN, 1, accept_callback, base);
http_event_add(listen_event);
// 事件循环
dispatch(base);
// 消除事件管理
http_event_del(listen_event);
// 销毁base
http_base_free(base);
return 0;
}
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流