[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Send evbuffer to multiple clients



Am Mittwoch, den 08.06.2011, 13:15 -0400 schrieb Nick Mathewson:
> So if I understand here, the semantics are that once a chain has been
> added by reference to another evbuffer, the chain is now supposed to
> be immutable.

Right.

> Could we simplify stuff by also declaring that if you
> have an evbuffer which is added by reference to another evbuffer, the
> entire original evbuffer is immutable?  It seems to me that would make
> the rules for what you can use these for much easier to understand,
> and it would prevent people from doing crazy stuff.

There is no technical reason the original buffer should not be mutable
after it was added to another buffer. Adding it takes the current
contents as a snapshot. We must only take care that the referenced
chains don't change. Also this keeps the code a lot simpler.

> Is there a reason that EVBUFFER_MEM_PINNED_M needs to be distinct from
> EVBUFFER_IMMUTABLE?

Right, this is unnecessary.

> I think there's a race condition around evbuffer_chain_decref.  The
> reference count needs to be protected by the lock of the evbuffer that
> contains the referenced-chain, but right now it looks like it's only
> protected by the lock of the evbuffer that contains the referencing
> chain.

Yep, there were also locking calls missing if an already-referencing
chain was added to another buffer.

> I think we can move the "parent" field into a type-specific
> EVBUFFER_CHAIN_EXTRA() annex, in the way that evbuffer_chain_reference
> and evbuffer_chain_file_segment and evbuffer_file_segment currently
> do.

Okay, I didn't know about that but it makes sense.

> It would be fine to do this one in another patch, but
> evbuffer_file_segment should actually be copyable with the appropriate
> bookkeeping.

As it was pretty straightforward, I already implemented it that way.

> Also, I had considered another possible implementation.  Instead of
> adding a reference to each chain in the referenced buffer, we could
> instead add a reference to the referenced buffer itself, and use an
> evbuffer_ptr to track how far into it we were.  But on consideration,
> I think your approach is better: my idea would make too many functions
> harder to implement, without much real gain.

Yeah, my current implementation tries to keep as much of the original
code unchanged and only add the new functionality.

> The list is a great place to discuss patches, but IMO it has the
> problem that stuff can get forgotten.  Sourceforge and github are
> better at preventing stuff from getting forgotten, but fewer people
> read them, alas.  Let's keep talking here for now, but if one of us
> drops the ball on replying, let's remember to add a ticket/pull
> request/whatever for this.
> 
> Also fwiw, the best format for sending patches is the one generated by
> "git format-patch"; it makes them much easier to merge.  Pointing at a
> branch on a public git repository is also pretty good.

Attached is an updated patch that addresses all your comments and should
be easily mergeable after some more testing.

Best regards,
  Joachim

From e613b30a52ef54068e1d95504a0887e8870654d0 Mon Sep 17 00:00:00 2001
From: Joachim Bauch <mail@xxxxxxxxxxxxxxxx>
Date: Thu, 9 Jun 2011 23:33:58 +0200
Subject: [PATCH] support adding buffers to other buffers non-destructively

---
 buffer.c                |  131 ++++++++++++++++++++++++++++++++++++++++++++++-
 evbuffer-internal.h     |   15 +++++
 include/event2/buffer.h |   12 ++++
 test/regress_buffer.c   |   68 ++++++++++++++++++++++++
 4 files changed, 225 insertions(+), 1 deletions(-)

diff --git a/buffer.c b/buffer.c
index 84d3aea..c27cf00 100644
--- a/buffer.c
+++ b/buffer.c
@@ -142,6 +142,8 @@ static struct evbuffer_chain *evbuffer_expand_singlechain(struct evbuffer *buf,
     size_t datlen);
 static int evbuffer_ptr_subtract(struct evbuffer *buf, struct evbuffer_ptr *pos,
     size_t howfar);
+static inline void evbuffer_chain_incref(struct evbuffer_chain *chain);
+static inline void evbuffer_chain_decref(struct evbuffer_chain *chain);
 
 static struct evbuffer_chain *
 evbuffer_chain_new(size_t size)
@@ -179,7 +181,11 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 		chain->flags |= EVBUFFER_DANGLING;
 		return;
 	}
-
+	if (chain->refcnt > 0) {
+		// chain is still referenced by other chains
+		return;
+	}
+    
 	if (chain->flags & EVBUFFER_REFERENCE) {
 		struct evbuffer_chain_reference *info =
 		    EVBUFFER_CHAIN_EXTRA(
@@ -203,6 +209,16 @@ evbuffer_chain_free(struct evbuffer_chain *chain)
 			evbuffer_file_segment_free(info->segment);
 		}
 	}
+	if (chain->flags & EVBUFFER_MULTICAST) {
+		struct evbuffer_multicast_parent *info =
+		    EVBUFFER_CHAIN_EXTRA(
+			    struct evbuffer_multicast_parent,
+			    chain);
+		EVUTIL_ASSERT(info->parent != NULL);
+		EVBUFFER_LOCK(info);
+		evbuffer_chain_decref(info->parent);
+		EVBUFFER_UNLOCK(info);
+	}
 
 	mm_free(chain);
 }
