I'm trying to create a fairly low overhead repeater process which
will accept N connections (where N is going to be relatively small,
in the area of 5-12) connections from other processes and which will
forward incoming packets from one connection to all of the others.
conn1 --'hello'--> repeater --'hello'--> conn{2,3,..N}
Copy elimination FTW, but presumably there will be at least one copy
from the source buffers to the output buffers for each subsequent
connection.
Right now, I have the following, but I'm wondering if there's any
huge "d'oh" in it that I could reduce the amount of copying; not
sure if there's some way to point multiple out buffers at a
single in buffer (well, without chaos as they all randomly drain
it).
static void _onEventBufferRead(struct bufferevent* bev, void* arg)
{
const int srcSock = (int)arg;
struct evbuffer* const input = bufferevent_get_input(bev);
ABORT_IF(input == nullptr);
struct evbuffer_iovec ebiov;
for ( ;; )
{
// consume received data in contiguous blocks so we don't
// expend cycles shaping them; incoming data is kinda
// likely to be in ready-to-go fashion.
const size_t contigBytes = evbuffer_get_contiguous_space(input);
if ( contigBytes == 0 )
break;
// get the internal details of the buffer.
const int numBufs = evbuffer_peek(input, -1, nullptr, &ebiov, 1);
if ( numBufs <= 0 || ebiov.iov_base == nullptr || ebiov.iov_len == 0 )
break; // although, this seems like some kind of fail condition.
// now transfer that buffer to each additional connected client.
for ( auto it = clients.begin(); it != clients.end(); ++it )
{
auto destSock = it->first;
if ( destSock == srcSock )
continue; // skip self.
// find their output buf and add this buffer to it.
auto output = bufferevent_get_output(it->second);
ABORT_IF(output == nullptr);
evbuffer_add(output, ebiov.iov_base, ebiov.iov_len);
}
// consume the received data.
evbuffer_drain(input, ebiov.iov_len);
}
}
static void _onEventBufferError(struct bufferevent* bev, short error, void* arg)
{
const int cliSock = (int)(arg);
if (error & BEV_EVENT_EOF)
printf("sock:%d disconnected.\n", cliSock);
else if (error & BEV_EVENT_ERROR)
printf("sock:%d error:%d. disconnecting.\n", cliSock, dbugGetErrno());
else if (error & BEV_EVENT_TIMEOUT)
printf("sock:%d timeout. disconnecting.\n", cliSock);
else
printf("sock:%d error %d.\n", cliSock, error);
bufferevent_free(bev);
clients.erase(cliSock);
}
static void _onAccept(evutil_socket_t listener, short event, void* arg)
{
auto base = static_cast<event_base*>(arg);
struct sockaddr_in cliAddr;
socklen_t caLen = sizeof(cliAddr);
int cliSock = EAGAIN;
while ( (cliSock == EAGAIN) | (cliSock == EINTR) )
{
cliSock = accept(listener, (struct sockaddr*)&cliAddr, &caLen);
};
if ( cliSock < 0 )
{
printf("accept error: %d\n", cliSock);
return;
}
evutil_make_socket_nonblocking(cliSock);
struct bufferevent* bev = bufferevent_socket_new(base, cliSock, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, _onEventBufferRead, nullptr, _onEventBufferError, (void*)cliSock);
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
const bool socketAlreadyRegistered = (clients.find(cliSock) != clients.end());
ABORT_IF( socketAlreadyRegistered );
clients[cliSock] = bev;
printf("sock:%d connected from %s:%u\n", cliSock, teulGetAddress(ntohl(cliAddr.sin_addr.s_addr)), ntohs(cliAddr.sin_port));
}
|