Zyre::

Node class

A node in a Zyre cluster.

Refs: - github.com/zeromq/zyre#readme

Public Instance Methods

advertised_endpoint = endpoint

Set an alternative endpoint value when using GOSSIP ONLY. This is useful if you’re advertising an endpoint behind a NAT.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_advertised_endpoint_eq( VALUE self, VALUE endpoint )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *endpoint_str = StringValueCStr( endpoint );

        zyre_set_advertised_endpoint( ptr, endpoint_str );

        return endpoint;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
beacon_peer_port = 16668

Set the TCP port bound by the ROUTER peer-to-peer socket (beacon mode). Defaults to * (the port is randomly assigned by the system). This call overrides this, to bypass some firewall issues when ports are random. Has no effect after start.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_beacon_peer_port_eq( VALUE self, VALUE port_number )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );

        zyre_set_beacon_peer_port( ptr, FIX2INT(port_number) );

        return port_number;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
each( &block )
Alias for: each_event
each_event( &block )

Yield each incoming event to the block. If no block is given, returns an enumerator instead.

# File lib/zyre/node.rb, line 20
def each_event( &block )
        iter = self.make_event_enum
        return iter.each( &block ) if block
        return iter
end
Also aliased as: each
endpoint → str

Return the endpoint being used by the Node if there is one.

static VALUE
rzyre_node_endpoint( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        zsock_t *sock = zyre_socket( ptr );
        const char *endpoint_str = zsock_endpoint( sock );

        return rb_str_new2( endpoint_str );
}
endpoint = string

By default, Zyre binds to an ephemeral TCP port and broadcasts the local host name using UDP beaconing. When you call this method, Zyre will use gossip discovery instead of UDP beaconing. You MUST set up the gossip service separately using gossip_bind and gossip_connect. Note that the endpoint MUST be valid for both bind and connect operations. You can use inproc://, ipc://, or tcp:// transports (for tcp://, use an IP address that is meaningful to remote as well as local nodes). Returns true if the bind was successful.

static VALUE
rzyre_node_endpoint_eq( VALUE self, VALUE endpoint )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *endpoint_str = StringValueCStr( endpoint );
        int res;

        res = zyre_set_endpoint( ptr, "%s", endpoint_str );

        if ( res == 0 ) return Qtrue;
        return Qfalse;
}
evasive_timeout = milliseconds

Set the peer evasiveness timeout in milliseconds. Default is 5000. This can be tuned in order to deal with expected network conditions and the response time expected by the application. This is tied to the beacon interval and rate of messages received.

static VALUE
rzyre_node_evasive_timeout_eq( VALUE self, VALUE timeout )
{
        zyre_t *ptr = rzyre_get_node( self );
        int timeout_ms = FIX2INT( timeout );

        zyre_set_evasive_timeout( ptr, timeout_ms );

        return Qtrue;
}
expired_timeout = milliseconds

Set the peer expiration timeout, in milliseconds. Default is 30000. This can be tuned in order to deal with expected network conditions and the response time expected by the application. This is tied to the beacon interval and rate of messages received.

static VALUE
rzyre_node_expired_timeout_eq( VALUE self, VALUE timeout )
{
        zyre_t *ptr = rzyre_get_node( self );
        int timeout_ms = FIX2INT( timeout );

        zyre_set_expired_timeout( ptr, timeout_ms );

        return Qtrue;
}
gossip_bind( endpoint )

Set-up gossip discovery of other nodes. At least one node in the cluster must bind to a well-known gossip endpoint, so other nodes can connect to it. Note that gossip endpoints are completely distinct from Zyre node endpoints, and should not overlap (they can use the same transport).

static VALUE
rzyre_node_gossip_bind( VALUE self, VALUE endpoint )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *endpoint_str = StringValueCStr( endpoint );

        assert( endpoint_str );
        rzyre_log_obj( self, "debug", "Binding to gossip endpoint %s.", endpoint_str );
        zyre_gossip_bind( ptr, "%s", endpoint_str );

        return Qtrue;
}
gossip_connect( endpoint )

