[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Send evbuffer to multiple clients



Am Montag, den 06.06.2011, 20:29 -0400 schrieb Nick Mathewson:
> Hm.  If there's nothing hideously complex in the source buffer (that
> is, nothing added with evbuffer_add_file), you could try an
> appropriate combination of evbuffer_peek and evbuffer_add_reference.
> Other than that, I'm not thinking of much.
> 
> I'd love to get a better API for this in 2.1, if anybody wants to write one.

Attached is a patch against the current git head that adds a new method
"evbuffer_add_buffer_reference" providing a non-destructive add from one
evbuffer to another while avoiding unnecessary copies. For now it only
supports buffers containing only memory chains (i.e. no file segments
and sendfile chains). Also included is a first testcase, the default
regression tests still pass.

As this is my first attempt to change something in the buffer internals,
could you please check for any obvious errors or wrong coding style?
I hope sending this to the list is okay, or should I post it somewhere
else?

Best regards,
  Joachim

diff --git a/buffer.c b/buffer.c
index 98de504..bb31883 100644
--- a/buffer.c
+++ b/buffer.c
@@ -125,8 +125,11 @@
 #define EVBUFFER_CB_OBSOLETE	       0x00040000
 
 /* evbuffer_chain support */
+#define CHAIN_MULTICAST_ACTIVE(ch)  ((ch)->refcnt > 0)
+#define CHAIN_MAY_MODIFY(ch)  (!CHAIN_MULTICAST_ACTIVE(ch) && \
+        ((ch)->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MULTICAST)) == 0)
 #define CHAIN_SPACE_PTR(ch) ((ch)->buffer + (ch)->misalign + (ch)->off)
-#define CHAIN_SPACE_LEN(ch) ((ch)->flags & EVBUFFER_IMMUTABLE ? \
+#define CHAIN_SPACE_LEN(ch) (!CHAIN_MAY_MODIFY(ch) ? \
 	    0 : (ch)->buffer_len - ((ch)->misalign + (ch)->off))
 
 #define CHAIN_PINNED(ch)  (((ch)->flags & EVBUFFER_MEM_PINNED_ANY) != 0)
@@ -142,6 +145,8 @@ static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf,
     size_t datlen);
 static int evbuffer_ptr_subtract(struct evbuffer *buf, struct evbuffer_ptr *pos,
     size_t howfar);
+static void evbuffer_chain_incref(struct evbuffer_chain *chain);
+static void evbuffer_chain_decref(struct evbuffer_chain *chain);
 
 static struct evbuffer_chain *
 evbuffer_chain_new(size_t size)
@@ -175,11 +180,15 @@ evbuffer_chain_new(size_t size)
 static inline void
 evbuffer_chain_free(struct evbuffer_chain *chain)
 {
-	if (CHAIN_PINNED(chain)) {
+	if (CHAIN_PINNED(chain) && !(chain->flags & EVBUFFER_MULTICAST)) {
 		chain->flags |= EVBUFFER_DANGLING;
 		return;
 	}
-
+	if (CHAIN_MULTICAST_ACTIVE(chain)) {
+		// chain is still referenced by other chains
+		return;
+	}
+    
 	if (chain->flags & EVBUFFER_REFERENCE) {
 		struct evbuffer_chain_reference *info =
 		    EVBUFFER_CHAIN_EXTRA(
@@ -203,6 +212,10 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 			evbuffer_file_segment_free(info->segment);
 		}
 	}
+	if (chain->flags & EVBUFFER_MULTICAST) {
+		EVUTIL_ASSERT(chain->parent != NULL);
+		evbuffer_chain_decref(chain->parent);
+	}
 
 	mm_free(chain);
 }
@@ -292,6 +305,25 @@ _evbuffer_chain_unpin(struct evbuffer_chain *chain, unsigned flag)
 		evbuffer_chain_free(chain);
 }
 
+static void
+evbuffer_chain_incref(struct evbuffer_chain *chain)
+{
+    ++chain->refcnt;
+    chain->flags |= EVBUFFER_MEM_PINNED_M;
+}
+
+static void
+evbuffer_chain_decref(struct evbuffer_chain *chain)
+{
+    EVUTIL_ASSERT(chain->refcnt > 0);
+    if (--chain->refcnt > 0) {
+        return;
+    }
+    
+    chain->flags &= ~EVBUFFER_MEM_PINNED_M;
+    evbuffer_chain_free(chain);
+}
+
 struct evbuffer *
 evbuffer_new(void)
 {
@@ -770,6 +802,46 @@ APPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 	dst->total_len += src->total_len;
 }
 
