Queue

class
Superclass
Object

An object class that encapsulates queueing logic for Symphony jobs.

Constants

CONFIG_DEFAULTS

Configurability defaults

CONSUMER_ARGS

Arguments to use when creating the consumer

DEFAULT_PREFETCH

The default number of messages to prefetch

Attributes

acknowledge[R]

Acknowledge mode

amqp_queue[R]

The underlying Bunny::Queue this object manages

amqp_queue_type[R]

The -x-queue-type of the queue, if any

consumer[RW]

The Bunny::Consumer that is dispatching messages for the queue.

consumer_tag[R]

The tag to use when setting up consumer

name[R]

The name of the queue

persistent[R]

Whether or not to create a persistent queue

prefetch[R]

The maximum number of un-acked messages to prefetch

routing_keys[R]

The Array of routing keys to use when binding the queue to the exchange

Public Class Methods

anchor
amqp()

Fetch a Hash that stores per-thread AMQP objects.

# File lib/symphony/queue.rb, line 106
def self::amqp
        @symphony ||= {}
        return @symphony
end
anchor
amqp_channel()

Fetch the AMQP channel, creating one if necessary.

# File lib/symphony/queue.rb, line 113
def self::amqp_channel
        unless self.amqp[:channel]
                self.log.debug "Creating a new AMQP channel"
                self.amqp_session.start
                channel = self.amqp_session.create_channel
                self.amqp[:channel] = channel
        end
        return self.amqp[:channel]
end
anchor
amqp_exchange()

Fetch the configured AMQP exchange interface object.

# File lib/symphony/queue.rb, line 137
def self::amqp_exchange
        unless self.amqp[:exchange]
                self.log.info "Getting a reference to the %s topic exchange" % [ self.exchange ]
                self.amqp[:exchange] = self.amqp_channel.topic( self.exchange, passive: true )
        end
        return self.amqp[:exchange]
end
anchor
amqp_session()

Fetch the current session for AMQP connections.

# File lib/symphony/queue.rb, line 90
def self::amqp_session
        unless @session
                options = self.amqp_session_options
                if self.broker_uri
                        self.log.info "Using the broker URI-style config"
                        @session = Bunny.new( self.broker_uri, options )
                else
                        self.log.info "Using the options hash-style config"
                        @session = Bunny.new( options )
                end
        end
        return @session
end
anchor
amqp_session_options()

Fetch a Hash of AMQP options.

# File lib/symphony/queue.rb, line 71
def self::amqp_session_options
        self.session_opts ||= self.defaults
        opts = self.session_opts.merge({
                logger: Loggability[ Bunny ],
        })
        opts[:heartbeat] = opts[:heartbeat].to_sym if opts[:heartbeat].is_a?( String )

        return opts
end
anchor
configure( config=nil )

Configurability API – install the 'symphony' section of the config when it's loaded.

# File lib/symphony/queue.rb, line 61
def self::configure( config=nil )
        config = self.defaults.merge( config || {} )

        self.broker_uri   = config.delete( :broker_uri )
        self.exchange     = config.delete( :exchange )
        self.session_opts = config
end
anchor
new( task_class )

Create a new Queue for the specified task_class.

# File lib/symphony/queue.rb, line 152
def initialize( task_class )
        @name            = task_class.queue_name
        @acknowledge     = task_class.acknowledge
        @consumer_tag    = task_class.consumer_tag
        @routing_keys    = task_class.routing_keys
        @prefetch        = task_class.prefetch
        @persistent      = task_class.persistent
        @always_rebind   = task_class.always_rebind
        @amqp_queue_type = task_class.queue_type

        @amqp_queue    = nil
        @shutting_down = false
end
anchor
reset()

Clear any created Bunny objects

# File lib/symphony/queue.rb, line 83
def self::reset
        @session = nil
        self.amqp.clear
end
anchor
reset_amqp_channel()

Close and remove the current AMQP channel, e.g., after an error.

# File lib/symphony/queue.rb, line 125
def self::reset_amqp_channel
        if self.amqp[:channel]
                self.log.info "Resetting AMQP channel."
                self.amqp[:channel].close if self.amqp[:channel].open?
                self.amqp.delete( :channel )
        end

        return self.amqp_channel
end

Public Instance Methods

anchor
ack_message( tag, success, try_again=true )

Signal a acknowledgement or rejection for a message.

# File lib/symphony/queue.rb, line 318
def ack_message( tag, success, try_again=true )
        return unless self.acknowledge

        channel = self.consumer.channel

        if success
                self.log.debug "ACKing message %s" % [ tag ]
                channel.acknowledge( tag )
        else
                self.log.debug "NACKing message %s %s retry" % [ tag, try_again ? 'with' : 'without' ]
                channel.reject( tag, try_again )
        end

        return success
end
anchor
always_rebind()

Whether or not to re-bind the queue to the exchange during setup

# File lib/symphony/queue.rb, line 197
attr_predicate :always_rebind
anchor
broker_uri()

The URL of the AMQP broker to connect to

# File lib/symphony/queue.rb, line 48
singleton_attr_accessor :broker_uri
anchor
create_amqp_queue( prefetch_count=DEFAULT_PREFETCH )

Create the AMQP queue from the task class and bind it to the configured exchange.