@@ -292,6 +308,23 @@ _evbuffer_chain_unpin(struct evbuffer_chain *chain, unsigned flag)
 		evbuffer_chain_free(chain);
 }
 
+static inline void
+evbuffer_chain_incref(struct evbuffer_chain *chain)
+{
+    ++chain->refcnt;
+}
+
+static inline void
+evbuffer_chain_decref(struct evbuffer_chain *chain)
+{
+    EVUTIL_ASSERT(chain->refcnt > 0);
+    if (--chain->refcnt > 0) {
+        return;
+    }
+    
+    evbuffer_chain_free(chain);
+}
+
 struct evbuffer *
 evbuffer_new(void)
 {
@@ -770,6 +803,59 @@ APPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 	dst->total_len += src->total_len;
 }
 
+static inline void
+APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
+{
+	struct evbuffer_chain *tmp;
+	struct evbuffer_chain *chain = src->first;
+	struct evbuffer_multicast_parent *extra;
+
+	ASSERT_EVBUFFER_LOCKED(dst);
+	ASSERT_EVBUFFER_LOCKED(src);
+
+	for (; chain; chain = chain->next) {
+		if (!chain->off || chain->flags & EVBUFFER_DANGLING) {
+			// skip empty chains
+			continue;
+		}
+
+		tmp = evbuffer_chain_new(sizeof(struct evbuffer_multicast_parent));
+		if (!tmp) {
+			event_warn("%s: out of memory", __func__);
+			return;
+		}
+		extra = EVBUFFER_CHAIN_EXTRA(struct evbuffer_multicast_parent, tmp);
+		if (chain->flags & EVBUFFER_MULTICAST) {
+			// source chain is a reference itself
+			struct evbuffer_multicast_parent *info =
+				EVBUFFER_CHAIN_EXTRA(
+					struct evbuffer_multicast_parent,
+					chain);
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+			extra->lock = info->lock;
+#endif
+			EVBUFFER_LOCK(info);
+			evbuffer_chain_incref(info->parent);
+			EVBUFFER_UNLOCK(info);
+			extra->parent = info->parent;
+		} else {
+			// reference source chain which now becomes immutable
+			evbuffer_chain_incref(chain);
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+			extra->lock = src->lock;
+#endif
+			extra->parent = chain;
+			chain->flags |= EVBUFFER_IMMUTABLE;
+		}
+		tmp->buffer_len = chain->buffer_len;
+		tmp->misalign = chain->misalign;
+		tmp->off = chain->off;
+		tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_IMMUTABLE;
+		tmp->buffer = chain->buffer;
+		evbuffer_chain_insert(dst, tmp);
+	}
+}
+
 static void
 PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
 {
@@ -835,6 +921,49 @@ done:
 }
 
 int
+evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf)
+{
+	size_t in_total_len, out_total_len;
+	struct evbuffer_chain *chain;
+	int result = 0;
+
+	EVBUFFER_LOCK2(inbuf, outbuf);
+	in_total_len = inbuf->total_len;
+	out_total_len = outbuf->total_len;
+	chain = inbuf->first;
+
+	if (in_total_len == 0)
+		goto done;
+
+	if (outbuf->freeze_end || outbuf == inbuf) {
+		result = -1;
+		goto done;
+	}
+
+	for (; chain; chain = chain->next) {
+		if ((chain->flags & (EVBUFFER_FILESEGMENT|EVBUFFER_SENDFILE)) != 0) {
+			// chain type can not be referenced
+			result = -1;
+			goto done;
+		}
+	}
+
+	if (out_total_len == 0) {
+		/* There might be an empty chain at the start of outbuf; free
+		 * it. */
+		evbuffer_free_all_chains(outbuf->first);
+	}
+	APPEND_CHAIN_MULTICAST(outbuf, inbuf);
+
+	outbuf->n_add_for_cb += in_total_len;
+	evbuffer_invoke_callbacks(outbuf);
+
+done:
+	EVBUFFER_UNLOCK2(inbuf, outbuf);
+	return result;
+}
+
+int
 evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
 {
 	struct evbuffer_chain *pinned, *last;
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index 6bb3e2c..75d026d 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -183,6 +183,11 @@ struct evbuffer_chain {
 	/** a chain that should be freed, but can't be freed until it is
 	 * un-pinned. */
 #define EVBUFFER_DANGLING	0x0040
+	/** a chain that is a referenced copy of another chain */
+#define EVBUFFER_MULTICAST	0x0080
+
+	/** number of multicast references to this chain */
+	int refcnt;
 
 	/** Usually points to the read-write memory belonging to this
 	 * buffer allocated as part of the evbuffer_chain allocation.
@@ -240,6 +245,16 @@ struct evbuffer_file_segment {
 	ev_off_t length;
 };
 
+/** Information about the multicast parent of a chain.  Lives at the
+ * end of an evbuffer_chain with the EVBUFFER_MULTICAST flag set.  */
+struct evbuffer_multicast_parent {
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+	void *lock; /**< lock prevent concurrent access to the parent */
+#endif
+	/** multicast parent for this chain */
+	struct evbuffer_chain *parent;
+};
+
 #define EVBUFFER_CHAIN_SIZE sizeof(struct evbuffer_chain)
 /** Return a pointer to extra data allocated along with an evbuffer. */
 #define EVBUFFER_CHAIN_EXTRA(t, c) (t *)((struct evbuffer_chain *)(c) + 1)
diff --git a/include/event2/buffer.h b/include/event2/buffer.h
index 7e2fc31..e0fb685 100644
--- a/include/event2/buffer.h
+++ b/include/event2/buffer.h
@@ -333,6 +333,18 @@ char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,
  */
 int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf);
 
+/**
+  Copy data from one evbuffer into another evbuffer.
+  
+  This is a non-destructive add.  The data from one buffer is copied
+  into the other buffer.  However, no unnecessary memory copies occur.
+  
+  @param outbuf the output buffer
+  @param inbuf the input buffer
+  @return 0 if successful, or -1 if an error occurred
+ */
+int evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf);
+
 
 typedef void (*evbuffer_ref_cleanup_cb)(const void *data,
     size_t datalen, void *extra);
