An object that manages batching of connections and gathering results.
The maximum number of connections to have running at any time.
An index of the current batch's connection hashes by connection.
The batch of connection hashes that are currently being selected, ordered from oldest to newest.
The Enumerator that yields connection hashes
The results hash
The Time the batch runner started.
The connection timeout from the monitor, in seconds
Create a new BatchRunner for the specified
enum
(an Enumerator)
# File lib/arborist/monitor/connection_batching.rb, line 31
def initialize( enum, batch_size, timeout )
@enum = enum
@results = {}
@current_batch = []
@connection_hashes = {}
@start = nil
@batch_size = batch_size || DEFAULT_BATCH_SIZE
@timeout = timeout || DEFAULT_TIMEOUT
end
Add a new conn_hash to the currrent batch. If the
conn_hash
's connection is an exception, don't add it
and just add an error status for it built from the exception.
# File lib/arborist/monitor/connection_batching.rb, line 107
def add_connection( conn_hash )
if conn_hash[:conn].is_a?( ::Exception )
self.log.debug "Adding an error result for %{identifier}." % conn_hash
self.results[ conn_hash[:identifier] ] = { error: conn_hash[:conn].message }
else
self.log.debug "Added connection for %{identifier} to the batch." % conn_hash
self.current_batch.push( conn_hash )
self.connection_hashes[ conn_hash[:conn] ] = conn_hash
end
end
Returns true
if the current batch is at capacity.
# File lib/arborist/monitor/connection_batching.rb, line 84
def batch_full?
return self.current_batch.length >= self.batch_size
end
Fill the current_batch if it's not yet at capacity and there are more connections to be made.
# File lib/arborist/monitor/connection_batching.rb, line 138
def fill_batch
# If the enum is not nil and the array isn't full, fetch a new connection
while self.enum && !self.batch_full?
self.log.debug "Adding connections to the queue."
conn_hash = self.next_connection or break
self.add_connection( conn_hash )
end
end
Returns true
if the runner has been run and all connections
have been handled.
# File lib/arborist/monitor/connection_batching.rb, line 78
def finished?
return self.start && self.enum.nil? && self.current_batch.empty?
end
Fetch the next connection from the Enumerator, unsetting the enumerator and
returning nil
when it reaches the end.
# File lib/arborist/monitor/connection_batching.rb, line 91
def next_connection
conn_hash = self.enum.next
conn_hash[:start] = Time.now
conn_hash[:timeout_at] = conn_hash[:start] + self.timeout
return conn_hash
rescue StopIteration
self.log.debug "Reached the end of the connections enum."
self.enum = nil
return nil
end
Remove the specified conn_hash
from the current batch.
# File lib/arborist/monitor/connection_batching.rb, line 120
def remove_connection( conn_hash )
self.current_batch.delete( conn_hash )
self.connection_hashes.delete( conn_hash[:conn] )
end
Remove the connection hash for the specified socket
from the
current batch and return it (if it was in the batch).
# File lib/arborist/monitor/connection_batching.rb, line 128
def remove_socket( socket )
conn_hash = self.connection_hashes.delete( socket )
self.current_batch.delete( conn_hash )
return conn_hash
end
Shift any connections which have timed out off of the current batch and return the timeout of the oldest non-timed-out connection.
# File lib/arborist/monitor/connection_batching.rb, line 150
def remove_timedout_connections
expired = self.current_batch.take_while do |conn_hash|
conn_hash[ :timeout_at ].past?
end
wait_seconds = if self.current_batch.empty?
1
else
self.current_batch.first[:timeout_at] - Time.now
end
expired.each do |conn_hash|
self.remove_connection( conn_hash )
self.log.debug "Discarding timed-out socket for %{identifier}." % conn_hash
elapsed = conn_hash[:timeout_at] - conn_hash[:start]
self.results[ conn_hash[:identifier] ] = {
error: "Timeout after %0.3fs" % [ elapsed ]
}
end
return wait_seconds.abs
end
Run the batch runner, yielding to the specified block
as each
connection becomes ready.
# File lib/arborist/monitor/connection_batching.rb, line 192
def run( &block )
self.start = Time.now
until self.finished?
self.log.debug "Getting the status of %d connections." %
[ self.current_batch.length ]
self.fill_batch
wait_seconds = self.remove_timedout_connections
ready = self.wait_for_ready_connections( wait_seconds )
# If the select returns ready sockets
# Build successful status for each ready socket
now = Time.now
ready.each do |sock|
conn_hash = self.remove_socket( sock ) or
raise "Ready socket %p was not in the current batch!" % [ sock ]
identifier, start = conn_hash.values_at( :identifier, :start )
duration = now - start
results[ identifier ] = block.call( conn_hash, duration )
end if ready
end
return Time.now - self.start
end
Wait at most wait_seconds
for one of the sockets in the
current batch to become ready. If any are ready before the
wait_seconds
have elapsed, returns them as an Array. If
wait_seconds
goes by without any sockets becoming ready, or if
there were no sockets to wait on, returns nil
.
# File lib/arborist/monitor/connection_batching.rb, line 179
def wait_for_ready_connections( wait_seconds )
sockets = self.connection_hashes.keys
ready = nil
self.log.debug "Selecting on %d sockets." % [ sockets.length ]
_, ready, _ = IO.select( nil, sockets, nil, wait_seconds ) unless sockets.empty?
return ready
end