Zyre::

Poller class

A poller for evented Zyre IO. Built on zpoller from czmq.

Refs: - api.zeromq.org/czmq3-0:zpoller

Public Instance Methods

add( *nodes )

Add the specified nodes to the list which will be polled by a call to wait.

static VALUE
rzyre_poller_add( VALUE self, VALUE nodes )
{
        zpoller_t *ptr = rzyre_get_poller( self );
        VALUE nodemap = rb_ivar_get( self, rb_intern("@nodes") );
        long i;

        // For each node given, record its endpoint in @nodes so we can map it back to
        // the node later and add it to the poller.
        nodes = rb_funcall( nodes, rb_intern("flatten"), 0 );
        for ( i=0; i < RARRAY_LEN(nodes); i++ ) {
                VALUE node = RARRAY_AREF( nodes, i );
                zyre_t *zyre_node = rzyre_get_node( node );
                zsock_t *sock = zyre_socket( zyre_node );
                const char *endpoint_str = zsock_endpoint( sock );

                assert( endpoint_str );

                rb_hash_aset( nodemap, rb_str_new2(endpoint_str), node );
                zpoller_add( ptr, sock );
        }

        return Qtrue;
}
nodes → hash

Return a (frozen copy) of the Hash of the nodes the Poller will wait on.

static VALUE
rzyre_poller_nodes( VALUE self )
{
        VALUE rval = rb_obj_dup( rb_ivar_get(self, rb_intern( "@nodes" )) );

        return rb_hash_freeze( rval );
}
remove( *nodes )

Add the specified nodes to the list which will be polled by a call to wait.

static VALUE
rzyre_poller_remove( VALUE self, VALUE nodes )
{
        zpoller_t *ptr = rzyre_get_poller( self );
        VALUE nodemap = rb_ivar_get( self, rb_intern("@nodes") );
        long i;

        // For each node given, record its endpoint in @nodes so we can map it back to
        // the node later and remove it to the poller.
        nodes = rb_funcall( nodes, rb_intern("flatten"), 0 );
        for ( i=0; i < RARRAY_LEN(nodes); i++ ) {
                VALUE node = RARRAY_AREF( nodes, i );
                zyre_t *zyre_node = rzyre_get_node( node );
                zsock_t *sock = zyre_socket( zyre_node );
                const char *endpoint_str = zsock_endpoint( sock );

                assert( endpoint_str );

                rb_hash_aset( nodemap, rb_str_new2(endpoint_str), node );
                zpoller_remove( ptr, sock );
        }

        return Qtrue;
}
wait( timeout=-1 ) → node or nil

Poll the registered nodes for I/O, return first one that has input. The timeout should be zero or greater, or -1 to wait indefinitely. Socket priority is defined by their order in the poll list. If the timeout expired, returns nil. If poll call is interrupted (SIGINT) or the ZMQ context was destroyed, an Interrupt is raised.

static VALUE
rzyre_poller_wait( int argc, VALUE *argv, VALUE self )
{
        zpoller_t *ptr = rzyre_get_poller( self );
        VALUE rval = Qnil;
        zsock_t *sock;
        VALUE timeout_arg;
        VALUE nodemap = rb_ivar_get( self, rb_intern("@nodes") );
        int timeout = -1;
        wait_call_t call;

        if ( rb_scan_args(argc, argv, "01", &timeout_arg) ) {
                timeout = floor( NUM2DBL(timeout_arg) * 1000 );
        }

        rzyre_log_obj( self, "debug", "waiting on %d socket/s (timeout: %d)",
                 RHASH_SIZE(nodemap), timeout );

        call.poller = ptr;
        call.timeout = timeout;
        sock = (zsock_t *)rb_thread_call_without_gvl2( rzyre_poller_wait_without_gvl, (void *)&call,
                RUBY_UBF_IO, 0 );

        if ( sock ) {
                const char *endpoint = zsock_endpoint( sock );
                rval = rb_hash_aref( nodemap, rb_str_new2(endpoint) );
        }

        return rval;
}