Configuring Static Shovels
Overview
This guide focuses on statically configured shovels. It assumes familiarity with the key concepts behind the Shovel plugin.
Unlike with dynamic shovels, static shovels are configured using the advanced configuration file. They are started on node boot and are primarily useful for permanently running workloads. Any changes to static shovel configuration would require a node restart, which makes them highly inflexible.
Most users should prefer dynamic shovels to static ones for their flexibility and ease of automation. Generating a dynamic shovel definition (a JSON document) is usually easier compared to a static shovel definition (which uses Erlang terms).
Configuration
The configuration for the Shovel plugin must be defined in the advanced configuration file.
It consists of a single shovels
clause that lists the shovels that should
be started on node boot:
{rabbit, [
%% ...
]},
{rabbitmq_shovel, [
{shovels, [
{shovel_one, [
%% shovel_one properties ...
]},
%% ...
]}
]}
A (deliberately verbose) example configuration can be found below.
Each element of the shovels
clause is a named static shovel.
The names in the list must be distinct.
A shovel definition looks like this at the top level:
{<em>shovel_name</em>, [
{source, [ <em>...protocol specific config...</em> ]},
{destination, [ <em>...protocol specific config...</em> ]},
{ack_mode, <em>a_mode</em>},
{reconnect_delay, <em>reconn_delay</em>}
]}
where shovel_name
is the name of the
shovel (an Erlang atom). The name should be enclosed in single quotes ('
) if they do
not begin with a lower-case letter or if they contain
other characters than alphanumeric characters, underscore
(_
), or @
.
The Source and The Destination
A shovel transfers messages from a source to a destination.
The source
and destination
keys are mandatory and contain nested protocol-specific keys. Currently AMQP 0.9.1 and AMQP 1.0 are two supported protocols.
Source and destination do not have to use the same protocol.
All the other properties are optional.
source
is a mandatory key and has different keys properties
for different protocols. Two properties are common across all
protocols: protocol
and uris
.
protocol
supports two values: amqp091
and amqp10
,
for AMQP 0-9-1 and AMQP 1.0, respectively:
%% for AMQP 0-9-1
{protocol, amqp091}
uris
is a list of AMQP connection URIs:
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.
General Source Keys
Some keys are supported by both AMQP 0-9-1 and AMQP 1.0 sources. They are described in the table below.
Key | Description |
reconnect-delay | The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1.
would delay for five seconds before reconnecting after failure. Value of |
ack-mode | Determines how the shovel should acknowledge consumed messages.
If set to If set to If set to |
AMQP 0-9-1 Source Keys
AMQP 0-9-1-specific source keys are covered in a separate table:
Key | Description |
declarations | An optional list of AMQP 0-9-1 operations to be executed by the Shovel before it starts transferring messages. They are typically used to set up the topology.
The declarations follow method and property names used by the RabbitMQ Erlang Client. A minimalistic declaration example:
will first declare an anonymous queue, and then bind it
to the exchange called Each element of the declaration list is either an AMQP 0-9-1 method
given as single quoted atom such as If just the method name is used all the
parameters take their defaults (as illustrated with
If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly. Here is another example:
will declare a durable, direct exchange called
" |
queue | The name of the source queue as an Erlang binary value. This property is mandatory:
This queue must exist. Use the resource |
prefetch-count | The maximum number of unacknowledged messages copied over a shovel at
any one time. Default is
|
AMQP 1.0 Source Keys
AMQP 1.0 source settings are different from those of AMQP 0-9-1 sources.
Key | Description |
source_address | This represents the source address of the AMQP 1.0 link. This key is mandatory:
|
prefetch-count | This optional key sets the link credit amount that will be granted to the receiving link. The credit will be automatically renewed when it falls below a 10th of this value. The default is 1000. It takes the form
|
Destination
destination
is a mandatory key and has different keys properties
for different protocols. Two properties are common across all
protocols: protocol
and uris
.
protocol
supports two values: amqp091
and amqp10
,
for AMQP 0-9-1 and AMQP 1.0, respectively:
%% for AMQP 0-9-1
{protocol, amqp091}
uris
is a list of AMQP connection URIs:
{uris, [
"amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"
]}
The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.
General Destination Keys
Key | Description |
reconnect-delay | The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1.
would delay for five seconds before reconnecting after failure. Value of |
AMQP 0-9-1 Destination Keys
Key | Description |
publish_properties | This optional key controls message properties set or overridden by the shovel. It takes the following form
where the properties in the list are set on the basic properties of each message before it is re-published. This specific example would mark all re-published messages as persistent:
By default the original properties of the message are preserved, but this clause can be used to change or set any known property:
|
publish_fields | This optional key is similar to
where the properties in the list are used to set the
fields on the By default the messages are re-published using the original exchange name and routing key. By specifying
messages would be re-published to an explicit exchange name with an explicit, fixed routing key. |
add_timestamp_header | This boolean key controls whether a custom header,
This header value is timestamp (in seconds since epoch) when message had been shovelled. By default the header is not added. |
add_forward_headers | When set to true the shovel will add a number of custom message headers:
|
AMQP 1.0 Destination Keys
Key | Description |
target_address | This represents the target address of the sending AMQP 1.0 link:
|
properties | This optional key controls what additional properties will be added when re-publishing messages. It takes the form of
The available keys include
|
application_properties | This optional key declares any additional application properties to be added when re-publishing a message. It takes the form of
Keys and values should be binary strings as in the example below. |
add_timestamp_header | This boolean key controls whether a
This value is timestamp (in seconds since epoch) when message had been shovelled. By default the property is not set. |
add_forward_headers | When set to true the shovel will add application properties for the
following keys:
|
Example Configuration
A reasonably complete static shovel configuration between AMQP 0.9.1 endpoints might look like this:
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp091},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost" ]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable
]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[ {exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
The configuration above defines a single shovel called
'my_first_shovel'
.
'my_first_shovel'
will connect to a broker on
either host1
or host2
(as source), and
directly to the local broker as destination. It will reconnect
to the other source broker on failure, after a delay of 5
seconds.
When connected to the source it will declare a direct, fanout exchange
called "my_fanout"
, an anonymous queue with a per-queue message ttl,
and bind the queue to the exchange.
When connected to the destination (the local broker) it will declare a
durable, direct exchange called "my_direct"
.
This shovel will re-publish messages sent to the anonymous queue on the
source to the local exchange with the fixed routing key
"from_shovel"
. The messages will be persistent and only
acknowledged after receiving a publish confirm from the local broker.
The shovel consumer will not get more deliveries if there are at least ten unacknowledged messages at any moment in time.
Example Configuration (1.0 Source - 0.9.1 Destination)
A reasonably complete shovel configuration between an AMQP 1.0 source and an AMQP 0.9.1 destination might look like this:
{rabbitmq_shovel,
[ {shovels, [ {my_first_shovel,
[ {source,
[ {protocol, amqp10},
{uris, [ "amqp://fred:secret@host1.domain/my_vhost",
]},
{source_address, <<"my-source">>},
{prefetch_count, 10}
]},
{destination,
[ {protocol, amqp091},
{uris, ["amqp://"]},
{declarations, [ {'exchange.declare',
[ {exchange, <<"my_direct">>},
{type, <<"direct">>},
durable
]}
]},
{publish_properties, [ {delivery_mode, 2} ]},
{add_forward_headers, true},
{publish_fields, [ {exchange, <<"my_direct">>},
{routing_key, <<"from_shovel">>}
]}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
Example Configuration (0.9.1 Source — 1.0 Destination)
A more extensive shovel configuration between an AMQP 0.9.1 Source and an AMQP 1.0 destination might look like this:
{rabbitmq_shovel,
[{shovels, [{my_first_shovel,
{source,
[{protocol, amqp091},
{uris, ["amqp://fred:secret@host1.domain/my_vhost",
"amqp://john:secret@host2.domain/my_vhost"]},
{declarations, [{'exchange.declare',
[{exchange, <<"my_fanout">>},
{type, <<"fanout">>},
durable]},
{'queue.declare',
[{arguments,
[{<<"x-message-ttl">>, long, 60000}]}]},
{'queue.bind',
[{exchange, <<"my_fanout">>},
{queue, <<>>}
]}
]},
{queue, <<>>},
{prefetch_count, 10}
]},
{destination,
[{protocol, amqp10},
%% Note: for plain text SASL authentication, use
% {uris, ["amqp://user:pass@host:5672?sasl=plain"]},
%% Note: this relies on default user credentials
%% which has remote access restrictions, see
%% ./access-control to learn more
{uris, ["amqp://host:5672"]},
{properties, [{user_id, <<"my-user">>}]},
{application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]},
{add_forward_headers, true},
{target_address, <<"destination-queue">>}
]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
}]}
]}
}