Set-up gossip discovery of other nodes. A node may connect to multiple other nodes, for redundancy paths. For details of the gossip network design, see the CZMQ zgossip class.

static VALUE
rzyre_node_gossip_connect( VALUE self, VALUE endpoint )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *endpoint_str = StringValueCStr( endpoint );

        assert( endpoint_str );
        rzyre_log_obj( self, "debug", "Connecting to gossip endpoint %s.", endpoint_str );
        zyre_gossip_connect( ptr, "%s", endpoint_str );

        return Qtrue;
}
gossip_connect_curve( public_key, endpoint )

Connect to the gossip discovery endpoint with CURVE enabled. The public_key is the Z85-armored public key of the connecting node’s cert/

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_gossip_connect_curve( VALUE self, VALUE public_key, VALUE endpoint )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *endpoint_str = StringValueCStr( endpoint );
        const char *key = StringValueCStr( public_key );

        assert( endpoint_str );
        assert( key );

        rzyre_log_obj( self, "debug", "Connecting to gossip endpoint %s (CURVE enabled).", endpoint_str );
        zyre_gossip_connect_curve( ptr, key, "%s", endpoint_str );

        return Qtrue;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
gossip_unpublish( node_uuid )

Unpublish a GOSSIP node from local list, useful in removing nodes from list when they EXIT.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_gossip_unpublish( VALUE self, VALUE node_uuid )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *node = StringValueCStr( node_uuid );

        zyre_gossip_unpublish( ptr, node );

        return Qtrue;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
headers=( hash )

Set headers from the given hash. Convenience wrapper for set_header. Symbol keys will have _ characters converted to - and will be capitalized when converted into Strings. E.g.,

headers = { content_type: ‘application/json’ }

will call:

.set_header( ‘Content-type’, ‘application/json’ )

# File lib/zyre/node.rb, line 56
def headers=( hash )
        hash.each do |key, val|
                key = Zyre.transform_header_key( key )
                self.set_header( key.to_s, val.to_s )
        end
end
inspect()

Return a string describing the node suitable for debugging.

# File lib/zyre/node.rb, line 65
def inspect
        return "#<%p:%#016x %s[%s]>" % [
                self.class,
                self.object_id,
                self.name,
                self.uuid,
        ]
end
interface = string

Set network interface for UDP beacons. If you do not set this, CZMQ will choose an interface for you. On boxes with several interfaces you should specify which one you want to use, or strange things can happen.

static VALUE
rzyre_node_interface_eq( VALUE self, VALUE interface )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *interface_str = StringValueCStr( interface );

        zyre_set_interface( ptr, interface_str );

        return Qtrue;
}
interval = milliseconds

Set UDP beacon discovery interval in milliseconds. Default is instant beacon exploration followed by pinging every 1000 msecs.

static VALUE
rzyre_node_interval_eq( VALUE self, VALUE interval )
{
        zyre_t *ptr = rzyre_get_node( self );
        size_t interval_ms = FIX2INT( interval );

        zyre_set_interval( ptr, interval_ms );

        return Qtrue;
}
join( group_name ) → int

Join a named group; after joining a group you can send messages to the group and all Zyre nodes in that group will receive them.

static VALUE
rzyre_node_join( VALUE self, VALUE group )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *group_str = StringValueCStr( group );
        int res;

        rzyre_log_obj( self, "debug", "Joining group %s.", group_str );
        res = zyre_join( ptr, group_str );

        return INT2FIX( res );
}
leave( group_name ) → int

Leave a group.

static VALUE
rzyre_node_leave( VALUE self, VALUE group )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *group_str = StringValueCStr( group );
        int res;

        rzyre_log_obj( self, "debug", "Leaving group %s.", group_str );
        res = zyre_leave( ptr, group_str );

        return INT2FIX( res );
}
name → str

Return the node name.

static VALUE
rzyre_node_name( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *name_str = zyre_name( ptr );
        VALUE name = rb_str_new2( name_str );

        return rb_str_freeze( name );
}
name = str

