[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Add a timeout to tor_cond_wait; add tor_cond impl from libevent



commit e865248156a8512d756be003118de446d29611d1
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Sun Sep 22 22:08:41 2013 -0400

    Add a timeout to tor_cond_wait; add tor_cond impl from libevent
    
    The windows code may need some tweaks for it to compile; I've not
    tested it yet.
---
 src/common/compat_pthreads.c   |   24 +++++--
 src/common/compat_threads.h    |   19 ++++-
 src/common/compat_winthreads.c |  152 ++++++++++++++++++++++++----------------
 3 files changed, 126 insertions(+), 69 deletions(-)

diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index 276b244..0e5d33a 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -148,10 +148,6 @@ tor_get_thread_id(void)
 
 /* Conditions. */
 
-/** Cross-platform condition implementation. */
-struct tor_cond_t {
-  pthread_cond_t cond;
-};
 /** Return a newly allocated condition, with nobody waiting on it. */
 tor_cond_t *
 tor_cond_new(void)
@@ -177,11 +173,25 @@ tor_cond_free(tor_cond_t *cond)
 }
 /** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
  * All waiters on the condition must wait holding the same <b>mutex</b>.
- * Returns 0 on success, negative on failure. */
+ * Returns 0 on success, -1 on failure, 1 on timeout. */
 int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
 {
-  return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+  if (tv == NULL) {
+    return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+  } else {
+    struct timespec ts;
+    int r;
+    ts.tv_sec = tv->tv_sec;
+    ts.tv_nsec = tv->tv_usec * 1000;
+    r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
+    if (r == 0)
+      return 0;
+    else if (r == ETIMEDOUT)
+      return 1;
+    else
+      return -1;
+  }
 }
 /** Wake up one of the waiters on <b>cond</b>. */
 void
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index f43e74a..bbd782f 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -57,10 +57,25 @@ void tor_threads_init(void);
 void set_main_thread(void);
 int in_main_thread(void);
 
-typedef struct tor_cond_t tor_cond_t;
+typedef struct tor_cond_t {
+#ifdef USE_PTHREADS
+  pthread_cond_t cond;
+#elif defined(USE_WIN32_THREADS)
+  HANDLE event;
+
+  CRITICAL_SECTION lock;
+  int n_waiting;
+  int n_to_wake;
+  int generation;
+#else
+#error no known condition implementation.
+#endif
+} tor_cond_t;
+
 tor_cond_t *tor_cond_new(void);
 void tor_cond_free(tor_cond_t *cond);
-int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
+int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
+                  const struct timeval *tv);
 void tor_cond_signal_one(tor_cond_t *cond);
 void tor_cond_signal_all(tor_cond_t *cond);
 
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
index 01332fd..634dfbe 100644
--- a/src/common/compat_winthreads.c
+++ b/src/common/compat_winthreads.c
@@ -70,17 +70,20 @@ tor_get_thread_id(void)
   return (unsigned long)GetCurrentThreadId();
 }
 
-static DWORD cond_event_tls_index;
-struct tor_cond_t {
-  CRITICAL_SECTION mutex;
-  smartlist_t *events;
-};
 tor_cond_t *
 tor_cond_new(void)
 {
-  tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
-  InitializeCriticalSection(&cond->mutex);
-  cond->events = smartlist_new();
+  tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
+  if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
+    tor_free(cond);
+    return NULL;
+  }
+  if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
+    DeleteCriticalSection(&cond->lock);
+    tor_free(cond);
+    return NULL;
+  }
+  cond->n_waiting = cond->n_to_wake = cond->generation = 0;
   return cond;
 }
 void
