[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] Request for comments on correct use of libevent



Hello,

I am writing a simple DHT application and using libevent in my
application. The libevent version is 2.0 which is installed by
default on my Arch Linux setup.

After some searching I found an example to use libevent in a
multi-threaded setup.
http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249

The above code runs one event base in its own separate thread and
uses a threaded workqueue model to service requests on incoming
connections.

I have taken that sample code and modified it for my own purposes.
While my application is functional when I deploy eight of them in
my setup and then check for the basic functionality, while trying
to run multiple operations from multiple application nodes I get
some messages which I do not fully comprehend at the moment.

They are as follows:
1. Epoll %s(%d) on fd %d failed.  Old events were %d; read change was %d (%s); write change was %d (%s); close change was %d (%s)

Which seems to be printed from epoll.c by event warn mechanism

2. Too many open files

3. One more which I failed to note and was related to EPOLL MOD

I know perhaps this is too much to ask, but can someone have a
look at the code and tell me things I am doing wrong. I have gone
through the libevent documentation online but I am not sure if I
am doing things the right way at the moment.

The peer part of dht I try to make synchronous by trying to break
the event base loop with event loopbreak call in the read cb however
sometimes if the callback never gets called which happens, the peer
part of the DHT is just stuck, so to circumvent this I use a timeout.

The code is attached. I have tried to keep it clean. I am no networking
or socket expert, appreciate any comments or feedback.

Thanks & Regards,
Sanchayan Maity.



/*
 * Copyright (C) 2015 Sanchayan Maity <maitysanchayan@xxxxxxxxx>
 *
 * Author: Sanchayan Maity <maitysanchayan@xxxxxxxxx>
 *						   <smaity1@xxxxxxxxxxxx>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 and
 * only version 2 as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 */
#include "common.h"

/*
 * Struct to carry around connection (client)-specific data.
 */
typedef struct client {
    /* The client's socket. */
    int fd;

    /* The event_base for this client. */
    struct event_base *evbase;

    /* The bufferedevent for this client. */
    struct bufferevent *buf_ev;

    /* The output buffer for this client. */
    struct evbuffer *output_buffer;

    /* Here you can add your own application-specific attributes which
     * are connection-specific. */
} client_t;

/* Table entry */
struct node_t {
	/* Key */
	char *key;
	/* Value of the key */
	char *value;
	/* Next entry in chain */
    struct node_t *next;
};

static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE];

/*
 * We use a read write lock to protect against concurrent
 * write to the hash table. It is ok to have concurrent
 * readers. We do not use a mutex as that will reduce
 * reader concurrency to a single thread at a time.
 */
pthread_rwlock_t ht_lock;

static struct event_base *evbase_accept;
/* Workqueue for the server */
static workqueue_t workqueue;

/*
 * Id of server. We use this to pick up appropriate
 * IP/port parameters from the file.
 */
static int server_id;

struct server_p {
	/* IP to which server will bind to */
	char *serverip;
	/* Port on which server will listen */
	char *serverport;
};

/*
 * We use this to store server parameters of the eight
 * servers information read from file
 */
static struct server_p servers[MAX_NO_OF_SERVERS];

pthread_t server_thread;
static volatile bool perf_test_on = false;

/* Signal handler function (defined below). */
static void sighandler(int signal);

/*
 * Struct to carry around server connection specific data
 */
struct server_conn {
	/* Event base for the peer server connection */
	struct event_base *evbase;
	/* Buffer event for the peer server connection */
	struct bufferevent *bev;
	/* Output buffer for the peer server connection */
	struct evbuffer *output_buffer;
};

static struct server_conn sconn;

struct timeval peer_timeout;

static void closeClient(client_t *client) {
    if (client != NULL) {
        if (client->fd >= 0) {
            close(client->fd);
            client->fd = -1;
        }
    }
}

static void closeAndFreeClient(client_t *client) {
    if (client != NULL) {
        closeClient(client);
        if (client->buf_ev != NULL) {
            bufferevent_free(client->buf_ev);
            client->buf_ev = NULL;
        }
        if (client->evbase != NULL) {
            event_base_free(client->evbase);
            client->evbase = NULL;
        }
        if (client->output_buffer != NULL) {
            evbuffer_free(client->output_buffer);
            client->output_buffer = NULL;
        }
        free(client);
    }
}

