I'm trying to create a fairly low overhead repeater process which
will accept N connections (where N is going to be relatively small,
in the area of 5-12) connections from other processes and which will
forward incoming packets from one connection to all of the others. conn1 --'hello'--> repeater --'hello'--> conn{2,3,..N} Copy elimination FTW, but presumably there will be at least one copy from the source buffers to the output buffers for each subsequent connection. Right now, I have the following, but I'm wondering if there's any huge "d'oh" in it that I could reduce the amount of copying; not sure if there's some way to point multiple out buffers at a single in buffer (well, without chaos as they all randomly drain it). static void _onEventBufferRead(struct bufferevent* bev, void* arg) { const int srcSock = (int)arg; struct evbuffer* const input = bufferevent_get_input(bev); ABORT_IF(input == nullptr); struct evbuffer_iovec ebiov; for ( ;; ) { // consume received data in contiguous blocks so we don't // expend cycles shaping them; incoming data is kinda // likely to be in ready-to-go fashion. const size_t contigBytes = evbuffer_get_contiguous_space(input); if ( contigBytes == 0 ) break; // get the internal details of the buffer. const int numBufs = evbuffer_peek(input, -1, nullptr, &ebiov, 1); if ( numBufs <= 0 || ebiov.iov_base == nullptr || ebiov.iov_len == 0 ) break; // although, this seems like some kind of fail condition. // now transfer that buffer to each additional connected client. for ( auto it = clients.begin(); it != clients.end(); ++it ) { auto destSock = it->first; if ( destSock == srcSock ) continue; // skip self. // find their output buf and add this buffer to it. auto output = bufferevent_get_output(it->second); ABORT_IF(output == nullptr); evbuffer_add(output, ebiov.iov_base, ebiov.iov_len); } // consume the received data. evbuffer_drain(input, ebiov.iov_len); } } static void _onEventBufferError(struct bufferevent* bev, short error, void* arg) { const int cliSock = (int)(arg); if (error & BEV_EVENT_EOF) printf("sock:%d disconnected.\n", cliSock); else if (error & BEV_EVENT_ERROR) printf("sock:%d error:%d. disconnecting.\n", cliSock, dbugGetErrno()); else if (error & BEV_EVENT_TIMEOUT) printf("sock:%d timeout. disconnecting.\n", cliSock); else printf("sock:%d error %d.\n", cliSock, error); bufferevent_free(bev); clients.erase(cliSock); } static void _onAccept(evutil_socket_t listener, short event, void* arg) { auto base = static_cast<event_base*>(arg); struct sockaddr_in cliAddr; socklen_t caLen = sizeof(cliAddr); int cliSock = EAGAIN; while ( (cliSock == EAGAIN) | (cliSock == EINTR) ) { cliSock = accept(listener, (struct sockaddr*)&cliAddr, &caLen); }; if ( cliSock < 0 ) { printf("accept error: %d\n", cliSock); return; } evutil_make_socket_nonblocking(cliSock); struct bufferevent* bev = bufferevent_socket_new(base, cliSock, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, _onEventBufferRead, nullptr, _onEventBufferError, (void*)cliSock); bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); const bool socketAlreadyRegistered = (clients.find(cliSock) != clients.end()); ABORT_IF( socketAlreadyRegistered ); clients[cliSock] = bev; printf("sock:%d connected from %s:%u\n", cliSock, teulGetAddress(ntohl(cliAddr.sin_addr.s_addr)), ntohs(cliAddr.sin_port)); } |