Set the public name of this node overriding the default. The name is provided during discovery and come in each ENTER message.

static VALUE
rzyre_node_name_eq( VALUE self, VALUE new_name )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *name_str = StringValueCStr( new_name );

        zyre_set_name( ptr, name_str );

        return Qtrue;
}
own_groups → array

Return an Array of the names of the receiving node’s current groups.

static VALUE
rzyre_node_own_groups( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        zlist_t *groups = zyre_own_groups( ptr );
        VALUE rary = rb_ary_new();
        char *item = NULL;

        assert( groups );

        item = zlist_first( groups );
        while ( item ) {
                rb_ary_push( rary, rb_str_new2(item) );
                item = zlist_next( groups );
        }

        zlist_destroy( &groups );
        return rary;
}
peer_address( peer_uuid ) → str

Return the endpoint of a connected peer. Returns nil if peer does not exist.

static VALUE
rzyre_node_peer_address( VALUE self, VALUE peer_uuid )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *peer = StringValueCStr( peer_uuid );
        char *address = zyre_peer_address( ptr, peer );
        VALUE rval = Qnil;

        if ( strnlen(address, BUFSIZ) ) {
                rval = rb_str_new2( address );
        }

        free( address );
        return rval;
}
peer_groups → array

Return an Array of the names of groups known through connected peers.

static VALUE
rzyre_node_peer_groups( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        zlist_t *groups = zyre_peer_groups( ptr );
        VALUE rary = rb_ary_new();
        char *item = NULL;

        assert( groups );

        item = zlist_first( groups );
        while ( item ) {
                rb_ary_push( rary, rb_str_new2(item) );
                item = zlist_next( groups );
        }

        zlist_destroy( &groups );
        return rary;
}
peer_header_value( peer_id, header_name ) → str

Return the value of a header of a conected peer. Returns nil if peer or key doesn’t exist.

static VALUE
rzyre_node_peer_header_value( VALUE self, VALUE peer_id, VALUE header_name )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *peer_id_str = StringValueCStr( peer_id );
        const char *header_name_str = StringValueCStr( header_name );
        char *res;
        VALUE rval = Qnil;

        res = zyre_peer_header_value( ptr, peer_id_str, header_name_str );

        // TODO: Encoding + frozen
        if ( res ) {
                rval = rb_str_new2( res );
                xfree( res );
        }

        return rval;
}
peers → array

Return an Array of current peer UUIDs.

static VALUE
rzyre_node_peers( VALUE self )
{
        zyre_t *ptr;
        zlist_t *peers;
        VALUE rary = rb_ary_new();
        char *item = NULL;

        ptr = rzyre_get_node( self );
        assert( ptr );

        peers = zyre_peers( ptr );
        assert( peers );

        item = zlist_first( peers );
        while ( item ) {
                rb_ary_push( rary, rb_str_new2(item) );
                item = zlist_next( peers );
        }

        zlist_destroy( &peers );
        return rary;
}
peers_by_group( group ) → array

Return an Array of the current peers in the specified group.

static VALUE
rzyre_node_peers_by_group( VALUE self, VALUE group )
{
        zyre_t *ptr;
        const char *group_str = StringValueCStr( group );
        zlist_t *peers;
        VALUE rary = rb_ary_new();
        char *item = NULL;

        ptr = rzyre_get_node( self );
        assert( ptr );

        peers = zyre_peers_by_group( ptr, group_str );
        assert( peers );

        item = zlist_first( peers );
        while ( item ) {
                rb_ary_push( rary, rb_str_new2(item) );
                item = zlist_next( peers );
        }

        zlist_destroy( &peers );
        return rary;
}
port = integer

Set UDP beacon discovery port. Defaults to 5670. This call overrides the default so that you can create independent clusters on the same network, for e.g. development vs. production. Has no effect after start.

static VALUE
rzyre_node_port_eq( VALUE self, VALUE new_port )
{
        zyre_t *ptr = rzyre_get_node( self );
        int port_nbr = FIX2INT( new_port );

        zyre_set_port( ptr, port_nbr );

        return Qtrue;
}
print