/*
 * https://en.wikipedia.org/wiki/Jenkins_hash_function
 */
unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) {
	unsigned int hash, i;

	for(hash = i = 0; i < len; ++i)
	{
		hash += key[i];
		hash += (hash << 10);
		hash ^= (hash >> 6);
	}

	hash += (hash << 3);
	hash ^= (hash >> 11);
	hash += (hash << 15);

	return hash;
}

/*
 * Hash function. Use the above and restrict result as per the table
 * size. We use a non power of 2 to get good hashing. Refer CLRS.
 */
unsigned int hash_server(const char *key) {
	return jenkins_one_at_a_time_hash(key, KEY_SIZE) % SERVER_HASH_TABLE_SIZE;
}

/*
 * Hash function. Use the above and restrict result as per the table
 * size. We use a non power of 2 to get good hashing. Refer CLRS.
 */
unsigned int hash_peer(const char *key) {
	return jenkins_one_at_a_time_hash(key, KEY_SIZE) % PEER_HASH_TABLE_SIZE;
}

/*
 * Get the node data pointer as per the key
 */
struct node_t *hash_table_get(const char *key) {
	struct node_t *np;

	pthread_rwlock_rdlock(&ht_lock);
	for(np = hashtable[hash_server(key)]; np != NULL; np = np->next) {
		if (strncmp(key, np->key, KEY_SIZE) == 0) {
			pthread_rwlock_unlock(&ht_lock);
			/* We found the key */
			return np;
		}
	}

	pthread_rwlock_unlock(&ht_lock);
	return NULL;
}

/*
 * We determine if the key being added exists. If it does, the
 * new value supersedes the old one, else we create a new entry
 * and add the key/value pair. Return NULL on any error.
 */
struct node_t *hash_table_put(const char *key, const char *value) {
	unsigned int hashval;
	struct node_t *np;

	pthread_rwlock_wrlock(&ht_lock);
	if ((np = hash_table_get(key)) == NULL) { /* Not found */
		np = (struct node_t *)malloc(sizeof(*np));
		if (np == NULL || (np->key = strndup(key, KEY_SIZE)) == NULL)
			goto error;

		/* Find the bucket position and add at 'head' location */
		hashval = hash_server(key);
		np->next = hashtable[hashval];
		hashtable[hashval] = np;
	} else /* Already there */
		free((void *) np->value);	/* Free previous value */
	if ((np->value = strndup(value, VALUE_SIZE)) == NULL)
		goto error;

	return np;

error:
	pthread_rwlock_unlock(&ht_lock);
	return NULL;
}

/*
 * Return 0 on success and 1 on failure
 */
unsigned int hash_table_delete(const char *key) {
	struct node_t *np1, *np2;
	unsigned int hashval;

	hashval = hash_server(key);

	pthread_rwlock_wrlock(&ht_lock);
	for (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 = np1, np1 = np1->next)
		if (strncmp(key, np1->key, KEY_SIZE) == 0) {
			/* Found a match */
			free(np1->key);
			free(np1->value);
			if (np2 == NULL)
				/* At the beginning? */
				hashtable[hashval] = np1->next;
			else
				/* In the middle or at the end? */
				np2->next = np1->next;
		free(np1);

		pthread_rwlock_unlock(&ht_lock);
		return 0;
	}

	pthread_rwlock_unlock(&ht_lock);
	return 1;
}

/*
 * Called by libevent when there is data to read.
 */