+static inline void
+APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
+{
+	struct evbuffer_chain *tmp;
+	struct evbuffer_chain *chain = src->first;
+
+	ASSERT_EVBUFFER_LOCKED(dst);
+	ASSERT_EVBUFFER_LOCKED(src);
+
+	for (; chain; chain = chain->next) {
+		if (!chain->off || chain->flags & EVBUFFER_DANGLING) {
+			// skip empty chains
+			continue;
+		}
+
+		tmp = evbuffer_chain_insert_new(dst, 0);
+		if (!tmp) {
+			event_warn("%s: out of memory", __func__);
+			return;
+		}
+
+		if (chain->flags & EVBUFFER_MULTICAST) {
+			// source chain is a reference itself
+			evbuffer_chain_incref(chain->parent);
+			tmp->parent = chain->parent;
+		} else {
+			// reference source chain
+			evbuffer_chain_incref(chain);
+			tmp->parent = chain;
+		}
+		tmp->buffer_len = chain->buffer_len;
+		tmp->misalign = chain->misalign;
+		tmp->off = chain->off;
+		tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_MEM_PINNED_M;
+		tmp->buffer = chain->buffer;
+	}
+
+	dst->total_len += src->total_len;
+}
+
 static void
 PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 {
@@ -835,6 +907,49 @@ done:
 }
 
 int
+evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf)
+{
+	size_t in_total_len, out_total_len;
+	struct evbuffer_chain *chain;
+	int result = 0;
+
+	EVBUFFER_LOCK2(inbuf, outbuf);
+	in_total_len = inbuf->total_len;
+	out_total_len = outbuf->total_len;
+	chain = inbuf->first;
+
+	if (in_total_len == 0 || outbuf == inbuf)
+		goto done;
+
+	if (outbuf->freeze_end) {
+		result = -1;
+		goto done;
+	}
+
+	for (; chain; chain = chain->next) {
+		if ((chain->flags & (EVBUFFER_FILESEGMENT|EVBUFFER_SENDFILE)) != 0) {
+			// chain type can not be referenced
+			result = -1;
+			goto done;
+		}
+	}
+
+	if (out_total_len == 0) {
+		/* There might be an empty chain at the start of outbuf; free
+		 * it. */
+		evbuffer_free_all_chains(outbuf->first);
+	}
+	APPEND_CHAIN_MULTICAST(outbuf, inbuf);
+
+	outbuf->n_add_for_cb += in_total_len;
+	evbuffer_invoke_callbacks(outbuf);
+
+done:
+	EVBUFFER_UNLOCK2(inbuf, outbuf);
+	return result;
+}
+
+int
 evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 {
 	struct evbuffer_chain *pinned, *last;
@@ -1485,7 +1600,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
 		evbuffer_chain_insert(buf, chain);
 	}
 
