Configure message queues

The message queue topology is a Magento Open Source feature. It can be included as part of Magento Open Source installation, or you can add it to existing modules.

Overview

Configuring the message queue topology involves creating and modifying the following configuration files in the <module>/etc directory:

  • queue.xml - Defines brokers that processes topics. Use for db (MySQL) connections only. Do not create this file for amqp (RabbitMQ) connections.
  • communication.xml - Defines aspects of the message queue system that all communication types have in common.
  • queue_consumer.xml - Defines the relationship between an existing queue and its consumer.
  • queue_topology.xml - Defines the message routing rules and declares queues and exchanges.
  • queue_publisher.xml - Defines the exchange where a topic is published.

Use Cases

Depending on your needs, you may only need to create and configure communication.xml and one or two of these files.

  • If you only want to publish to an existing queue created by a 3rd party system, you will only need the queue_publisher.xml file.
  • If you only want to consume from an existing queue, you will only need the queue_consumer.xml config file.
  • In cases where you want to configure the local queue and publish to it for 3rd party systems to consume, you will need the queue_publisher.xml and queue_topology.xml files.
  • When you want to configure the local queue and consume messages published by 3rd party system, you will need the queue_topology.xml and queue_consumer.xml files.

queue.xml

The queue.xml file defines the broker that processes topics. It also specifies the queue each topic will be sent to. Do not create this file for RabbitMQ connections.

Sample queue.xml file

1
2
3
4
5
6
7
8
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
    <broker topic="product_action_attribute.update" exchange="magento-db" type="db">
        <queue name="product_action_attribute.update"
               consumer="product_action_attribute.update"
               consumerInstance="Magento\Framework\MessageQueue\Consumer"
               handler="Magento\Catalog\Model\Attribute\Backend\Consumer::process"/>
    </broker>
</config>

broker element

The broker element also contains queue elements.

Parameter Description
topic A topic defined in the communication.xml file.
type The type of message broker. The value must be db.
exchange The name of the exchange to publish to. The default system exchange name is magento.

queue element

The queue element defines the module’s queues.

Parameter Description
name (required) Defines the queue name to send the message to.
consumer (required) The name of the consumer.
consumerInstance The path to a Magento class that consumes the message.
handler Specifies the class and method that processes the message. The value must be specified in the format <Vendor>\Module\<ServiceName>::<methodName>.
maxMessages Specifies the maximum number of messages to consume.

communication.xml

The <module>/etc/communication.xml file defines aspects of the message queue system that all communication types have in common. This release supports AMQP and database connections.

Sample communication.xml file

The following sample defines two synchronous topics. The first topic is for RPC calls. The second uses a custom service interface.

1
2
3
4
5
6
7
8
9
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
  <topic name="synchronous.rpc.test" request="string" response="string">
    <handler name="processRpcRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
  </topic>
  <topic name="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" schema="Magento\TestModuleSynchronousAmqp\Api\ServiceInterface::execute">
    <handler name="processRemoteRequest" type="Magento\TestModuleSynchronousAmqp\Model\RpcRequestHandler" method="process"/>
  </topic>
</config>

topic element

Topic configuration is flexible in that you can switch the transport layer for topics at deployment time. These values can be overwritten in the env.php file.

The name parameter is required. The topic definition must include either a request or a schema. Use schema if you want to implement a custom service interface. Otherwise, specify request. If request is specified, then also specify response if the topic is synchronous.

Parameter Description
name A string that uniquely identifies the topic. A topic name should be a series of strings that are separated by periods. The leftmost string should be the most general, and each string afterward should narrow the scope. For example, to describe actions for tending to pets, you might create names such as cat.white.feed and dog.retriever.walk. Wildcards are not supported in the communication.xml file.
request Specifies the data type of the topic.
response Specifies the format of the response. This parameter is required if you are defining a synchronous topic. Omit this parameter if you are defining an asynchronous topic.
schema The interface that describes the structure of the message. The format must be <module>\Api\<ServiceName>::<methodName>.

handler element

The handler element specifies the class where the logic for handling messages exists and the method it executes.

Parameter Description
name A string that uniquely defines the handler. The name can be derived from the topic name if the handler is specific to the topic. If the handler provides more generic capabilities, name the handler so that it describes those capabilities.
type The class or interface that defines the handler.
method The method this handler executes.
disabled Determines whether this handler is disabled. The default value is false.

queue_consumer.xml

The queue_consumer.xml file contains one or more consumer elements:

Example queue_consumer file

1
2
3
4
5
6
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="basic.consumer" queue="basic.consumer.queue" handler="LoggerClass::log"/>
    <consumer name="synchronous.rpc.test" queue="synchronous.rpc.test.queue" handler="LoggerClass::log"/>
    <consumer name="rpc.test" queue="queue.for.rpc.test.unused.queue" consumerInstance="Magento\Framework\MessageQueue\BatchConsumer" connection="amqp"/>
</config>

consumer element

Attribute Description
name (required) The name of the consumer.
queue (required) Defines the queue name to send the message to.
handler Specifies the class and method that processes the message. The value must be specified in the format <Vendor>\Module\<ServiceName>::<methodName>.
consumerInstance The Magento class name that consumes the message
connection For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db.
maxMessages Specifies the maximum number of messages to consume.

Consumer handlers