void server_buffered_on_read(struct bufferevent *bev, void *arg) {
    client_t *client = (client_t *)arg;
    char data[MESSAGE_SIZE] = {0};
	struct node_t *np;

    struct evbuffer *input;
    input = bufferevent_get_input(bev);
	/*
	 * Remove a chunk of data from the input buffer, copying it into our
	 * local array (data).
	 */
	evbuffer_remove(input, data, MESSAGE_SIZE); 

	/*
	 * Check if the message is meant for us and the command. While sending
	 * data back, we use the same buffer keeping header and key information
	 * but changing/appending the value of the key and setting the OK or ERROR command
	 */
	if (data[0] == 'C' && data[1] == 'S') {
		switch (data[2]) {
		case CMD_PUT:
			if ((hash_table_put(&data[4], &data[24])) == NULL)
				data[3] = CMD_ERR;
			else
				data[3] = CMD_OK;
			evbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
			break;
		case CMD_GET:
			np = hash_table_get(&data[4]);
			if (np == NULL) {
				data[3] = CMD_ERR;
				evbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
			} else {
				data[3] = CMD_OK;
				strncpy(&data[24], np->value, VALUE_SIZE);
				evbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
			}
			break;
		case CMD_DEL:
			if (hash_table_delete(&data[4]))
				data[3] = CMD_ERR;
			else
				data[3] = CMD_OK;
			evbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
			break;
		default:
			break;
		}
	}

    /* Send the results to the peer.  This actually only queues the results
     * for sending. Sending will occur asynchronously, handled by libevent. */
    if (bufferevent_write_buffer(bev, client->output_buffer)) {
        errorOut("Error sending data to client on fd %d\n", client->fd);
        closeClient(client);
    }
}

/**
 * Called by libevent when the write buffer reaches 0.  We only
 * provide this because libevent expects it, but we don't use it.
 */
void server_buffered_on_write(struct bufferevent *bev, void *arg) {
}

/**
 * Called by libevent when there is an error on underlying the socket
 * descriptor.
 */
void server_buffered_on_error(struct bufferevent *bev, short events, void *arg) {
	if (events & BEV_EVENT_ERROR) {
		printf("Server: Got an error: %s\n",
			   evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
	}
	closeClient((client_t *)arg);
}

static void server_job_function(struct job *job) {
    client_t *client = (client_t *)job->user_data;

	event_base_dispatch(client->evbase);
	closeAndFreeClient(client);
	free(job);
}

/*
 * This function will be called by libevent when there is a connection
 * ready to be accepted.
 */
void server_on_accept(evutil_socket_t fd, short ev, void *arg) {
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    workqueue_t *workqueue = (workqueue_t *)arg;
    client_t *client;
    job_t *job;

    client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    if (client_fd < 0) {
        warn("accept failed");
        return;
    }

    /* Set the client socket to non-blocking mode. */
    if (evutil_make_socket_nonblocking(client_fd) < 0) {
        warn("failed to set client socket to non-blocking");
        close(client_fd);
        return;
    }

    /* Create a client object. */
    if ((client = (client_t *)malloc(sizeof(*client))) == NULL) {
        warn("failed to allocate memory for client state");
        close(client_fd);
        return;
    }
    memset(client, 0, sizeof(*client));
    client->fd = client_fd;

    if ((client->output_buffer = evbuffer_new()) == NULL) {
        warn("client output buffer allocation failed");
        closeAndFreeClient(client);
        return;
    }

    if ((client->evbase = event_base_new()) == NULL) {
        warn("client event_base creation failed");
        closeAndFreeClient(client);
        return;
    }

    client->buf_ev = bufferevent_socket_new(client->evbase, client_fd,
						BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
    if ((client->buf_ev) == NULL) {
        warn("client bufferevent creation failed");
        closeAndFreeClient(client);
        return;
    }

	/*
	 * We trigger the read callback only when there atleast MESSAGE_SIZE
	 * bytes to be read.
	 */
	bufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE, 0);
    bufferevent_setcb(client->buf_ev, server_buffered_on_read, server_buffered_on_write,
                      server_buffered_on_error, client);

    /* We have to enable it before our callbacks will be
     * called. */
    bufferevent_enable(client->buf_ev, EV_READ);

    /* Create a job object and add it to the work queue. */
    if ((job = (job_t *)malloc(sizeof(*job))) == NULL) {
        warn("failed to allocate memory for job state");
        closeAndFreeClient(client);
        return;
    }
    job->job_function = server_job_function;
    job->user_data = client;

    workqueue_add_job(workqueue, job);
}

/*
 * Run the server.  This function blocks, only returning when the server has 
 * terminated.
 */
void *runServer(void *arg) {
	struct server_p *server_i = (struct server_p *)arg;
    evutil_socket_t listenfd;
	struct sockaddr_in listen_addr;
	struct event *ev_accept;
    int reuseaddr_on;
	struct sigaction siginfo;

    /* Set signal handlers */
    sigset_t sigset;
    sigemptyset(&sigset);
    siginfo.sa_handler = sighandler;
    siginfo.sa_mask = sigset;
    siginfo.sa_flags = SA_RESTART;

    sigaction(SIGINT, &siginfo, NULL);
    sigaction(SIGTERM, &siginfo, NULL);

	/* Create our listening socket */
	listenfd = socket(AF_INET, SOCK_STREAM, 0);
	if (listenfd < 0) {
		err(1, "listen failed");
		goto exit;
	}

	memset(&listen_addr, 0, sizeof(listen_addr));
	listen_addr.sin_family = AF_INET;
	listen_addr.sin_port = htons(atoi(server_i->serverport));
	inet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr);

    if (bind(listenfd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) {
        err(1, "bind failed");
		goto exit;
    }

    if (listen(listenfd, CONNECTION_BACKLOG) < 0) {
        err(1, "listen failed");
		goto exit;
    }
	reuseaddr_on = 1;
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
		sizeof(reuseaddr_on));

    /*
	 * Set the socket to non-blocking, this is essential in event
     * based programming with libevent.
	 */
    if (evutil_make_socket_nonblocking(listenfd) < 0) {
        err(1, "failed to set server socket to non-blocking");
		goto exit;
    }

    if ((evbase_accept = event_base_new()) == NULL) {
        perror("Unable to create socket accept event base");
        close(listenfd);
        goto exit;
    }

    /* Initialize work queue. */
    if (workqueue_init(&workqueue, NUM_THREADS)) {
        perror("Failed to create work queue");
        close(listenfd);
        workqueue_shutdown(&workqueue);
        goto exit;
    }

    /* We now have a listening socket, we create a read event to
     * be notified when a client connects. */
    ev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST,
					server_on_accept, (void *)&workqueue);
    event_add(ev_accept, NULL);

    printf("Server running.\n");

    /* Start the event loop. */
    event_base_dispatch(evbase_accept);

    event_base_free(evbase_accept);
    evbase_accept = NULL;

    close(listenfd);

    printf("Server shutdown.\n");