# File lib/symphony/queue.rb, line 260
def create_amqp_queue( prefetch_count=DEFAULT_PREFETCH )
        exchange = self.class.amqp_exchange
        channel = self.class.amqp_channel
        created_queue = false

        queue = begin
                existing_queue = channel.queue( self.name, passive: true )
                channel.prefetch( prefetch_count )
                self.log.info "Using pre-existing queue: %s" % [ self.name ]
                existing_queue
        rescue Bunny::NotFound => err
                self.log.info "%s; creating a new queue instead." % [ err.message ]
                created_queue = true
                channel = self.class.reset_amqp_channel
                channel.prefetch( prefetch_count )

                arguments = { 'x-queue-type' => self.amqp_queue_type }.compact
                channel.queue( self.name, auto_delete: !self.persistent, arguments: arguments )
        end

        if self.always_rebind? || created_queue
                self.routing_keys.each do |key|
                        self.log.info "  binding queue %s to the %s exchange with topic key: %s" %
                                [ self.name, exchange.name, key ]
                        queue.bind( exchange, routing_key: key )
                end
        end

        return queue
end
anchor
create_consumer( amqp_queue, &work_callback )

Create the Bunny::Consumer that will dispatch messages from the broker.

# File lib/symphony/queue.rb, line 234
def create_consumer( amqp_queue, &work_callback )
        ackmode = self.acknowledge
        tag     = self.consumer_tag

        # Last argument is *no_ack*, so need to invert the logic
        self.log.debug "Creating consumer for the '%s' queue with tag: %s" %
                [ amqp_queue.name, tag ]
        cons = Bunny::Consumer.new( amqp_queue.channel, amqp_queue, tag, !ackmode, false, CONSUMER_ARGS )

        cons.on_delivery do |delivery_info, properties, payload|
                self.handle_message( delivery_info, properties, payload, &work_callback )
                self.log.debug "Done with message %s. Session is %s" %
                                [ delivery_info.delivery_tag, self.class.amqp_session.closed? ? "closed" : "open" ]
                cons.cancel if self.shutting_down?
        end

        cons.on_cancellation do
                self.log.warn "Consumer cancelled."
                self.shutdown
        end

        return cons
end
anchor
exchange()

The name of the exchang to bind queues to.

# File lib/symphony/queue.rb, line 52
singleton_attr_accessor :exchange
anchor
for_task()

Syntactic sugar alias

# File lib/symphony/queue.rb, line 148
singleton_method_alias :for_task, :new
anchor
halt()

Forcefully halt the queue.

# File lib/symphony/queue.rb, line 343
def halt
        self.shutting_down = true
        self.consumer.channel.close
end
anchor
handle_message( delivery_info, properties, payload, &work_callback )

Handle each subscribed message.

# File lib/symphony/queue.rb, line 293
def handle_message( delivery_info, properties, payload, &work_callback )
        metadata = {
                delivery_info: delivery_info,
                properties: properties,
                content_type: properties[:content_type],
        }
        rval = work_callback.call( payload, metadata )
        return self.ack_message( delivery_info.delivery_tag, rval )

# Re-raise errors from AMQP
rescue Bunny::Exception => err
        self.log.error "%p while handling a message: %s %s" %
                [ err.class, err.message, err.backtrace.first ]
        self.log.debug "  " + err.backtrace.join( "\n  " )
        raise

rescue => err
        self.log.error "%p while handling a message: %s %s" %
                [ err.class, err.message, err.backtrace.first ]
        self.log.debug "  " + err.backtrace.join( "\n  " )
        return self.ack_message( delivery_info.delivery_tag, false, false )
end
anchor
inspect()

Return a human-readable representation of the Queue in a form suitable for debugging.

# File lib/symphony/queue.rb, line 350
def inspect
        return "#<%p:%#0x %s (%s) ack: %s, routing: %p, prefetch: %d>" % [
                self.class,
                self.object_id * 2,
                self.name,
                self.consumer_tag,
                self.acknowledge ? "yes" : "no",
                self.routing_keys,
                self.prefetch,
        ]
end
anchor
session_opts()

The options to pass to Bunny when setting up the session

# File lib/symphony/queue.rb, line 56
singleton_attr_accessor :session_opts
anchor
shutdown()

Close the AMQP session associated with this queue.

# File lib/symphony/queue.rb, line 336
def shutdown
        self.shutting_down = true
        self.consumer.cancel if self.consumer
end
anchor
shutting_down()

The flag for shutting the queue down.

# File lib/symphony/queue.rb, line 213
attr_predicate_accessor :shutting_down
anchor
wait_for_message( only_one=false, &work_callback )

The main work loop – subscribe to the message queue and yield the payload and associated metadata when one is received.

# File lib/symphony/queue.rb, line 218
def wait_for_message( only_one=false, &work_callback )
        raise LocalJumpError, "no work_callback given" unless work_callback
        session = self.class.amqp_session

        self.shutting_down = only_one
        amqp_queue = self.create_amqp_queue( only_one ? 1 : self.prefetch )
        self.consumer = self.create_consumer( amqp_queue, &work_callback )

        self.log.debug "Subscribing to queue with consumer: %p" % [ self.consumer ]
        amqp_queue.subscribe_with( self.consumer, block: true )
        amqp_queue.channel.close
        session.close
end