diff --git a/test/regress_buffer.c b/test/regress_buffer.c
index 9c4b47c..74bc1ec 100644
--- a/test/regress_buffer.c
+++ b/test/regress_buffer.c
@@ -1478,6 +1478,73 @@ end:
 		evbuffer_free(buf2);
 }
 
+static void
+test_evbuffer_multicast(void *ptr)
+{
+	const char chunk1[] = "If you have found the answer to such a problem";
+	const char chunk2[] = "you ought to write it up for publication";
+			  /* -- Knuth's "Notes on the Exercises" from TAOCP */
+	char tmp[16];
+	size_t len1 = strlen(chunk1), len2=strlen(chunk2);
+
+	struct evbuffer *buf1 = NULL, *buf2 = NULL;
+
+	buf1 = evbuffer_new();
+	tt_assert(buf1);
+
+	evbuffer_add(buf1, chunk1, len1);
+	evbuffer_add(buf1, ", ", 2);
+	evbuffer_add(buf1, chunk2, len2);
+	tt_int_op(evbuffer_get_length(buf1), ==, len1+len2+2);
+
+	buf2 = evbuffer_new();
+	tt_assert(buf2);
+
+    tt_int_op(evbuffer_add_buffer_reference(buf2, buf1), ==, 0);
+    tt_int_op(evbuffer_add_buffer_reference(buf2, buf2), ==, -1);
+
+    // both buffers contain the same amount of data
+    tt_int_op(evbuffer_get_length(buf1), ==, evbuffer_get_length(buf1));
+
+	/* Make sure we can drain a little from the first buffer. */
+	tt_int_op(evbuffer_remove(buf1, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf1, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf1, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf1);
+
+	/* Make sure we can drain a little from the second buffer. */
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "If you", 6), ==, 0);
+	tt_int_op(evbuffer_remove(buf2, tmp, 5), ==, 5);
+	tt_int_op(memcmp(tmp, " have", 5), ==, 0);
+
+	/* Make sure that prepending does not meddle with immutable data */
+	tt_int_op(evbuffer_prepend(buf2, "I have ", 7), ==, 0);
+	tt_int_op(memcmp(chunk1, "If you", 6), ==, 0);
+	evbuffer_validate(buf2);
+
+    /* Make sure the data can be read from the second buffer when the first is freed */
+	evbuffer_free(buf1);
+	buf1 = NULL;
+    
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "I have", 6), ==, 0);
+
+	tt_int_op(evbuffer_remove(buf2, tmp, 6), ==, 6);
+	tt_int_op(memcmp(tmp, "  foun", 6), ==, 0);
+
+end:
+	if (buf1)
+		evbuffer_free(buf1);
+	if (buf2)
+		evbuffer_free(buf2);
+}
+
 /* Some cases that we didn't get in test_evbuffer() above, for more coverage. */
 static void
 test_evbuffer_prepend(void *ptr)
@@ -1761,6 +1828,7 @@ struct testcase_t evbuffer_testcases[] = {
 	{ "search", test_evbuffer_search, 0, NULL, NULL },
 	{ "callbacks", test_evbuffer_callbacks, 0, NULL, NULL },
 	{ "add_reference", test_evbuffer_add_reference, 0, NULL, NULL },
+	{ "multicast", test_evbuffer_multicast, 0, NULL, NULL },
 	{ "prepend", test_evbuffer_prepend, TT_FORK, NULL, NULL },
 	{ "peek", test_evbuffer_peek, 0, NULL, NULL },
 	{ "freeze_start", test_evbuffer_freeze, 0, &nil_setup, (void*)"start" },
-- 
1.7.4.1