exit:
	pthread_exit(NULL);
}

/*
 * Kill the server.  This function can be called from another thread to kill
 * the server, causing runServer() to return.
 */
void killServer(void) {
    fprintf(stdout, "Stopping socket listener event loop.\n");
    if (event_base_loopexit(evbase_accept, NULL)) {
        perror("Error shutting down server");
    }
    fprintf(stdout, "Stopping workers.\n");
    workqueue_shutdown(&workqueue);
}

static void sighandler(int signal) {
    fprintf(stdout, "Received signal %d: %s.  Shutting down.\n", signal,
            strsignal(signal));
    killServer();
}

/*
 * Taken from http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c
 * We modify it to use strtok_r the MT safe variant. strtok is not MT safe.
 */
char** str_split(char* a_str, const char a_delim) {
	char** result    = 0;
	size_t count     = 0;
	char* tmp        = a_str;
	char* last_comma = 0;
	char* save		 = 0;
	char delim[2];
	delim[0] = a_delim;
	delim[1] = 0;

	/* Count how many elements will be extracted. */
	while (*tmp) {
		if (a_delim == *tmp) {
			count++;
			last_comma = tmp;
		}
		tmp++;
	}

	/* Add space for trailing token. */
	count += last_comma < (a_str + strlen(a_str) - 1);

	/* Add space for terminating null string so caller
	knows where the list of returned strings ends. */
	count++;

	result = (char **)malloc(sizeof(char*) * count);

	if (result) {
		size_t idx  = 0;
		//char* token = strtok(a_str, delim);
		char* token = strtok_r(a_str, delim, &save);

		while (token) {
			*(result + idx++) = strdup(token);
			//token = strtok(0, delim);
			token = strtok_r(0, delim, &save);
		}
		assert(idx == count - 1);
		*(result + idx) = 0;
	}

	return result;
}

