Слияние кода завершено, страница обновится автоматически
// complier: gcc -o srv ft_event.c -lpthread -I ./
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <netdb.h>
#include <stdint.h>
#include <netinet/in.h>
#include <strings.h>
#include <string.h>
#include <assert.h>
#include <stddef.h>
#include <signal.h>
#include <stdarg.h>
#include <netinet/tcp.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "ft_event.h"
static int exit_main_loop;
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// util //////////////////////////////////////////////////////////////////////
int64_t ev_usec_now()
{
struct timeval now;
int64_t usec;
int status;
status = gettimeofday(&now, NULL);
if (status < 0) {
perror("gettimeofday failed");
return -1;
}
usec = (int64_t)now.tv_sec * 1000000LL + (int64_t)now.tv_usec;
return usec;
}
int64_t ev_msec_now()
{
return ev_usec_now() / 1000LL;
}
int ev_socket_nonblocing(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) {
return flags;
}
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int ev_socket_reuseaddr(int fd)
{
int reuse;
socklen_t len;
reuse = 1;
len = sizeof(reuse);
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, len);
}
int ev_socket_nodelay(int fd)
{
int nodelay;
socklen_t len;
nodelay = 1;
len = sizeof(nodelay);
return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, len);
}
int ev_socket_keepalive(int fd)
{
int val = 1;
return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
}
int ev_socket_listen(const char* ip, uint16_t port)
{
int fd = -1;
struct sockaddr_in sa;
struct hostent* ent;
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd <0) {
perror("srv_fd < 0");
exit(1);
}
if(ev_socket_nonblocing(fd) < 0) {
perror("fcntl(srv_fd, F_SETFL, flags | O_NONBLOCK) error");
exit(1);
}
if(ev_socket_reuseaddr(fd) < 0) {
perror("setsockopt(srv_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) error");
exit(1);
}
bzero((char*)&sa, sizeof(sa));
if(ip == NULL) {
sa.sin_addr.s_addr = INADDR_ANY;
} else {
ent = gethostbyname(ip);
if(ent == NULL) {
perror("gethostbyname error");
exit(0);
}
bcopy((char *)ent->h_addr,
(char *)&sa.sin_addr.s_addr,
ent->h_length);
}
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
if(bind(fd, (struct sockaddr*)&sa, sizeof(sa)) < 0) {
perror("bind error");
exit(1);
}
if(listen(fd, SOMAXCONN) < 0) {
perror("listen error");
exit(1);
}
return fd;
}
void ev_daemon()
{
pid_t pid;
/* Fork off the parent process */
pid = fork();
/* An error occurred */
if (pid < 0)
exit(EXIT_FAILURE);
/* Success: Let the parent terminate */
if (pid > 0)
exit(EXIT_SUCCESS);
/* On success: The child process becomes session leader */
if (setsid() < 0)
exit(EXIT_FAILURE);
/* Catch, ignore and handle signals */
//TODO: Implement a working signal handler */
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, SIG_IGN);
/* Fork off for the second time*/
pid = fork();
/* An error occurred */
if (pid < 0)
exit(EXIT_FAILURE);
/* Success: Let the parent terminate */
if (pid > 0)
exit(EXIT_SUCCESS);
/* Set new file permissions */
umask(0);
/* Change the working directory to the root directory */
/* or another appropriated directory */
chdir("/");
/* Close all open file descriptors */
int x;
for (x = sysconf(_SC_OPEN_MAX); x>0; x--)
{
close (x);
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// log //////////////////////////////////////////////////////////////////////
ev_log_t* ev_log_init(int level, char *filename)
{
int f = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644);
if(f == -1) {
return NULL;
}
ev_log_t* log = (ev_log_t*)malloc(sizeof(ev_log_t));
if(log == NULL) {
perror("malloc log error");
close(f);
exit(0);
}
bzero(log, sizeof(*log));
log->fd = f;
log->level = level;
return log;
}
void ev_log_finit(ev_log_t* self)
{
if(self->fd > 0) {
close(self->fd);
}
free(self);
}
void ev_log_write(ev_log_t* self, int level, const char* fmt, ...)
{
static const char *levelstr[] = {"NONE", "ERROR" ,"INFO" ,"DEBUG"};
if(self && level > self->level) {
return;
}
va_list ap;
char msg[EV_LOG_MAX_LEN];
char buf[EV_LOG_MAX_LEN];
va_start(ap, fmt);
vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
struct timeval tv;
gettimeofday(&tv, NULL);
struct tm *t = localtime(&tv.tv_sec);
size_t n = snprintf(buf, EV_LOG_MAX_LEN, "%d-%02d-%02d %02d:%02d:%02d.%06d [%s][%d] %s\n",
t->tm_year + 1900,
t->tm_mon + 1,
t->tm_mday,
t->tm_hour,
t->tm_min,
t->tm_sec,
tv.tv_usec,
levelstr[level],
getpid(),
msg);
if(self && self->fd) {
printf("%s", buf);
write(self->fd, buf, n);
} else {
fprintf(stderr, "%s", buf);
}
}
ev_log_t* ev_default_log()
{
static ev_log_t* log = NULL;
if(log == NULL) {
log = ev_log_init(ev_default_config()->log_level, ev_default_config()->logfile);
}
return log;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// array //////////////////////////////////////////////////////////////////////
ev_array_t* ev_array_new(uint32_t alloc_size, size_t elem_size)
{
ev_array_t* a;
assert(alloc_size != 0 && elem_size != 0);
a = (ev_array_t*)malloc(sizeof(a));
if(NULL == a) {
return NULL;
}
if(0 != ev_array_init(a, alloc_size, elem_size)) {
free(a);
return NULL;
}
return a;
}
void ev_array_del(ev_array_t *a)
{
if(a->elem != NULL) {
free(a->elem);
}
free(a);
}
int ev_array_init(ev_array_t *a, uint32_t alloc_size, size_t elem_size)
{
assert(alloc_size != 0 && elem_size != 0);
a->elem = malloc(alloc_size * elem_size);
if (a->elem == NULL) {
return -1;
}
a->nelem = 0;
a->size = elem_size;
a->nalloc = alloc_size;
return 0;
}
void ev_array_deinit(ev_array_t *a)
{
if (a->elem != NULL) {
free(a->elem);
}
}
void* ev_array_get(ev_array_t *a, uint32_t idx)
{
void *elem;
assert(a->nelem != 0);
assert(idx < a->nelem);
elem = (uint8_t *)a->elem + (a->size * idx);
return elem;
}
void* ev_array_push(ev_array_t* a)
{
void *elem, *new;
size_t size;
if (a->nelem == a->nalloc) {
size = a->size * a->nalloc;
new = realloc(a->elem, 2 * size);
if (new == NULL) {
return NULL;
}
a->elem = new;
a->nalloc *= 2;
}
elem = (uint8_t *)a->elem + a->size * a->nelem;
a->nelem++;
return elem;
}
void* ev_array_pop(ev_array_t* a)
{
void *elem;
assert(a->nelem != 0);
a->nelem--;
elem = (uint8_t *)a->elem + a->size * a->nelem;
return elem;
}
void ev_array_swap(ev_array_t *a, ev_array_t *b)
{
ev_array_t tmp;
tmp = *a;
*a = *b;
*b = tmp;
}
void* array_get(ev_array_t *a, uint32_t idx)
{
void *elem;
assert(a->nelem != 0);
assert(idx < a->nelem);
elem = (uint8_t *)a->elem + (a->size * idx);
return elem;
}
int ev_array_each(ev_array_t* a, array_each_t func, void* data)
{
uint32_t i, nelem;
int status;
assert(a->nelem != 0);
assert(func != NULL);
for (i = 0, nelem = a->nelem; i < nelem; ++i) {
void *elem = array_get(a, i);
status = func(elem, data);
if (status != 0) {
return status;
}
}
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// buffer //////////////////////////////////////////////////////////////////////
ev_buf_t* ev_buf_new(const char* data, size_t size)
{
ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
if(buf == NULL) {
LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
return NULL;
}
bzero(buf, sizeof(*buf));
if(size) {
ev_buf_resize(buf, size);
ev_buf_append(buf, data, size);
}
return buf;
}
ev_buf_t* ev_buf_new2(size_t size)
{
ev_buf_t* buf = (ev_buf_t*)malloc(sizeof(ev_buf_t));
if(buf == NULL) {
LOG_ERROR("(ev_buf_t*)malloc(sizeof(ev_buf_t)) error");
return NULL;
}
bzero(buf, sizeof(*buf));
if(size > 0) {
ev_buf_resize(buf, size);
}
return buf;
}
void ev_buf_del(ev_buf_t* self)
{
if(self->buf) {
free(self->buf);
}
free(self);
}
void ev_buf_resize(ev_buf_t* self, size_t n)
{
if(self->total < n) {
self->buf = realloc(self->buf, n);
self->total = n;
}
}
size_t ev_buf_append(ev_buf_t* self, const char* data, size_t len)
{
if(!len) {
return self->len;
}
size_t tailFree = self->total - self->offset - self->len;
if(tailFree < len){
if(tailFree + self->offset >= len){
memcpy(self->buf, self->buf + self->offset, self->len);
self->offset = 0;
}else{
ev_buf_resize(self, self->total + len);
}
}
memcpy(self->buf + self->offset + self->len, data, len);
self->len+= len;
self->buf[self->offset + self->len] = '\0';
return self->len;
}
size_t ev_buf_seek(ev_buf_t* self, size_t n)
{
if(n >= self->len){
self->len = 0;
self->offset = 0;
}else{
self->offset+= n;
self->len-= n;
}
return self->len;
}
int ev_buf_empty(ev_buf_t* self)
{
return self->len == 0;
}
void ev_buf_clear(ev_buf_t* self)
{
self->len = self->offset = 0;
}
size_t ev_buf_size(ev_buf_t* self)
{
return self->len;
}
const char* ev_buf_lock(ev_buf_t* self)
{
return self->buf + self->offset;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// timer manager //////////////////////////////////////////////////////////////////////
ev_timer_mgr_t* ev_timer_mgr_new(size_t slots, int granularity)
{
int i;
ev_timer_mgr_t* mgr;
assert(slots > 0 && granularity > 0);
mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr));
if(mgr == NULL) {
LOG_ERROR("ev_timer_mgr_new: mgr = (ev_timer_mgr_t*)malloc(sizeof(*mgr)) error\n");
return NULL;
}
if( 0 != ev_array_init(&mgr->timer_slots, slots, sizeof(ev_timer_head_t)) ) {
LOG_ERROR("ev_timer_mgr_new: 0 != ev_array_init(mgr->timer_slots, slots, sizeof(ev_timer_t)) error\n");
free(mgr);
return NULL;
}
for(i=0; i<slots; ++i) {
ev_timer_head_t* head = ev_array_push(&mgr->timer_slots);
TAILQ_INIT(head);
}
mgr->granularity = granularity;
mgr->index = 0;
mgr->slots = slots;
mgr->update_time = ev_msec_now();
return mgr;
}
void ev_timer_mgr_del(ev_timer_mgr_t* mgr)
{
}
int ev_timer_mgr_start_timer(ev_timer_mgr_t* mgr, ev_timer_t* t, int64_t expire, int repeate)
{
assert(mgr && t);
uint32_t cursor;
uint32_t ticks;
uint32_t td;
if(expire < mgr->granularity) {
ticks = mgr->granularity;
} else {
ticks = (expire / mgr->granularity);
}
td = (ticks % mgr->slots);
t->rotation = (ticks / mgr->slots);
cursor = ((mgr->index + td) % mgr->slots);
t->repeat = repeate;
t->expire = expire;
t->slot = cursor;
//INSERT_TAIL(t, (ev_timer_t*)ev_array_get(&mgr->timer_slots, cursor));
TAILQ_INSERT_TAIL((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, cursor), t, node);
return 0;
}
int ev_tiemr_mgr_stop_timer(ev_timer_mgr_t* mgr, ev_timer_t* t)
{
//REMOVE_ITEM(t);
TAILQ_REMOVE((ev_timer_head_t*)ev_array_get(&mgr->timer_slots, t->slot), t, node);
return 0;
}
int ev_timer_mgr_tick(ev_timer_mgr_t* mgr, int64_t now)
{
int32_t span;
int32_t i;
span = (now - mgr->update_time) / mgr->granularity;
if(span == 0) {
return 0;
}
for(i =0; i < span; ++i) {
ev_timer_t *item;
ev_timer_t *temp;
ev_timer_head_t swap;
ev_timer_head_t* head = (ev_timer_head_t*)ev_array_get(&mgr->timer_slots, mgr->index);
TAILQ_INIT(&swap);
TAILQ_SWAP(head, &swap, ev_timer_s, node);
// LIST_REPLACE(head, &timers);
// LIST_INIT(head);
TAILQ_FOREACH_SAFE(item, &swap, node, temp) {
TAILQ_REMOVE(&swap, item, node);
if(item->rotation > 0) {
item->rotation--;
TAILQ_INSERT_TAIL(head, item, node);
} else {
if(item->job.cb) {
if(item->job.cb(item->job.ptr) != 0) {
continue;
}
}
if(item->repeat) {
ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
}
}
}
// LIST_EACH_SAFE(item, temp, &timers) {
// REMOVE_ITEM(item);
// if(item->rotation > 0) {
// item->rotation--;
// INSERT_TAIL(item, head);
// } else {
// item->job.cb(item->job.ptr);
// if(item->repeat) {
// ev_timer_mgr_start_timer(mgr, item, item->expire, item->repeat);
// }
// }
// }
mgr->index = ++mgr->index % mgr->slots;
}
mgr->update_time = ev_msec_now();
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// async //////////////////////////////////////////////////////////////////////
ev_async_t* ev_async_new(ev_loop_t* ev)
{
int fds[2];
ev_async_t* async = NULL;
async = (ev_async_t*)malloc(sizeof(ev_async_t));
if(async == NULL) {
return NULL;
}
bzero(async, sizeof(ev_async_t));
if(0 != pthread_mutex_init(&async->watch_mtx, NULL) ) {
perror("ev_async_new: pthread_mutex_init error\n");
exit(1);
}
#ifdef USE_KQUEUE
if( pipe(fds) < 0) {
perror("ev_async_new: pipe2 error\n");
pthread_mutex_destroy(&async->watch_mtx);
free(async);
return NULL;
}
int flags = fcntl(fds[0], F_GETFD);
flags |= O_NONBLOCK|O_CLOEXEC;
fcntl(fds[0], F_SETFD, flags);
flags = fcntl(fds[1], F_GETFD);
flags |= O_NONBLOCK|O_CLOEXEC;
fcntl(fds[1], F_SETFD, flags);
#else
if( pipe2(fds, O_NONBLOCK | O_CLOEXEC) < 0) {
perror("ev_async_new: pipe2 error\n");
pthread_mutex_destroy(&async->watch_mtx);
free(async);
return NULL;
}
#endif
if(0 != ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t))) {
LOG_ERROR("ev_async_new: ev_array_init(async->watch_jobs, 64, sizeof(ev_job_t)) error\n");
pthread_mutex_destroy(&async->watch_mtx);
close(fds[0]);
close(fds[1]);
free(async);
return NULL;
}
async->fd = fds[0];
async->wfd = fds[1];
async->type = EV_ASYNC;
async->ev = ev;
async->refc = 1;
ev_event_add_in(ev->evp, async->fd, (ev_stream_t*)async);
return async;
}
static int ev__async_read(ev_async_t* async)
{
int size = 0;
int n;
int i;
ev_array_t temp;
char buf[1024];
LOG_ENTER_FN;
for(;;) {
do {
n = read(async->fd, buf, sizeof(buf));
}while(n < 0 && errno == EINTR);
if(n <= 0) {
break;
}
if(n == sizeof(buf)) {
continue;
} else {
break;
}
}
pthread_mutex_lock(&async->watch_mtx);
if(ev_array_n(&async->watch_jobs) > 0) {
ev_array_swap(&async->watch_jobs, &temp);
} else {
pthread_mutex_unlock(&async->watch_mtx);
return 0;
}
ev_array_init(&async->watch_jobs, 1, sizeof(ev_job_t));
pthread_mutex_unlock(&async->watch_mtx);
size = ev_array_n(&temp);
for(i =0; i < size; ++i) {
ev_job_t* job = ev_array_get(&temp, i);
if(job->cb) {
job->cb(job->ptr);
}
}
ev_array_deinit(&temp);
return 0;
}
static int ev__async_stop(ev_async_t* async)
{
LOG_ENTER_FN;
ev_event_del_in(async->ev->evp, async->fd, (ev_stream_t*)async);
close(async->fd);
close(async->wfd);
return 0;
}
static int ev__async_free(ev_async_t* async)
{
LOG_ENTER_FN;
return 0;
}
int ev_async_post(ev_async_t* async, ev_job_t* op)
{
int r;
int len = 1;
char* buf = "";
ev_job_t* job;
pthread_mutex_lock(&async->watch_mtx);
job = ev_array_push(&async->watch_jobs);
if(job == NULL) {
LOG_ERROR("ev__async_post_job error\n");
pthread_mutex_unlock(&async->watch_mtx);
return -1;
}
*job = *op;
pthread_mutex_unlock(&async->watch_mtx);
do {
r = write(async->wfd, buf, len);
}while (r == -1 && errno == EINTR);
if (r == len) {
return 0;
}
if (r == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
}
exit(1);
return 0;
}
// timeout op
void ev_add_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
c->timeout = (uint32_t)time(NULL) + ev_default_config()->max_timeout;
TAILQ_INSERT_TAIL(&c->ev->streams_timeout, c, timeout_tqe);
//INSERT_TAIL(&c->timeout, &c->ev->stream_timeoutQ);
}
void ev_del_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
//REMOVE_ITEM(&c->timeout);
TAILQ_REMOVE(&c->ev->streams_timeout, c, timeout_tqe);
}
void ev_reset_timeout_watch(ev_tcp_t* c)
{
if(ev_default_config()->max_timeout <=0) {
return;
}
//REMOVE_ITEM(&c->timeout);
ev_del_timeout_watch(c);
ev_add_timeout_watch(c);
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// tcp //////////////////////////////////////////////////////////////////////
ev_tcp_t* ev_tcp_new(ev_loop_t* ev, int fd)
{
ev_tcp_t* c = NULL;
c = (ev_tcp_t*)malloc(sizeof(ev_tcp_t));
if(c == NULL) {
LOG_ERROR("client_t* c = malloc(sizeof(struct client_t)) == NULL\n");
return NULL;
}
memset(c, 0, sizeof(ev_tcp_t));
c->in_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);
c->out_buf = ev_buf_new2(ev_default_config()->buf_trunk_size);
c->type = EV_TCP;
c->fd = fd;
c->ev = ev;
c->refc = 1;
ev_loop_put_stream(ev, (ev_stream_t*)c, 1);
return c;
}
static int ev__tcp_stop(ev_tcp_t* c)
{
LOG_ENTER_FN;
ev_del_timeout_watch(c);
ev_buf_del(c->in_buf);
ev_buf_del(c->out_buf);
ev_event_del_in(c->ev->evp, c->fd, (ev_stream_t*)c);
ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c);
close(c->fd);
if(c->stop_cb) {
c->stop_cb(c);
}
return 0;
}
static int ev__tcp_free(ev_tcp_t* c)
{
LOG_ENTER_FN;
if(c->free_cb) {
c->free_cb(c);
}
return 0;
}
static int ev__tcp_write(ev_tcp_t* c)
{
if(!ev_buf_empty(c->out_buf)) {
int n = 0;
const char* data = ev_buf_lock(c->out_buf);
size_t size = ev_buf_size(c->out_buf);
ev_reset_timeout_watch(c);
if(c->write_cb) {
c->write_cb(c);
}
do {
n = send(c->fd, data, size, 0);
}while(n < 0 && errno == EINTR);
if(n > 0) {
int remain = ev_buf_seek(c->out_buf, n);
if(remain <= 0) {
if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
}
if(n < 0) {
if(errno == EAGAIN) {
return 0;
}
LOG_ERROR("send data error");
ev_stream_del((ev_stream_t*)c);
return -1;
}
} else {
if(ev_event_del_out(c->ev->evp, c->fd, (ev_stream_t*)c) < 0) {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
return 0;
}
static int ev__tcp_read(ev_tcp_t* c)
{
LOG("begin client_t read\n");
int n = 0;
int nrecv = 0;
char buf[4096] = {0};
ev_reset_timeout_watch(c);
do {
n = recv(c->fd, buf, sizeof(buf), 0);
}while(n < 0 && errno == EINTR);
if( n == 0) {
LOG_INFO("peer close\n");
ev_stream_del((ev_stream_t*)c);
return -1;
}
if(n < 0) {
if(errno == EWOULDBLOCK || errno == EAGAIN) {
return 0;
} else {
ev_stream_del((ev_stream_t*)c);
return -1;
}
}
ev_buf_append(c->in_buf, buf, n);
if(c->read_cb) {
c->read_cb(c);
}
return nrecv;
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// loop //////////////////////////////////////////////////////////////////////
static void ev__loop_pending_queue_cb(ev_loop_t* ev)
{
int size = 0;
int i;
ev_array_t temp = NULL_ARRAY;
pthread_mutex_lock(&ev->pending_mtx);
if(ev_array_n(&ev->pending_jobs) > 0 ) {
ev_array_swap(&ev->pending_jobs, &temp);
} else {
pthread_mutex_unlock(&ev->pending_mtx);
return;
}
ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t));
pthread_mutex_unlock(&ev->pending_mtx);
size = ev_array_n(&temp);
for(i =0; i < size; ++i) {
ev_job_t* job = ev_array_get(&temp, i);
job->cb(job->ptr);
}
ev_array_deinit(&temp);
}
static void ev__loop_watch_timeout(ev_loop_t* ev)
{
ev_tcp_t *item, *temp;
uint32_t now = time(NULL);
TAILQ_FOREACH_SAFE(item, &ev->streams_timeout, timeout_tqe, temp) {
if(now >= item->timeout ) {
TAILQ_REMOVE(&ev->streams_timeout, item, timeout_tqe);
LOG_INFO("%d time out", item->fd);
ev_stream_del((ev_stream_t*)item);
} else {
break;
}
}
}
static void* ev__loop_run_loop(void* ptr)
{
ev_loop_t* ev = NULL;
LOG("run thread: %02x succ", pthread_self());
ev = (ev_loop_t*)ptr;
while(!ev->exit) {
ev__loop_watch_timeout(ev);
ev__loop_pending_queue_cb(ev);
ev_timer_mgr_tick(ev->timer_mgr, ev_msec_now());
ev_event_poll(ev->evp, 100);
}
LOG("thread %02x exit\n", pthread_self());
pthread_exit(NULL);
}
static int on_loop_cb(void* data, unsigned int mask)
{
ev_stream_t* c = (ev_stream_t*)data;
if(mask & kEventError) {
ev_stream_del(c);
return -1;
}
if(mask & kEventRead) {
if(c->type == EV_TCP) {
ev_tcp_t* tcp = (ev_tcp_t*)c;
if(ev__tcp_read(tcp) < 0) {
ev_stream_del(c);
return -1;
}
} else if(c->type == EV_ASYNC) {
ev_async_t* async = (ev_async_t*)c;
if(ev__async_read(async) < 0) {
ev_stream_del(c);
return -1;
}
}
}
if(mask & kEventWrite) {
if(c->type == EV_TCP) {
ev_tcp_t* tcp = (ev_tcp_t*)c;
if(ev__tcp_write(tcp) < 0) {
ev_stream_del(c);
return -1;
}
} else if(c->type == EV_ASYNC) {
}
}
return 0;
}
ev_loop_t* ev_loop_new()
{
ev_loop_t* ev = (ev_loop_t*)malloc(sizeof(ev_loop_t));
if(ev == NULL) {
return NULL;
}
memset(ev, 0, sizeof(ev_loop_t));
//LIST_INIT(&ev->streamQ);
//LIST_INIT(&ev->stream_timeoutQ);
TAILQ_INIT(&ev->streams_list);
TAILQ_INIT(&ev->streams_timeout);
ev->evp = ev_event_new(32, on_loop_cb);
if(ev->evp == NULL) {
perror("create evbase error\n");
}
if(ev_array_init(&ev->pending_jobs, 64, sizeof(ev_job_t)) != 0) {
goto err;
}
ev->timer_mgr = ev_timer_mgr_new(256, 100);
if(ev->timer_mgr == NULL) {
goto err;
}
if(pthread_mutex_init(&ev->pending_mtx, NULL) != 0) {
goto err;
}
if(pthread_mutex_init(&ev->stream_mtx, NULL) != 0) {
goto err;
}
if(0 != pthread_create(&ev->cur_thread, NULL, ev__loop_run_loop, (void*)ev)) {
perror("pthread_create error\n");
exit(1);
}
return ev;
err:
perror("ev_loop_new error\n");
if(ev) {
if(ev->evp) {
ev_event_destroy(ev->evp);
}
free(ev);
}
return NULL;
}
void ev_loop_del(ev_loop_t* ev)
{
ev_stream_t* c, *tvar;
TAILQ_FOREACH_SAFE(c, &ev->streams_list, node, tvar) {
ev_stream_del(c);
}
ev_event_destroy(ev->evp);
pthread_mutex_destroy(&ev->pending_mtx);
pthread_mutex_destroy(&ev->stream_mtx);
free(ev);
}
void ev_loop_put_pending_queue(ev_loop_t* ev, ev_job_t* op)
{
ev_job_t* job;
pthread_mutex_lock(&ev->pending_mtx);
job = ev_array_push(&ev->pending_jobs);
if(job == NULL) {
LOG_ERROR("ev_loop_put_peeding_queue error\n");
pthread_mutex_unlock(&ev->pending_mtx);
return;
}
*job = *op;
pthread_mutex_unlock(&ev->pending_mtx);
}
void ev_loop_put_stream(ev_loop_t* ev, ev_stream_t* c, int put_tail)
{
pthread_mutex_lock(&ev->stream_mtx);
if(put_tail) {
//INSERT_TAIL(c, &ev->streamQ)
TAILQ_INSERT_TAIL(&ev->streams_list, c, node);
} else {
//INSERT_HEAD(c, &ev->streamQ)
TAILQ_INSERT_HEAD(&ev->streams_list, c, node);
}
++ev->nstreams;
pthread_mutex_unlock(&ev->stream_mtx);
}
void ev_loop_remove_stream(ev_loop_t* ev, ev_stream_t* c)
{
assert(c->ev == ev);
if(c->type == EV_TCP) {
pthread_mutex_lock(&ev->stream_mtx);
TAILQ_REMOVE(&ev->streams_list, c, node);
--ev->nstreams;
pthread_mutex_unlock(&ev->stream_mtx);
}
}
int ev_loop_accept(ev_loop_t* ev, int sfd)
{
int fd;
do {
fd = accept(sfd, NULL, NULL);
} while(fd < 0 && errno == EINTR);
if(fd < 0) {
if(errno != EAGAIN) {
LOG_ERROR("accept fd < 0 error\n");
return -1;
}
}
else {
LOG("incomming connect: %d\n", fd);
if(ev_socket_nonblocing(fd) < 0) {
LOG_ERROR("connect fd set nonblocking error\n");
close(fd);
return -1;
}
if(ev_socket_nodelay(fd) < 0) {
LOG_ERROR("connect fd set nodelay error\n");
close(fd);
return -1;
}
ev_tcp_t* c = ev_tcp_new(ev, fd);
if( c == NULL) {
LOG_ERROR("create client_t error\n");
close(fd);
return -1;
}
ev_add_timeout_watch(c);
ev_event_add_in(ev->evp, fd, (ev_stream_t*)c);
}
return 0;
}
static int ev__stream_force_del(void* ptr)
{
ev_stream_t* c = (ev_stream_t*)ptr;
assert(c->refc == 0);
if(c->type == EV_TCP) {
ev__tcp_free((ev_tcp_t*)c);
} else if(c->type == EV_ASYNC) {
ev__async_free((ev_async_t*)c);
}
ev_loop_remove_stream(c->ev, (ev_stream_t*)c);
free(c);
return 1;
}
void ev_stream_del(ev_stream_t* c)
{
if(__sync_bool_compare_and_swap(&c->stop, 0, 1) ) {
if(c->type == EV_TCP) {
ev__tcp_stop((ev_tcp_t*)c);
} else if(c->type == EV_ASYNC) {
ev__async_stop((ev_async_t*)c);
}
}
if( __sync_fetch_and_sub(&c->refc, 1) == 1) {
if(pthread_self() == c->ev->cur_thread) {
ev__stream_force_del(c);
} else {
LOG("ev_loop_put_pending_queue");
ev_job_t op = { c, ev__stream_force_del };
ev_loop_put_pending_queue(c->ev, &op);
}
}
}
ev_tcp_t* ev_tcp_connect(ev_loop_t* ev, const char* ip, uint16_t port)
{
ev_tcp_t* tcp = NULL;
int fd;
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
LOG_ERROR("create tcp socket error %d", errno);
return NULL;
}
ev_socket_nonblocing(fd);
ev_socket_reuseaddr(fd);
struct sockaddr_in sa;
struct hostent* ent = gethostbyname(ip);
if(ent == NULL) {
LOG_ERROR("gethostbyname(%s) error: %d", ip, errno);
close(fd);
return NULL;
}
bcopy((char *)ent->h_addr,
(char *)&sa.sin_addr.s_addr,
ent->h_length);
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
tcp = ev_tcp_new(ev, fd);
if(tcp == NULL) {
close(fd);
return NULL;
}
if( connect(fd, (const struct sockaddr*)&sa, sizeof(sa)) != 0 ) {
if(errno != EINPROGRESS) {
LOG_ERROR("connect (%s:%d) error: %d", ip, port, errno);
ev_stream_del((ev_stream_t*)tcp);
return NULL;
} else {
tcp->connecting = 1;
return tcp;
}
}
tcp->connected = 1;
return tcp;
}
size_t ev_tcp_write(ev_tcp_t* self, const char* data, size_t len)
{
if(self->connecting == 1) {
self->connecting = 0;
self->connected = 1;
if(self->connect_cb) {
self->connect_cb(self);
}
}
if(data && len) {
if(ev_buf_empty(self->out_buf)) {
ev_event_add_out(self->ev->evp, self->fd, (ev_stream_t*)self);
}
return ev_buf_append( self->out_buf, data, len);
}
return 0;
}
static void* work_run(void* arg)
{
ev_thread_pool_t* pool = (ev_thread_pool_t*)arg;
while(!pool->exit) {
ev_job_t* job;
pthread_mutex_lock(&pool->mtx);
while(ev_array_n(&pool->jobs) == 0) {
++pool->idle_cnt;
pthread_cond_wait(&pool->cond, &pool->mtx);
--pool->idle_cnt;
if(pool->exit) {
goto exit;
}
}
LOG("run thread pool job");
job = ev_array_pop(&pool->jobs);
pthread_mutex_unlock(&pool->mtx);
if(job->cb) {
job->cb(job->ptr);
}
}
exit:
return NULL;
}
ev_thread_pool_t* ev_thread_pool_new(int nthread)
{
int i = 0;
ev_thread_pool_t* pool = (ev_thread_pool_t*)malloc(sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread );
if(pool == NULL) {
perror("create thread pool error");
exit(0);
}
bzero(pool, sizeof(ev_thread_pool_t) + sizeof(pthread_t) * nthread);
pool->init_queue_cnt = 16;
pthread_mutex_init(&pool->mtx, NULL);
pthread_cond_init(&pool->cond, NULL);
ev_array_init(&pool->jobs, pool->init_queue_cnt, sizeof(ev_job_t));
pool->thread_cnt = nthread;
for(i = 0; i < nthread; ++i) {
pthread_create(&pool->thread[i], NULL, work_run, (void*)pool);
}
return pool;
}
void ev_thread_pool_post(ev_thread_pool_t* self, ev_job_t* op)
{
ev_job_t* job;
LOG("enter ev_thread_pool_post");
pthread_mutex_lock(&self->mtx);
job = ev_array_push(&self->jobs);
*job = *op;
if(self->idle_cnt > 0) {
LOG("enter pthread_cond_signal");
pthread_cond_signal(&self->cond);
}
pthread_mutex_unlock(&self->mtx);
}
ev_thread_pool_t* ev_default_thread_pool()
{
static ev_thread_pool_t* pool = NULL;
if(pool == NULL) {
pool = ev_thread_pool_new(ev_default_config()->default_npool);
}
return pool;
}
void ev_thread_pool_destroy(ev_thread_pool_t* pool)
{
int i =0;
pool->exit = 1;
pthread_cond_broadcast(&pool->cond);
for(; i < pool->thread_cnt; ++i) {
pthread_join(pool->thread[i], NULL);
}
pthread_mutex_destroy(&pool->mtx);
pthread_cond_destroy(&pool->cond);
free(pool);
}
ev_event_t* ev_event_new(int nevensts, event_cb_pt cb)
{
int evfd;
ev_event_t* ev = NULL;
#ifdef USE_KQUEUE
evfd = kqueue();
#else
evfd = epoll_create(1024);
#endif
if(evfd < 0) {
perror("create evfd error\n");
exit(0);
}
ev = (ev_event_t*)malloc(sizeof(ev_event_t));
if(ev == NULL) {
close(evfd);
return NULL;
}
bzero(ev, sizeof(ev_event_t));
#ifdef USE_KQUEUE
ev->events = (struct kevent*)malloc(sizeof(struct kevent) * nevensts);
#else
ev->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * nevensts);
#endif
if(ev->events == NULL) {
close(evfd);
free(ev);
return NULL;
}
ev->evfd = evfd;
ev->nevents = nevensts;
ev->ev_cb = cb;
return ev;
}
void ev_event_destroy(ev_event_t* self) {
if(self->events) {
free(self->events);
}
close(self->evfd);
free(self);
}
static int ev__event_op(ev_event_t* self, int fd, int ev, int op, void* data)
{
#ifdef USE_KQUEUE
struct kevent ke;
int kop = (op == kEventOpAdd) ? EV_ADD | EV_CLEAR : EV_DELETE;
if(ev & kEventRead) {
EV_SET(&ke, fd, EVFILT_READ, kop, 0, 0, data);
if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
return -1;
}
}
if(ev & kEventWrite) {
EV_SET(&ke, fd, EVFILT_WRITE, kop, 0, 0, data);
if (kevent(self->evfd, &ke, 1, NULL, 0, NULL) == -1) {
return -1;
}
}
#else
struct epoll_event evp;
int kop = (op == kEventOpAdd) ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
bzero(&evp, sizeof(evp));
evp.data.fd = fd;
evp.data.ptr = data;
if(ev & kEventRead) {
evp.events = EPOLLIN;
if( epoll_ctl(self->evfd, kop, fd, &evp) < 0) {
if(errno == EEXIST) {
if( epoll_ctl(self->evfd, EPOLL_CTL_MOD, fd, &evp) < 0) {
return -1;
}
} else {
return -1;
}
}
}
if(ev & kEventWrite) {
evp.events = EPOLLOUT;
if( epoll_ctl(self->evfd, kop, fd, &evp) < 0) {
if(errno == EEXIST) {
if( epoll_ctl(self->evfd, EPOLL_CTL_MOD, fd, &evp) < 0) {
return -1;
}
} else {
return -1;
}
}
}
#endif
return 0;
}
int ev_event_add_in(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->recv_active) {
return 0;
}
if( ev__event_op(self, fd, kEventRead, kEventOpAdd, c) < 0 ) {
LOG_ERROR("ev_event_add_in error");
return -1;
} else {
if(c) {
c->recv_active = 1;
}
}
return 0;
}
int ev_event_del_in(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->recv_active == 0) {
return 0;
}
if(ev__event_op(self, fd, kEventRead, kEventOpDel, NULL) < 0) {
LOG_ERROR("ev_event_del_in error");
return -1;
} else {
if(c) {
c->recv_active = 0;
}
}
return 0;
}
int ev_event_add_out(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active) {
return 0;
}
if( ev__event_op(self, fd, kEventWrite, kEventOpAdd, c) < 0) {
LOG_ERROR("ev_event_add_out error");
} else {
if(c) {
c->send_active = 1;
}
}
return 0;
}
int ev_event_del_out(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active == 0) {
return 0;
}
if(ev__event_op(self, fd, kEventWrite, kEventOpDel, NULL) < 0) {
LOG_ERROR("ev_event_del_out error");
} else {
if(c) {
c->send_active = 0;
}
}
return 0;
}
int ev_event_add_all(ev_event_t* self, int fd, ev_stream_t* c)
{
if(c && c->send_active == 1 && c->recv_active == 1) {
return 0;
}
if(c && c->send_active == 0 && c->recv_active == 0) {
if(ev__event_op(self, fd, kEventAll, kEventOpAdd, c) < 0) {
LOG_ERROR("ev_event_add_all error");
return -1;
} else if(c){
c->send_active = 1;
c->recv_active = 1;
return 0;
}
} else if(c && c->send_active == 0) {
return ev_event_add_out(self, fd, c);
} else if(c){
return ev_event_add_in(self, fd, c);
}
return -1;
}
int ev_event_resize(ev_event_t* self, int n)
{
if(self->nevents < n) {
#ifdef USE_KQUEUE
self->events = (struct kevent*)realloc(self->events, sizeof(struct kevent) * n);
#else
self->events = (struct epoll_event*)realloc(self->events, sizeof(struct epoll_event) * n);
#endif
}
return 0;
}
int ev_event_poll(ev_event_t* self, int timeout)
{
int n = 0;
int i = 0;
#ifdef USE_KQUEUE
do {
if(timeout < 0) {
n = kevent(self->evfd, NULL, 0, self->events, self->nevents, NULL);
} else {
struct timespec time = { timeout / 1000L, (timeout % 1000L) * 1000000L };
n = kevent(self->evfd, NULL, 0, self->events, self->nevents, &time);
}
}while(n < 0 && errno == EINTR);
if(n > 0) {
for(i = 0; i < n; ++i) {
int mask = 0;
struct kevent* e = self->events + i;
if(e->flags & EV_ERROR) {
if (e->data == EBADF || e->data == EINVAL ||
e->data == ENOENT || e->data == EINTR) {
continue;
}
mask |= kEventError;
}
if(e->filter & EVFILT_READ) mask |= kEventRead;
if(e->filter & EVFILT_WRITE) mask |= kEventWrite;
if(self->ev_cb && mask != 0) {
self->ev_cb(e->udata, mask);
}
}
if(n == self->nevents) {
ev_event_resize(self, self->nevents * 2);
}
return n;
}
if(n == 0) {
if(timeout == -1) {
LOG_ERROR("ev_event_poll kevent error");
return -1;
}
return 0;
}
return -1;
#else
do {
n = epoll_wait(self->evfd, self->events, self->nevents, timeout);
}while(n < 0 && errno == EINTR);
if( n > 0) {
for(i = 0; i < n; ++i) {
int mask = 0;
struct epoll_event* e = self->events + i;
if(e->events & EPOLLERR) mask |= kEventError;
if(e->events & EPOLLIN|EPOLLHUP) mask |= kEventRead;
if(e->events & EPOLLOUT) mask |= kEventWrite;
if(self->ev_cb && mask != 0) {
self->ev_cb(e->data.ptr, mask);
}
}
if(n == self->nevents) {
ev_event_resize(self, self->nevents * 2);
}
return n;
}
if(n == 0) {
if(timeout == -1) {
LOG_ERROR("ev_event_poll epoll wait error");
return -1;
}
return 0;
}
return -1;
#endif
}
//////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////// config //////////////////////////////////////////////////////////////////////
static inline const char* ev_config_skip_start_space(const char* line) {
const char* pl = line;
if(pl == NULL) {
return NULL;
}
while (isspace(*pl)) pl++;
return pl;
}
static inline const char* ev_config_trim_end_space(const char* line) {
const char* pl = line;
if(pl == NULL) {
return NULL;
}
pl = pl + strlen(line) - 1;
while (isspace(*pl)) pl--;
return pl;
}
static inline const char* ev_config_skip_comment_empty(const char* line)
{
const char* pl = ev_config_skip_start_space(line);
if(pl && *pl == '\0') {
return NULL;
}
if(pl && *pl == '#') {
return NULL;
}
return pl;
}
static inline int ev_config_read_int32(const char* line, const char* key, int32_t* value)
{
const char* pl = ev_config_skip_start_space(line);
if(strncasecmp(pl, key, strlen(key)) == 0) {
pl += strlen(key);
pl = ev_config_skip_start_space(pl);
if(*pl++ != '=') {
return 0;
}
*value = atoi(pl);
return 0;
}
return -1;
}
static inline int ev_config_read_string(const char* line, const char* key, char* value, int len)
{
const char* pl = ev_config_skip_start_space(line);
if(strncasecmp(pl, key, strlen(key)) == 0) {
pl += strlen(key);
pl = ev_config_skip_start_space(pl);
if(*pl++ != '=') {
return 0;
}
const char* end = ev_config_trim_end_space(line);
if(len < end - pl) {
perror("read_string buf size too small");
exit(0);
}
strncpy(value, pl, end-pl);
return 0;
}
return -1;
}
static void ev_config_print(const char* var, ev_config_t* cfg)
{
LOG_SAFE("%s set:\n"
"====================================\n"
"daemon = %d\n"
"logfile = %s\n"
"pidfile = %s\n"
"loglevel = %d\n"
"max_nconnect = %d\n"
"max_thread = %d\n"
"npool = %d\n"
"buf trunk size = %d\n"
"max timeout = %d\n"
"server port = %d\n"
"====================================",
var,
cfg->daemon,
cfg->logfile,
cfg->pidfile,
cfg->log_level,
cfg->max_nconnect,
cfg->max_nthread,
cfg->default_npool,
cfg->buf_trunk_size,
cfg->max_timeout,
cfg->port
);
}
static int ev_config_set_default(ev_config_t* cfg)
{
char cwd[256] = {0};
bzero(cfg, sizeof(ev_config_t));
cfg->daemon = 0;
getcwd(cwd, sizeof(cwd));
strcat(cfg->logfile, cwd);
strcat(cfg->logfile, "/a.log");
cfg->log_level = EV_LOG_DEBUG;
cfg->max_nconnect = 1024;
cfg->max_nthread = 2;
cfg->max_nwork_thread = 1;
cfg->default_npool = 3;
cfg->port = 7070;
cfg->buf_trunk_size = 4096;
strcat(cfg->pidfile, cwd);
strcat(cfg->pidfile, "/a.pid");
ev_config_print("default", cfg);
return 0;
}
ev_config_t* ev_default_config()
{
static ev_config_t* config = NULL;
if(config == NULL) {
config = malloc(sizeof(*config));
if(config == NULL) {
LOG_SAFE("malloc config error");
}
ev_config_set_default(config);
}
return config;
}
static int ev_config_load(const char* path)
{
FILE* f = fopen(path, "r");
char line[256] = {0};
if(f == NULL) {
LOG_SAFE("load %s config error", path);
return -1;
}
ev_config_t* config = ev_default_config();
do {
bzero(line, sizeof(line));
if(!fgets(line, sizeof(line), f)) {
break;
}
const char* pl = ev_config_skip_comment_empty(line);
if(!pl) {
continue;
}
if(ev_config_read_int32(pl, MAX_THREAD, &config->max_nthread) == 0) {
if(config->max_nthread < 1) {
//
}
continue;
}
if(ev_config_read_int32(pl, SERV_PORT, (int*)&config->port) == 0) {
continue;
}
if(ev_config_read_int32(pl, MAX_TIMEOUT, &config->max_timeout) == 0) {
continue;
}
if(ev_config_read_int32(pl, MAX_CONNECTS, &config->max_nconnect) == 0) {
continue;
}
if(ev_config_read_int32(pl, DAEMON, &config->daemon) == 0) {
continue;
}
if(ev_config_read_string(pl, LOG_FILE_PATH, config->logfile, sizeof(config->logfile)) == 0) {
continue;
}
if(ev_config_read_string(pl, PID_FILE_PATH, config->pidfile, sizeof(config->pidfile)) == 0) {
continue;
}
if(ev_config_read_string(pl, DEFAULT_POOL_SIZE, config->pidfile, sizeof(config->pidfile)) == 0) {
continue;
}
LOG_SAFE("unknow config %s", line);
}while(1);
fclose(f);
ev_config_print("custom", config);
return 0;
}
static int on_new_connect(void* data, unsigned int mask)
{
static int curloop = -1;
if(mask & kEventRead) {
ev_server_t* server = (ev_server_t*)data;
ev_loop_accept(server->evloop[++curloop%ev_default_config()->max_nthread], server->sfd);
}
return 0;
}
static void ev_sigint_handler(int signum)
{
LOG_ENTER_FN;
exit_main_loop = 1;
}
static void ev_init_signal()
{
struct sigaction saint;
signal(SIGPIPE, SIG_IGN);
sigemptyset(&saint.sa_mask);
saint.sa_flags = 0;
saint.sa_handler = ev_sigint_handler;
sigaction(SIGQUIT, &saint, NULL);
sigaction(SIGTERM, &saint, NULL);
sigaction(SIGINT, &saint, NULL);
sigaction(SIGUSR1, &saint, NULL);
}
int ev_server_run(const char* config_filepath, pre_server_run_pt pre_run)
{
int i;
if(config_filepath != NULL) {
if(0 != ev_config_load(config_filepath)) {
perror("load config file error");
return -1;
}
}
if(ev_default_config()->daemon == 1) {
ev_daemon();
}
ev_init_signal();
ev_config_t* config = ev_default_config();
ev_server_t* server = malloc(sizeof(ev_server_t));
if(server == NULL) {
perror("malloc serer error");
exit(0);
}
bzero((void*)server, sizeof(ev_server_t));
server->evloop = (ev_loop_t**)malloc(config->max_nthread * sizeof(ev_loop_t*));
if(server->evloop == NULL) {
perror("malloc ev_loop_t error");
exit(0);
}
for(i =0; i < config->max_nthread; ++i) {
server->evloop[i] = ev_loop_new();
if(server->evloop[i] == NULL) {
perror("gev[i] = evloop_t_new() error");
exit(1);
}
}
ev_default_thread_pool();
server->sfd = ev_socket_listen(NULL, config->port);
if(server->sfd == -1) {
perror("create server sfd error");
exit(0);
}
server->evbase = ev_event_new(1, on_new_connect);
ev__event_op(server->evbase, server->sfd, kEventRead, kEventOpAdd, (void*)server);
if(pre_run) {
pre_run(server);
}
while(!exit_main_loop) {
ev_event_poll(server->evbase, 200);
}
for(i =0; i < config->max_nthread; ++i) {
server->evloop[i]->exit = 1;
pthread_join(server->evloop[i]->cur_thread, NULL);
ev_loop_del(server->evloop[i]);
}
close(server->sfd);
free(server);
return 0;
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )