Arborist::Monitor::ConnectionBatching::

BatchRunner

class
Superclass
Object
Extended With
Loggability

An object that manages batching of connections and gathering results.

Attributes

batch_size[R]

The maximum number of connections to have running at any time.

connection_hashes[R]

An index of the current batch's connection hashes by connection.

current_batch[R]

The batch of connection hashes that are currently being selected, ordered from oldest to newest.

enum[RW]

The Enumerator that yields connection hashes

results[R]

The results hash

start[RW]

The Time the batch runner started.

timeout[R]

The connection timeout from the monitor, in seconds

Public Class Methods

anchor
new( enum, batch_size, timeout )

Create a new BatchRunner for the specified enum (an Enumerator)

# File lib/arborist/monitor/connection_batching.rb, line 31
def initialize( enum, batch_size, timeout )
        @enum              = enum
        @results           = {}
        @current_batch     = []
        @connection_hashes = {}
        @start             = nil
        @batch_size        = batch_size || DEFAULT_BATCH_SIZE
        @timeout           = timeout || DEFAULT_TIMEOUT
end

Public Instance Methods

anchor
add_connection( conn_hash )

Add a new conn_hash to the currrent batch. If the conn_hash's connection is an exception, don't add it and just add an error status for it built from the exception.

# File lib/arborist/monitor/connection_batching.rb, line 107
def add_connection( conn_hash )
        if conn_hash[:conn].is_a?( ::Exception )
                self.log.debug "Adding an error result for %{identifier}." % conn_hash
                self.results[ conn_hash[:identifier] ] = { error: conn_hash[:conn].message }
        else
                self.log.debug "Added connection for %{identifier} to the batch." % conn_hash
                self.current_batch.push( conn_hash )
                self.connection_hashes[ conn_hash[:conn] ] = conn_hash
        end
end
anchor
batch_full?()

Returns true if the current batch is at capacity.

# File lib/arborist/monitor/connection_batching.rb, line 84
def batch_full?
        return self.current_batch.length >= self.batch_size
end
anchor
fill_batch()

Fill the current_batch if it's not yet at capacity and there are more connections to be made.

# File lib/arborist/monitor/connection_batching.rb, line 138
def fill_batch
        # If the enum is not nil and the array isn't full, fetch a new connection
        while self.enum && !self.batch_full?
                self.log.debug "Adding connections to the queue."
                conn_hash = self.next_connection or break
                self.add_connection( conn_hash )
        end
end
anchor
finished?()

Returns true if the runner has been run and all connections have been handled.

# File lib/arborist/monitor/connection_batching.rb, line 78
def finished?
        return self.start && self.enum.nil? && self.current_batch.empty?
end
anchor
next_connection()

Fetch the next connection from the Enumerator, unsetting the enumerator and returning nil when it reaches the end.

# File lib/arborist/monitor/connection_batching.rb, line 91
def next_connection
        conn_hash = self.enum.next
        conn_hash[:start] = Time.now
        conn_hash[:timeout_at] = conn_hash[:start] + self.timeout

        return conn_hash
rescue StopIteration
        self.log.debug "Reached the end of the connections enum."
        self.enum = nil
        return nil
end
anchor
remove_connection( conn_hash )

Remove the specified conn_hash from the current batch.

# File lib/arborist/monitor/connection_batching.rb, line 120
def remove_connection( conn_hash )
        self.current_batch.delete( conn_hash )
        self.connection_hashes.delete( conn_hash[:conn] )
end
anchor
remove_socket( socket )

Remove the connection hash for the specified socket from the current batch and return it (if it was in the batch).

# File lib/arborist/monitor/connection_batching.rb, line 128
def remove_socket( socket )
        conn_hash = self.connection_hashes.delete( socket )
        self.current_batch.delete( conn_hash )

        return conn_hash
end
anchor
remove_timedout_connections()

Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.

# File lib/arborist/monitor/connection_batching.rb, line 150
def remove_timedout_connections
        expired = self.current_batch.take_while do |conn_hash|
                conn_hash[ :timeout_at ].past?
        end

        wait_seconds = if self.current_batch.empty?
                        1
                else
                        self.current_batch.first[:timeout_at] - Time.now
                end

        expired.each do |conn_hash|
                self.remove_connection( conn_hash )
                self.log.debug "Discarding timed-out socket for %{identifier}." % conn_hash

                elapsed = conn_hash[:timeout_at] - conn_hash[:start]
                self.results[ conn_hash[:identifier] ] = {
                        error: "Timeout after %0.3fs" % [ elapsed ]
                }
        end

        return wait_seconds.abs
end
anchor
run( &block )

Run the batch runner, yielding to the specified block as each connection becomes ready.

# File lib/arborist/monitor/connection_batching.rb, line 192
def run( &block )
        self.start = Time.now

        until self.finished?
                self.log.debug "Getting the status of %d connections." %
                        [ self.current_batch.length ]

                self.fill_batch
                wait_seconds = self.remove_timedout_connections
                ready = self.wait_for_ready_connections( wait_seconds )

                # If the select returns ready sockets
                #   Build successful status for each ready socket
                now = Time.now
                ready.each do |sock|
                        conn_hash = self.remove_socket( sock ) or
                                raise "Ready socket %p was not in the current batch!" % [ sock ]

                        identifier, start = conn_hash.values_at( :identifier, :start )
                        duration = now - start

                        results[ identifier ] = block.call( conn_hash, duration )
                end if ready
        end

        return Time.now - self.start
end
anchor
wait_for_ready_connections( wait_seconds )

Wait at most wait_seconds for one of the sockets in the current batch to become ready. If any are ready before the wait_seconds have elapsed, returns them as an Array. If wait_seconds goes by without any sockets becoming ready, or if there were no sockets to wait on, returns nil.

# File lib/arborist/monitor/connection_batching.rb, line 179
def wait_for_ready_connections( wait_seconds )
        sockets = self.connection_hashes.keys
        ready = nil

        self.log.debug "Selecting on %d sockets." % [ sockets.length ]
        _, ready, _ = IO.select( nil, sockets, nil, wait_seconds ) unless sockets.empty?

        return ready
end