static void print_key(char *data, int start_pos) {
	int i;

	for (i = start_pos; i < start_pos + KEY_SIZE; i++)
		printf("%c", data[i]);
}

static void closeAndFreeServerConn(struct server_conn *client) {
    if (client != NULL) {
        if (client->bev != NULL) {
            bufferevent_free(client->bev);
            client->bev = NULL;
        }
        if (client->evbase != NULL) {
            event_base_free(client->evbase);
            client->evbase = NULL;
        }
        if (client->output_buffer != NULL) {
            evbuffer_free(client->output_buffer);
            client->output_buffer = NULL;
        }
    }
}

/*
 * Called by libevent when there is data to read.
 */
void peer_buffered_on_read(struct bufferevent *bev, void *arg) {
	struct server_conn *sconn = (struct server_conn *)arg;
    char data[MESSAGE_SIZE] = {0};
 
    struct evbuffer *input;
    input = bufferevent_get_input(bev);

	/*
	 * Remove a chunk of data from the input buffer, copying it into our
	 * local array (data).
	 */
 	evbuffer_remove(input, data, MESSAGE_SIZE); 

	/*
	 * Reply from server
	 */
	if (data[0] == 'C' && data[1] == 'S') {
		switch (data[2]) {
		case CMD_PUT:
			if (data[3] == CMD_OK) {
				if (!perf_test_on)
					printf("\nPut operation successful\n");
			} else {
				if (!perf_test_on)
					printf("\nPut operation failed\n");
			}

			if (!perf_test_on) {
				printf("Key was: ");
				print_key(data, 4);
				printf("\n");
				printf("Value was: %s\n\n", &data[24]);
			}
			break;
		case CMD_GET:
			if (data[3] == CMD_OK) {
				if (!perf_test_on) {
					printf("\nGet operation successful\n");
					printf("Key was: ");
					print_key(data, 4);
					printf("\n");
					printf("Value is: %s\n\n", &data[24]);
				}
			} else {
				if (!perf_test_on) {
					printf("\nGet operation failed\n");			
					printf("Key was: ");
					print_key(data, 4);
					printf("\n\n");
				}
			}
			break;
		case CMD_DEL:
			if (data[3] == CMD_OK) {
				if (!perf_test_on)
					printf("\nDelete operation successful\n");
			}
			else {
				if (!perf_test_on)
					printf("\nDelete operation failed\n");
			}

			if (!perf_test_on) {
				printf("Key was: ");
				print_key(data, 4);
				printf("\n\n");
			}
			break;
		default:
			break;
		}
	}

	if (sconn->evbase != NULL)
		event_base_loopbreak(sconn->evbase);
}

/*
 * Called by libevent when there is an error on underlying the socket
 * descriptor.
 */
void peer_buffered_on_error(struct bufferevent *bev, short events, void *arg) {
	if (events & BEV_EVENT_ERROR) {
		printf("Peer: Got an error: %s\n",
			evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
	}
}

/**
 * Called by libevent when the write buffer reaches 0.  We only
 * provide this because libevent expects it, but we don't use it.
 */
void peer_buffered_on_write(struct bufferevent *bev, void *arg) {
}

static int make_server_connection(int lserver_id) {
	struct sockaddr_in sin;

	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(atoi(servers[lserver_id].serverport));
	inet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr);

	if ((sconn.output_buffer = evbuffer_new()) == NULL) {
		closeAndFreeServerConn(&sconn);
		return -1;
	}

	if ((sconn.evbase = event_base_new()) == NULL) {
		closeAndFreeServerConn(&sconn);
		return -1;
	}

	if ((sconn.bev = bufferevent_socket_new(sconn.evbase,
			-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS)) == NULL) {
		closeAndFreeServerConn(&sconn);
		return -1;
	}

	/*
	 * We trigger the read callback only when there atleast MESSAGE_SIZE
	 * bytes to be read.
	 */
	bufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE, MESSAGE_SIZE, 0);
	bufferevent_setcb(sconn.bev, peer_buffered_on_read,
					  peer_buffered_on_write, peer_buffered_on_error, &sconn);
	bufferevent_enable(sconn.bev, EV_READ);

	if (bufferevent_socket_connect(sconn.bev,
			(struct sockaddr *)&sin, sizeof(sin)) < 0) {
		closeAndFreeServerConn(&sconn);
		return -1;
	}

	return 0;
}

