I have some new information regarding possible issue in event_active(). I checked that data corruption is not in question because the function always uses TryEnterCriticalSection/EnterCriticalSection
However, when I implemented an alternative to event_active(), the application has been running for more than 10 hours without any abnormalities. With event_active() the same implementation was never running more than 1 hours without entering an infinite loop in event_base_dispatch().
struct ThreadPool::impl
{
struct ExclusiveData {
struct CompareTasks {
bool operator()(
const shared_ptr<BackgroundTask> &left,
const shared_ptr<BackgroundTask> &right)
{
return left->DueTime > right->DueTime;
}
};
union {
struct sockaddr saddr;
struct sockaddr_storage storage;
} u;
int saddr_len;
SOCKET fd = INVALID_SOCKET;
priority_queue<
shared_ptr<BackgroundTask>,
deque<shared_ptr<BackgroundTask>>,
CompareTasks
> ScheduledTasks;
list<shared_ptr<BackgroundTask>> DueTasks;
list<shared_ptr<BackgroundTask>> CompletedTasks;
bool MustExit = false;
void SendByte()
{
if(fd == INVALID_SOCKET) {
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd == INVALID_SOCKET) {
Scope;
Error << "socket cannot be created";
return;
}
if(connect(fd, &u.saddr, saddr_len) != 0) {
Scope;
Error << "connection failed " << WSAGetLastError();
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}
char byte = 1;
if(send(fd, &byte, 1, 0) == SOCKET_ERROR) {
Scope;
Error << "send failed";
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}
~ExclusiveData()
{
if(fd != INVALID_SOCKET) closesocket(fd);
}
};
list<pthread_t> Threads;
SharedDataContainer<ExclusiveData> Data;
map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
};
class ThreadPoolIPC : GenericContext {
ThreadPoolIPC(GenericServer *s, std::string &p) : GenericContext(s, p) {}
public:
static GenericContext *Create(GenericServer *server, std::string &peer)
{
return new ThreadPoolIPC(server, peer);
}
static void Reader(struct bufferevent *bev, void *user_data)
{
char buf[256];
auto input = bufferevent_get_input(bev);
while(evbuffer_remove(input, buf, sizeof(buf)) > 0) {}
auto ctx = static_cast<ThreadPoolIPC *>(user_data);
ctx->service->pool->EventProc();
}
DataCallback GetReader() const override
{
return Reader;
}
};
void *ThreadPool::ThreadProc(void *user_data)
{
TimedScope(0);
auto ptr = static_cast<impl *>(user_data);
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = "">
if(task) {
Trace << "Waking up main I/O thread";
data->CompletedTasks.push_back(move(task));
data->SendByte();
task.reset();
}
if(data->MustExit) {
Info << "Exiting the thread";
break;
}
bool due_time_set = false;
MillisecondsClock due_time;
MillisecondsClock now = MillisecondsClock::Now();
while(!data->ScheduledTasks.empty()) {
auto &next = data->ScheduledTasks.top();
if(next->DueTime > now) {
due_time = next->DueTime;
due_time_set = true;
break;
}
data->DueTasks.push_back(next);
data->ScheduledTasks.pop();
}
if(!data->DueTasks.empty()) {
task = move(data->DueTasks.front());
data->DueTasks.pop_front();
} else if(due_time_set) {
long delay = due_time - now;
Trace << "Entering idle state for " << delay << " milliseconds";
data.WaitForSignal(&due_time.ts);
} else {
Trace << "Entering idle state indefinitely";
data.WaitForSignal();
}
}
if(!task) continue;
Trace << "Running task " << task->Name;
task->BackgroundWorker();
Trace << "Task " << task->Name << " has been completed";
}
return 0;
}
void ThreadPool::EventProc()
{
Scope;
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = "">
if(task) {
if(task->RestartDelay == 1) {
Trace << "Restarting task " << task->Name << " immediately";
data->DueTasks.push_back(move(task));
data.SendSignal();
} else if(task->RestartDelay != 0) {
Trace << "Rescheduling task " << task->Name << " for "
<< task->RestartDelay << " milliseconds from now";
task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
data->ScheduledTasks.push(move(task));
data.SendSignal();
} else if(task->ConcurrencyForbidden) {
auto &tasks = ptr->TasksByType[task->Type];
tasks.pop_front();
if(!tasks.empty()) {
Trace << task->Type << " left so far " << tasks.size();
data->DueTasks.push_back(move(tasks.front()));
data.SendSignal();
}
}
task.reset();
}
if(data->CompletedTasks.empty()) break;
task = move(data->CompletedTasks.front());
data->CompletedTasks.pop_front();
}
Trace << "Running completion routine for task " << task->Name;
task->OnTaskComplete(service);
}
}
Do you want me to try to implement a compact code (without dependency on the C++ interface) able to reproduce the problem with event_active() ?