An object class that encapsulates queueing logic for Symphony
jobs.
Configurability defaults
Arguments to use when creating the consumer
The default number of messages to prefetch
Acknowledge mode
The underlying Bunny::Queue this object manages
The -x-queue-type of the queue, if any
The Bunny::Consumer that is dispatching messages for the queue.
The tag to use when setting up consumer
The name of the queue
Whether or not to create a persistent queue
The maximum number of un-acked messages to prefetch
The Array of routing keys to use when binding the queue to the exchange
Fetch a Hash that stores per-thread AMQP objects.
# File lib/symphony/queue.rb, line 106
def self::amqp
@symphony ||= {}
return @symphony
end
Fetch the AMQP channel, creating one if necessary.
# File lib/symphony/queue.rb, line 113
def self::amqp_channel
unless self.amqp[:channel]
self.log.debug "Creating a new AMQP channel"
self.amqp_session.start
channel = self.amqp_session.create_channel
self.amqp[:channel] = channel
end
return self.amqp[:channel]
end
Fetch the configured AMQP exchange interface object.
# File lib/symphony/queue.rb, line 137
def self::amqp_exchange
unless self.amqp[:exchange]
self.log.info "Getting a reference to the %s topic exchange" % [ self.exchange ]
self.amqp[:exchange] = self.amqp_channel.topic( self.exchange, passive: true )
end
return self.amqp[:exchange]
end
Fetch the current session for AMQP connections.
# File lib/symphony/queue.rb, line 90
def self::amqp_session
unless @session
options = self.amqp_session_options
if self.broker_uri
self.log.info "Using the broker URI-style config"
@session = Bunny.new( self.broker_uri, options )
else
self.log.info "Using the options hash-style config"
@session = Bunny.new( options )
end
end
return @session
end
Fetch a Hash of AMQP options.
# File lib/symphony/queue.rb, line 71
def self::amqp_session_options
self.session_opts ||= self.defaults
opts = self.session_opts.merge({
logger: Loggability[ Bunny ],
})
opts[:heartbeat] = opts[:heartbeat].to_sym if opts[:heartbeat].is_a?( String )
return opts
end
Configurability API – install the 'symphony' section of the config when it's loaded.
# File lib/symphony/queue.rb, line 61
def self::configure( config=nil )
config = self.defaults.merge( config || {} )
self.broker_uri = config.delete( :broker_uri )
self.exchange = config.delete( :exchange )
self.session_opts = config
end
Create a new Queue
for the specified task_class
.
# File lib/symphony/queue.rb, line 152
def initialize( task_class )
@name = task_class.queue_name
@acknowledge = task_class.acknowledge
@consumer_tag = task_class.consumer_tag
@routing_keys = task_class.routing_keys
@prefetch = task_class.prefetch
@persistent = task_class.persistent
@always_rebind = task_class.always_rebind
@amqp_queue_type = task_class.queue_type
@amqp_queue = nil
@shutting_down = false
end
Clear any created Bunny objects
# File lib/symphony/queue.rb, line 83
def self::reset
@session = nil
self.amqp.clear
end
Close and remove the current AMQP channel, e.g., after an error.
# File lib/symphony/queue.rb, line 125
def self::reset_amqp_channel
if self.amqp[:channel]
self.log.info "Resetting AMQP channel."
self.amqp[:channel].close if self.amqp[:channel].open?
self.amqp.delete( :channel )
end
return self.amqp_channel
end
Signal a acknowledgement or rejection for a message.
# File lib/symphony/queue.rb, line 318
def ack_message( tag, success, try_again=true )
return unless self.acknowledge
channel = self.consumer.channel
if success
self.log.debug "ACKing message %s" % [ tag ]
channel.acknowledge( tag )
else
self.log.debug "NACKing message %s %s retry" % [ tag, try_again ? 'with' : 'without' ]
channel.reject( tag, try_again )
end
return success
end
Whether or not to re-bind the queue to the exchange during setup
# File lib/symphony/queue.rb, line 197
attr_predicate :always_rebind
The URL of the AMQP broker to connect to
# File lib/symphony/queue.rb, line 48
singleton_attr_accessor :broker_uri
Create the AMQP queue from the task class and bind it to the configured exchange.
# File lib/symphony/queue.rb, line 260
def create_amqp_queue( prefetch_count=DEFAULT_PREFETCH )
exchange = self.class.amqp_exchange
channel = self.class.amqp_channel
created_queue = false
queue = begin
existing_queue = channel.queue( self.name, passive: true )
channel.prefetch( prefetch_count )
self.log.info "Using pre-existing queue: %s" % [ self.name ]
existing_queue
rescue Bunny::NotFound => err
self.log.info "%s; creating a new queue instead." % [ err.message ]
created_queue = true
channel = self.class.reset_amqp_channel
channel.prefetch( prefetch_count )
arguments = { 'x-queue-type' => self.amqp_queue_type }.compact
channel.queue( self.name, auto_delete: !self.persistent, arguments: arguments )
end
if self.always_rebind? || created_queue
self.routing_keys.each do |key|
self.log.info " binding queue %s to the %s exchange with topic key: %s" %
[ self.name, exchange.name, key ]
queue.bind( exchange, routing_key: key )
end
end
return queue
end
Create the Bunny::Consumer that will dispatch messages from the broker.
# File lib/symphony/queue.rb, line 234
def create_consumer( amqp_queue, &work_callback )
ackmode = self.acknowledge
tag = self.consumer_tag
# Last argument is *no_ack*, so need to invert the logic
self.log.debug "Creating consumer for the '%s' queue with tag: %s" %
[ amqp_queue.name, tag ]
cons = Bunny::Consumer.new( amqp_queue.channel, amqp_queue, tag, !ackmode, false, CONSUMER_ARGS )
cons.on_delivery do |delivery_info, properties, payload|
self.handle_message( delivery_info, properties, payload, &work_callback )
self.log.debug "Done with message %s. Session is %s" %
[ delivery_info.delivery_tag, self.class.amqp_session.closed? ? "closed" : "open" ]
cons.cancel if self.shutting_down?
end
cons.on_cancellation do
self.log.warn "Consumer cancelled."
self.shutdown
end
return cons
end
The name of the exchang to bind queues to.
# File lib/symphony/queue.rb, line 52
singleton_attr_accessor :exchange
Syntactic sugar alias
# File lib/symphony/queue.rb, line 148
singleton_method_alias :for_task, :new
Forcefully halt the queue.
# File lib/symphony/queue.rb, line 343
def halt
self.shutting_down = true
self.consumer.channel.close
end
Handle each subscribed message.
# File lib/symphony/queue.rb, line 293
def handle_message( delivery_info, properties, payload, &work_callback )
metadata = {
delivery_info: delivery_info,
properties: properties,
content_type: properties[:content_type],
}
rval = work_callback.call( payload, metadata )
return self.ack_message( delivery_info.delivery_tag, rval )
# Re-raise errors from AMQP
rescue Bunny::Exception => err
self.log.error "%p while handling a message: %s %s" %
[ err.class, err.message, err.backtrace.first ]
self.log.debug " " + err.backtrace.join( "\n " )
raise
rescue => err
self.log.error "%p while handling a message: %s %s" %
[ err.class, err.message, err.backtrace.first ]
self.log.debug " " + err.backtrace.join( "\n " )
return self.ack_message( delivery_info.delivery_tag, false, false )
end
Return a human-readable representation of the Queue
in a form suitable for debugging.
# File lib/symphony/queue.rb, line 350
def inspect
return "#<%p:%#0x %s (%s) ack: %s, routing: %p, prefetch: %d>" % [
self.class,
self.object_id * 2,
self.name,
self.consumer_tag,
self.acknowledge ? "yes" : "no",
self.routing_keys,
self.prefetch,
]
end
The options to pass to Bunny when setting up the session
# File lib/symphony/queue.rb, line 56
singleton_attr_accessor :session_opts
Close the AMQP session associated with this queue.
# File lib/symphony/queue.rb, line 336
def shutdown
self.shutting_down = true
self.consumer.cancel if self.consumer
end
The flag for shutting the queue down.
# File lib/symphony/queue.rb, line 213
attr_predicate_accessor :shutting_down
The main work loop – subscribe to the message queue and yield the payload and associated metadata when one is received.
# File lib/symphony/queue.rb, line 218
def wait_for_message( only_one=false, &work_callback )
raise LocalJumpError, "no work_callback given" unless work_callback
session = self.class.amqp_session
self.shutting_down = only_one
amqp_queue = self.create_amqp_queue( only_one ? 1 : self.prefetch )
self.consumer = self.create_consumer( amqp_queue, &work_callback )
self.log.debug "Subscribing to queue with consumer: %p" % [ self.consumer ]
amqp_queue.subscribe_with( self.consumer, block: true )
amqp_queue.channel.close
session.close
end