Print zyre node information to stdout.

recv → zyre_event

Receive the next event from the network as a Zyre::Event.

static VALUE
rzyre_node_recv( VALUE self )
{
        return rb_funcall( rzyre_cZyreEvent, rb_intern("from_node"), 1, self );
}
require_peer( node_uuid, router_endpoint, public_key )

Explicitly connect to a peer with the given node_uuid via the specified router_endpoint, using the given public_key to authenticate.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_require_peer( VALUE self, VALUE uuid, VALUE endpoint, VALUE public_key )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *uuid_str = StringValueCStr( uuid ),
                *endpoint_str = StringValueCStr( endpoint ),
                *key_str = StringValueCStr( public_key );
        int rval;

        rval = zyre_require_peer( ptr, uuid_str, endpoint_str, key_str );

        return rval == 0 ? Qtrue : Qfalse;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
set_contest_in_group( group )

This options enables a peer to actively contest for leadership in the given group. If this option is not set the peer will still participate in elections but never gets elected. This ensures that a consent for a leader is reached within a group even though not every peer is contesting for leadership.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_set_contest_in_group( VALUE self, VALUE group_name )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *group_str = StringValueCStr( group_name );

        zyre_set_contest_in_group( ptr, group_str );

        return group_name;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
set_header( name, value )

Set node header; these are provided to other nodes during discovery and come in each ENTER message.

static VALUE
rzyre_node_set_header( VALUE self, VALUE name, VALUE value )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *name_str = StringValueCStr( name );
        const char *value_str = StringValueCStr( value );

        rzyre_log_obj( self, "debug", "Setting header `%s` to `%s`", name_str, value_str );
        zyre_set_header( ptr, name_str, "%s", value_str );

        return Qtrue;
}
shout( group, *messages ) → int

Send message to a named group.

static VALUE
rzyre_node_shout( int argc, VALUE *argv, VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        VALUE group, msg_parts;
        char *group_str;
        zmsg_t *msg;
        int rval;

        rb_scan_args( argc, argv, "1*", &group, &msg_parts );

        group_str = StringValueCStr( group );
        msg = rzyre_make_zmsg_from( msg_parts );

        rval = zyre_shout( ptr, group_str, &msg );

        return rval ? Qtrue : Qfalse;
}
silent_timeout=(p1)
HAVE_ZYRE_SET_SILENT_TIMEOUT
static VALUE
rzyre_node_silent_timeout_eq( VALUE self, VALUE timeout )
{
        zyre_t *ptr = rzyre_get_node( self );
        int timeout_ms = FIX2INT( timeout );

        zyre_set_silent_timeout( ptr, timeout_ms );

        return Qtrue;
}
start → bool

Start node, after setting header values. When you start a node it begins discovery and connection. Returns true if the node was started successfully.

static VALUE
rzyre_node_start( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        int res;

        rzyre_log_obj( self, "debug", "Starting." );
        res = zyre_start( ptr );

        if ( res == 0 ) return Qtrue;
        return Qfalse;
}
stop

Stop node; this signals to other peers that this node will go away. This is polite; however you can also just destroy the node without stopping it.

static VALUE
rzyre_node_stop( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );

        assert( ptr );
        rzyre_log_obj( self, "debug", "Stopping." );
        zyre_stop( ptr );

        return Qtrue;
}
uuid → str

Return the node’s UUID as a String.

static VALUE
rzyre_node_uuid( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        const char *uuid_str = zyre_uuid( ptr );
        VALUE uuid = rb_str_new2( uuid_str );

        return rb_str_freeze( uuid );
}
verbose!

Set verbose mode; this tells the node to log all traffic as well as all major events.

static VALUE
rzyre_node_verbose_bang( VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );

        zyre_set_verbose( ptr );

        return Qtrue;
}
wait_for( event_type, timeout: nil, **criteria, &block )

Wait for an event of a given event_type (e.g., :JOIN) and matching any optional criteria, returning the event if a matching one was seen. If a timeout is given and the event hasn’t been seen after the timeout seconds have elapsed, return nil. If a block is given, call the block for each (non-matching) event that arrives in the interim. Note that the execution time of the block is counted in the timeout.

