[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] RPC: patch for cancelling requests



Hello,

The following patch allows to delete a RPC pool (with evrpc_pool_free()), including all requests which have already started. The user-defined callback is called with a new status (EVRPC_STATUS_ERR_ABORTED) which allows to properly release memory
used for message requests and replies.

It also fixes some possible memory leaks in evrpc_schedule_request(), when an error is encountered. The problem I described in my previous message should also be fixed.

I checked different scenarios (with or without hooks, timeouts, connection
failures...) using Valgrind, everything seems ok. The "regress" test code seems
also fine (all tests are OK).

Please let me know your opinion about it, I'm not yet familiar with the libevent
code :)

Best regards,

Christophe

diff -ur libevent-2.0.10-stable/evrpc.c libevent-2.0.10-patch/evrpc.c
--- libevent-2.0.10-stable/evrpc.c	2010-11-09 16:19:05.000000000 +0100
+++ libevent-2.0.10-patch/evrpc.c	2011-02-25 15:07:51.194781685 +0100
@@ -494,6 +494,7 @@
 
 static int evrpc_schedule_request(struct evhttp_connection *connection,
     struct evrpc_request_wrapper *ctx);
+static void evrpc_abort_request(struct evrpc_request_wrapper *ctx);
 
 struct evrpc_pool *
 evrpc_pool_new(struct event_base *base)
@@ -504,6 +505,8 @@
 
 	TAILQ_INIT(&pool->connections);
 	TAILQ_INIT(&pool->requests);
+	
+	TAILQ_INIT(&pool->started_requests);
 
 	TAILQ_INIT(&pool->paused_requests);
 
@@ -521,6 +524,11 @@
 {
 	if (request->hook_meta != NULL)
 		evrpc_hook_context_free(request->hook_meta);
+	if (request->req != NULL)
+		evhttp_cancel_request(request->req);
+
+	event_del(&request->ev_timeout);
+
 	mm_free(request->name);
 	mm_free(request);
 }
@@ -539,6 +547,12 @@
 		evrpc_request_wrapper_free(request);
 	}
 
+	while ((request = TAILQ_FIRST(&pool->started_requests)) != NULL) {
+		TAILQ_REMOVE(&pool->started_requests, request, next);
+		evrpc_abort_request(request);
+		evrpc_request_wrapper_free(request);
+	}
+
 	while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
 		TAILQ_REMOVE(&pool->paused_requests, pause, next);
 		mm_free(pause);
@@ -654,6 +668,8 @@
 	struct evrpc_pool *pool = ctx->pool;
 	struct evrpc_status status;
 
+	TAILQ_INSERT_TAIL(&pool->started_requests, ctx, next);
+
 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
 		goto error;
 
@@ -701,6 +717,7 @@
 	memset(&status, 0, sizeof(status));
 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
+	TAILQ_REMOVE(&pool->started_requests, ctx, next);
 	evrpc_request_wrapper_free(ctx);
 	return (-1);
 }
@@ -746,6 +763,7 @@
 	memset(&status, 0, sizeof(status));
 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
+	TAILQ_REMOVE(&pool->started_requests, ctx, next);
 	evrpc_request_wrapper_free(ctx);
 }
 
@@ -931,18 +949,33 @@
 
 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
 
-	evrpc_request_wrapper_free(ctx);
+	/* 
+	 * If we don't own the request, it will be freed by the underlying
+	 * http layer. So, avoid freeing it twice.
+	 */
+	if (req != NULL && !evhttp_request_is_owned(req))
+		ctx->req = NULL;
 
-	/* the http layer owned the original request structure, but if we
-	 * got paused, we asked for ownership and need to free it here. */
-	if (req != NULL && evhttp_request_is_owned(req))
-		evhttp_request_free(req);
+	TAILQ_REMOVE(&pool->started_requests, ctx, next);
+	evrpc_request_wrapper_free(ctx);
 
 	/* see if we can schedule another request */
 	evrpc_pool_schedule(pool);
 }
 
 static void
+evrpc_abort_request(struct evrpc_request_wrapper *ctx)
+{
+	struct evrpc_status status;
+
+	memset(&status, 0, sizeof(status));
+	status.http_req = NULL;
+	status.error	= EVRPC_STATUS_ERR_ABORTED;
+
+	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
+}
+
+static void
 evrpc_pool_schedule(struct evrpc_pool *pool)
 {
 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
diff -ur libevent-2.0.10-stable/evrpc-internal.h libevent-2.0.10-patch/evrpc-internal.h
--- libevent-2.0.10-stable/evrpc-internal.h	2010-08-17 19:22:32.000000000 +0200
+++ libevent-2.0.10-patch/evrpc-internal.h	2011-02-25 15:04:39.042782710 +0100
@@ -92,6 +92,13 @@
 	int timeout;
 
 	TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) (requests);
+
+	/* 
+	 * We maintain a separate list of requests that have been started
+	 * but not finished, in order to clean up everything properly if
+	 * the pool is deleted.
+	 */
+	TAILQ_HEAD(evrpc_strequestq, evrpc_request_wrapper) (started_requests);
 };
 
 struct evrpc_hook_ctx {
diff -ur libevent-2.0.10-stable/http.c libevent-2.0.10-patch/http.c
--- libevent-2.0.10-stable/http.c	2010-12-15 20:02:25.000000000 +0100
+++ libevent-2.0.10-patch/http.c	2011-02-25 16:19:07.302782506 +0100
@@ -1219,7 +1219,10 @@
 
 		/* we might want to set an error here */
 		request->cb(request, request->cb_arg);
-		evhttp_request_free(request);
+
+		if ((request->flags & EVHTTP_USER_OWNED) == 0) {
+			evhttp_request_free(request);
+		}
 	}
 }
 
diff -ur libevent-2.0.10-stable/include/event2/rpc_struct.h libevent-2.0.10-patch/include/event2/rpc_struct.h
--- libevent-2.0.10-stable/include/event2/rpc_struct.h	2010-08-16 18:56:09.000000000 +0200
+++ libevent-2.0.10-patch/include/event2/rpc_struct.h	2011-02-25 11:55:47.998781671 +0100
@@ -47,6 +47,7 @@
 #define EVRPC_STATUS_ERR_BADPAYLOAD	2
 #define EVRPC_STATUS_ERR_UNSTARTED	3
 #define EVRPC_STATUS_ERR_HOOKABORTED	4
+#define EVRPC_STATUS_ERR_ABORTED	5
 	int error;
 
 	/* for looking at headers or other information */