void put_at_server(const char *key, const char *value) {
	char data[MESSAGE_SIZE];
	int lserver_id;

	memset(data, 0x30, MESSAGE_SIZE);
	data[0] = 'C';
	data[1] = 'S';
	data[2] = CMD_PUT;

	strncpy(&data[4], key, KEY_SIZE);
	strncpy(&data[24], value, VALUE_SIZE);

	lserver_id = hash_peer(&data[4]);
	if (!perf_test_on)
		printf("PUT Server Id: %d\n", lserver_id);

	if (make_server_connection(lserver_id) != 0) {
		printf("Connecting to Server %d failed.\n", lserver_id);
		return;
	}

	evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
	bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
	event_base_loopexit(sconn.evbase, &peer_timeout);
	event_base_loop(sconn.evbase, 0);

	/*
	 * Though our functional specifications are to maintain the connections
	 * in an open state once we conenct, but for some reason if the connections
	 * are not closed, the buffers do not seem to flush and we never exit the
	 * event loop above OR the server does not exit the event loop connection
	 * even though there was only one event from the event base to handle and
	 * the other end does not get the data or the callbacks get called. So we
	 * explicitly close the connection here. Did try a few different mechanisms
	 * for handling this scenario both at server and peer end but none seems to
	 * work. So we explicitly deviate from our design recommendation of keeping
	 * connections open unfortunately. Needs some further indepth investigation.
	 */
	if (sconn.bev != NULL) {
		bufferevent_free(sconn.bev);
		sconn.bev = NULL;
	}
	if (sconn.evbase != NULL) {
		event_base_free(sconn.evbase);
		sconn.evbase = NULL;
	}
	if (sconn.output_buffer != NULL) {
		evbuffer_free(sconn.output_buffer);
		sconn.output_buffer = NULL;
	}
}

void get_from_server(const char *key) {
	char data[MESSAGE_SIZE];
	int lserver_id;

	memset(data, 0x30, MESSAGE_SIZE);
	data[0] = 'C';
	data[1] = 'S';
	data[2] = CMD_GET;

	strncpy(&data[4], key, KEY_SIZE);

	lserver_id = hash_peer(&data[4]);
	if (!perf_test_on)
		printf("GET Server Id: %d\n", lserver_id);

	if (make_server_connection(lserver_id) != 0) {
		printf("Connecting to Server %d failed.\n", lserver_id);
		return;
	}

	evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
	bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
	event_base_loopexit(sconn.evbase, &peer_timeout);
	event_base_loop(sconn.evbase, 0);

	/*
	 * Though our functional specifications are to maintain the connections
	 * in an open state once we conenct, but for some reason if the connections
	 * are not closed, the buffers do not seem to flush and we never exit the
	 * event loop above OR the server does not exit the event loop connection
	 * even though there was only one event from the event base to handle and
	 * the other end does not get the data or the callbacks get called. So we
	 * explicitly close the connection here. Did try a few different mechanisms
	 * for handling this scenario both at server and peer end but none seems to
	 * work. So we explicitly deviate from our design recommendation of keeping
	 * connections open unfortunately. Needs some further indepth investigation.
	 */
	if (sconn.bev != NULL) {
		bufferevent_free(sconn.bev);
		sconn.bev = NULL;
	}
	if (sconn.evbase != NULL) {
		event_base_free(sconn.evbase);
		sconn.evbase = NULL;
	}
	if (sconn.output_buffer != NULL) {
		evbuffer_free(sconn.output_buffer);
		sconn.output_buffer = NULL;
	}
}