A handler is a class and method that processes a message. Magento has two ways to define a handler for messages.

  • In the <handler> element of the module’s communication.xml file
  • In the handler attribute of the module’s queue_consumer.xml file

The following conditions determine how these handlers are processed:

  • If the consumer in queue_consumer.xml does not have a consumerInstance defined, then the system uses the default consumer: Magento\Framework\MessageQueue\Consumer. In this case, if the <consumer> element contains the handler attribute, then it will be used, and the <handler> element in communication.xml will be ignored.
  • If the consumer in queue_consumer.xml has a consumerInstance defined, then the specific consumer implementation defines how the handler is used.

Magento provides these consumers out-of-the-box:

Class name Handler in communication.xml will be executed? Handler in queue_consumer.xml will be executed?
Magento\Framework\MessageQueue\Consumer Only if not defined in queue_consumer.xml Yes, if exists
Magento\Framework\MessageQueue\BatchConsumer Only if not defined in queue_consumer.xml Yes, if exists
Magento\AsynchronousOperations\Model\MassConsumer Yes, if exists Yes, if exists

queue_topology.xml

The queue_topology.xml file defines the message routing rules and declares queues and exchanges. It contains the following elements:

  • exchange
  • exchange/binding (optional)
  • exchange/arguments (optional)
  • exchange/binding/arguments (optional)

Example queue_topology.xml file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
  <exchange name="magento-topic-based-exchange1" type="topic" connection="db">
    <binding id="topicBasedRouting2" topic="anotherTopic" destinationType="queue" destination="topic-queue1">
        <arguments>
            <!--Not part of our use case, but will be processed if someone specifies them-->
            <argument name="argument1" xsi:type="string">value</argument>
        </arguments>
    </binding>
    <arguments>
        <argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
    </arguments>
  </exchange>
  <exchange name="magento-topic-based-exchange2" type="topic" connection="db">
    <binding id="topicBasedRouting1" topic="#" destinationType="queue" destination="topic-queue2"/>
    <arguments>
      <argument name="alternate-exchange" xsi:type="string">magento-log-exchange</argument>
    </arguments>
  </exchange>
</config>

exchange element

Attribute Description
name (required) A unique ID for the exchange.
type (required) Specifies the type of exchange. Must be topic.
connection (required) For AMQP connections, a string that identifies the connection. For MySQL connections, the connection name must be db.
durable Boolean value indicating whether the exchange is persistent. Non-durable exchanges are purged when the server restarts. The default is true.
autoDelete Boolean value indicating whether the exchange is deleted when all queues have finished using it. The default is false.
internal Boolean value. If set to true, the exchange may not be used directly by publishers, but only when bound to other exchanges. The default is false.

binding element

The binding element is a subnode of the exchange element.

Attribute Description
id (required) A unique ID for this binding.
topic (required) The name of a topic. You can specify an asterisk (*) or pound sign (#) as wildcards. These are described below the table.
destinationType (required) Must be queue.
destination (required) Identifies the name of a queue.
disabled Determines whether this binding is disabled. The default value is false.

Example topic names that include wildcards:

Pattern Description Example matching topics Example non-matching topics
*.*.* Matches any topic that contains exactly two periods. mytopic.createOrder.success, mytopic.updatePrice.item1 mytopic.createOrder, mytopic.createOrder.success.true
# Matches any topic name. mytopic, mytopic.createOrder.success, this.is.a.long.topic.name Not applicable
mytopic.# Matches any topic name that begins with mytopic and has a period afterward. mytopic.success, mytopic.createOrder.error new.mytopic.success,
*.Order.# There must be one string before .Order. There can be any number of strings (including 0) after that. mytopic.Order, mytopic.Order.Create, newtopic.Order.delete.success  

arguments element

The arguments element is an optional element that contains one or more argument elements. These arguments define key/value pairs that are passed to the broker for processing.

Each argument definition must have the following parameters:

Attribute Description
name The parameter name
type The data type of the value

The following illustrates an arguments block:

1
2
3
4
<arguments>
    <argument name="warehouseId" xsi:type="int">1</argument>
    <argument name="carrierName" xsi:type="string">USPS</argument>
</arguments>

queue_publisher.xml

The queue_publisher.xml file defines which connection and exchange to use to publish messages for a specific topic. It contains the following elements:

Example queue_publisher.xml file

1
2
3
4
5
6
7
8
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="magento.testModuleSynchronousAmqp.api.serviceInterface.execute" disabled="true" />
    <publisher topic="asynchronous.test">
        <connection name="amqp" exchange="magento" disabled="false"/>
        <connection name="db" exchange="exch1" disabled="true"/>
    </publisher>
</config>

publisher element

Attribute Description
topic (required) The name of the topic.
disabled Determines whether this queue is disabled. The default value is false.

connection element

The connection element is a subnode of the publisher element. There must not be more than one enabled active connection to a publisher defined at a time. If you omit the connection element, the default connection of amqp and exchange magento will be used.

Attribute Description
name (required) For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be db.
exchange The name of the exchange to publish to. The default system exchange name is magento.
disabled Determines whether this queue is disabled. The default value is false.

You cannot enable more than one publisher for each topic.

Updating queue.xml

See Migrate message queue configuration for information about upgrading from Magento 2.0 or 2.1.