[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] Crash in my simple multi-threaded evhttp app [code incl.]



Hi libevent,

Sorry for the recent surge of emails.  I am trying to determine the cause of a problem in my application while using evhttp.  I wrote a toy/dummy server (whose code I included) that exhibits the same problem that my real application is having.

PROBLEM OVERVIEW:

My application is using ev_http and I set its gencb (handles any request that comes in) to a function that just validates that there's a path or query.  If a path or query exists, the evhttp_request*, path, and query are handed off to one thread which processes the request one at a time.  This thread is calling evhttp_send_reply or evhttp_send_error when done with the request.

The problem is that when I hit my server through the browser and hit refresh to constantly send it requests, it crashes.  In the debugger, I see it crashes on the evhttp_send_reply line and the error reads: "exception at 0x____ in ____.exe, 0xC0000005: Access violation reading location 0xdddddee5" none of the parameters going into this method are NULL.

If I do the processing of requests in the gencb, there is no crash.

THINGS IVE TRIED:
-in the gencb, I've tried calling evhttp_request_own(req); to ensure that the connection/resources aren't freed after the gencb method returns (because really I want to keep using the evhttp_request* later when my thread actually processes the request).

-if in the gencb, I process the request there, instead of passing it to another thread, then everything works, but all the processing is done in the event loop thread (right?) so this would not be ideal if the request processing takes a while because it'd hold up the event loop.

OTHER NOTES:
-I made a post to this distribution list regarding my problem with my custom threading library and recursive locks and wait conds.  I have verified that no wait conditions are being used while the app is running, so it's not a problem of my custom wait conds.

QUESTIONS:
-was evhttp meant to be used this way?  Should I not be calling evhttp_send_reply/error from a different thread?  I have set up evthread_lock_callbacks, so I would think that a multi-threaded app should work.  

-any idea what might be causing this crash?  all my parameters for evhttp_send_reply are non null

Any help would be greatly appreciated and I am thankful for the other responses I've gotten on this DL.

Thanks,
-Julian

Code is attached below.  Took me a while to reduce my code to something very simple that I could post and I apologize for the dependencies which would make it difficult for you guys to run.  Methods of interest are: HttpServer::onGeneralRequest(), WmtsRequestHandler::process(), and EvThread::id_fn()

========================================================================
HttpServer.cpp
========================================================================

#include "HttpServer.h"

#include "EvLock.h"
#include "EvThread.h"
#include "EvWaitCond.h"

#include "HttpRequestResponse.h"
#include "WmtsRequestHandler.h"

#include <io.h>
#include <sys/stat.h>

#include <event2/thread.h>

event_base* s_http_event_base = NULL;

HttpServer::HttpServer(const char* hostname, int port) :
   m_port(port)
{
   m_hostname = new char[strlen(hostname)];
   strcpy(m_hostname, hostname);
}

HttpServer::~HttpServer()
{
   if(m_hostname)
      delete[](m_hostname);
}

// for every valid request, just pass the evhttp_request* to the WmtsRequestHandler
void HttpServer::onGeneralRequest(struct evhttp_request* req, void* arg)
{
   if(!req || !arg) 
      return;

   evhttp_request_own(req);

   if(evhttp_request_get_command(req) == EVHTTP_REQ_GET)
   {
      //cout << "GET request received" << endl;
   }
   else
   {
      cout << "Unknown request type received" << endl;
      evhttp_send_error(req, HTTP_BADREQUEST, "Bad request: server doesn't handle this request type.");
      return;
   }

   struct evhttp_uri* decoded_uri = evhttp_uri_parse(evhttp_request_get_uri(req));
   if(!decoded_uri)
   {
      cout << "Could not decode URI" << endl;
      evhttp_send_error(req, HTTP_BADREQUEST, "Bad request: cannot decode URI");
   }
   else
   {
      const char* path = evhttp_uri_get_path(decoded_uri);
      const char* query = evhttp_uri_get_query(decoded_uri);

      const char* decoded_path = NULL;
      const char* decoded_query = NULL;
      if(path)
      {
         //cout << "path: " << path << endl;
         decoded_path = evhttp_uridecode(path, 0, NULL);

         //if(decoded_path)
            //cout << "decoded_path: " << decoded_path << endl;
      }

      if(query)
      {
         //cout << "query: " << query << endl;
         decoded_query = evhttp_uridecode(query, 0, NULL);

         //if(decoded_query)
         //   cout << "decoded_query: " << decoded_query << endl;
      }

      if(decoded_query || decoded_path)
      {
         HttpRequest request;
         request.conn = req;

         //WmtsRequestHandler* req_handler = (WmtsRequestHandler*) arg;
         //req_handler->addToQueue(request);

         WmtsRequestHandler::process(request);
      }
      else
      {
         evhttp_send_error(req, HTTP_BADREQUEST, "Bad request: Need both a path and a query");
      }
   }
}

