[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] Re: infinite loop caused by incorrect behaviour of win32_dispatch() after certain amount of calls to event_active()



Hello

I have some new information regarding possible issue in event_active(). I checked that data corruption is not in question because the function always uses TryEnterCriticalSection/EnterCriticalSection
However, when I implemented an alternative to event_active(), the application has been running for more than 10 hours without any abnormalities. With event_active() the same implementation was never running more than 1 hours without entering an infinite loop in event_base_dispatch().

Below is the code snippets related to the change

struct ThreadPool::impl
{
struct ExclusiveData {
struct CompareTasks {
bool operator()(
const shared_ptr<BackgroundTask> &left,
const shared_ptr<BackgroundTask> &right)
{
return left->DueTime > right->DueTime;
}
};

union {
struct sockaddr saddr;
struct sockaddr_storage storage;
} u;
int saddr_len;
SOCKET fd = INVALID_SOCKET;

priority_queue<
shared_ptr<BackgroundTask>,
deque<shared_ptr<BackgroundTask>>,
CompareTasks
> ScheduledTasks;

list<shared_ptr<BackgroundTask>> DueTasks;
list<shared_ptr<BackgroundTask>> CompletedTasks;
bool MustExit = false;

void SendByte()
{
if(fd == INVALID_SOCKET) {
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd == INVALID_SOCKET) {
Scope;
Error << "socket cannot be created";
return;
}
if(connect(fd, &u.saddr, saddr_len) != 0) {
Scope;
Error << "connection failed " << WSAGetLastError();
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}

char byte = 1;
if(send(fd, &byte, 1, 0) == SOCKET_ERROR) {
Scope;
Error << "send failed";
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}

~ExclusiveData()
{
if(fd != INVALID_SOCKET) closesocket(fd);
}
};

list<pthread_t> Threads;
SharedDataContainer<ExclusiveData> Data;
map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
};



class ThreadPoolIPC : GenericContext {
ThreadPoolIPC(GenericServer *s, std::string &p) : GenericContext(s, p) {}

public:
static GenericContext *Create(GenericServer *server, std::string &peer)
{
return new ThreadPoolIPC(server, peer);
}

static void Reader(struct bufferevent *bev, void *user_data)
{
char buf[256];
auto input = bufferevent_get_input(bev);
while(evbuffer_remove(input, buf, sizeof(buf)) > 0) {}

auto ctx = static_cast<ThreadPoolIPC *>(user_data);
ctx->service->pool->EventProc();
}

DataCallback GetReader() const override
{
return Reader;
}
};


void *ThreadPool::ThreadProc(void *user_data)
{
TimedScope(0);
auto ptr = static_cast<impl *>(user_data);
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = "">
if(task) {
Trace << "Waking up main I/O thread";
data->CompletedTasks.push_back(move(task));
data->SendByte();
task.reset();
}

if(data->MustExit) {
Info << "Exiting the thread";
break;
}

bool due_time_set = false;
MillisecondsClock due_time;
MillisecondsClock now = MillisecondsClock::Now();

while(!data->ScheduledTasks.empty()) {
auto &next = data->ScheduledTasks.top();
if(next->DueTime > now) {
due_time = next->DueTime;
due_time_set = true;
break;
}
data->DueTasks.push_back(next);
data->ScheduledTasks.pop();
}

if(!data->DueTasks.empty()) {
task = move(data->DueTasks.front());
data->DueTasks.pop_front();

} else if(due_time_set) {
long delay = due_time - now;
Trace << "Entering idle state for " << delay << " milliseconds";
data.WaitForSignal(&due_time.ts);

} else {
Trace << "Entering idle state indefinitely";
data.WaitForSignal();
}
}
if(!task) continue;

Trace << "Running task " << task->Name;
task->BackgroundWorker();
Trace << "Task " << task->Name << " has been completed";
}
return 0;
}


void ThreadPool::EventProc()
{
Scope;
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = "">
if(task) {
if(task->RestartDelay == 1) {
Trace << "Restarting task " << task->Name << " immediately";
data->DueTasks.push_back(move(task));
data.SendSignal();

} else if(task->RestartDelay != 0) {
Trace << "Rescheduling task " << task->Name << " for "
<< task->RestartDelay << " milliseconds from now";

task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
data->ScheduledTasks.push(move(task));
data.SendSignal();

} else if(task->ConcurrencyForbidden) {
auto &tasks = ptr->TasksByType[task->Type];
tasks.pop_front();
if(!tasks.empty()) {
Trace << task->Type << " left so far " << tasks.size();
data->DueTasks.push_back(move(tasks.front()));
data.SendSignal();
}
}
task.reset();
}

if(data->CompletedTasks.empty()) break;
task = move(data->CompletedTasks.front());
data->CompletedTasks.pop_front();
}

Trace << "Running completion routine for task " << task->Name;
task->OnTaskComplete(service);
}
}


Do you want me to try to implement a compact code (without dependency on the C++ interface) able to reproduce the problem with event_active() ?


Best regards,
Sten Kultakangas


On Wed, Sep 13, 2017 at 11:15 PM, Sten Kultakangas <ratkaisut@xxxxxxxxx> wrote:
Hello

I have implemented a thread pool for a network service application and ran into the following issue. After a certain quite large amount of calls to the event_active() function from a thread other than the main I/O thread event_base_dispatch() hangs causing high CPU usage. No events are dispatched after that.

Infinite loop is caused by the event_base_loop() function which calls win32_dispatch() via the following pointer:

res = evsel->dispatch(base, tv_p);

if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}