void delete_from_server(const char *key) {
	char data[MESSAGE_SIZE];
	int lserver_id;

	memset(data, 0x30, MESSAGE_SIZE);
	data[0] = 'C';
	data[1] = 'S';
	data[2] = CMD_DEL;

	strncpy(&data[4], key, KEY_SIZE);

	lserver_id = hash_peer(&data[4]);
	if (!perf_test_on)
		printf("DEL Server Id: %d\n", lserver_id);

	if (make_server_connection(lserver_id) != 0) {
		printf("Connecting to Server %d failed.\n", lserver_id);
		return;
	}

	evbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
	bufferevent_write_buffer(sconn.bev, sconn.output_buffer);
	event_base_loopexit(sconn.evbase, &peer_timeout);
	event_base_loop(sconn.evbase, 0);

	/*
	 * Though our functional specifications are to maintain the connections
	 * in an open state once we conenct, but for some reason if the connections
	 * are not closed, the buffers do not seem to flush and we never exit the
	 * event loop above OR the server does not exit the event loop connection
	 * even though there was only one event from the event base to handle and
	 * the other end does not get the data or the callbacks get called. So we
	 * explicitly close the connection here. Did try a few different mechanisms
	 * for handling this scenario both at server and peer end but none seems to
	 * work. So we explicitly deviate from our design recommendation of keeping
	 * connections open unfortunately. Needs some further indepth investigation.
	 */
	if (sconn.bev != NULL) {
		bufferevent_free(sconn.bev);
		sconn.bev = NULL;
	}
	if (sconn.evbase != NULL) {
		event_base_free(sconn.evbase);
		sconn.evbase = NULL;
	}
	if (sconn.output_buffer != NULL) {
		evbuffer_free(sconn.output_buffer);
		sconn.output_buffer = NULL;
	}
}

static void discard_logs(int severity, const char *msg) {
	
}

void set_libevent_logging(void) {
	event_enable_debug_logging(EVENT_DBG_ALL);
	//event_set_log_callback(discard_logs);
}

void rand_str(char *dest, size_t length) {
	char charset[] = "0123456789"
						"abcdefghijklmnopqrstuvwxyz"
						"ABCDEFGHIJKLMNOPQRSTUVWXYZ";

	while (length-- > 0) {
		size_t index = (double) rand() / RAND_MAX * (sizeof charset - 1);
		*dest++ = charset[index];
	}

	*dest = '\0';
}