// set everything up, bind, and dispatch
std_error_type HttpServer::init()
{
   WSADATA wsaData;
   WORD version = 0x202;

   cout << "WSAStartup...";
   int error = WSAStartup(version, &wsaData);
   cout << " complete" << endl;

   std_error_type ret_val = STD_FAILED;

   evthread_set_id_callback(&EvThread::id_fn);

   evthread_condition_callbacks cond_cbs; 
   cond_cbs.condition_api_version = 1.0;
   cond_cbs.alloc_condition = &EvWaitCond::alloc_condition;
   cond_cbs.free_condition = &EvWaitCond::free_condition;
   cond_cbs.signal_condition = &EvWaitCond::signal_condition;
   cond_cbs.wait_condition = &EvWaitCond::wait_condition;

   evthread_set_condition_callbacks(&cond_cbs);

   evthread_lock_callbacks lock_cbs;
   lock_cbs.lock_api_version = 1.0;
   lock_cbs.supported_locktypes = EVTHREAD_LOCKTYPE_RECURSIVE;
   lock_cbs.alloc = &EvLock::alloc;
   lock_cbs.free = &EvLock::free;
   lock_cbs.lock = &EvLock::lock;
   lock_cbs.unlock = &EvLock::unlock;
     
   evthread_set_lock_callbacks(&lock_cbs);

   //evthread_use_windows_threads();  //--------------------

   s_http_event_base = event_base_new();

   if(s_http_event_base)
   {
      cout << "Attempting to bind to " << m_hostname << ":" << m_port << " ... ";
      evhttp* http_event = evhttp_new(s_http_event_base);

      if(http_event)
      {
         evhttp_bound_socket* server_socket = NULL;

         cout << "Attempting to bind to " << m_hostname << ":" << m_port << " ... ";
         server_socket = evhttp_bind_socket_with_handle(http_event, m_hostname, m_port);

         evutil_make_listen_socket_reuseable(evhttp_bound_socket_get_fd(server_socket)); // don't care what it returns

         if(server_socket)
         {
            cout << "complete" << endl;

            // successfully bound
            WmtsRequestHandler req_handler(1); // thread pool size
            req_handler.init();

            evhttp_set_gencb(http_event, onGeneralRequest, &req_handler);

            cout << "Successfully bound and starting the dispatch loop" << endl;

            int dispatch_err = event_base_dispatch(s_http_event_base);

            evutil_closesocket(evhttp_bound_socket_get_fd(server_socket));

            cout << "Dispatch loop returned with: " << dispatch_err << endl;
         }
         else
         {
            cout << "failed.  Could not bind." << endl;
         }
      }
      else
      {
         cout << "failed.  http_event was null" << endl;
      }
   }
   else
   {
      cout << "Couldn't create event base" << endl;
   }

   return ret_val;
}

========================================================================
WmtsRequstHandler
========================================================================

#include "WmtsRequestHandler.h"

#include "WmtsRequestHandlerThread.h"

