Symphony
uses the RabbitMQ server for speed, redundancy, and high availability. Installing RabbitMQ is outside the scope of this document, please see their installation instructions.
Once installed and running, lets move on!
Here's a quick review on RabbitMQ vocabulary. It's helpful to have some fundamental understanding of AMQP concepts when using Symphony
, as a lot of advanced messaging options are available only via server-side configuration.
A logical container for exchanges and queues. You can have as many of these as you like for a RabbitMQ cluster, they are used to partition objects for security purposes.
An exchange receives incoming messages, and routes them to queues via rules defined in bindings.
Consumers receive messages from a queue, if their binding rules match.
A rule that links/routes messages from an exchange to a queue.
A period separated hierarchal string that specifies a category for a message. This is matched against existing bindings for incoming messages.
An exchange that receives messages that fail for any reason. Configuring a dead letter exchange is critical for robust error reporting.
Symphony
uses Configurability to determine connection criteria to the RabbitMQ server, which is stored in a YAML file.
An example configuration file for Symphony
looks as such, if you were connecting to a virtual host called '/test', wanted to log all messages to STDERR at a level of 'debug', and had an exchange called 'symphony' (the default):
amqp: broker_uri: amqp://USERNAME:PASSWORD@example.com:5672/%2Ftest exchange: symphony logging: symphony: debug STDERR (color)
Symphony
won't create the exchange for you, it is expected to already exist under the specified virtual host. (There are a lot of server-side exchange configuration options, and we don't make any assumptions on your behalf.) The only requirement is that it is a 'topic' exchange. If it is of a different kind ('fanout', 'direct', etc, it won't work!) It's also recommended to mark it 'durable', so you won't need to re-create it after any RabbitMQ service restarts.
Tasks inherit from the Symphony::Task
class. You can start an individual task directly via the 'symphony-task' binary, or as a pool of managed processes via the 'symphony' daemon, or by simply calling run on the class itself.
#!/usr/bin/env ruby require 'symphony' class Test < Symphony::Task # Process all events subscribe_to '#' def work( payload, metadata ) puts "I received an event: %p with a payload of %p" % [ metadata[ :delivery_info ][ :routing_key ], payload ] return true end end Symphony.load_config( 'etc/config.yml' ) Test.run
The only requirement is the 'subscribe_to' clause, which indicates what topics the queue will receive from the exchange. Subscribing to '#' means this task will receive anything and everything that is published. In practice, you'll likely want to be more discerning. You can specify as many comma separated topics as you like for a given task, as well as use AMQP wildcard characters.
# Subscribe to hypothetical creation events for all types, # and deletion events for users. # subscribe_to '#.create, 'users.delete'
In this fashion, you can decide to have many separate (and optionally distributed) tasks that only respond to a small subset of possible events, or more monolithic tasks that can respond to a variety of event topics.
Because AMQP manages the messages that the bound consumers receive, starting up multiple copies of this Test task (across any number of machines) will automatically cause published events to be received in a round-robin fashion without any additional effort. All running task workers will receive roughly equal numbers of matching events.
By default, a task will automatically create an auto-delete queue for itself with a sane (but configurable) name, along with subscription bindings, ONLY if the queue didn't previously exist. The auto-delete flag ensures that when the last worker disconnects, the queue is automatically removed from the AMQP broker, setting everything back to the state it was before a Symphony
worker ever connected.
Along the same logic as the initial exchange creation, if a matching queue exists already for a task name (an automatically generated or manually specified name with the 'queue_name' option), then a binding is NOT created by default. Symphony
defaults to being “hands off” for anything server side that already exists, so you can enforce specific behaviors, and know that Symphony
won't fiddle with them.
This can be confusing if you've created a queue manually for a task, and didn't also manually create a binding for the topic key. There are some advanced routing cases where you'll want to set up queues yourself, rather than let Symphony
automatically create and remove them, or rebind existing queues on startup; we'll talk more on that later.
By default, a Task is configured to tell AMQP when it has completed its job. It does this by returning true
from the work method. When AMQP sees the job was completed, the message is considered “delivered”, and its lifetime is at an end. If instead, the task returns an explicit false
, the message is retried, potentially on a different task worker.
If something within the task raises an exception, the default behavior is to permanently abort the task. If you need different behavior, you'll need to catch the exception. As the task author, you're in the best position to know if a failure state requires an immediate retry, or a permanent rejection. If you'll be allowing message retries, you might also want to consider publishing messages with a maximum TTL, to avoid any situations that could cause jobs to loop infinitely. (Default Max-TTL is an example of something that is settable when creating queues on the RabbitMQ server manually.)
If you don't require retry or reject behaviors on task error states, you can set 'acknowledge' to false
. With this setting, the AMQP server considers a message as “delivered” the moment a consumer receives it. This can be useful for additional speed when processing non-important events.
acknowledge false # (default: true)
By default, a task signals that is ready for more messages as soon as it finishes processing one. This isn't always the optimal environment for long running processes. The work you want to accomplish might require heavy computing resources, and you might want to do things like relinquish memory in between messages, or disconnect from network resources (databases, etc.)
Settings your work_model to 'oneshot' causes the task to exit immediately after performing its work. Clearly, this only makes sense if you're running managed processes under the Symphony
daemon, so a fresh process can take its place. You can do all of your expensive work within the work method, leaving the main “waiting for messages” loop as low-resource as possible.
work_model :oneshot # (default: :longlived)
Prefetch is another tuning for speed on extremely busy exchanges. If a task worker sees there are additional pending messages, it can concurrently retrieve and store them locally in memory while performing current work. This can reduce consumer/server network traffic, but the trade off is that all prefetched message payloads are stored in memory until they are processed. It's a good idea to keep this low if your payloads are large.
prefetch 100 # (default: 10)
Message prefetch is ignored (automatically set to 1) if the task's work_model is set to oneshot.
The timeout option specifies the maximum length of time (in seconds) a task can be within its work method, and what action to take if it exceeds that timeframe. Please note, this is different from the Max-TTL setting for AMQP, which dictates the maximum timeframe a message can exist in a queue.
AMQP can't know if a task is in ruby loop or otherwise unfortunate state, so this can ensure workers won't ever get permanently stuck. There is no default timeout. If one is set, the default action is to act as an exception, which is a permanent rejection. You can choose to retry instead:
# perform work for maximum 20 seconds, then stop and try # again on another worker # timeout 20.0, :action => :retry
By default, Symphony
will try and create a queue based on the Class name of the task. You can override this per-task.
class Test < Symphony::Task # Process all events subscribe_to '#' # I don't want to connect to a 'test' queue, lets make it interesting: queue_name 'shazzzaaaam' def work( payload, metadata ) ...
If you'd rather keep the Symphony
queues around even when the workers aren't running (to, for example, queue up events for processing when they resume), you can tell a task to create a persistent queue instead of an auto-delete queue:
persistent true
If you want Symphony
to re-bind a worker's queue when it starts up, you call tell it to with always_rebind
:
always_rebind true
Note that re-binding doesn't unbind from the existing pattern/s, so you'll need to account for this in the Task's work
method.
If you're running a version of RabbitMQ that supports the `x-queue-type` argument (e.g., for quorum queues), you can specify a value for it using the `queue_type` declaration:
queue_type 'quorum'
Note that several other queue settings might not work with some kinds of queues; the validity of the combination of the task's settings are left to the implementor.
Plugins change or enhance the behavior of a Symphony::Task
.
The metrics plugin provides periodic information about the performance of a task worker to the log. It shows processed message averages and resource consumption summaries at the “info” log level, and also changes the process name to display a total count and jobs per second rate.
require 'symphony/metrics' class Test < Symphony::Task prepend Symphony::Metrics # ... end
The routing plugin removes the requirement to perform all processing in the work method, and instead links individual units of work to separate on declarations. This makes tasks that are designed to receive multiple message topics much easier to maintain and test.
require 'symphony/routing' class Test < Symphony::Task include Symphony::Routing on 'users.create', 'workstation.create' do |payload, metadata| puts "A user or workstation wants to be created!" # ... return true end on 'users.delete' do |payload, metadata| puts "A user wants to be deleted!" return true end end
The on method accepts the same syntax as the 'subscribe_to' option. In fact, if you're using the routing plugin, it removes the requirement of 'subscribe_to' altogether, making it entirely redundant. on blocks that match multiple times for a message will be executed in top down order. It will only return true (signalling success) if all blocks return true.
This plugin enables always_rebind
on your task when it's included so that adding a routing pattern will be reflected in the bindings of persistent queues.
Because AMQP is language agnostic and a message can have many unique flags attached to it, this is another area where Symphony
remains neutral, and imposes nothing.
If you want to publish or republish from within Symphony
, you can get a reference to the exchange object Symphony
is binding to via the Symphony::Queue
class:
exchange = Symphony::Queue.amqp_exchange
This is a Bunny object that you can interact with directly. See the Bunny documentation for options.
When publishing, include a message expiration. (Again, this has a different goal than the 'timeout' option for the Symphony::Task
class, which can dislodge a potentially stuck worker.)
With an expiration, a task that retries will eventually meet it's maximum lifetime, and AMQP will stop delivering it to consumers. If there is a dead letter queue configured, it will place the message there for later inspection.
Because messaging is designed to be asynchronous, you won't receive instantaneous results when pushing an event into the void. RabbitMQ has the concept of a 'dead letter queue', which receives events that have failed due to expiration or rejection. They contain the original route information and the original payload.
RabbitMQ also permits setting 'policies', which apply default settings to any created queue under a virtual host. We've been creating an automatic policy that links any queue that doesn't start with '_' to a dead letter queue called '_failures', so all errors filter there without any further configuration. New queues have the same policy applied automatically.
% rabbitmqctl list_policies Listing policies ... / DLX queues ^[^_].* {"dead-letter-exchange":"_failures"} 0
There is an example task (symphony/lib/tasks/failure_logger.rb) that you're welcome to use as a foundation for your error reporting. What you do with the errors is up to you – the failure_logger example assumes it is binding to a dead letter queue, and it simply logs to screen.
Like good error reporting, this requires server-side configuration to get rolling.
If messages are published when no queues are bound and available to receive them, AMQP drops the message immediately. (If the publisher sets the 'mandatory' flag, they'll receive an error and can act accordingly.)
Instead of setting 'mandatory', you may want to have AMQP accept the message, and save it for task worker to consume at a later time. To do this, you just need to manually create the queue, and the bindings to it from the exchange (you'll probably want these marked as 'durable' as well, to survive RabbitMQ service restarts.)
With a binding in place, messages will be retained on the RabbitMQ server until a consumer drains them. If the messages are published with the 'persistent' flag, they'll also survive RabbitMQ server restarts.
With this setup, Symphony
won't create or modify any queues. It will just attach, start receiving events, and get to work.