[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Abnormal delay on read



On Tue, Sep 02, 2014 at 10:33:12AM +0200, Viallard Anthony wrote:
> Hello there,
> 
> I have a problem with libevent (version 2.0.21).
> 
> I have 2 programs. One is sending requests (lets call it program A) to the
> other (program B) which process the request and send the response.
> 
> There are 2 kinds of request: one sent each 10 seconds (like a ping) and
> another one which is sent asynchronously when A has to.
> 
> All is working good at the beginning. But after a while, there is a delay
> between the request sent by A and the trigger of the read callback on B. All
> the request are shifted. During this delay, B proceed old ping requests...
> And this delay gradually increases.
> 
> I did a tcpdump capture on each side and I see there isn't a delay on the
> ethernet layer.
> 
> It's like there a mysterious queue somewhere...
> 
> However, I think I do the correct things in my read callback and I don't use
> watermarks.
> 
> Maybe, someone has already seen this kind of bad behavior ?
> 
> Here the code of my read callback (this function is generic for my 2
> programs):
> 
> ----------------------------------------------
> 
> static void _lev_read_cb(struct bufferevent *bev, void *arg)
> {
> 	struct talk_socket *sock = arg;
> 	struct evbuffer *evbuf_in;
> 	struct evbuffer_ptr it;
> 	size_t in_avail;
> 
> 	if(sock->mode == talk_socket_mode_listener
> 	   && sock->status == talk_socket_status_idle)
> 	{
> 		_change_stage(sock, talk_socket_status_reading);
> 	}
> 
> 	if(sock->status != talk_socket_status_reading)
> 	{
> 		log_err("sock %p: Invalid sock status: '%s' (mode=%s)",
> 			     sock, talk_socket_get_status_str(sock),
> 			     sock->mode == talk_socket_mode_listener ? "in" : "out");
> 		return;
> 	}
> 
> 	/* Here, we have 2 cases:
> 	 *  1) we received an incomplete message, so we stay waiting
>          *     for the remaining data ... ;
> 	 *  2) there is a complete and so, we need to proceed ;
> 	 */
> 	evbuf_in = bufferevent_get_input(bev);
> 	it = evbuffer_search(evbuf_in, "\0", 1, NULL);
> 	if(it.pos < 0)
> 	{
> 		log_warn("sock %p: message is incomplete."
> 			      " Waiting the remaining data...",
> 			      sock);
> 		return;
> 	}
> 
> 	if(it.pos == 0)
> 	{
> 		log_err("sock %p: empty message !",
> 			     sock);
> 		evbuffer_drain(evbuf_in, 1); /*remove the \0 character*/
> 		goto cleanup;
> 	}
> 
> 	if((size_t)it.pos + 1 > TALK_MSG_MAX_LEN)
> 	{
> 		log_err("sock %p: msg received is too big (%zu bytes)!",
> 			     sock, (size_t)it.pos + 1);
> 		evbuffer_drain(evbuf_in, (size_t)it.pos + 1);
> 		goto cleanup;
> 	}
> 
> 	/* According to the mode, instantiate a new request
>          * if we're receiving a message from nowhere.
> 	 */
> 	if(sock->mode == talk_socket_mode_listener)
> 	{
> 		sock->request = talk_request_new(NULL, NULL);
> 		if(sock->request == NULL)
> 		{
> 			log_err("talk_request_new() failed");
> 			return;
> 		}
> 
> 		log_dbg5("sock %p: [req:#%u] receiving new request...",
> 			      sock, sock->request->id);
> 	}
> 	else
> 	{
> 		log_dbg5("sock %p: [req:#%u] receiving reply...",
> 			      sock, sock->request->id);
> 	}
> 
> 	/* copy message (with the \0 character) to msg struct */
> 	evbuffer_remove(evbuf_in,
>                         sock->request->in.data, (size_t)it.pos + 1);
> 
> 	log_dbg4("sock %p: [req:#%u] receiving msg of %zu byte(s): %s",
> 		      sock, sock->request->id, (size_t)it.pos,
> 		      sock->request->in.data);
> 
> 	/* call user callback */
> 	sock->reqev_cb(sock,
> 		       talk_socket_reqev_recv,
> 		       sock->request, sock->cb_userdata);
> 
> 	if(sock->mode == talk_socket_mode_listener)
> 	{
> 		/* send the reply */
> 		_change_stage(sock, talk_socket_status_idle);
> 
> 		/* some debug */
> 		log_dbg4("sock %p: [req:#%u] sending reply...",
> 			      sock, sock->request->id);
> 
> 		/* write to the socket */
> 		if(talk_socket_write(sock,
> 					sock->request) != 0)
> 		{
> 			log_err("sock %p: [req:#%u]"
> 				     " talk_socket_write() failed",
> 				     sock, sock->request->id);
> 			talk_request_free(sock->request);
> 			sock->request = NULL;
> 		}
> 	}
> 	else
> 	{
> 		/* the user have free the request in callback.
> 		 * So here, we reset request pointer and change
>                  * the status of the socket
> 		 */
> 		sock->request = NULL;
> 		_change_stage(sock, talk_socket_status_idle);
> 	}
> 
> cleanup:
> 	/* there is remaining data in evbuffer ?
> 	 */
> 	in_avail = evbuffer_get_length(evbuf_in);
> 	if(in_avail > 0)
> 	{
> 		if(sock->mode == talk_socket_mode_listener)
> 		{
> 			log_dbg4("sock %p: remaining %zu byte(s)"
>                                  " of data",
> 				      sock, in_avail);
> 		}
> 		else
> 		{
> 			log_err("sock %p: remaining data after
>                                  " proceeding a msg ! Discarding"
>                                  " %zu byte(s) !",
> 				     sock, in_avail);
> 			evbuffer_drain(evbuf_in, in_avail);
> 		}
> 	}
> }
> ----------------------------------------------

Unless you have very-big buffers, and if you external functions are
fast, this callback must work fast (at the first glance).
Could you attach debugger to process when you see slow downs, and post
backtrace here (What I'm wonder is it enter the read callback or not)

> 
> Moreover, after read callback is called, I don't have any remaining data in
> the input evbuffer (I don't see the log messages).

You mean that at the first line of you read callback:
evbuffer_get_length(bufferevent_get_input(bev)) === 0 ?

> Writing this email, I'm wondering if the evbuffer_get_length() function
> works in all contexts ? I mean, I disable the read callback when the
> "socket" is writing for example.

AFAIK it must work in all contexts, this is just reading
buffer->total_len + locking, but if you disable read event then you will
not recieve any new data, and maybe you are expecting it?

Cheers,
Azat.
***********************************************************************
To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
unsubscribe libevent-users    in the body.