#include <assert.h>
#include <map>
#include <iostream>
#include <io.h>
#include <time.h>
#include <sys/stat.h>

#include <Std/StdErrorTypes.h>


WmtsRequestHandler::WmtsRequestHandler(
   int thread_pool_size) :
   m_is_aborted(false),
   m_lock(false)
{
   m_thread_pool_size = thread_pool_size;
}

WmtsRequestHandler::~WmtsRequestHandler()
{

}

// just set up threads
std_error_type 
WmtsRequestHandler::init()
{
   for(int i = 0; i < m_thread_pool_size; ++i)
   {
      WmtsRequestHandlerThread* thread = new WmtsRequestHandlerThread(*this);
      if(thread)
         m_thread_pool.push_back(thread);
   }

   for(int i = 0; i < m_thread_pool.size(); ++i)
   {
      m_thread_pool.at(i)->threadCreateRun();
   }

   if(m_thread_pool.size() > 0) 
      return STD_SUCCESS;
   else
      return STD_FAILED;
}

std_error_type 
WmtsRequestHandler::addToQueue(HttpRequest& req)
{
   m_lock.lock();
   m_requests.push_back(req);
   m_lock.unlock();

   m_data_available.wakeOne();

   return STD_SUCCESS;
}

bool 
WmtsRequestHandler::getIsAborted()
{
   // need lock for this
   return m_is_aborted;
}

ThrdMutex& 
WmtsRequestHandler::getLock()
{
   return m_lock;
}

ThrdWaitCond& 
WmtsRequestHandler::getWc()
{
   return m_data_available;
}

list<HttpRequest>& 
WmtsRequestHandler::getRequests()
{
   return m_requests;
}

// handles a request, just returns the time of the request
void 
WmtsRequestHandler::process(HttpRequest& req) // threads call this method
{
   assert(req.conn != NULL);

   std_error_type status = STD_FAILED;

   HttpResponse response;
   response.response = evbuffer_new(); //*******************
   evbuffer_add_printf(response.response, "This is a response at time: %d", time (NULL));
   response.conn = req.conn;
   response.content_type = "text/plain";
   response.code = 200;

   char* reason = "";

   if(response.code >= 200 && response.code < 300)
   {
      struct evkeyvalq* headers = evhttp_request_get_output_headers(response.conn);

      assert(headers != NULL);

      evhttp_add_header(headers, "Content-Type", response.content_type.c_str());
      evhttp_send_reply(response.conn, response.code, reason, response.response);
   }
   else
   {
      if(response.conn && evhttp_request_get_connection(response.conn)) 
      {
         evhttp_send_error(response.conn, response.code, reason); 
         cout << "code: " << response.code << " reason: " << reason << endl;
      }
      else
      {
         cout << "send error failed" << endl;
      }
   }
}

========================================================================
WmtsRequstHandlerThread
========================================================================
#include "WmtsRequestHandlerThread.h"

#include <iostream>

#include "WmtsRequestHandler.h"
#include "HttpRequestResponse.h"

#include <Thrd/ThrdMutex.h>
#include <Thrd/ThrdWaitCond.h>

WmtsRequestHandlerThread::WmtsRequestHandlerThread(WmtsRequestHandler& req_handler) :
   m_req_handler(req_handler)
{

}

// gets a request from the queue and calls WmtsRequestHandler::process
void WmtsRequestHandlerThread::vRun()
{
   ThrdMutex& lock = m_req_handler.getLock();
   ThrdWaitCond& wc = m_req_handler.getWc();
   
   while(true)
   {
      lock.lock();

      list<HttpRequest>& requests = m_req_handler.getRequests();

      if(m_req_handler.getIsAborted())
      {
         lock.unlock();
         return;
      }
      
      if(requests.size() <= 0)
      {
         wc.wait(&lock);
      }

      bool got_request = false;
      HttpRequest req;
      if(requests.size() > 0) // check again in case we woke up due to abort
      {
         req = requests.front();
         requests.pop_front();
         got_request = true;
      }
      lock.unlock();

      if(got_request)
         m_req_handler.process(req); // this method will validate inputs
   }
}