# File lib/zyre/node.rb, line 34
def wait_for( event_type, timeout: nil, **criteria, &block )
        expected_type = Zyre::Event.type_by_name( event_type ) or
                raise ArgumentError, "no such event type %p" % [ event_type ]

        if timeout
                return self.wait_for_with_timeout( expected_type, timeout, **criteria, &block )
        else
                return self.wait_for_indefinitely( expected_type, **criteria, &block )
        end
end
whisper( peer_uuid, *messages ) → int

Send a message to a single peer specified as a UUID string.

static VALUE
rzyre_node_whisper( int argc, VALUE *argv, VALUE self )
{
        zyre_t *ptr = rzyre_get_node( self );
        VALUE peer_uuid, msg_parts;
        char *peer_uuid_str;
        zmsg_t *msg;
        int rval;

        rb_scan_args( argc, argv, "1*", &peer_uuid, &msg_parts );

        peer_uuid_str = StringValueCStr( peer_uuid );
        msg = rzyre_make_zmsg_from( msg_parts );

        rval = zyre_whisper( ptr, peer_uuid_str, &msg );

        return rval ? Qtrue : Qfalse;
}
zap_domain = domain

Specify the ZAP domain (for use with CURVE authentication). See Authentication.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_zap_domain_eq( VALUE self, VALUE domain )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        const char *domain_str = StringValueCStr( domain );

        zyre_set_zap_domain( ptr, domain_str );

        return Qtrue;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}
zcert = zcert

Apply a zcert to a Zyre node. See Zyre::Cert#apply and Authentication.

Note: this is part of the ZRE 43 Draft API, and may be changed or removed at any time.

static VALUE
rzyre_node_zcert_eq( VALUE self, VALUE cert )
{
#ifdef ZYRE_BUILD_DRAFT_API
        zyre_t *ptr = rzyre_get_node( self );
        zcert_t *zcert = rzyre_get_cert( cert );

        if ( streq(zcert_secret_txt(zcert), FORTY_ZEROES) ) {
                rb_raise( rb_eArgError,
                        "can't use this cert for authentication (no curve or missing secret key)" );
        }

        zyre_set_zcert( ptr, zcert );

        return Qtrue;
#else
        RAISE_NO_DRAFT_APIS();
#endif // ZYRE_BUILD_DRAFT_API
}

Protected Instance Methods

make_event_enum()

Create an Enumerator that yields each event as it comes in. If there is no event to read, block until there is one.

# File lib/zyre/node.rb, line 82
def make_event_enum
        return Enumerator.new do |yielder|
                while event = self.recv
                        yielder.yield( event )
                end
        end
end
wait_for_indefinitely( event_class, **criteria, &block )

Wait for an event of the given event_class and criteria, returning it when it arrives. Blocks indefinitely until it arrives or interrupted.

# File lib/zyre/node.rb, line 93
def wait_for_indefinitely( event_class, **criteria, &block )
        poller = Zyre::Poller.new( self )
        while poller.wait
                event = self.recv
                if event.kind_of?( event_class ) && event.match( criteria )
                        return event
                else
                        block.call( event ) if block
                end
        end
end
wait_for_with_timeout( event_class, timeout, **criteria, &block )

Wait for an event of the given event_class and criteria, returning it if it arrives before timeout seconds elapses. If the timeout elapses first, return nil.

# File lib/zyre/node.rb, line 109
def wait_for_with_timeout( event_class, timeout, **criteria, &block )
        start_time = get_monotime()
        timeout_at = start_time + timeout

        poller = Zyre::Poller.new( self )

        timeout = timeout_at - get_monotime()
        while timeout > 0
                if poller.wait( timeout )
                        event = self.recv
                        if event.kind_of?( event_class ) && event.match( criteria )
                                return event
                        else
                                block.call( event ) if block
                        end
                else
                        break
                end

                timeout = timeout_at - get_monotime()
        end

        return nil

end