I traced win32_dispatch() and discovered that at least select() does not fail. However, one of the underlying functions called by evmap_io_active_() calls SetLastError() with the value 5 (ERROR_ACCESS_DENIED). How can i troubleshoot the issue ? Is there any other reliable way to wake up the thread running event_base_dispatch() ?

How can i make sure that libevent's internal data is not corrupted during a call to event_active() from another thread ? All i know for sure is that the calls to event_active() are serialized as well as the access to the data structures by both the thread pool and main I/O thread:

void *ThreadPool::ThreadProc(void *user_data)
{
TimedScope(0);
auto ptr = static_cast<impl *>(user_data);
shared_ptr<BackgroundTask> task;
while(true) {
{
// internally uses mutex to ensure mutually
// exclusive access to the data contained by 'Data'
auto data = "">

if(task) {
Trace << "Waking up main I/O thread";
data->CompletedTasks.push_back(move(task));
event_active(data->Event, EV_READ, 0);
task.reset();
}

if(data->MustExit) {
Info << "Exiting the thread";
break;
}

bool due_time_set = false;
MillisecondsClock due_time;
MillisecondsClock now = MillisecondsClock::Now();

while(!data->ScheduledTasks.empty()) {
auto &next = data->ScheduledTasks.top();
if(next->DueTime > now) {
due_time = next->DueTime;
due_time_set = true;
break;
}
data->DueTasks.push_back(next);
data->ScheduledTasks.pop();
}

if(!data->DueTasks.empty()) {
task = move(data->DueTasks.front());
data->DueTasks.pop_front();

} else if(due_time_set) {
long delay = due_time - now;
Trace << "Entering idle state for " << delay << " milliseconds";
data.WaitForSignal(&due_time.ts);

} else {
Trace << "Entering idle state indefinitely";
data.WaitForSignal();
}
}
if(!task) continue;

Trace << "Running task " << task->Name;
task->BackgroundWorker();
Trace << "Task " << task->Name << " has been completed";
}
return 0;
}



The following functions are called in the context of the main I/O thread:

void ThreadPool::EventProc(evutil_socket_t, short, void *user_data)
{
Scope;
auto pool = static_cast<ThreadPool *>(user_data);
auto ptr = pool->ptr.get();
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = "">
if(task) {
if(task->RestartDelay == 1) {
Trace << "Restarting task " << task->Name << " immediately";
data->DueTasks.push_back(move(task));
data.SendSignal();

} else if(task->RestartDelay != 0) {
Trace << "Rescheduling task " << task->Name << " for "
<< task->RestartDelay << " milliseconds from now";

task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
data->ScheduledTasks.push(move(task));
data.SendSignal();

} else if(task->ConcurrencyForbidden) {
auto &tasks = ptr->TasksByType[task->Type];
tasks.pop_front();
if(!tasks.empty()) {
data->DueTasks.push_back(move(tasks.front()));
data.SendSignal();
}
}
task.reset();
}

if(data->CompletedTasks.empty()) break;
task = move(data->CompletedTasks.front());
data->CompletedTasks.pop_front();
}

Trace << "Running completion routine for task " << task->Name;
task->OnTaskComplete(pool->service);
}
}


void ThreadPool::RegisterTask(std::shared_ptr<BackgroundTask> task)
{
auto &type = typeid(*task);
task->Type = type.hash_code();
task->Name = Demangle(type.name());

auto data = "">
if(task->ConcurrencyForbidden) {
// Append the new task to the list of the tasks having the same dynamic
// type.

auto &tasks = ptr->TasksByType[task->Type];
if(!tasks.empty()) {
// If there are other tasks of the same type, we simply append the
// new task to the list. The new task will be started as soon as the
// OnTaskComplete() member function of the last task having the same
// type returns.
tasks.push_back(move(task));
return;
}

// Otherwise the list will only have a single item containing a null
// reference which will be removed by ThreadPool::EventProc as soon as
// the OnTaskComplete() member function of the new task returns. It is
// our responsibility to start the new task immediately.
tasks.emplace_back();
}
data->DueTasks.push_back(move(task));
data.SendSignal();
}


Internal data structure accessed with mutual exclusion:

struct ThreadPool::impl
{
struct ExclusiveData {
struct CompareTasks {
bool operator()(
const shared_ptr<BackgroundTask> &left,
const shared_ptr<BackgroundTask> &right)
{
return left->DueTime > right->DueTime;
}
};

priority_queue<
shared_ptr<BackgroundTask>,
deque<shared_ptr<BackgroundTask>>,
CompareTasks
> ScheduledTasks;

list<shared_ptr<BackgroundTask>> DueTasks;
list<shared_ptr<BackgroundTask>> CompletedTasks;
struct event *Event = 0;
bool MustExit = false;
};

list<pthread_t> Threads;
SharedDataContainer<ExclusiveData> Data;
map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
};


I have performed different tests and came to the conclusion that the faulty behavior occurs only when real network I/O is used quite intensively. For example, the following simple test does not reproduce the infinite loop in event_base_dispatch() no matter for how long the test is running.


struct TestTask : BackgroundTask {
int number;

TestTask()
{
static int seq = 0;
number = ++seq;
}

void BackgroundWorker() override
{
Scope;
for(int i = 0; i < 1000; i++) {
Trace << "blah " << number;
}
}

void OnTaskComplete(GenericService *service) override
{
RestartDelay = 1;
}
};

void TestThreadPool(GenericService *service)
{
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
service->pool->RunTask(new TestTask);
}




Best regards,
Sten Kultakangas