========================================================================
HttpRequestResponse
========================================================================

#ifndef HTTP_REQUEST_RESPONSE
#define HTTP_REQUEST_RESPONSE

#include <string> 

#include <event2/http.h>
#include <event2/buffer.h>

using namespace std;

class HttpRequest
{
public:
   HttpRequest() : 
      conn(NULL),
      query(NULL) 
   {
   }

   HttpRequest&
   HttpRequest::operator=(
   const HttpRequest& rhs)
   {
      if (&rhs != this)
      {
         conn = rhs.conn;
         query = rhs.query;
      }
      return *this;
   }

   evhttp_request* conn;

   char* query;
};

class HttpResponse
{
public:

   HttpResponse() :
      conn(NULL),
      response(NULL),
      code(200),
      reason(""),
      content_type("") 
   {
   }

   HttpResponse&
   HttpResponse::operator=(
   const HttpResponse& rhs)
   {
      if (&rhs != this)
      {
         conn = rhs.conn;
         response = rhs.response;
         code = rhs.code;
         reason = rhs.reason;
         content_type = rhs.content_type;
      }
      return *this;
   }

   evhttp_request* conn;
   evbuffer* response;

   int code;
   std::string reason;
   std::string content_type;
};

#endif

========================================================================
EvLock.cpp, uses ThrdMutex which is basically a child of QMutex with no extended members
========================================================================

#include "EvLock.h"

#include <iostream>
#include <new>

#include <Thrd/ThrdMutex.h>

using namespace std;

void* EvLock::alloc(unsigned locktype)
{
   if(locktype == EVTHREAD_LOCKTYPE_READWRITE)
   {
      cout << "FAILED: read write lock requested" << endl;
      return NULL;
   }
   else if(locktype == EVTHREAD_LOCKTYPE_RECURSIVE)
   {
      cout << "Recursive" << endl;
      return new (nothrow) ThrdMutex(true); // true means recursive //*********************
   }
   else 
   {
      cout << "Not recursive" << endl;
      return new (nothrow) ThrdMutex(false); // false means non-recursive //*********************
   }
}

void EvLock::free(void* lock, unsigned locktype)
{
   if(!lock)
      delete(lock);
   else
      cout << "Error: free: lock was null" << endl;
}

int EvLock::lock(unsigned mode, void* lock)
{
   if(!lock || ((mode & EVTHREAD_WRITE) != 0) || ((mode & EVTHREAD_READ) != 0))
   {
      cout << "Error: problem locking" << endl;
      return -1; // api says READWRITE locks are not currently used.
   }

   if((mode & EVTHREAD_TRY) != 0)
   {
      cout << "tryLock ********" << endl;
      if(((ThrdMutex*)lock)->tryLock() == true)
         return 0;
      else
         return -1;
   }

   ThrdMutex* mutex = (ThrdMutex*)lock;
   mutex->lock();
   return 0;
}

int EvLock::unlock(unsigned mode, void* lock)
{
   if(!lock || ((mode & EVTHREAD_WRITE) != 0) || ((mode & EVTHREAD_READ) != 0))
   {
      cout << "Error: problem unlocking" << endl;
      return -1; // api says READWRITE locks are not currently used.
   }

   ThrdMutex* mutex = (ThrdMutex*)lock;

   mutex->unlock();
   return 0;
}

========================================================================
EvThread.h, uses ThrdThread which is basically a child of QThread with no extended members
========================================================================

#ifndef EV_THREAD
#define EV_THREAD

#include <iostream>

#include <event2/thread.h>

#include <Thrd/ThrdThread.h>

class EvThread
{
public:

   static unsigned long id_fn() 
   {
      unsigned int tid = (unsigned long) ThrdThread::getCurrentThreadHandle(); // ThrdThread::getCurrentThreadPtr();
      return tid;
   }

};

#endif