auto data = "">
if(task) {
if(task->RestartDelay == 1) {
Trace << "Restarting task " << task->Name << " immediately";
data->DueTasks.push_back(move(task));
data.SendSignal();
} else if(task->RestartDelay != 0) {
Trace << "Rescheduling task " << task->Name << " for "
<< task->RestartDelay << " milliseconds from now";
task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
data->ScheduledTasks.push(move(task));
data.SendSignal();
} else if(task->ConcurrencyForbidden) {
auto &tasks = ptr->TasksByType[task->Type];
tasks.pop_front();
if(!tasks.empty()) {
data->DueTasks.push_back(move(tasks.front()));
data.SendSignal();
}
}
task.reset();
}
if(data->CompletedTasks.empty()) break;
task = move(data->CompletedTasks.front());
data->CompletedTasks.pop_front();
}
Trace << "Running completion routine for task " << task->Name;
task->OnTaskComplete(pool->service);
}
}
void ThreadPool::RegisterTask(std::shared_ptr<BackgroundTask> task)
{
auto &type = typeid(*task);
task->Type = type.hash_code();
auto data = "">
if(task->ConcurrencyForbidden) {
// Append the new task to the list of the tasks having the same dynamic
// type.
auto &tasks = ptr->TasksByType[task->Type];
if(!tasks.empty()) {
// If there are other tasks of the same type, we simply append the
// new task to the list. The new task will be started as soon as the
// OnTaskComplete() member function of the last task having the same
// type returns.
tasks.push_back(move(task));
return;
}
// Otherwise the list will only have a single item containing a null
// reference which will be removed by ThreadPool::EventProc as soon as
// the OnTaskComplete() member function of the new task returns. It is
// our responsibility to start the new task immediately.
tasks.emplace_back();
}
data->DueTasks.push_back(move(task));
data.SendSignal();
}
Internal data structure accessed with mutual exclusion:
struct ThreadPool::impl
{
struct ExclusiveData {
struct CompareTasks {
bool operator()(
const shared_ptr<BackgroundTask> &left,
const shared_ptr<BackgroundTask> &right)
{
return left->DueTime > right->DueTime;
}
};
priority_queue<
shared_ptr<BackgroundTask>,
deque<shared_ptr<BackgroundTask>>,
CompareTasks
> ScheduledTasks;
list<shared_ptr<BackgroundTask>> DueTasks;
list<shared_ptr<BackgroundTask>> CompletedTasks;
struct event *Event = 0;
bool MustExit = false;
};
list<pthread_t> Threads;
SharedDataContainer<ExclusiveData> Data;
map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
};
I have performed different tests and came to the conclusion that the faulty behavior occurs only when real network I/O is used quite intensively. For example, the following simple test does not reproduce the infinite loop in event_base_dispatch() no matter for how long the test is running.
struct TestTask : BackgroundTask {
int number;
TestTask()
{
static int seq = 0;
number = ++seq;
}
void BackgroundWorker() override
{
Scope;
for(int i = 0; i < 1000; i++) {
Trace << "blah " << number;
}
}
void OnTaskComplete(GenericService *service) override
{
RestartDelay = 1;
}
};
void TestThreadPool(GenericService *service)
{
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
}
Best regards,
Sten Kultakangas