类似于kylin的ExecMan, ExecutionQueue提供了异步串行执行的功能。ExecutionQueue的相关技术最早使用在RPC中实现多线程向同一个fd写数据. 在r31345之后加入到bthread。 ExecutionQueue 提供了如下基本功能:
和ExecMan的主要区别:
在多核并发编程领域, Message passing作为一种解决竞争的手段得到了比较广泛的应用,它按照业务依赖的资源将逻辑拆分成若干个独立actor,每个actor负责对应资源的维护工作,当一个流程需要修改某个资源的时候, 就转化为一个消息发送给对应actor,这个actor(通常在另外的上下文中)根据命令内容对这个资源进行相应的修改,之后可以选择唤醒调用者(同步)或者提交到下一个actor(异步)的方式进行后续处理。
ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比较使用mutex, 使用ExecutionQueue有着如下几个优点:
但是缺点也同样明显:
不考虑性能和复杂度,理论上任何系统都可以只使用mutex或者ExecutionQueue来消除竞争. 但是复杂系统的设计上,建议根据不同的场景灵活决定如何使用这两个工具:
总之,多线程编程没有万能的模型,需要根据具体的场景,结合丰富的profliling工具,最终在复杂度和性能之间找到合适的平衡。
特别指出一点,Linux中mutex无竞争的lock/unlock只有需要几条原子指令,在绝大多数场景下的开销都可以忽略不计.
// Iterate over the given tasks
//
// Example:
//
// #include <bthread/execution_queue.h>
//
// int demo_execute(void* meta, TaskIterator<T>& iter) {
// if (iter.is_stopped()) {
// // destroy meta and related resources
// return 0;
// }
// for (; iter; ++iter) {
// // do_something(meta, *iter)
// // or do_something(meta, iter->a_member_of_T)
// }
// return 0;
// }
template <typename T>
class TaskIterator;
// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// default options.
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
const ExecutionQueueOptions* options,
int (*execute)(void* meta, TaskIterator<T>& iter),
void* meta);
创建的返回值是一个64位的id, 相当于ExecutionQueue实例的一个弱引用, 可以wait-free的在O(1)时间内定位一个ExecutionQueue, 你可以到处拷贝这个id, 甚至可以放在RPC中,作为远端资源的定位工具。 你必须保证meta的生命周期,在对应的ExecutionQueue真正停止前不会释放.
// Stop the ExecutionQueue.
// After this function is called:
// - All the following calls to execution_queue_execute would fail immediately.
// - The executor will call |execute| with TaskIterator::is_queue_stopped() being
// true exactly once when all the pending tasks have been executed, and after
// this point it's ok to release the resource referenced by |meta|.
// Returns 0 on success, errno othrwise
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);
// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
stop和join都可以多次调用, 都会又合理的行为。stop可以随时调用而不用当心线程安全性问题。
和fd的close类似,如果stop不被调用, 相应的资源会永久泄露。
安全释放meta的时机: 可以在execute函数中收到iter.is_queue_stopped()==true的任务的时候释放,也可以等到join返回之后释放. 注意不要double-free
struct TaskOptions {
TaskOptions();
TaskOptions(bool high_priority, bool in_place_if_possible);
// Executor would execute high-priority tasks in the FIFO order but before
// all pending normal-priority tasks.
// NOTE: We don't guarantee any kind of real-time as there might be tasks still
// in process which are uninterruptible.
//
// Default: false
bool high_priority;
// If |in_place_if_possible| is true, execution_queue_execute would call
// execute immediately instead of starting a bthread if possible
//
// Note: Running callbacks in place might cause the dead lock issue, you
// should be very careful turning this flag on.
//
// Default: false
bool in_place_if_possible;
};
const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(/*high_priority=*/ true, /*in_place_if_possible=*/ false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(/*high_priority=*/ false, /*in_place_if_possible=*/ true);
// Thread-safe and Wait-free.
// Execute a task with defaut TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task);
// Thread-safe and Wait-free.
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
// If |handle| is not NULL, we will assign it with the hanlder of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle);
high_priority的task之间的执行顺序也会严格按照提交顺序, 这点和ExecMan不同, ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined. 但是这也意味着你没有办法将任何任务插队到一个high priority的任务之前执行.
开启inplace_if_possible, 在无竞争的场景中可以省去一次线程调度和cache同步的开销. 但是可能会造成死锁或者递归层数过多(比如不停的ping-pong)的问题,开启前请确定你的代码中不存在这些问题。
/// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task was executed or h is an invalid handle
// 0: Success
// 1: The task is executing
int execution_queue_cancel(const TaskHandle& h);
返回非0仅仅意味着ExecutionQueue已经将对应的task递给过execute, 真实的逻辑中可能将这个task缓存在另外的容器中,所以这并不意味着逻辑上的task已经结束,你需要在自己的业务上保证这一点.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )