[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] Creating a repeater/forwarder



I'm trying to create a fairly low overhead repeater process which will accept N connections (where N is going to be relatively small, in the area of 5-12) connections from other processes and which will forward incoming packets from one connection to all of the others.

 conn1 --'hello'--> repeater --'hello'--> conn{2,3,..N}

Copy elimination FTW, but presumably there will be at least one copy from the source buffers to the output buffers for each subsequent connection.

Right now, I have the following, but I'm wondering if there's any huge "d'oh" in it that I could reduce the amount of copying; not sure if there's some way to point multiple out buffers at a single in buffer (well, without chaos as they all randomly drain it).

static void _onEventBufferRead(struct bufferevent* bev, void* arg)
{
    const int srcSock = (int)arg;
    struct evbuffer* const input = bufferevent_get_input(bev);
    ABORT_IF(input == nullptr);

    struct evbuffer_iovec ebiov;
    for ( ;; )
    {
        // consume received data in contiguous blocks so we don't
        // expend cycles shaping them; incoming data is kinda
        // likely to be in ready-to-go fashion.
        const size_t contigBytes = evbuffer_get_contiguous_space(input);
        if ( contigBytes == 0 )
            break;

        // get the internal details of the buffer.
        const int numBufs = evbuffer_peek(input, -1, nullptr, &ebiov, 1);
        if ( numBufs <= 0 || ebiov.iov_base == nullptr || ebiov.iov_len == 0 )
            break;    // although, this seems like some kind of fail condition.

        // now transfer that buffer to each additional connected client.
        for ( auto it = clients.begin(); it != clients.end(); ++it )
        {
            auto destSock = it->first;
            if ( destSock == srcSock )
                continue;    // skip self.
            // find their output buf and add this buffer to it.
            auto output = bufferevent_get_output(it->second);
            ABORT_IF(output == nullptr);
            evbuffer_add(output, ebiov.iov_base, ebiov.iov_len);
        }

        // consume the received data.
        evbuffer_drain(input, ebiov.iov_len);
    }
}

static void _onEventBufferError(struct bufferevent* bev, short error, void* arg)
{
    const int cliSock = (int)(arg);
    if (error & BEV_EVENT_EOF)
        printf("sock:%d disconnected.\n", cliSock);
    else if (error & BEV_EVENT_ERROR)
        printf("sock:%d error:%d. disconnecting.\n", cliSock, dbugGetErrno());
    else if (error & BEV_EVENT_TIMEOUT)
        printf("sock:%d timeout. disconnecting.\n", cliSock);
    else
        printf("sock:%d error %d.\n", cliSock, error);

    bufferevent_free(bev);
    clients.erase(cliSock);
}

static void _onAccept(evutil_socket_t listener, short event, void* arg)
{
    auto base = static_cast<event_base*>(arg);
    struct sockaddr_in cliAddr;
    socklen_t caLen = sizeof(cliAddr);
    int cliSock = EAGAIN;
    while ( (cliSock == EAGAIN) | (cliSock == EINTR) )
    {
        cliSock = accept(listener, (struct sockaddr*)&cliAddr, &caLen);
    };

    if ( cliSock < 0 )
    {
        printf("accept error: %d\n", cliSock);
        return;
    }

    evutil_make_socket_nonblocking(cliSock);
    struct bufferevent* bev = bufferevent_socket_new(base, cliSock, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, _onEventBufferRead, nullptr, _onEventBufferError, (void*)cliSock);
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);

    const bool socketAlreadyRegistered = (clients.find(cliSock) != clients.end());
    ABORT_IF( socketAlreadyRegistered );

    clients[cliSock] = bev;

    printf("sock:%d connected from %s:%u\n", cliSock, teulGetAddress(ntohl(cliAddr.sin_addr.s_addr)), ntohs(cliAddr.sin_port));
}