[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Request for comments on correct use of libevent



also DHT, a real DHT, is never simple; but I figure you are talking about a simple consistent hash?

On Tue, Oct 6, 2015 at 2:42 PM Mark Ellzey <mthomas@xxxxxxxxxx> wrote:
A good resource for the "proper way" to use libevent is to read the libevent book.Âhttp://www.wangafu.net/~nickm/libevent-book/

On Tue, Oct 6, 2015 at 2:51 AM Azat Khuzhin <a3at.mail@xxxxxxxxx> wrote:
On Mon, Oct 05, 2015 at 10:43:19PM +0530, Sanchayan Maity wrote:
> Hello,
>
> I am writing a simple DHT application and using libevent in my
> application. The libevent version is 2.0 which is installed by
> default on my Arch Linux setup.
>
> After some searching I found an example to use libevent in a
> multi-threaded setup.
> http://roncemer.com/software-development/multi-threaded-libevent-server-example/#comment-3249
>
> The above code runs one event base in its own separate thread and
> uses a threaded workqueue model to service requests on incoming
> connections.

Hi,

Personally I prefer to use pipe/socketpair for scheduling jobs to
another threads, in a nutshell it works like this:
- acceptcb:
 write(notify_fd_write_side, user-struct, sizeof(user-struct))
- per thread event base readcb for notify_fd read side:
 read(notify_fd_read_size, buf, sizeof(user-struct))

It is much simpler then using pthread_cond_*, but both of them must
works at first glance.

Here is a few implementations using this mechanism:
- https://github.com/ellzey/libevhtp/blob/libevhtp2/src/evhtp2/evhtp_thr.c#L60
- https://github.com/azat/boostcache/blob/libevent-aio-v2/src/kernel/net/commandserver.cpp#L206

Other comments are inlined in.

> I have taken that sample code and modified it for my own purposes.
> While my application is functional when I deploy eight of them in
> my setup and then check for the basic functionality, while trying
> to run multiple operations from multiple application nodes I get
> some messages which I do not fully comprehend at the moment.
>
> They are as follows:
> 1. Epoll %s(%d) on fd %d failed. Old events were %d; read change was %d (%s); write change was %d (%s); close change was %d (%s)
>
> Which seems to be printed from epoll.c by event warn mechanism

Yep.

>
> 2. Too many open files

This can be because of too much files (and indeed you have overhead --
since you have own base for every socket, and it means that you will
have epollfd for every socket which is not how this must works -- see
commends to you sample).

And I guess that this is the main problem that you have, other (with
epoll failed message) will be fixed when you fix EMFILE problem.

But if you still sure that all fds that you have are needful you can
increase limits:

$ ulimit -n $((1<<20))
Or permanently in /etc/security/limits.conf.

But that limits are limited with fs.file-max sysctl, so if you need more
increase it.

> 3. One more which I failed to note and was related to EPOLL MOD
>
> I know perhaps this is too much to ask, but can someone have a
> look at the code and tell me things I am doing wrong. I have gone
> through the libevent documentation online but I am not sure if I
> am doing things the right way at the moment.
>
> The peer part of dht I try to make synchronous by trying to break
> the event base loop with event loopbreak call in the read cb however
> sometimes if the callback never gets called which happens, the peer
> part of the DHT is just stuck, so to circumvent this I use a timeout.
>
> The code is attached. I have tried to keep it clean. I am no networking
> or socket expert, appreciate any comments or feedback.
>
> Thanks & Regards,
> Sanchayan Maity.
>
>
>

> /*
>Â * Copyright (C) 2015 Sanchayan Maity <maitysanchayan@xxxxxxxxx>
>Â *
>Â * Author: Sanchayan Maity <maitysanchayan@xxxxxxxxx>
>Â *Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â<smaity1@xxxxxxxxxxxx>
>Â *
>Â * This program is free software; you can redistribute it and/or modify
>Â * it under the terms of the GNU General Public License version 2 and
>Â * only version 2 as published by the Free Software Foundation.
>Â *
>Â * This program is distributed in the hope that it will be useful,
>Â * but WITHOUT ANY WARRANTY; without even the implied warranty of
> * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
>Â * GNU General Public License for more details.
>Â */
> #include "common.h"
>
> /*
>Â * Struct to carry around connection (client)-specific data.
>Â */
> typedef struct client {
>Â Â Â/* The client's socket. */
>Â Â Âint fd;
>
>Â Â Â/* The event_base for this client. */
>Â Â Âstruct event_base *evbase;
>
>Â Â Â/* The bufferedevent for this client. */
>Â Â Âstruct bufferevent *buf_ev;
>
>Â Â Â/* The output buffer for this client. */
>Â Â Âstruct evbuffer *output_buffer;
>
>Â Â Â/* Here you can add your own application-specific attributes which
>Â Â Â * are connection-specific. */
> } client_t;
>
> /* Table entry */
> struct node_t {
>Â Â Â Â/* Key */
>Â Â Â Âchar *key;
>Â Â Â Â/* Value of the key */
>Â Â Â Âchar *value;
>Â Â Â Â/* Next entry in chain */
>Â Â Âstruct node_t *next;
> };
>
> static struct node_t *hashtable[SERVER_HASH_TABLE_SIZE];
>
> /*
>Â * We use a read write lock to protect against concurrent
>Â * write to the hash table. It is ok to have concurrent
>Â * readers. We do not use a mutex as that will reduce
>Â * reader concurrency to a single thread at a time.
>Â */
> pthread_rwlock_t ht_lock;
>
> static struct event_base *evbase_accept;
> /* Workqueue for the server */
> static workqueue_t workqueue;
>
> /*
>Â * Id of server. We use this to pick up appropriate
>Â * IP/port parameters from the file.
>Â */
> static int server_id;
>
> struct server_p {
>Â Â Â Â/* IP to which server will bind to */
>Â Â Â Âchar *serverip;
>Â Â Â Â/* Port on which server will listen */
>Â Â Â Âchar *serverport;
> };
>
> /*
>Â * We use this to store server parameters of the eight
>Â * servers information read from file
>Â */
> static struct server_p servers[MAX_NO_OF_SERVERS];
>
> pthread_t server_thread;
> static volatile bool perf_test_on = false;
>
> /* Signal handler function (defined below). */
> static void sighandler(int signal);
>
> /*
>Â * Struct to carry around server connection specific data
>Â */
> struct server_conn {
>Â Â Â Â/* Event base for the peer server connection */
>Â Â Â Âstruct event_base *evbase;
>Â Â Â Â/* Buffer event for the peer server connection */
>Â Â Â Âstruct bufferevent *bev;
>Â Â Â Â/* Output buffer for the peer server connection */
>Â Â Â Âstruct evbuffer *output_buffer;
> };
>
> static struct server_conn sconn;
>
> struct timeval peer_timeout;
>
> static void closeClient(client_t *client) {
>Â Â Âif (client != NULL) {
>Â Â Â Â Âif (client->fd >= 0) {
>Â Â Â Â Â Â Âclose(client->fd);
>Â Â Â Â Â Â Âclient->fd = -1;
>Â Â Â Â Â}
>Â Â Â}
> }
>
> static void closeAndFreeClient(client_t *client) {
>Â Â Âif (client != NULL) {
>Â Â Â Â ÂcloseClient(client);
>Â Â Â Â Âif (client->buf_ev != NULL) {
>Â Â Â Â Â Â Âbufferevent_free(client->buf_ev);
>Â Â Â Â Â Â Âclient->buf_ev = NULL;
>Â Â Â Â Â}
>Â Â Â Â Âif (client->evbase != NULL) {
>Â Â Â Â Â Â Âevent_base_free(client->evbase);
>Â Â Â Â Â Â Âclient->evbase = NULL;
>Â Â Â Â Â}
>Â Â Â Â Âif (client->output_buffer != NULL) {
>Â Â Â Â Â Â Âevbuffer_free(client->output_buffer);
>Â Â Â Â Â Â Âclient->output_buffer = NULL;
>Â Â Â Â Â}
>Â Â Â Â Âfree(client);
>Â Â Â}
> }
>
> /*
>Â * https://en.wikipedia.org/wiki/Jenkins_hash_function
>Â */
> unsigned int jenkins_one_at_a_time_hash(const char *key, size_t len) {
>Â Â Â Âunsigned int hash, i;
>
>Â Â Â Âfor(hash = i = 0; i < len; ++i)
>Â Â Â Â{
>Â Â Â Â Â Â Â Âhash += key[i];
>Â Â Â Â Â Â Â Âhash += (hash << 10);
>Â Â Â Â Â Â Â Âhash ^= (hash >> 6);
>Â Â Â Â}
>
>Â Â Â Âhash += (hash << 3);
>Â Â Â Âhash ^= (hash >> 11);
>Â Â Â Âhash += (hash << 15);
>
>Â Â Â Âreturn hash;
> }
>
> /*
>Â * Hash function. Use the above and restrict result as per the table
>Â * size. We use a non power of 2 to get good hashing. Refer CLRS.
>Â */
> unsigned int hash_server(const char *key) {
>Â Â Â Âreturn jenkins_one_at_a_time_hash(key, KEY_SIZE) % SERVER_HASH_TABLE_SIZE;
> }
>
> /*
>Â * Hash function. Use the above and restrict result as per the table
>Â * size. We use a non power of 2 to get good hashing. Refer CLRS.
>Â */
> unsigned int hash_peer(const char *key) {
>Â Â Â Âreturn jenkins_one_at_a_time_hash(key, KEY_SIZE) % PEER_HASH_TABLE_SIZE;
> }
>
> /*
>Â * Get the node data pointer as per the key
>Â */
> struct node_t *hash_table_get(const char *key) {
>Â Â Â Âstruct node_t *np;
>
>Â Â Â Âpthread_rwlock_rdlock(&ht_lock);
>Â Â Â Âfor(np = hashtable[hash_server(key)]; np != NULL; np = np->next) {
>Â Â Â Â Â Â Â Âif (strncmp(key, np->key, KEY_SIZE) == 0) {
>Â Â Â Â Â Â Â Â Â Â Â Âpthread_rwlock_unlock(&ht_lock);
>Â Â Â Â Â Â Â Â Â Â Â Â/* We found the key */
>Â Â Â Â Â Â Â Â Â Â Â Âreturn np;
>Â Â Â Â Â Â Â Â}
>Â Â Â Â}
>
>Â Â Â Âpthread_rwlock_unlock(&ht_lock);
>Â Â Â Âreturn NULL;
> }
>
> /*
>Â * We determine if the key being added exists. If it does, the
>Â * new value supersedes the old one, else we create a new entry
>Â * and add the key/value pair. Return NULL on any error.
>Â */
> struct node_t *hash_table_put(const char *key, const char *value) {
>Â Â Â Âunsigned int hashval;
>Â Â Â Âstruct node_t *np;
>
>Â Â Â Âpthread_rwlock_wrlock(&ht_lock);
>Â Â Â Âif ((np = hash_table_get(key)) == NULL) { /* Not found */
>Â Â Â Â Â Â Â Ânp = (struct node_t *)malloc(sizeof(*np));
>Â Â Â Â Â Â Â Âif (np == NULL || (np->key = strndup(key, KEY_SIZE)) == NULL)
>Â Â Â Â Â Â Â Â Â Â Â Âgoto error;
>
>Â Â Â Â Â Â Â Â/* Find the bucket position and add at 'head' location */
>Â Â Â Â Â Â Â Âhashval = hash_server(key);
>Â Â Â Â Â Â Â Ânp->next = hashtable[hashval];
>Â Â Â Â Â Â Â Âhashtable[hashval] = np;
>Â Â Â Â} else /* Already there */
>Â Â Â Â Â Â Â Âfree((void *) np->value);Â Â Â Â/* Free previous value */
>Â Â Â Âif ((np->value = strndup(value, VALUE_SIZE)) == NULL)
>Â Â Â Â Â Â Â Âgoto error;
>
>Â Â Â Âreturn np;
>
> error:
>Â Â Â Âpthread_rwlock_unlock(&ht_lock);
>Â Â Â Âreturn NULL;
> }
>
> /*
>Â * Return 0 on success and 1 on failure
>Â */
> unsigned int hash_table_delete(const char *key) {
>Â Â Â Âstruct node_t *np1, *np2;
>Â Â Â Âunsigned int hashval;
>
>Â Â Â Âhashval = hash_server(key);
>
>Â Â Â Âpthread_rwlock_wrlock(&ht_lock);
>Â Â Â Âfor (np1 = hashtable[hashval], np2 = NULL; np1 != NULL; np2 = np1, np1 = np1->next)
>Â Â Â Â Â Â Â Âif (strncmp(key, np1->key, KEY_SIZE) == 0) {
>Â Â Â Â Â Â Â Â Â Â Â Â/* Found a match */
>Â Â Â Â Â Â Â Â Â Â Â Âfree(np1->key);
>Â Â Â Â Â Â Â Â Â Â Â Âfree(np1->value);
>Â Â Â Â Â Â Â Â Â Â Â Âif (np2 == NULL)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â/* At the beginning? */
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âhashtable[hashval] = np1->next;
>Â Â Â Â Â Â Â Â Â Â Â Âelse
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â/* In the middle or at the end? */
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Ânp2->next = np1->next;
>Â Â Â Â Â Â Â Âfree(np1);
>
>Â Â Â Â Â Â Â Âpthread_rwlock_unlock(&ht_lock);
>Â Â Â Â Â Â Â Âreturn 0;
>Â Â Â Â}
>
>Â Â Â Âpthread_rwlock_unlock(&ht_lock);
>Â Â Â Âreturn 1;
> }
>
> /*
>Â * Called by libevent when there is data to read.
>Â */
> void server_buffered_on_read(struct bufferevent *bev, void *arg) {
>Â Â Âclient_t *client = (client_t *)arg;
>Â Â Âchar data[MESSAGE_SIZE] = {0};
>Â Â Â Âstruct node_t *np;
>
>Â Â Âstruct evbuffer *input;
>Â Â Âinput = bufferevent_get_input(bev);
>Â Â Â Â/*
>Â Â Â Â * Remove a chunk of data from the input buffer, copying it into our
>Â Â Â Â * local array (data).
>Â Â Â Â */
>Â Â Â Âevbuffer_remove(input, data, MESSAGE_SIZE);
>
>Â Â Â Â/*
>Â Â Â Â * Check if the message is meant for us and the command. While sending
>Â Â Â Â * data back, we use the same buffer keeping header and key information
>Â Â Â Â * but changing/appending the value of the key and setting the OK or ERROR command
>Â Â Â Â */
>Â Â Â Âif (data[0] == 'C' && data[1] == 'S') {
>Â Â Â Â Â Â Â Âswitch (data[2]) {
>Â Â Â Â Â Â Â Âcase CMD_PUT:
>Â Â Â Â Â Â Â Â Â Â Â Âif ((hash_table_put(&data[4], &data[24])) == NULL)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_ERR;
>Â Â Â Â Â Â Â Â Â Â Â Âelse
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_OK;
>Â Â Â Â Â Â Â Â Â Â Â Âevbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase CMD_GET:
>Â Â Â Â Â Â Â Â Â Â Â Ânp = hash_table_get(&data[4]);
>Â Â Â Â Â Â Â Â Â Â Â Âif (np == NULL) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_ERR;
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âevbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
>Â Â Â Â Â Â Â Â Â Â Â Â} else {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_OK;
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âstrncpy(&data[24], np->value, VALUE_SIZE);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âevbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
>Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase CMD_DEL:
>Â Â Â Â Â Â Â Â Â Â Â Âif (hash_table_delete(&data[4]))
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_ERR;
>Â Â Â Â Â Â Â Â Â Â Â Âelse
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdata[3] = CMD_OK;
>Â Â Â Â Â Â Â Â Â Â Â Âevbuffer_add(client->output_buffer, data, MESSAGE_SIZE);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âdefault:
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Â}
>Â Â Â Â}
>
>  Â/* Send the results to the peer. This actually only queues the results
>Â Â Â * for sending. Sending will occur asynchronously, handled by libevent. */
>Â Â Âif (bufferevent_write_buffer(bev, client->output_buffer)) {
>Â Â Â Â ÂerrorOut("Error sending data to client on fd %d\n", client->fd);
>Â Â Â Â ÂcloseClient(client);
>Â Â Â}
> }
>
> /**
> * Called by libevent when the write buffer reaches 0. We only
>Â * provide this because libevent expects it, but we don't use it.
>Â */
> void server_buffered_on_write(struct bufferevent *bev, void *arg) {
> }
>
> /**
>Â * Called by libevent when there is an error on underlying the socket
>Â * descriptor.
>Â */
> void server_buffered_on_error(struct bufferevent *bev, short events, void *arg) {
>Â Â Â Âif (events & BEV_EVENT_ERROR) {
>Â Â Â Â Â Â Â Âprintf("Server: Got an error: %s\n",
>Â Â Â Â Â Â Â Â Â Â Â Â Â evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
>Â Â Â Â}
>Â Â Â ÂcloseClient((client_t *)arg);
> }
>
> static void server_job_function(struct job *job) {
>Â Â Âclient_t *client = (client_t *)job->user_data;
>
>Â Â Â Âevent_base_dispatch(client->evbase);
>Â Â Â ÂcloseAndFreeClient(client);
>Â Â Â Âfree(job);
> }
>
> /*
>Â * This function will be called by libevent when there is a connection
>Â * ready to be accepted.
>Â */
> void server_on_accept(evutil_socket_t fd, short ev, void *arg) {
>Â Â Âint client_fd;
>Â Â Âstruct sockaddr_in client_addr;
>Â Â Âsocklen_t client_len = sizeof(client_addr);
>Â Â Âworkqueue_t *workqueue = (workqueue_t *)arg;
>Â Â Âclient_t *client;
>Â Â Âjob_t *job;
>
>Â Â Âclient_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
>Â Â Âif (client_fd < 0) {
>Â Â Â Â Âwarn("accept failed");
>Â Â Â Â Âreturn;
>Â Â Â}
>
>Â Â Â/* Set the client socket to non-blocking mode. */
>Â Â Âif (evutil_make_socket_nonblocking(client_fd) < 0) {
>Â Â Â Â Âwarn("failed to set client socket to non-blocking");
>Â Â Â Â Âclose(client_fd);
>Â Â Â Â Âreturn;
>Â Â Â}
>
>Â Â Â/* Create a client object. */
>Â Â Âif ((client = (client_t *)malloc(sizeof(*client))) == NULL) {
>Â Â Â Â Âwarn("failed to allocate memory for client state");
>Â Â Â Â Âclose(client_fd);
>Â Â Â Â Âreturn;
>Â Â Â}
>Â Â Âmemset(client, 0, sizeof(*client));
>Â Â Âclient->fd = client_fd;
>
>Â Â Âif ((client->output_buffer = evbuffer_new()) == NULL) {
>Â Â Â Â Âwarn("client output buffer allocation failed");
>Â Â Â Â ÂcloseAndFreeClient(client);
>Â Â Â Â Âreturn;
>Â Â Â}
>
>Â Â Âif ((client->evbase = event_base_new()) == NULL) {
>Â Â Â Â Âwarn("client event_base creation failed");
>Â Â Â Â ÂcloseAndFreeClient(client);
>Â Â Â Â Âreturn;
>Â Â Â}

Are you sure you need this overhead (i.e. base per socket)?
AFAICS you don't need this.

>
>Â Â Âclient->buf_ev = bufferevent_socket_new(client->evbase, client_fd,
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂBEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
>Â Â Âif ((client->buf_ev) == NULL) {
>Â Â Â Â Âwarn("client bufferevent creation failed");
>Â Â Â Â ÂcloseAndFreeClient(client);
>Â Â Â Â Âreturn;
>Â Â Â}
>
>Â Â Â Â/*
>Â Â Â Â * We trigger the read callback only when there atleast MESSAGE_SIZE
>Â Â Â Â * bytes to be read.
>Â Â Â Â */
>Â Â Â Âbufferevent_setwatermark(client->buf_ev, EV_READ, MESSAGE_SIZE, 0);
>Â Â Âbufferevent_setcb(client->buf_ev, server_buffered_on_read, server_buffered_on_write,
>Â Â Â Â Â Â Â Â Â Â Â Âserver_buffered_on_error, client);
>
>Â Â Â/* We have to enable it before our callbacks will be
>Â Â Â * called. */
>Â Â Âbufferevent_enable(client->buf_ev, EV_READ);
>
>Â Â Â/* Create a job object and add it to the work queue. */
>Â Â Âif ((job = (job_t *)malloc(sizeof(*job))) == NULL) {
>Â Â Â Â Âwarn("failed to allocate memory for job state");
>Â Â Â Â ÂcloseAndFreeClient(client);
>Â Â Â Â Âreturn;
>Â Â Â}
>Â Â Âjob->job_function = server_job_function;
>Â Â Âjob->user_data = client;
>
>Â Â Âworkqueue_add_job(workqueue, job);
> }
>
> /*
> * Run the server. This function blocks, only returning when the server has
>Â * terminated.
>Â */
> void *runServer(void *arg) {
>Â Â Â Âstruct server_p *server_i = (struct server_p *)arg;
>Â Â Âevutil_socket_t listenfd;
>Â Â Â Âstruct sockaddr_in listen_addr;
>Â Â Â Âstruct event *ev_accept;
>Â Â Âint reuseaddr_on;
>Â Â Â Âstruct sigaction siginfo;
>
>Â Â Â/* Set signal handlers */
>Â Â Âsigset_t sigset;
>Â Â Âsigemptyset(&sigset);
>Â Â Âsiginfo.sa_handler = sighandler;
>Â Â Âsiginfo.sa_mask = sigset;
>Â Â Âsiginfo.sa_flags = SA_RESTART;
>
>Â Â Âsigaction(SIGINT, &siginfo, NULL);
>Â Â Âsigaction(SIGTERM, &siginfo, NULL);
>
>Â Â Â Â/* Create our listening socket */
>Â Â Â Âlistenfd = socket(AF_INET, SOCK_STREAM, 0);
>Â Â Â Âif (listenfd < 0) {
>Â Â Â Â Â Â Â Âerr(1, "listen failed");
>Â Â Â Â Â Â Â Âgoto exit;
>Â Â Â Â}
>
>Â Â Â Âmemset(&listen_addr, 0, sizeof(listen_addr));
>Â Â Â Âlisten_addr.sin_family = AF_INET;
>Â Â Â Âlisten_addr.sin_port = htons(atoi(server_i->serverport));
>Â Â Â Âinet_pton(AF_INET, server_i->serverip, &listen_addr.sin_addr);
>
>Â Â Âif (bind(listenfd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0) {
>Â Â Â Â Âerr(1, "bind failed");
>Â Â Â Â Â Â Â Âgoto exit;
>Â Â Â}
>
>Â Â Âif (listen(listenfd, CONNECTION_BACKLOG) < 0) {
>Â Â Â Â Âerr(1, "listen failed");
>Â Â Â Â Â Â Â Âgoto exit;
>Â Â Â}
>Â Â Â Âreuseaddr_on = 1;
>Â Â Â Âsetsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
>Â Â Â Â Â Â Â Âsizeof(reuseaddr_on));
>
>Â Â Â/*
>Â Â Â Â * Set the socket to non-blocking, this is essential in event
>Â Â Â * based programming with libevent.
>Â Â Â Â */
>Â Â Âif (evutil_make_socket_nonblocking(listenfd) < 0) {
>Â Â Â Â Âerr(1, "failed to set server socket to non-blocking");
>Â Â Â Â Â Â Â Âgoto exit;
>Â Â Â}
>
>Â Â Âif ((evbase_accept = event_base_new()) == NULL) {
>Â Â Â Â Âperror("Unable to create socket accept event base");
>Â Â Â Â Âclose(listenfd);
>Â Â Â Â Âgoto exit;
>Â Â Â}
>
>Â Â Â/* Initialize work queue. */
>Â Â Âif (workqueue_init(&workqueue, NUM_THREADS)) {
>Â Â Â Â Âperror("Failed to create work queue");
>Â Â Â Â Âclose(listenfd);
>Â Â Â Â Âworkqueue_shutdown(&workqueue);
>Â Â Â Â Âgoto exit;
>Â Â Â}
>
>Â Â Â/* We now have a listening socket, we create a read event to
>Â Â Â * be notified when a client connects. */
>Â Â Âev_accept = event_new(evbase_accept, listenfd, EV_READ | EV_PERSIST,
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âserver_on_accept, (void *)&workqueue);
>Â Â Âevent_add(ev_accept, NULL);
>
>Â Â Âprintf("Server running.\n");
>
>Â Â Â/* Start the event loop. */
>Â Â Âevent_base_dispatch(evbase_accept);
>
>Â Â Âevent_base_free(evbase_accept);
>Â Â Âevbase_accept = NULL;
>
>Â Â Âclose(listenfd);
>
>Â Â Âprintf("Server shutdown.\n");
>
> exit:
>Â Â Â Âpthread_exit(NULL);
> }
>
> /*
> * Kill the server. This function can be called from another thread to kill
>Â * the server, causing runServer() to return.
>Â */
> void killServer(void) {
>Â Â Âfprintf(stdout, "Stopping socket listener event loop.\n");
>Â Â Âif (event_base_loopexit(evbase_accept, NULL)) {
>Â Â Â Â Âperror("Error shutting down server");
>Â Â Â}
>Â Â Âfprintf(stdout, "Stopping workers.\n");
>Â Â Âworkqueue_shutdown(&workqueue);
> }
>
> static void sighandler(int signal) {
>  Âfprintf(stdout, "Received signal %d: %s. Shutting down.\n", signal,
>Â Â Â Â Â Â Âstrsignal(signal));
>Â Â ÂkillServer();
> }
>
> /*
>Â * Taken from http://stackoverflow.com/questions/9210528/split-string-with-delimiters-in-c
>Â * We modify it to use strtok_r the MT safe variant. strtok is not MT safe.
>Â */
> char** str_split(char* a_str, const char a_delim) {
>   Âchar** result  = 0;
>   Âsize_t count  Â= 0;
>   Âchar* tmp    = a_str;
>Â Â Â Âchar* last_comma = 0;
>   Âchar* save       Â= 0;
>Â Â Â Âchar delim[2];
>Â Â Â Âdelim[0] = a_delim;
>Â Â Â Âdelim[1] = 0;
>
>Â Â Â Â/* Count how many elements will be extracted. */
>Â Â Â Âwhile (*tmp) {
>Â Â Â Â Â Â Â Âif (a_delim == *tmp) {
>Â Â Â Â Â Â Â Â Â Â Â Âcount++;
>Â Â Â Â Â Â Â Â Â Â Â Âlast_comma = tmp;
>Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Âtmp++;
>Â Â Â Â}
>
>Â Â Â Â/* Add space for trailing token. */
>Â Â Â Âcount += last_comma < (a_str + strlen(a_str) - 1);
>
>Â Â Â Â/* Add space for terminating null string so caller
>Â Â Â Âknows where the list of returned strings ends. */
>Â Â Â Âcount++;
>
>Â Â Â Âresult = (char **)malloc(sizeof(char*) * count);
>
>Â Â Â Âif (result) {
>       Âsize_t idx = 0;
>Â Â Â Â Â Â Â Â//char* token = strtok(a_str, delim);
>Â Â Â Â Â Â Â Âchar* token = strtok_r(a_str, delim, &save);
>
>Â Â Â Â Â Â Â Âwhile (token) {
>Â Â Â Â Â Â Â Â Â Â Â Â*(result + idx++) = strdup(token);
>Â Â Â Â Â Â Â Â Â Â Â Â//token = strtok(0, delim);
>Â Â Â Â Â Â Â Â Â Â Â Âtoken = strtok_r(0, delim, &save);
>Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Âassert(idx == count - 1);
>Â Â Â Â Â Â Â Â*(result + idx) = 0;
>Â Â Â Â}
>
>Â Â Â Âreturn result;
> }
>
> static void print_key(char *data, int start_pos) {
>Â Â Â Âint i;
>
>Â Â Â Âfor (i = start_pos; i < start_pos + KEY_SIZE; i++)
>Â Â Â Â Â Â Â Âprintf("%c", data[i]);
> }
>
> static void closeAndFreeServerConn(struct server_conn *client) {
>Â Â Âif (client != NULL) {
>Â Â Â Â Âif (client->bev != NULL) {
>Â Â Â Â Â Â Âbufferevent_free(client->bev);
>Â Â Â Â Â Â Âclient->bev = NULL;
>Â Â Â Â Â}
>Â Â Â Â Âif (client->evbase != NULL) {
>Â Â Â Â Â Â Âevent_base_free(client->evbase);
>Â Â Â Â Â Â Âclient->evbase = NULL;
>Â Â Â Â Â}
>Â Â Â Â Âif (client->output_buffer != NULL) {
>Â Â Â Â Â Â Âevbuffer_free(client->output_buffer);
>Â Â Â Â Â Â Âclient->output_buffer = NULL;
>Â Â Â Â Â}
>Â Â Â}
> }
>
> /*
>Â * Called by libevent when there is data to read.
>Â */
> void peer_buffered_on_read(struct bufferevent *bev, void *arg) {
>Â Â Â Âstruct server_conn *sconn = (struct server_conn *)arg;
>Â Â Âchar data[MESSAGE_SIZE] = {0};
>
>Â Â Âstruct evbuffer *input;
>Â Â Âinput = bufferevent_get_input(bev);
>
>Â Â Â Â/*
>Â Â Â Â * Remove a chunk of data from the input buffer, copying it into our
>Â Â Â Â * local array (data).
>Â Â Â Â */
>Â Â Â Âevbuffer_remove(input, data, MESSAGE_SIZE);
>
>Â Â Â Â/*
>Â Â Â Â * Reply from server
>Â Â Â Â */
>Â Â Â Âif (data[0] == 'C' && data[1] == 'S') {
>Â Â Â Â Â Â Â Âswitch (data[2]) {
>Â Â Â Â Â Â Â Âcase CMD_PUT:
>Â Â Â Â Â Â Â Â Â Â Â Âif (data[3] == CMD_OK) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nPut operation successful\n");
>Â Â Â Â Â Â Â Â Â Â Â Â} else {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nPut operation failed\n");
>Â Â Â Â Â Â Â Â Â Â Â Â}
>
>Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Key was: ");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprint_key(data, 4);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Value was: %s\n\n", &data[24]);
>Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase CMD_GET:
>Â Â Â Â Â Â Â Â Â Â Â Âif (data[3] == CMD_OK) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nGet operation successful\n");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Key was: ");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprint_key(data, 4);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Value is: %s\n\n", &data[24]);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Â} else {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nGet operation failed\n");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Key was: ");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprint_key(data, 4);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n\n");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase CMD_DEL:
>Â Â Â Â Â Â Â Â Â Â Â Âif (data[3] == CMD_OK) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nDelete operation successful\n");
>Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Âelse {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\nDelete operation failed\n");
>Â Â Â Â Â Â Â Â Â Â Â Â}
>
>Â Â Â Â Â Â Â Â Â Â Â Âif (!perf_test_on) {
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("Key was: ");
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprint_key(data, 4);
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n\n");
>Â Â Â Â Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âdefault:
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Â}
>Â Â Â Â}
>
>Â Â Â Âif (sconn->evbase != NULL)
>Â Â Â Â Â Â Â Âevent_base_loopbreak(sconn->evbase);
> }
>
> /*
>Â * Called by libevent when there is an error on underlying the socket
>Â * descriptor.
>Â */
> void peer_buffered_on_error(struct bufferevent *bev, short events, void *arg) {
>Â Â Â Âif (events & BEV_EVENT_ERROR) {
>Â Â Â Â Â Â Â Âprintf("Peer: Got an error: %s\n",
>Â Â Â Â Â Â Â Â Â Â Â Âevutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
>Â Â Â Â}
> }
>
> /**
> * Called by libevent when the write buffer reaches 0. We only
>Â * provide this because libevent expects it, but we don't use it.
>Â */
> void peer_buffered_on_write(struct bufferevent *bev, void *arg) {
> }
>
> static int make_server_connection(int lserver_id) {
>Â Â Â Âstruct sockaddr_in sin;
>
>Â Â Â Âmemset(&sin, 0, sizeof(sin));
>Â Â Â Âsin.sin_family = AF_INET;
>Â Â Â Âsin.sin_port = htons(atoi(servers[lserver_id].serverport));
>Â Â Â Âinet_pton(AF_INET, servers[lserver_id].serverip, &sin.sin_addr);
>
>Â Â Â Âif ((sconn.output_buffer = evbuffer_new()) == NULL) {
>Â Â Â Â Â Â Â ÂcloseAndFreeServerConn(&sconn);
>Â Â Â Â Â Â Â Âreturn -1;
>Â Â Â Â}
>
>Â Â Â Âif ((sconn.evbase = event_base_new()) == NULL) {
>Â Â Â Â Â Â Â ÂcloseAndFreeServerConn(&sconn);
>Â Â Â Â Â Â Â Âreturn -1;
>Â Â Â Â}
>
>Â Â Â Âif ((sconn.bev = bufferevent_socket_new(sconn.evbase,
>Â Â Â Â Â Â Â Â Â Â Â Â-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS)) == NULL) {
>Â Â Â Â Â Â Â ÂcloseAndFreeServerConn(&sconn);
>Â Â Â Â Â Â Â Âreturn -1;
>Â Â Â Â}
>
>Â Â Â Â/*
>Â Â Â Â * We trigger the read callback only when there atleast MESSAGE_SIZE
>Â Â Â Â * bytes to be read.
>Â Â Â Â */
>Â Â Â Âbufferevent_setwatermark(sconn.bev, EV_READ | EV_WRITE, MESSAGE_SIZE, 0);
>Â Â Â Âbufferevent_setcb(sconn.bev, peer_buffered_on_read,
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âpeer_buffered_on_write, peer_buffered_on_error, &sconn);
>Â Â Â Âbufferevent_enable(sconn.bev, EV_READ);
>
>Â Â Â Âif (bufferevent_socket_connect(sconn.bev,
>Â Â Â Â Â Â Â Â Â Â Â Â(struct sockaddr *)&sin, sizeof(sin)) < 0) {
>Â Â Â Â Â Â Â ÂcloseAndFreeServerConn(&sconn);
>Â Â Â Â Â Â Â Âreturn -1;
>Â Â Â Â}
>
>Â Â Â Âreturn 0;
> }
>
> void put_at_server(const char *key, const char *value) {
>Â Â Â Âchar data[MESSAGE_SIZE];
>Â Â Â Âint lserver_id;
>
>Â Â Â Âmemset(data, 0x30, MESSAGE_SIZE);
>Â Â Â Âdata[0] = 'C';
>Â Â Â Âdata[1] = 'S';
>Â Â Â Âdata[2] = CMD_PUT;
>
>Â Â Â Âstrncpy(&data[4], key, KEY_SIZE);
>Â Â Â Âstrncpy(&data[24], value, VALUE_SIZE);
>
>Â Â Â Âlserver_id = hash_peer(&data[4]);
>Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Âprintf("PUT Server Id: %d\n", lserver_id);
>
>Â Â Â Âif (make_server_connection(lserver_id) != 0) {
>Â Â Â Â Â Â Â Âprintf("Connecting to Server %d failed.\n", lserver_id);
>Â Â Â Â Â Â Â Âreturn;
>Â Â Â Â}
>
>Â Â Â Âevbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>Â Â Â Âbufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>Â Â Â Âevent_base_loopexit(sconn.evbase, &peer_timeout);
>Â Â Â Âevent_base_loop(sconn.evbase, 0);
>
>Â Â Â Â/*
>Â Â Â Â * Though our functional specifications are to maintain the connections
>Â Â Â Â * in an open state once we conenct, but for some reason if the connections
>Â Â Â Â * are not closed, the buffers do not seem to flush and we never exit the
>Â Â Â Â * event loop above OR the server does not exit the event loop connection
>Â Â Â Â * even though there was only one event from the event base to handle and
>Â Â Â Â * the other end does not get the data or the callbacks get called. So we
>Â Â Â Â * explicitly close the connection here. Did try a few different mechanisms
>Â Â Â Â * for handling this scenario both at server and peer end but none seems to
>Â Â Â Â * work. So we explicitly deviate from our design recommendation of keeping
>Â Â Â Â * connections open unfortunately. Needs some further indepth investigation.
>Â Â Â Â */
>Â Â Â Âif (sconn.bev != NULL) {
>Â Â Â Â Â Â Â Âbufferevent_free(sconn.bev);
>Â Â Â Â Â Â Â Âsconn.bev = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.evbase != NULL) {
>Â Â Â Â Â Â Â Âevent_base_free(sconn.evbase);
>Â Â Â Â Â Â Â Âsconn.evbase = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.output_buffer != NULL) {
>Â Â Â Â Â Â Â Âevbuffer_free(sconn.output_buffer);
>Â Â Â Â Â Â Â Âsconn.output_buffer = NULL;
>Â Â Â Â}
> }
>
> void get_from_server(const char *key) {
>Â Â Â Âchar data[MESSAGE_SIZE];
>Â Â Â Âint lserver_id;
>
>Â Â Â Âmemset(data, 0x30, MESSAGE_SIZE);
>Â Â Â Âdata[0] = 'C';
>Â Â Â Âdata[1] = 'S';
>Â Â Â Âdata[2] = CMD_GET;
>
>Â Â Â Âstrncpy(&data[4], key, KEY_SIZE);
>
>Â Â Â Âlserver_id = hash_peer(&data[4]);
>Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Âprintf("GET Server Id: %d\n", lserver_id);
>
>Â Â Â Âif (make_server_connection(lserver_id) != 0) {
>Â Â Â Â Â Â Â Âprintf("Connecting to Server %d failed.\n", lserver_id);
>Â Â Â Â Â Â Â Âreturn;
>Â Â Â Â}
>
>Â Â Â Âevbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>Â Â Â Âbufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>Â Â Â Âevent_base_loopexit(sconn.evbase, &peer_timeout);
>Â Â Â Âevent_base_loop(sconn.evbase, 0);
>
>Â Â Â Â/*
>Â Â Â Â * Though our functional specifications are to maintain the connections
>Â Â Â Â * in an open state once we conenct, but for some reason if the connections
>Â Â Â Â * are not closed, the buffers do not seem to flush and we never exit the
>Â Â Â Â * event loop above OR the server does not exit the event loop connection
>Â Â Â Â * even though there was only one event from the event base to handle and
>Â Â Â Â * the other end does not get the data or the callbacks get called. So we
>Â Â Â Â * explicitly close the connection here. Did try a few different mechanisms
>Â Â Â Â * for handling this scenario both at server and peer end but none seems to
>Â Â Â Â * work. So we explicitly deviate from our design recommendation of keeping
>Â Â Â Â * connections open unfortunately. Needs some further indepth investigation.
>Â Â Â Â */
>Â Â Â Âif (sconn.bev != NULL) {
>Â Â Â Â Â Â Â Âbufferevent_free(sconn.bev);
>Â Â Â Â Â Â Â Âsconn.bev = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.evbase != NULL) {
>Â Â Â Â Â Â Â Âevent_base_free(sconn.evbase);
>Â Â Â Â Â Â Â Âsconn.evbase = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.output_buffer != NULL) {
>Â Â Â Â Â Â Â Âevbuffer_free(sconn.output_buffer);
>Â Â Â Â Â Â Â Âsconn.output_buffer = NULL;
>Â Â Â Â}
> }
>
> void delete_from_server(const char *key) {
>Â Â Â Âchar data[MESSAGE_SIZE];
>Â Â Â Âint lserver_id;
>
>Â Â Â Âmemset(data, 0x30, MESSAGE_SIZE);
>Â Â Â Âdata[0] = 'C';
>Â Â Â Âdata[1] = 'S';
>Â Â Â Âdata[2] = CMD_DEL;
>
>Â Â Â Âstrncpy(&data[4], key, KEY_SIZE);
>
>Â Â Â Âlserver_id = hash_peer(&data[4]);
>Â Â Â Âif (!perf_test_on)
>Â Â Â Â Â Â Â Âprintf("DEL Server Id: %d\n", lserver_id);
>
>Â Â Â Âif (make_server_connection(lserver_id) != 0) {
>Â Â Â Â Â Â Â Âprintf("Connecting to Server %d failed.\n", lserver_id);
>Â Â Â Â Â Â Â Âreturn;
>Â Â Â Â}
>
>Â Â Â Âevbuffer_add(sconn.output_buffer, data, MESSAGE_SIZE);
>Â Â Â Âbufferevent_write_buffer(sconn.bev, sconn.output_buffer);
>Â Â Â Âevent_base_loopexit(sconn.evbase, &peer_timeout);
>Â Â Â Âevent_base_loop(sconn.evbase, 0);
>
>Â Â Â Â/*
>Â Â Â Â * Though our functional specifications are to maintain the connections
>Â Â Â Â * in an open state once we conenct, but for some reason if the connections
>Â Â Â Â * are not closed, the buffers do not seem to flush and we never exit the
>Â Â Â Â * event loop above OR the server does not exit the event loop connection
>Â Â Â Â * even though there was only one event from the event base to handle and
>Â Â Â Â * the other end does not get the data or the callbacks get called. So we
>Â Â Â Â * explicitly close the connection here. Did try a few different mechanisms
>Â Â Â Â * for handling this scenario both at server and peer end but none seems to
>Â Â Â Â * work. So we explicitly deviate from our design recommendation of keeping
>Â Â Â Â * connections open unfortunately. Needs some further indepth investigation.
>Â Â Â Â */
>Â Â Â Âif (sconn.bev != NULL) {
>Â Â Â Â Â Â Â Âbufferevent_free(sconn.bev);
>Â Â Â Â Â Â Â Âsconn.bev = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.evbase != NULL) {
>Â Â Â Â Â Â Â Âevent_base_free(sconn.evbase);
>Â Â Â Â Â Â Â Âsconn.evbase = NULL;
>Â Â Â Â}
>Â Â Â Âif (sconn.output_buffer != NULL) {
>Â Â Â Â Â Â Â Âevbuffer_free(sconn.output_buffer);
>Â Â Â Â Â Â Â Âsconn.output_buffer = NULL;
>Â Â Â Â}
> }
>
> static void discard_logs(int severity, const char *msg) {
>
> }
>
> void set_libevent_logging(void) {
>Â Â Â Âevent_enable_debug_logging(EVENT_DBG_ALL);
>Â Â Â Â//event_set_log_callback(discard_logs);
> }
>
> void rand_str(char *dest, size_t length) {
>Â Â Â Âchar charset[] = "0123456789"
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â"abcdefghijklmnopqrstuvwxyz"
>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
>
>Â Â Â Âwhile (length-- > 0) {
>Â Â Â Â Â Â Â Âsize_t index = (double) rand() / RAND_MAX * (sizeof charset - 1);
>Â Â Â Â Â Â Â Â*dest++ = charset[index];
>Â Â Â Â}
>
>Â Â Â Â*dest = '\0';
> }
>
> void run_perf_tests(void) {
>Â Â Â Âint i;
>Â Â Â Âchar key[KEY_SIZE];
>Â Â Â Âchar value[VALUE_SIZE];
>Â Â Â Âstruct timeval t1, t2;
>Â Â Â Âdouble elapsedtime, totalelapsedtime;
>
>Â Â Â Âperf_test_on = true;
>
>Â Â Â Â/* Run PUT tests */
>Â Â Â Âfor (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>Â Â Â Â Â Â Â Âmemset(key, 0x30, KEY_SIZE);
>Â Â Â Â Â Â Â Âmemset(value, 0x30, VALUE_SIZE);
>
>Â Â Â Â Â Â Â Âkey[0] = server_id + 0x31;
>Â Â Â Â Â Â Â Â/* Generate a key depending on loop iteration */
>Â Â Â Â Â Â Â Âsnprintf(&key[1], KEY_SIZE - 1, "%d", i);
>Â Â Â Â Â Â Â Â/* Generate a random string value */
>Â Â Â Â Â Â Â Ârand_str(value, VALUE_SIZE);
>Â Â Â Â Â Â Â Âgettimeofday(&t1, NULL);
>Â Â Â Â Â Â Â Âput_at_server(key, value);
>Â Â Â Â Â Â Â Âgettimeofday(&t2, NULL);
>Â Â Â Â Â Â Â Â// compute and print the elapsed time in millisec
>Â Â Â Â Â Â Â Âelapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;Â Â Â // sec to ms
>Â Â Â Â Â Â Â Âelapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;Â Â// us to ms
>Â Â Â Â Â Â Â Âprintf("Elapsed time: %f\n", elapsedtime);
>Â Â Â Â Â Â Â Âtotalelapsedtime += elapsedtime;
>Â Â Â Â}
>Â Â Â Âprintf("Average Response time for PUT requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);
>
>Â Â Â Âprintf("Press any key to continue\n");
>Â Â Â Âgetchar();
>
>Â Â Â Â/* Run GET tests */
>Â Â Â Âfor (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>Â Â Â Â Â Â Â Âmemset(key, 0x30, KEY_SIZE);
>
>Â Â Â Â Â Â Â Âkey[0] = server_id + 0x31;
>Â Â Â Â Â Â Â Â/* Generate a key depending on loop iteration */
>Â Â Â Â Â Â Â Âsnprintf(&key[1], KEY_SIZE - 1, "%d", i);
>Â Â Â Â Â Â Â Âgettimeofday(&t1, NULL);
>Â Â Â Â Â Â Â Âget_from_server(key);
>Â Â Â Â Â Â Â Âgettimeofday(&t2, NULL);
>Â Â Â Â Â Â Â Â// compute and print the elapsed time in millisec
>Â Â Â Â Â Â Â Âelapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;Â Â Â // sec to ms
>Â Â Â Â Â Â Â Âelapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;Â Â// us to ms
>Â Â Â Â Â Â Â Âprintf("Elapsed time: %f\n", elapsedtime);
>Â Â Â Â Â Â Â Âtotalelapsedtime += elapsedtime;
>Â Â Â Â}
>Â Â Â Âprintf("Average Response time for GET requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);
>
>Â Â Â Âprintf("Press any key to continue\n");
>Â Â Â Âgetchar();
>
>Â Â Â Â/* Run DEL tests */
>Â Â Â Âfor (i = 0; i < NO_OF_TEST_ITERATIONS; i++) {
>Â Â Â Â Â Â Â Âmemset(key, 0x30, KEY_SIZE);
>
>Â Â Â Â Â Â Â Âkey[0] = server_id + 0x31;
>Â Â Â Â Â Â Â Â/* Generate a key depending on loop iteration */
>Â Â Â Â Â Â Â Âsnprintf(&key[1], KEY_SIZE - 1, "%d", i);
>Â Â Â Â Â Â Â Âgettimeofday(&t1, NULL);
>Â Â Â Â Â Â Â Âdelete_from_server(key);
>Â Â Â Â Â Â Â Âgettimeofday(&t2, NULL);
>Â Â Â Â Â Â Â Â// compute and print the elapsed time in millisec
>Â Â Â Â Â Â Â Âelapsedtime = (t2.tv_sec - t1.tv_sec) * 1000.0;Â Â Â // sec to ms
>Â Â Â Â Â Â Â Âelapsedtime += (t2.tv_usec - t1.tv_usec) / 1000.0;Â Â// us to ms
>Â Â Â Â Â Â Â Âprintf("Elapsed time: %f\n", elapsedtime);
>Â Â Â Â Â Â Â Âtotalelapsedtime += elapsedtime;
>Â Â Â Â}
>Â Â Â Âprintf("Average Response time for DEL requests: %f ms\n", totalelapsedtime / NO_OF_TEST_ITERATIONS);
>
>Â Â Â Âperf_test_on = false;
> }
>
> void input_process(void) {
>Â Â Â Âchar key[KEY_SIZE];
>Â Â Â Âchar value[VALUE_SIZE];
>Â Â Â Âbool exitloop = false;
>Â Â Â Âint input;
>
>Â Â Â Â/*
>Â Â Â Â * We run the peer functionality in this main thread
>Â Â Â Â */
>Â Â Â Âwhile (!exitloop) {
>Â Â Â Â Â Â Â Âprintf("\nSelect Operation\n");
>Â Â Â Â Â Â Â Âprintf("(1) Put (2) Get (3) Delete (4) Run tests\n");
>Â Â Â Â Â Â Â Âprintf("(5) Exit");
>Â Â Â Â Â Â Â Âprintf("\nPlease enter your selection (1-5):\t");
>
>Â Â Â Â Â Â Â Âscanf("%d", &input);
>Â Â Â Â Â Â Â Âgetchar();
>
>Â Â Â Â Â Â Â Âswitch (input) {
>Â Â Â Â Â Â Â Âcase 1:
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("Enter Key: \t");
>Â Â Â Â Â Â Â Â Â Â Â Âscanf("%s", key);
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("Enter Value: \t");
>Â Â Â Â Â Â Â Â Â Â Â Âscanf("%s", value);
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Âput_at_server(key, value);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase 2:
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("Enter Key: \t");
>Â Â Â Â Â Â Â Â Â Â Â Âscanf("%s", key);
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Âget_from_server(key);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase 3:
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("Enter Key: \t");
>Â Â Â Â Â Â Â Â Â Â Â Âscanf("%s", key);
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n");
>Â Â Â Â Â Â Â Â Â Â Â Âdelete_from_server(key);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase 4:
>Â Â Â Â Â Â Â Â Â Â Â Ârun_perf_tests();
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âcase 5:
>Â Â Â Â Â Â Â Â Â Â Â ÂkillServer();
>Â Â Â Â Â Â Â Â Â Â Â Âexitloop = true;
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Âdefault:
>Â Â Â Â Â Â Â Â Â Â Â Âprintf("\n\nWrong value: %d\n", input);
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>Â Â Â Â Â Â Â Â}
>
>Â Â Â Â Â Â Â Â/* Reset buffers for next iteration */
>Â Â Â Â Â Â Â Âmemset(key, 0x30, KEY_SIZE);
>Â Â Â Â Â Â Â Âmemset(value, 0x30, VALUE_SIZE);
>Â Â Â Â}
> }
>
> int main(int argc, char *argv[]) {
>Â Â Â ÂFILE *fp;
>Â Â Â Âint i;
>Â Â Â Âint error;
>Â Â Â Âint count_of_servers;
>Â Â Â Âssize_t read;
>Â Â Â Âsize_t len = 0;
>Â Â Â Âchar *line = NULL;
>Â Â Â Âchar **tokens = NULL;
>
>Â Â Â Âif (argc != 3) {
>Â Â Â Â Â Â Â Â/*
>Â Â Â Â Â Â Â Â * We do not validate or error check any of the arguments
>Â Â Â Â Â Â Â Â * Please enter correct arguments
>Â Â Â Â Â Â Â Â */
>Â Â Â Â Â Â Â Âprintf("Usage: ./server <serverid#> </path/to/server/conf/file>");
>Â Â Â Â Â Â Â Âexit(1);
>Â Â Â Â}
>
>Â Â Â Âserver_id = atoi(argv[1]) - 1;
>Â Â Â Âif ((server_id < 0) || (server_id > MAX_NO_OF_SERVERS)) {
>Â Â Â Â Â Â Â Âprintf("Incorrect server id provided\n");
>Â Â Â Â Â Â Â Âexit(1);
>Â Â Â Â}
>
>Â Â Â Âfp = fopen(argv[2], "r");
>Â Â Â Âif (fp == NULL) {
>Â Â Â Â Â Â Â Âperror("Could not open server configuration file");
>Â Â Â Â Â Â Â Âexit(1);
>Â Â Â Â}
>
>Â Â Â Â/*
>Â Â Â Â * We now extract the IP and port information of 8 servers
>Â Â Â Â * which will be involved in this setup.
>Â Â Â Â */
>Â Â Â Âcount_of_servers = 0;
>Â Â Â Âwhile ((read = getline(&line, &len, fp)) != -1) {
>
>Â Â Â Â Â Â Â Âif (count_of_servers == MAX_NO_OF_SERVERS)
>Â Â Â Â Â Â Â Â Â Â Â Âbreak;
>
>Â Â Â Â Â Â Â Âtokens = str_split(line, ' ');
>Â Â Â Â Â Â Â Âif (tokens) {
>Â Â Â Â Â Â Â Â Â Â Â Âservers[count_of_servers].serverip = *(tokens);
>Â Â Â Â Â Â Â Â Â Â Â Âservers[count_of_servers].serverport = *(tokens + 1);
>Â Â Â Â Â Â Â Â}
>Â Â Â Â Â Â Â Âfree(line);
>Â Â Â Â Â Â Â Âline = NULL;
>
>Â Â Â Â Â Â Â Âcount_of_servers++;
>Â Â Â Â}
>
>Â Â Â Âfclose(fp);
>
>Â Â Â Âif (pthread_rwlock_init(&ht_lock, NULL) != 0) {
>Â Â Â Â Â Â Â Âperror("Lock init failed");
>Â Â Â Â Â Â Â Âgoto free_tokens;
>Â Â Â Â}
>
>Â Â Â Â/*
>Â Â Â Â * Start the server. We start the server in another thread so
>Â Â Â Â * the libevent event loop for dispatching events on connections
>Â Â Â Â * can work separately which would otherwise block.
>Â Â Â Â */
>Â Â Â Âerror = pthread_create(&server_thread, NULL, &runServer, &servers[server_id]);
>Â Â Â Âif (error != 0) {
>Â Â Â Â Â Â Â Âperror("Error in server thread creation");
>Â Â Â Â Â Â Â Âgoto free_tokens;
>Â Â Â Â}
>
>Â Â Â Âpeer_timeout.tv_sec = 0;
>Â Â Â Âpeer_timeout.tv_usec = PEER_CONN_TIMEOUT;
>
>Â Â Â Âinput_process();
>
>Â Â Â Âfor (i = count_of_servers - 1; i >= 0; --i) {
>Â Â Â Â Â Â Â Âfree(servers[i].serverip);
>Â Â Â Â Â Â Â Âfree(servers[i].serverport);
>Â Â Â Â}
>
>Â Â Â Âpthread_rwlock_destroy(&ht_lock);
>
>Â Â Â Âreturn 0;
>
> free_tokens:
>Â Â Â Âfor (i = count_of_servers - 1; i >= 0; --i) {
>Â Â Â Â Â Â Â Âfree(servers[i].serverip);
>Â Â Â Â Â Â Â Âfree(servers[i].serverport);
>Â Â Â Â}
>
>Â Â Â Âreturn -1;
> }
>


--
Respectfully
Azat Khuzhin
***********************************************************************
To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
unsubscribe libevent-users  in the body.