[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Multiplexing protocol using libevent



I want prioritization  between reading from many files/pipes. For example I have one pipe with highest priority and two files with with equal priority. I want to read pipe and then both files by small chunks. That's the place I want to use libevent. Is it even possible to read files with libevent?

I tried it like this:
    int file = open(filename, O_RDWR);
    struct event *ev_file_read = event_new(ev_base, file, EV_READ, read_file, NULL);

    if(event_add(ev_file_read, NULL))
            error("adding file event");

but it still gives me error:
[warn] Epoll ADD(1) on fd 7 failed.  Old events were 0; read change was 1 (add); write change was 0 (none): Operation not permitted
adding file event: Operation not permitted

File is accesible for both read and write.

Any ideas how to do it?

Michal



2011/3/21 Kelly Brock <Kerby@xxxxxxxxxxx>
Hi Michal,

       Libevent's priorities are not likely to do what you want given this
information.  From my understanding of libevent priorities they only choose
between existing events and reorder them (basically just the priority queue
item) but they are tied to a specific fd/event pair such that there is no
prioritization possible between multiple "read" events.  I.e. there is no
way to tell libevent that one read or write is more important than another
if they associated with the same fd.  In fact, I have a feeling that the
priorities are more intended for event groups and/or timers than they are
for single fd work.

       As to the high priority traffic, remember that given a single NIC,
all network traffic is going to be linearized and there is a single
incoming/outgoing packet stream.  If you are sending a lot of data the
sendto is likely to start blocking randomly and calling sendto from multiple
threads would not respect priorities.  I think your best bet is still the
code presented and a very simple send function along the following lines:

while( running )
{
 // Note: add a semaphore to the queue to allow pop
 // to block when there are no messages.  Also add some
 // form of "exit" message so the thread can be killed cleanly.
 priority_datagram     msg     =       mStream.pop();
 uint8_t buffer[ kMaxDatagramSize ];
 memcpy( buffer, &msg.mStreamId, sizeof( int32_t ) );
 memcpy( buffer, .... data buffer );
 sendto( target, buffer, .. size of total packet .. );
}

       Of course, this still means that there is a certain amount of data
on the OS buffer which your high priority message will be scheduled to be
sent after, you have no control over that.  You "could" try adjusting the
size of the os specific buffer to reduce latency but from my experience that
can cause other problems and waste a lot of bandwidth.

       I personally would still use libevent even in a case like this since
it does not add any notable latency, is small and takes care of a bunch of
annoying OS specifics for you.  But don't use the priority system as I don't
believe that it will do anything like what you want.

KB

> Hi Kelly,
>
> thanks for your response. My plan is to implement this protocol just as
> you said. But I don't want to use libevent between Sender and Receiver,
> since there will be only one socket anyway. Actually I wanted to use it
> between buffers and sender/receiver as a kind of priority queue.
>
> Via this protocol I need to send a lot of data and from time to time small
> chunks of privileged traffic, which has to be delivered immediately.
> Since libevent supports event's priorities I thought it will be more
> efficient to invoke Sender's send() method by many threads writing to the
> FIFO's then using traditional  priority queue + mutexes. I just have no
> idea if it is the most efficient approach.
>
> Michal
>
>
> 2011/3/21 Kelly Brock <Kerby@xxxxxxxxxxx>
>
>
>       Hi Michal,
>
>              Hopefully this is not too verbose but I'm actually going to
> be
>       rewriting a more complicated variation of this problem myself so
> figured I'd
>       walk through it in detail as an exercise.
>
>              This is really not something you need libevent to do for you,
> in
>       fact libevent wouldn't support it with udp anyway.  Starting with
> the
>       prioritization scheme among the streams, don't bother with anything
> fancy
>       since order in this case is just a "suggestion" since you are using
> udp.
>       I.e. remember udp can be reordered, some packets can be dropped etc.
>       Anyway, for the sending priority based and stream side of things, I
> suggest
>       the following:
>
>       struct priority_datagram
>       {
>        const bool operator < ( const priority_datagram& rhs ) const
>        { return mPriority < rhs.mPriority; }
>
>        int32_t    mPriority;
>        uint32_t   mStreamId;
>        size_t     mLength;
>        void*      mpPayload;
>       };
>       typedef std::priority_queue< priority_datagram >
> priority_stream_t;
>
>       // In some class or whatever.
>       priotity_stream_t   mStream;
>       mutex_t             mStreamLock;
>
>       // The workers call this to post data packets.
>       void post( const int32_t priority, const uint32_t streameid, void* b
> )
>       {
>        priority_datagram     data    =       {priority, streamed, b};
>        scoped_lock< mutex_t >  lock( &mStreamLock );
>        mStream.push( data );
>       }
>
>              That takes care of everything you need to post data to the
> socket up
>       to the point where libevent's write callback hits.  Basically you
> completely
>       ignore any concept of multiple streams at this level and just build
> an
>       outgoing buffer of events, of course by the nature of
> priority_stream_t,
>       when libevent calls your callback, you just pop from the buffer and
>       everything is going to work exactly as you want assuming I've
> understood
>       your requirements correctly.
>
>              As to the muxing/demuxing.  That's easy; when you get the
> write
>       callback for your event, just send the stream id with the payload
> data in a
>       single packet.  When reading, pull the stream id and then look up
> the target
>       in a hashmap or map whatever to send the data along to the correct
> stream.
>       Unless your consumer can't process the data immediately, there is no
> reason
>       to worry about the priority on this side since udp gets to you
> whenever it
>       wants and priority of receipt doesn't make much sense anymore.
>
>              Hope this gives you an idea of how to proceed.  Mostly this
> is not a
>       problem that libevent solves; it is up to you to do things correctly
> outside
>       of libevent.
>
>       KB
>
>
>       > -----Original Message-----
>       > From: owner-libevent-users@xxxxxxxxxxxxx [mailto:owner-libevent-
>       > users@xxxxxxxxxxxxx] On Behalf Of Michal Król
>       > Sent: Monday, March 21, 2011 5:09 AM
>       > To: libevent-users@xxxxxxxx
>       > Subject: [Libevent-users] Multiplexing protocol using libevent
>       >
>       > Hi,
>       >
>       > I'm trying to write multiplexing protocol. Data has to be read
> from many
>       > prioritized buffers (which will be fed by many different threads),
> sent as
>       > a one UDP stream via Network, and then written to many different
>       > buffers(regarding info from the header).
>       > To be precise here's an image:
> http://img861.imageshack.us/i/protocol.png/
>       >
>       > Number of buffers can be relatively big (even tens of thousands),
> but most
>       > of them will not be used (at least not in the same time). So I
> need to
>       > invoke sender, when data arrives to any of these buffers. That's
> the point
>       > when I wanted to use libevent. As far as I know to achieve this I
> need to
>       > create pipe for every buffer, but I'm afraid it is not the most
> efficient
>       > way (especially with such number of pipes). Is there any better
> way to
>       > invoke sender?
>       >
>       > I need also to do something opposite on the other side of the
> network (one
>       > receiver dispatching header, writing to appropriate buffer which
> will
>       > invoke appropriate function, but I think solution is the same
> here.
>       >
>       > Anyone has an idea how to efficiently solve this problem?
>       >
>       > Thanks in advance
>       > Michal
>
>
>
>       ********************************************************************
> ***
>       To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
>       unsubscribe libevent-users    in the body.
>
>


***********************************************************************
To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
unsubscribe libevent-users    in the body.