-	if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
+	if ((chain->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MEM_PINNED_M)) == 0) {
 		remain = (size_t)(chain->buffer_len - chain->misalign - chain->off);
 		if (remain >= datlen) {
 			/* there's enough space to hold all the data in the
@@ -1566,8 +1681,8 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen)
 		evbuffer_chain_insert(buf, chain);
 	}
 
-	/* we cannot touch immutable buffers */
-	if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
+	/* we cannot touch immutable or referenced buffers */
+	if ((chain->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MEM_PINNED_M)) == 0) {
 		/* If this chain is empty, we can treat it as
 		 * 'empty at the beginning' rather than 'empty at the end' */
 		if (chain->off == 0)
@@ -1766,7 +1881,7 @@ _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen, int n)
 	ASSERT_EVBUFFER_LOCKED(buf);
 	EVUTIL_ASSERT(n >= 2);
 
-	if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) {
+	if (chain == NULL || (chain->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MEM_PINNED_M))) {
 		/* There is no last chunk, or we can't touch the last chunk.
 		 * Just add a new chunk. */
 		chain = evbuffer_chain_new(datlen);
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index 6bb3e2c..dcc2f96 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -179,10 +179,19 @@ struct evbuffer_chain {
 	 * memmoved, until the chain is un-pinned. */
 #define EVBUFFER_MEM_PINNED_R	0x0010
 #define EVBUFFER_MEM_PINNED_W	0x0020
-#define EVBUFFER_MEM_PINNED_ANY (EVBUFFER_MEM_PINNED_R|EVBUFFER_MEM_PINNED_W)
+#define EVBUFFER_MEM_PINNED_M	0x0080	/**< pinned for multicasting */
+#define EVBUFFER_MEM_PINNED_ANY (EVBUFFER_MEM_PINNED_R|EVBUFFER_MEM_PINNED_W|EVBUFFER_MEM_PINNED_M)
 	/** a chain that should be freed, but can't be freed until it is
 	 * un-pinned. */
 #define EVBUFFER_DANGLING	0x0040
+	/** a chain that is a referenced copy of another chain */
+#define EVBUFFER_MULTICAST	0x0100
+
+	/** number of multicast references to this chain */
+	int refcnt;
+
+	/** multicast parent for this chain */
+	struct evbuffer_chain *parent;
 
 	/** Usually points to the read-write memory belonging to this
 	 * buffer allocated as part of the evbuffer_chain allocation.
diff --git a/include/event2/buffer.h b/include/event2/buffer.h
index 7e2fc31..e0fb685 100644
--- a/include/event2/buffer.h
+++ b/include/event2/buffer.h
@@ -333,6 +333,18 @@ char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
  */
 int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf);
 
+/**
+  Copy data from one evbuffer into another evbuffer.
+  
+  This is a non-destructive add.  The data from one buffer is copied
+  into the other buffer.  However, no unnecessary memory copies occur.
+  
+  @param outbuf the output buffer
+  @param inbuf the input buffer
+  @return 0 if successful, or -1 if an error occurred
+ */
+int evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf);
+
 
 typedef void (*evbuffer_ref_cleanup_cb)(const void *data,
     size_t datalen, void *extra);
diff --git a/test/regress_buffer.c b/test/regress_buffer.c
index 9c4b47c..4b20215 100644
--- a/test/regress_buffer.c
+++ b/test/regress_buffer.c
@@ -1478,6 +1478,72 @@ end:
 		evbuffer_free(buf2);
 }
 
+static void
+test_evbuffer_multicast(void *ptr)
+{
+	const char chunk1[] = "If you have found the answer to such a problem";
+	const char chunk2[] = "you ought to write it up for publication";
+			  /* -- Knuth's "Notes on the Exercises" from TAOCP */
+	char tmp[16];
+	size_t len1 = strlen(chunk1), len2=strlen(chunk2);
+
+	struct evbuffer *buf1 = NULL, *buf2 = NULL;
+
+	buf1 = evbuffer_new();
+	tt_assert(buf1);
+
+	evbuffer_add(buf1, chunk1, len1);
+	evbuffer_add(buf1, ", ", 2);
+	evbuffer_add(buf1, chunk2, len2);
+	tt_int_op(evbuffer_get_length(buf1), ==, len1+len2+2);
+
+	buf2 = evbuffer_new();
+	tt_assert(buf2);
+
+    tt_int_op(evbuffer_add_buffer_reference(buf2, buf1), ==, 0);
+
+    // both buffers contain the same amount of data
+    tt_int_op(evbuffer_get_length(buf1), ==, evbuffer_get_length(buf1));
+
+	/* Make sure we can drain a little from the first buffer. */
+	tt_int_op(evbuffer_remove(buf1, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf1, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf1, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf1);
+
+	/* Make sure we can drain a little from the second buffer. */
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf2, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf2, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf2);
+
+    /* Make sure the data can be read from the second buffer when the first is freed */
+	evbuffer_free(buf1);
+	buf1 = NULL;
+    
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "I have", 6), ==, 0);
+
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "  foun", 6), ==, 0);
+
+end:
+	if (buf1)
+		evbuffer_free(buf1);
+	if (buf2)
+		evbuffer_free(buf2);
+}
+
 /* Some cases that we didn't get in test_evbuffer() above, for more coverage. */
 static void
 test_evbuffer_prepend(void *ptr)
@@ -1761,6 +1827,7 @@ struct testcase_t evbuffer_testcases[] = {
 	{ "search", test_evbuffer_search, 0, NULL, NULL },
 	{ "callbacks", test_evbuffer_callbacks, 0, NULL, NULL },
 	{ "add_reference", test_evbuffer_add_reference, 0, NULL, NULL },
+	{ "multicast", test_evbuffer_multicast, 0, NULL, NULL },
 	{ "prepend", test_evbuffer_prepend, TT_FORK, NULL, NULL },
 	{ "peek", test_evbuffer_peek, 0, NULL, NULL },
 	{ "freeze_start", test_evbuffer_freeze, 0, &nil_setup, (void*)"start" },