void run_perf_tests(void) {
	int i;
	char key[KEY_SIZE];
	char value[VALUE_SIZE];
	struct timeval t1, t2;
	double elapsedtime, totalelapsedtime;

	perf_test_on = true;

	/* Run PUT tests */
	for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
		memset(key, 0x30, KEY_SIZE);
		memset(value, 0x30, VALUE_SIZE);

		key[0] = server_id + 0x31;
		/* Generate a key depending on loop iteration */
		snprintf(&key[1], KEY_SIZE - 1, "%d", i);
		/* Generate a random string value */
		rand_str(value, VALUE_SIZE);
		gettimeofday(&t1, NULL);
		put_at_server(key, value);
		gettimeofday(&t2, NULL);
		// compute and print the elapsed time in millisec
		elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      // sec to ms
		elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us to ms
		printf("Elapsed time: %f\n", elapsedtime);
		totalelapsedtime += elapsedtime;
	}
	printf("Average Response time for PUT requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);

	printf("Press any key to continue\n");
	getchar();

	/* Run GET tests */
	for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
		memset(key, 0x30, KEY_SIZE);

		key[0] = server_id + 0x31;
		/* Generate a key depending on loop iteration */
		snprintf(&key[1], KEY_SIZE - 1, "%d", i);
		gettimeofday(&t1, NULL);
		get_from_server(key);
		gettimeofday(&t2, NULL);
		// compute and print the elapsed time in millisec
		elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      // sec to ms
		elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us to ms
		printf("Elapsed time: %f\n", elapsedtime);
		totalelapsedtime += elapsedtime;
	}
	printf("Average Response time for GET requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);

	printf("Press any key to continue\n");
	getchar();

	/* Run DEL tests */
	for (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
		memset(key, 0x30, KEY_SIZE);

		key[0] = server_id + 0x31;
		/* Generate a key depending on loop iteration */
		snprintf(&key[1], KEY_SIZE - 1, "%d", i);
		gettimeofday(&t1, NULL);
		delete_from_server(key);
		gettimeofday(&t2, NULL);
		// compute and print the elapsed time in millisec
		elapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;      // sec to ms
		elapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;   // us to ms
		printf("Elapsed time: %f\n", elapsedtime);
		totalelapsedtime += elapsedtime;
	}
	printf("Average Response time for DEL requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);

	perf_test_on = false;
}

void input_process(void) {
	char key[KEY_SIZE];
	char value[VALUE_SIZE];
	bool exitloop = false;
	int input;

	/*
	 * We run the peer functionality in this main thread
	 */
	while (!exitloop) {
		printf("\nSelect Operation\n");
		printf("(1) Put (2) Get (3) Delete (4) Run tests\n");
		printf("(5) Exit");
		printf("\nPlease enter your selection (1-5):\t");

		scanf("%d", &input);
		getchar();

		switch (input) {
		case 1:
			printf("Enter Key: \t");
			scanf("%s", key);
			printf("\n");
			printf("Enter Value: \t");
			scanf("%s", value);
			printf("\n");
			put_at_server(key, value);
			break;
		case 2:
			printf("Enter Key: \t");
			scanf("%s", key);
			printf("\n");
			get_from_server(key);
			break;
		case 3:
			printf("Enter Key: \t");
			scanf("%s", key);
			printf("\n");
			delete_from_server(key);
			break;
		case 4:
			run_perf_tests();
			break;
		case 5:
			killServer();
			exitloop = true;
			break;
		default:
			printf("\n\nWrong value: %d\n", input);
			break;
		}

		/* Reset buffers for next iteration */
		memset(key, 0x30, KEY_SIZE);
		memset(value, 0x30, VALUE_SIZE);
	}
}

int main(int argc, char *argv[]) {
	FILE *fp;
	int i;
	int error;
	int count_of_servers;
	ssize_t read;
	size_t len = 0;
	char *line = NULL;
	char **tokens = NULL;

	if (argc != 3) {
		/*
		 * We do not validate or error check any of the arguments
		 * Please enter correct arguments
		 */
		printf("Usage: ./server <serverid#> </path/to/server/conf/file>");
		exit(1);
	}

	server_id = atoi(argv[1]) - 1;
	if ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) {
		printf("Incorrect server id provided\n");
		exit(1);
	}

	fp = fopen(argv[2], "r");
	if (fp == NULL) {
		perror("Could not open server configuration file");
		exit(1);
	}

	/*
	 * We now extract the IP and port information of 8 servers
	 * which will be involved in this setup.
	 */
	count_of_servers = 0;
	while ((read = getline(&line, &len, fp)) != -1) {

		if (count_of_servers == MAX_NO_OF_SERVERS)
			break;

		tokens = str_split(line, ' ');
		if (tokens) {
			servers[count_of_servers].serverip = *(tokens);
			servers[count_of_servers].serverport = *(tokens + 1);
		}
		free(line);
		line = NULL;

		count_of_servers++;
	}

	fclose(fp);

	if (pthread_rwlock_init(&ht_lock, NULL) != 0) {
		perror("Lock init failed");
		goto free_tokens;
	}

	/*
	 * Start the server. We start the server in another thread so
	 * the libevent event loop for dispatching events on connections
	 * can work separately which would otherwise block.
	 */
	error = pthread_create(&server_thread, NULL, &runServer, &servers[server_id]);
	if (error != 0) {
		perror("Error in server thread creation");
		goto free_tokens;
	}

	peer_timeout.tv_sec = 0;
	peer_timeout.tv_usec = PEER_CONN_TIMEOUT;

	input_process();

	for (i = count_of_servers - 1; i >= 0; --i) {
		free(servers[i].serverip);
		free(servers[i].serverport);
	}

	pthread_rwlock_destroy(&ht_lock);

	return 0;

free_tokens:
	for (i = count_of_servers - 1; i >= 0; --i) {
		free(servers[i].serverip);
		free(servers[i].serverport);
	}

	return -1;
}