[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] Abnormal delay on read



Hello there,

I have a problem with libevent (version 2.0.21).

I have 2 programs. One is sending requests (lets call it program A) to the other (program B) which process the request and send the response.

There are 2 kinds of request: one sent each 10 seconds (like a ping) and another one which is sent asynchronously when A has to.

All is working good at the beginning. But after a while, there is a delay between the request sent by A and the trigger of the read callback on B. All the request are shifted. During this delay, B proceed old ping requests... And this delay gradually increases.

I did a tcpdump capture on each side and I see there isn't a delay on the ethernet layer.

It's like there a mysterious queue somewhere...

However, I think I do the correct things in my read callback and I don't use watermarks.

Maybe, someone has already seen this kind of bad behavior ?

Here the code of my read callback (this function is generic for my 2 programs):

----------------------------------------------

static void _lev_read_cb(struct bufferevent *bev, void *arg)
{
	struct talk_socket *sock = arg;
	struct evbuffer *evbuf_in;
	struct evbuffer_ptr it;
	size_t in_avail;

	if(sock->mode == talk_socket_mode_listener
	   && sock->status == talk_socket_status_idle)
	{
		_change_stage(sock, talk_socket_status_reading);
	}

	if(sock->status != talk_socket_status_reading)
	{
		log_err("sock %p: Invalid sock status: '%s' (mode=%s)",
			     sock, talk_socket_get_status_str(sock),
			     sock->mode == talk_socket_mode_listener ? "in" : "out");
		return;
	}

	/* Here, we have 2 cases:
	 *  1) we received an incomplete message, so we stay waiting
         *     for the remaining data ... ;
	 *  2) there is a complete and so, we need to proceed ;
	 */
	evbuf_in = bufferevent_get_input(bev);
	it = evbuffer_search(evbuf_in, "\0", 1, NULL);
	if(it.pos < 0)
	{
		log_warn("sock %p: message is incomplete."
			      " Waiting the remaining data...",
			      sock);
		return;
	}

	if(it.pos == 0)
	{
		log_err("sock %p: empty message !",
			     sock);
		evbuffer_drain(evbuf_in, 1); /*remove the \0 character*/
		goto cleanup;
	}

	if((size_t)it.pos + 1 > TALK_MSG_MAX_LEN)
	{
		log_err("sock %p: msg received is too big (%zu bytes)!",
			     sock, (size_t)it.pos + 1);
		evbuffer_drain(evbuf_in, (size_t)it.pos + 1);
		goto cleanup;
	}

	/* According to the mode, instantiate a new request
         * if we're receiving a message from nowhere.
	 */
	if(sock->mode == talk_socket_mode_listener)
	{
		sock->request = talk_request_new(NULL, NULL);
		if(sock->request == NULL)
		{
			log_err("talk_request_new() failed");
			return;
		}

		log_dbg5("sock %p: [req:#%u] receiving new request...",
			      sock, sock->request->id);
	}
	else
	{
		log_dbg5("sock %p: [req:#%u] receiving reply...",
			      sock, sock->request->id);
	}

	/* copy message (with the \0 character) to msg struct */
	evbuffer_remove(evbuf_in,
                        sock->request->in.data, (size_t)it.pos + 1);

	log_dbg4("sock %p: [req:#%u] receiving msg of %zu byte(s): %s",
		      sock, sock->request->id, (size_t)it.pos,
		      sock->request->in.data);

	/* call user callback */
	sock->reqev_cb(sock,
		       talk_socket_reqev_recv,
		       sock->request, sock->cb_userdata);

	if(sock->mode == talk_socket_mode_listener)
	{
		/* send the reply */
		_change_stage(sock, talk_socket_status_idle);

		/* some debug */
		log_dbg4("sock %p: [req:#%u] sending reply...",
			      sock, sock->request->id);

		/* write to the socket */
		if(talk_socket_write(sock,
					sock->request) != 0)
		{
			log_err("sock %p: [req:#%u]"
				     " talk_socket_write() failed",
				     sock, sock->request->id);
			talk_request_free(sock->request);
			sock->request = NULL;
		}
	}
	else
	{
		/* the user have free the request in callback.
		 * So here, we reset request pointer and change
                 * the status of the socket
		 */
		sock->request = NULL;
		_change_stage(sock, talk_socket_status_idle);
	}

cleanup:
	/* there is remaining data in evbuffer ?
	 */
	in_avail = evbuffer_get_length(evbuf_in);
	if(in_avail > 0)
	{
		if(sock->mode == talk_socket_mode_listener)
		{
			log_dbg4("sock %p: remaining %zu byte(s)"
                                 " of data",
				      sock, in_avail);
		}
		else
		{
			log_err("sock %p: remaining data after
                                 " proceeding a msg ! Discarding"
                                 " %zu byte(s) !",
				     sock, in_avail);
			evbuffer_drain(evbuf_in, in_avail);
		}
	}
}
----------------------------------------------

Moreover, after read callback is called, I don't have any remaining data in the input evbuffer (I don't see the log messages).

Writing this email, I'm wondering if the evbuffer_get_length() function works in all contexts ? I mean, I disable the read callback when the "socket" is writing for example.

I missed something ?

Anthony V.
***********************************************************************
To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
unsubscribe libevent-users    in the body.