@@ -88,74 +91,103 @@ tor_cond_free(tor_cond_t *cond)
 {
   if (!cond)
     return;
-  DeleteCriticalSection(&cond->mutex);
-  /* XXXX notify? */
-  smartlist_free(cond->events);
-  tor_free(cond);
+  DeleteCriticalSection(&cond->lock);
+  CloseHandle(cond->event);
+  mm_free(cond);
 }
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+
+static void
+tor_cond_signal_impl(tor_cond_t *cond, int broadcast)
 {
-  HANDLE event;
-  int r;
-  tor_assert(cond);
-  tor_assert(mutex);
-  event = TlsGetValue(cond_event_tls_index);
-  if (!event) {
-    event = CreateEvent(0, FALSE, FALSE, NULL);
-    TlsSetValue(cond_event_tls_index, event);
-  }
-  EnterCriticalSection(&cond->mutex);
-
-  tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
-  tor_assert(!smartlist_contains(cond->events, event));
-  smartlist_add(cond->events, event);
-
-  LeaveCriticalSection(&cond->mutex);
-
-  tor_mutex_release(mutex);
-  r = WaitForSingleObject(event, INFINITE);
-  tor_mutex_acquire(mutex);
-
-  switch (r) {
-    case WAIT_OBJECT_0: /* we got the mutex normally. */
-      break;
-    case WAIT_ABANDONED: /* holding thread exited. */
-    case WAIT_TIMEOUT: /* Should never happen. */
-      tor_assert(0);
-      break;
-    case WAIT_FAILED:
-      log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
-  }
+  EnterCriticalSection(&cond->lock);
+  if (broadcast)
+    cond->n_to_wake = cond->n_waiting;
+  else
+    ++cond->n_to_wake;
+  cond->generation++;
+  SetEvent(cond->event);
+  LeaveCriticalSection(&cond->lock);
   return 0;
 }
 void
 tor_cond_signal_one(tor_cond_t *cond)
 {
-  HANDLE event;
-  tor_assert(cond);
-
-  EnterCriticalSection(&cond->mutex);
-
-  if ((event = smartlist_pop_last(cond->events)))
-    SetEvent(event);
-
-  LeaveCriticalSection(&cond->mutex);
+  tor_cond_signal_impl(cond, 0);
 }
 void
 tor_cond_signal_all(tor_cond_t *cond)
 {
-  tor_assert(cond);
+  tor_cond_signal_impl(cond, 1);
+}
 
-  EnterCriticalSection(&cond->mutex);
-  SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
-  smartlist_clear(cond->events);
-  LeaveCriticalSection(&cond->mutex);
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock, const struct timeval *tv)
+{
+  CRITICAL_SECTION *lock = &lock->mutex;
+  int generation_at_start;
+  int waiting = 1;
+  int result = -1;
+  DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime;
+  if (tv)
+    ms_orig = ms = evutil_tv_to_msec_(tv);
+
+  EnterCriticalSection(&cond->lock);
+  ++cond->n_waiting;
+  generation_at_start = cond->generation;
+  LeaveCriticalSection(&cond->lock);
+
+  LeaveCriticalSection(lock);
+
+  startTime = GetTickCount();
+  do {
+    DWORD res;
+    res = WaitForSingleObject(cond->event, ms);
+    EnterCriticalSection(&cond->lock);
+    if (cond->n_to_wake &&
+        cond->generation != generation_at_start) {
+      --cond->n_to_wake;
+      --cond->n_waiting;
+      result = 0;
+      waiting = 0;
+      goto out;
+    } else if (res != WAIT_OBJECT_0) {
+      result = (res==WAIT_TIMEOUT) ? 1 : -1;
+      --cond->n_waiting;
+      waiting = 0;
+      goto out;
+    } else if (ms != INFINITE) {
+      endTime = GetTickCount();
+      if (startTime + ms_orig <= endTime) {
+        result = 1; /* Timeout */
+        --cond->n_waiting;
+        waiting = 0;
+        goto out;
+      } else {
+        ms = startTime + ms_orig - endTime;
+      }
+    }
+    /* If we make it here, we are still waiting. */
+    if (cond->n_to_wake == 0) {
+      /* There is nobody else who should wake up; reset
+       * the event. */
+      ResetEvent(cond->event);
+    }
+  out:
+    LeaveCriticalSection(&cond->lock);
+  } while (waiting);
+
+  EnterCriticalSection(lock);
+
+  EnterCriticalSection(&cond->lock);
+  if (!cond->n_waiting)
+    ResetEvent(cond->event);
+  LeaveCriticalSection(&cond->lock);
+
+  return result;
 }
 
 void
 tor_threads_init(void)
 {
-  cond_event_tls_index = TlsAlloc();
   set_main_thread();
 }



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits