Слияние кода завершено, страница обновится автоматически
//
// Created by Administrator on 2015/5/24.
//
#include <chrono>
#include "es.h"
#include "es_block.h"
#include "es_util.h"
#define USE_IO_EVENT_TYPE ""
#undef HAVE_POLL
#if defined(HAVE_EPOLL)
#include "es_epoll.cpp"
#undef USE_IO_EVENT_TYPE
#define USE_IO_EVENT_TYPE "epoll"
#elif defined(HAVE_POLL)
#include "es_poll.cpp"
#undef USE_IO_EVENT_TYPE
#define USE_IO_EVENT_TYPE "poll"
#elif defined(HAVE_SELECT)
#include "es_select.cpp"
#undef USE_IO_EVENT_TYPE
#define USE_IO_EVENT_TYPE "select"
#endif
#ifdef MULTI_THREAD
#include "es_thread_pool.h"
static esvr::ThreadPool &get_worker_pool() {
static esvr::ThreadPool pool(esvr::get_cpu_core_count() - 1);
return pool;
}
#endif
namespace esvr {
std::string get_event_str(ES_EVENT event) {
static std::map<ES_EVENT, std::string> event_2_str = {
{ES_DEFAULT, "deft"}, {ES_READ, "read"}, {ES_WRITE, "writ"},
{ES_CONNECTED, "conn"}, {ES_CLOSED, "clos"}, {ES_MODIFIED, "modi"},
{ES_IDLE, "idle"}};
return event_2_str[event];
}
LockFreeQueue<socket_t> *IOManager::get_to_close_queue() {
static LockFreeQueue<socket_t> *queue = new LockFreeQueue<socket_t>;
return queue;
}
void IOManager::before_start() {
LOG_INFO("use io type:%s", USE_IO_EVENT_TYPE);
}
void IOManager::before_stop() {}
void IOManager::after_stop() {}
void IOManager::force_close(socket_t fd) { get_to_close_queue()->push(fd); }
void IOManager::before_loop_once() {}
void IOManager::after_loop_once() {}
void IOManager::on_idle() {
static time_t time_counter = 0;
time_t now = time(NULL);
IdleEvent event;
EventBus::get_instance().fire_event(&event);
if (now % 10 == 0 && time_counter != now) {
time_counter = now;
log_debug("%s-idle active client size[%d]", m_io_type.c_str(), m_active_clients.size());
}
}
int IOManager::send(socket_t cli_fd, const char *buffer, const size_t size) {
int ret = es_send(cli_fd, buffer, size);
WriteEvent event{cli_fd, buffer, size};
EventBus::get_instance().fire_event(&event);
return ret;
}
void IOManager::stop() {
before_stop();
m_start = false;
#ifdef MULTI_THREAD
get_worker_pool().destroy();
#endif
after_stop();
}
void IOManager::readable_buffer(socket_t fd, char *&buffer,
size_t &readable_len) {
buffer = NULL;
readable_len = 0;
auto iterator = m_active_clients.find(fd);
if (iterator != m_active_clients.end()) {
auto readable_buffer = iterator->second->readable_buffer();
if (readable_buffer != NULL) {
buffer = readable_buffer->readable_buffer();
readable_len = readable_buffer->readable_size();
}
}
}
void IOManager::increase_readable(socket_t fd, size_t len) {
auto iterator = m_active_clients.find(fd);
if (iterator != m_active_clients.end()) {
auto readable_buffer = iterator->second->writable_bufer();
if (readable_buffer != NULL) {
readable_buffer->increase_readable(len);
}
}
}
void IOManager::writable_buffer(socket_t fd, char *&buffer,
size_t &writable_len) {
buffer = NULL;
writable_len = 0;
auto iterator = m_active_clients.find(fd);
if (iterator != m_active_clients.end()) {
auto writable_buffer = iterator->second->writable_bufer();
if (writable_buffer != NULL) {
buffer = writable_buffer->writable_bufer();
writable_len = writable_buffer->writable_size();
}
}
}
void IOManager::decrease_readable(socket_t fd, size_t len) {
auto iterator = m_active_clients.find(fd);
if (iterator != m_active_clients.end()) {
auto writable_buffer = iterator->second->readable_buffer();
if (writable_buffer != NULL) {
writable_buffer->decrease_readable(len);
}
}
}
void IOManager::new_fd(socket_t fd) { m_active_clients[fd] = new BlockList; }
void IOManager::delete_fd(socket_t fd) {
auto iterator = m_active_clients.find(fd);
if (iterator != m_active_clients.end()) {
log_info("delete block list for fd=%d", fd);
delete iterator->second;
m_active_clients.erase(fd);
}
}
template <typename T> void EventBus::fire_event(const T *event) {
if (event->type() != ES_IDLE) {
log_debug("fire_event:%s", event->type_str().c_str());
}
#ifdef MULTI_THREAD
ThreadPool &pool = get_worker_pool();
pool.enqueue(
[](const T *ent) { EventBus::get_instance().dispatch_event(ent, true); },
new T(*event));
#else
dispatch_event(event);
#endif
}
template <typename T> void EventBus::dispatch_event(const T *event, bool free) {
auto listeners = m_listeners.find(T::type());
if (listeners != m_listeners.end()) {
for (auto &it : listeners->second) {
(static_cast<EventListener<T> *>(it))->on_event(event);
}
}
if (free)
delete event;
}
}
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )