[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Send evbuffer to multiple clients



Hi,

On 10.06.2011 23:21, Nick Mathewson wrote:
[...]
I think the locking mechanism has a few potential deadlocks and race
conditions we need to worry about.
[...]

I finally got some time to continue working on this. Attached are more
changes against my first patch, that should resolve the deadlocks/race
conditions.
It basically prevents nesting of already referenced buffers, so you
can't add a buffer that contains references to other buffers. Also the
referenced source buffer is stored, so the lock doesn't get freed too
early.

Best regards,
  Joachim
From e613b30a52ef54068e1d95504a0887e8870654d0 Mon Sep 17 00:00:00 2001
From: Joachim Bauch <mail@xxxxxxxxxxxxxxxx>
Date: Thu, 9 Jun 2011 23:33:58 +0200
Subject: [PATCH 1/2] support adding buffers to other buffers non-destructively

---
 buffer.c                |  131 ++++++++++++++++++++++++++++++++++++++++++++++-
 evbuffer-internal.h     |   15 +++++
 include/event2/buffer.h |   12 ++++
 test/regress_buffer.c   |   68 ++++++++++++++++++++++++
 4 files changed, 225 insertions(+), 1 deletions(-)

diff --git a/buffer.c b/buffer.c
index 84d3aea..c27cf00 100644
--- a/buffer.c
+++ b/buffer.c
@@ -142,6 +142,8 @@ static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf,
     size_t datlen);
 static int evbuffer_ptr_subtract(struct evbuffer *buf, struct evbuffer_ptr *pos,
     size_t howfar);
+static inline void evbuffer_chain_incref(struct evbuffer_chain *chain);
+static inline void evbuffer_chain_decref(struct evbuffer_chain *chain);
 
 static struct evbuffer_chain *
 evbuffer_chain_new(size_t size)
@@ -179,7 +181,11 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 		chain->flags |= EVBUFFER_DANGLING;
 		return;
 	}
-
+	if (chain->refcnt > 0) {
+		// chain is still referenced by other chains
+		return;
+	}
+    
 	if (chain->flags & EVBUFFER_REFERENCE) {
 		struct evbuffer_chain_reference *info =
 		    EVBUFFER_CHAIN_EXTRA(
@@ -203,6 +209,16 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 			evbuffer_file_segment_free(info->segment);
 		}
 	}
+	if (chain->flags & EVBUFFER_MULTICAST) {
+		struct evbuffer_multicast_parent *info =
+		    EVBUFFER_CHAIN_EXTRA(
+			    struct evbuffer_multicast_parent,
+			    chain);
+		EVUTIL_ASSERT(info->parent != NULL);
+		EVBUFFER_LOCK(info);
+		evbuffer_chain_decref(info->parent);
+		EVBUFFER_UNLOCK(info);
+	}
 
 	mm_free(chain);
 }
@@ -292,6 +308,23 @@ _evbuffer_chain_unpin(struct evbuffer_chain *chain, unsigned flag)
 		evbuffer_chain_free(chain);
 }
 
+static inline void
+evbuffer_chain_incref(struct evbuffer_chain *chain)
+{
+    ++chain->refcnt;
+}
+
+static inline void
+evbuffer_chain_decref(struct evbuffer_chain *chain)
+{
+    EVUTIL_ASSERT(chain->refcnt > 0);
+    if (--chain->refcnt > 0) {
+        return;
+    }
+    
+    evbuffer_chain_free(chain);
+}
+
 struct evbuffer *
 evbuffer_new(void)
 {
@@ -770,6 +803,59 @@ APPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 	dst->total_len += src->total_len;
 }
 
+static inline void
+APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
+{
+	struct evbuffer_chain *tmp;
+	struct evbuffer_chain *chain = src->first;
+	struct evbuffer_multicast_parent *extra;
+
+	ASSERT_EVBUFFER_LOCKED(dst);
+	ASSERT_EVBUFFER_LOCKED(src);
+
+	for (; chain; chain = chain->next) {
+		if (!chain->off || chain->flags & EVBUFFER_DANGLING) {
+			// skip empty chains
+			continue;
+		}
+
+		tmp = evbuffer_chain_new(sizeof(struct evbuffer_multicast_parent));
+		if (!tmp) {
+			event_warn("%s: out of memory", __func__);
+			return;
+		}
+		extra = EVBUFFER_CHAIN_EXTRA(struct evbuffer_multicast_parent, tmp);
+		if (chain->flags & EVBUFFER_MULTICAST) {
+			// source chain is a reference itself
+			struct evbuffer_multicast_parent *info =
+				EVBUFFER_CHAIN_EXTRA(
+					struct evbuffer_multicast_parent,
+					chain);
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+			extra->lock = info->lock;
+#endif
+			EVBUFFER_LOCK(info);
+			evbuffer_chain_incref(info->parent);
+			EVBUFFER_UNLOCK(info);
+			extra->parent = info->parent;
+		} else {
+			// reference source chain which now becomes immutable
+			evbuffer_chain_incref(chain);
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+			extra->lock = src->lock;
+#endif
+			extra->parent = chain;
+			chain->flags |= EVBUFFER_IMMUTABLE;
+		}
+		tmp->buffer_len = chain->buffer_len;
+		tmp->misalign = chain->misalign;
+		tmp->off = chain->off;
+		tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_IMMUTABLE;
+		tmp->buffer = chain->buffer;
+		evbuffer_chain_insert(dst, tmp);
+	}
+}
+
 static void
 PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 {
@@ -835,6 +921,49 @@ done:
 }
 
 int
+evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf)
+{
+	size_t in_total_len, out_total_len;
+	struct evbuffer_chain *chain;
+	int result = 0;
+
+	EVBUFFER_LOCK2(inbuf, outbuf);
+	in_total_len = inbuf->total_len;
+	out_total_len = outbuf->total_len;
+	chain = inbuf->first;
+
+	if (in_total_len == 0)
+		goto done;
+
+	if (outbuf->freeze_end || outbuf == inbuf) {
+		result = -1;
+		goto done;
+	}
+
+	for (; chain; chain = chain->next) {
+		if ((chain->flags & (EVBUFFER_FILESEGMENT|EVBUFFER_SENDFILE)) != 0) {
+			// chain type can not be referenced
+			result = -1;
+			goto done;
+		}
+	}
+
+	if (out_total_len == 0) {
+		/* There might be an empty chain at the start of outbuf; free
+		 * it. */
+		evbuffer_free_all_chains(outbuf->first);
+	}
+	APPEND_CHAIN_MULTICAST(outbuf, inbuf);
+
+	outbuf->n_add_for_cb += in_total_len;
+	evbuffer_invoke_callbacks(outbuf);
+
+done:
+	EVBUFFER_UNLOCK2(inbuf, outbuf);
+	return result;
+}
+
+int
 evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 {
 	struct evbuffer_chain *pinned, *last;
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index 6bb3e2c..75d026d 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -183,6 +183,11 @@ struct evbuffer_chain {
 	/** a chain that should be freed, but can't be freed until it is
 	 * un-pinned. */
 #define EVBUFFER_DANGLING	0x0040
+	/** a chain that is a referenced copy of another chain */
+#define EVBUFFER_MULTICAST	0x0080
+
+	/** number of multicast references to this chain */
+	int refcnt;
 
 	/** Usually points to the read-write memory belonging to this
 	 * buffer allocated as part of the evbuffer_chain allocation.
@@ -240,6 +245,16 @@ struct evbuffer_file_segment {
 	ev_off_t length;
 };
 
+/** Information about the multicast parent of a chain.  Lives at the
+ * end of an evbuffer_chain with the EVBUFFER_MULTICAST flag set.  */
+struct evbuffer_multicast_parent {
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+	void *lock; /**< lock prevent concurrent access to the parent */
+#endif
+	/** multicast parent for this chain */
+	struct evbuffer_chain *parent;
+};
+
 #define EVBUFFER_CHAIN_SIZE sizeof(struct evbuffer_chain)
 /** Return a pointer to extra data allocated along with an evbuffer. */
 #define EVBUFFER_CHAIN_EXTRA(t, c) (t *)((struct evbuffer_chain *)(c) + 1)
diff --git a/include/event2/buffer.h b/include/event2/buffer.h
index 7e2fc31..e0fb685 100644
--- a/include/event2/buffer.h
+++ b/include/event2/buffer.h
@@ -333,6 +333,18 @@ char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
  */
 int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf);
 
+/**
+  Copy data from one evbuffer into another evbuffer.
+  
+  This is a non-destructive add.  The data from one buffer is copied
+  into the other buffer.  However, no unnecessary memory copies occur.
+  
+  @param outbuf the output buffer
+  @param inbuf the input buffer
+  @return 0 if successful, or -1 if an error occurred
+ */
+int evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf);
+
 
 typedef void (*evbuffer_ref_cleanup_cb)(const void *data,
     size_t datalen, void *extra);
diff --git a/test/regress_buffer.c b/test/regress_buffer.c
index 9c4b47c..74bc1ec 100644
--- a/test/regress_buffer.c
+++ b/test/regress_buffer.c
@@ -1478,6 +1478,73 @@ end:
 		evbuffer_free(buf2);
 }
 
+static void
+test_evbuffer_multicast(void *ptr)
+{
+	const char chunk1[] = "If you have found the answer to such a problem";
+	const char chunk2[] = "you ought to write it up for publication";
+			  /* -- Knuth's "Notes on the Exercises" from TAOCP */
+	char tmp[16];
+	size_t len1 = strlen(chunk1), len2=strlen(chunk2);
+
+	struct evbuffer *buf1 = NULL, *buf2 = NULL;
+
+	buf1 = evbuffer_new();
+	tt_assert(buf1);
+
+	evbuffer_add(buf1, chunk1, len1);
+	evbuffer_add(buf1, ", ", 2);
+	evbuffer_add(buf1, chunk2, len2);
+	tt_int_op(evbuffer_get_length(buf1), ==, len1+len2+2);
+
+	buf2 = evbuffer_new();
+	tt_assert(buf2);
+
+    tt_int_op(evbuffer_add_buffer_reference(buf2, buf1), ==, 0);
+    tt_int_op(evbuffer_add_buffer_reference(buf2, buf2), ==, -1);
+
+    // both buffers contain the same amount of data
+    tt_int_op(evbuffer_get_length(buf1), ==, evbuffer_get_length(buf1));
+
+	/* Make sure we can drain a little from the first buffer. */
+	tt_int_op(evbuffer_remove(buf1, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf1, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf1, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf1);
+
+	/* Make sure we can drain a little from the second buffer. */
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf2, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf2, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf2);
+
+    /* Make sure the data can be read from the second buffer when the first is freed */
+	evbuffer_free(buf1);
+	buf1 = NULL;
+    
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "I have", 6), ==, 0);
+
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "  foun", 6), ==, 0);
+
+end:
+	if (buf1)
+		evbuffer_free(buf1);
+	if (buf2)
+		evbuffer_free(buf2);
+}
+
 /* Some cases that we didn't get in test_evbuffer() above, for more coverage. */
 static void
 test_evbuffer_prepend(void *ptr)
@@ -1761,6 +1828,7 @@ struct testcase_t evbuffer_testcases[] = {
 	{ "search", test_evbuffer_search, 0, NULL, NULL },
 	{ "callbacks", test_evbuffer_callbacks, 0, NULL, NULL },
 	{ "add_reference", test_evbuffer_add_reference, 0, NULL, NULL },
+	{ "multicast", test_evbuffer_multicast, 0, NULL, NULL },
 	{ "prepend", test_evbuffer_prepend, TT_FORK, NULL, NULL },
 	{ "peek", test_evbuffer_peek, 0, NULL, NULL },
 	{ "freeze_start", test_evbuffer_freeze, 0, &nil_setup, (void*)"start" },
-- 
1.7.4.1

From 9f169a7ce5c8744e81dc8e190fdaa24db0a37364 Mon Sep 17 00:00:00 2001
From: Joachim Bauch <mail@xxxxxxxxxxxxxxxx>
Date: Thu, 4 Aug 2011 23:39:15 +0200
Subject: [PATCH 2/2] prevent nested multicast references, reworked locking

---
 buffer.c                |   42 ++++++++++++------------------------------
 evbuffer-internal.h     |    5 ++---
 include/event2/buffer.h |    6 +++++-
 test/regress_buffer.c   |    2 ++
 4 files changed, 21 insertions(+), 34 deletions(-)

diff --git a/buffer.c b/buffer.c
index 7c9d62c..e2c7f2f 100644
--- a/buffer.c
+++ b/buffer.c
@@ -221,10 +221,11 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 		    EVBUFFER_CHAIN_EXTRA(
 			    struct evbuffer_multicast_parent,
 			    chain);
+		EVUTIL_ASSERT(info->source != NULL);
 		EVUTIL_ASSERT(info->parent != NULL);
-		EVBUFFER_LOCK(info);
+		EVBUFFER_LOCK(info->source);
 		evbuffer_chain_decref(info->parent);
-		EVBUFFER_UNLOCK(info);
+		_evbuffer_decref_and_unlock(info->source);
 	}
 
 	mm_free(chain);
@@ -325,11 +326,8 @@ static inline void
 evbuffer_chain_decref(struct evbuffer_chain *chain)
 {
     EVUTIL_ASSERT(chain->refcnt > 0);
-    if (--chain->refcnt > 0) {
-        return;
-    }
-    
-    evbuffer_chain_free(chain);
+    --chain->refcnt;
+    // chain will be freed when parent buffer is released
 }
 
 struct evbuffer *
@@ -837,28 +835,12 @@ APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
 			return;
 		}
 		extra = EVBUFFER_CHAIN_EXTRA(struct evbuffer_multicast_parent, tmp);
-		if (chain->flags & EVBUFFER_MULTICAST) {
-			// source chain is a reference itself
-			struct evbuffer_multicast_parent *info =
-				EVBUFFER_CHAIN_EXTRA(
-					struct evbuffer_multicast_parent,
-					chain);
-#ifndef _EVENT_DISABLE_THREAD_SUPPORT
-			extra->lock = info->lock;
-#endif
-			EVBUFFER_LOCK(info);
-			evbuffer_chain_incref(info->parent);
-			EVBUFFER_UNLOCK(info);
-			extra->parent = info->parent;
-		} else {
-			// reference source chain which now becomes immutable
-			evbuffer_chain_incref(chain);
-#ifndef _EVENT_DISABLE_THREAD_SUPPORT
-			extra->lock = src->lock;
-#endif
-			extra->parent = chain;
-			chain->flags |= EVBUFFER_IMMUTABLE;
-		}
+		// reference source chain which now becomes immutable
+		_evbuffer_incref(src);
+		extra->source = src;
+		evbuffer_chain_incref(chain);
+		extra->parent = chain;
+		chain->flags |= EVBUFFER_IMMUTABLE;
 		tmp->buffer_len = chain->buffer_len;
 		tmp->misalign = chain->misalign;
 		tmp->off = chain->off;
@@ -953,7 +935,7 @@ evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf)
 	}
 
 	for (; chain; chain = chain->next) {
-		if ((chain->flags & (EVBUFFER_FILESEGMENT|EVBUFFER_SENDFILE)) != 0) {
+		if ((chain->flags & (EVBUFFER_FILESEGMENT|EVBUFFER_SENDFILE|EVBUFFER_MULTICAST)) != 0) {
 			// chain type can not be referenced
 			result = -1;
 			goto done;
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index 75d026d..1d67973 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -248,9 +248,8 @@ struct evbuffer_file_segment {
 /** Information about the multicast parent of a chain.  Lives at the
  * end of an evbuffer_chain with the EVBUFFER_MULTICAST flag set.  */
 struct evbuffer_multicast_parent {
-#ifndef _EVENT_DISABLE_THREAD_SUPPORT
-	void *lock; /**< lock prevent concurrent access to the parent */
-#endif
+	/** source buffer the multicast parent belongs to */
+	struct evbuffer *source;
 	/** multicast parent for this chain */
 	struct evbuffer_chain *parent;
 };
diff --git a/include/event2/buffer.h b/include/event2/buffer.h
index b8135f0..951137f 100644
--- a/include/event2/buffer.h
+++ b/include/event2/buffer.h
@@ -393,11 +393,15 @@ int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf);
   This is a non-destructive add.  The data from one buffer is copied
   into the other buffer.  However, no unnecessary memory copies occur.
   
+  Note that buffers already containing buffer references can't be added
+  to other buffers.
+  
   @param outbuf the output buffer
   @param inbuf the input buffer
   @return 0 if successful, or -1 if an error occurred
  */
-int evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf);
+int evbuffer_add_buffer_reference(struct evbuffer *outbuf,
+    struct evbuffer *inbuf);
 
 /**
    A cleanup function for a piece of memory added to an evbuffer by
diff --git a/test/regress_buffer.c b/test/regress_buffer.c
index 0eda87c..00240d9 100644
--- a/test/regress_buffer.c
+++ b/test/regress_buffer.c
@@ -1544,7 +1544,9 @@ test_evbuffer_multicast(void *ptr)
 	tt_assert(buf2);
 
     tt_int_op(evbuffer_add_buffer_reference(buf2, buf1), ==, 0);
+    // nested references are not allowed
     tt_int_op(evbuffer_add_buffer_reference(buf2, buf2), ==, -1);
+    tt_int_op(evbuffer_add_buffer_reference(buf1, buf2), ==, -1);
 
     // both buffers contain the same amount of data
     tt_int_op(evbuffer_get_length(buf1), ==, evbuffer_get_length(buf1));
-- 
1.7.4.1