Message Queuing with RabbitMQ: How It Works?
Magento 2 supports message queuing with MySql и RabbitMQ. In the example below, we will use Magento 2 and RabbitMQ.
Let’s examine how queueing with RabbitMQ can be integrated in Magento 2.
What is message queuing?
Before we dive into the explanation of RabbitMQ, we need to understand what message queuing is and how it works.
Message queueing allows different apps, which do not have the built-in integration, to connect and share the information.
Message queuing consists of producer (1), broker (2) (message queuing software), and consumer (3).
Producers — the client applications which compose messages and deliver them to the broker. The broker saves messages and waits until the consumer will connect to him and receive the messages.
Illustration of the general scheme:
What is RabbitMQ?
RabbitMQ is popular software with open-source code for the creation of message queues. According to the official website, RabbitMQ is lightweight and easy to deploy locally and in the cloud. It supports multiple API protocols like AMQP, STOMP, MQTT, and HTTP. Besides, RabbitMQ supports a variety of common programming languages and can operate in different cloud environments and operating systems. Isn’t that awesome?!
The advantages of using RabbitMQ
RabbitMQ is popular because it has the following advantages:
- Confirmation of delivery and other confirmations increase the security of the message queuing by reducing the possibility of messages loss.
- Flexible routing that enables the delivery of particular messages in the particular queues and to the specific consumers.
- Multiple types of exchanges that allow directing the messages to the consumers with different methods.
- Easy to deploy on corporate and public cloud networks due to its being lightweight.
However, RabbitMQ users mentioned such drawbacks:
- Some aspects are too technical and might be very difficult for the new users.
- Inability to see queues and messages in queues.
- Vague error messages that obstruct the diagnostics of the malfunction.
Setting up Magento 2 and RabbitMQ
Now that we know more about queue and RabbitMQ, let’s examine their integration in Magento 2.
You should already have RabbitMQ installed, properly set up, and working. The guide on the service installation you can find here or here.
Ex. (Ubuntu) : sudo apt install -y rabbitmq-server
We need to additionally install Management Plugin which would assist in monitoring the queues in the web interface.
Ex.(enable plugin): sudo rabbitmq-plugins enable rabbitmq_management
After plugin activation, enter the web address http://127.0.0.1:15672/ and fill in the username and password fields guest/guest (by default).
Upon entering the system, you will see the following interface:
If everything is successful at this point, let’s move to the next steps.
Connecting RabbitMQ to Magento 2 would be possible either before or after the installation of Magento 2.
Option 1 (adding settings during the installation of Magento 2)
Add the following command line configurations during the installation:
--amqp-host="<hostname>" --amqp-port="5672" --amqp-user="<user_name>" --amqp-password="<password>" --amqp-virtualhost="/"
Where:
- –amqp-host порт (localhost)
- –amqp-port по дефолту 5672
- –amqp-user по дефолту quest
- –amqp-password по дефолту quest
Option 2. If you already have installed Magento 2, add queue section in the
<install_directory>/app/etc/env.php
'queue' =>
array (
'amqp' =>
array (
'host' => 'rabbitmq.example.com',
'port' => '11213',
'user' => 'magento',
'password' => 'magento',
'virtualhost' => '/'
),
),
With the SSL support:
ueue' =>
array (
'amqp' =>
array (
'host' => 'rabbitmq.example.com',
'port' => '11213',
'user' => 'magento',
'password' => 'magento',
'virtualhost' => '/',
// start ssl configuration
'ssl' => 'true',
'ssl_options' => [
'cafile' => '/etc/pki/tls/certs/DigiCertCA.crt',
'certfile' => '/path/to/magento/app/etc/ssl/test-rabbit.crt',
'keyfile' => '/path/to/magento/app/etc/ssl/test-rabbit.key'
],
),
),
Implementation
Let’s create a simple module Overdose_RabbitMQ
The primary goal of this module is to send a message to the queue when adding an item to the cart. Then, after listening to the current queue, we will receive the message and will write it to logs.
Let’s go:
Add configurations and determine entities for the work with RMQ.
Message Queuing requires four xml files in <vendor>/<module>/etc:
In these files, we will determine stock, subject, queue, publisher, and consumer.
- communication.xml — stores general configuration for the connection of module and queue.
- queue_consumer.xml — defines the relationship between the existing queue and its consumer; also, class and method which will be used to process the messages from the queue.
- queue_topology.xml — defines rules for message routing and declares queues.
- queue_publisher.xml — defines an exchange that will be used to publish a topic.
communication.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic
name="od.add.to.cart"
request="Magento\Quote\Api\Data\CartInterface"/>
<!--
name = od.add.to.cart unique value divided by period
request = defines the type of data topic (in this case CartInterface )
-->
</config>
queue_consumer.xml
Adding the consumers and assigning the queue which they will track.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="OverdoseAddToCart"
queue="od.add.to.cart.queue"
connection="amqp"
handler="Overdose\RabbitMQ\Model\Queue\Handler\TestRMQHandler::execute"/>
<!--
consumer :
name = unique value
queue = determines the name of the queue for sending a message
connection = for AMQP, the name of the connection has to correspond to connection attribute
in queue_topology.xml. Otherwise, the name of the connection should be db.
handler = assigns class and method which will be used for processing the messages.
Additional configurations
maxMessages = determines maximum allowed messages in the use
maxIdleTime = waiting time (in seconds) for the new message from the queue
sleep = assigns waiting time (in seconds) for entering the sleep mode before checking
the new available messages in the queue. The default value is null (1 second).
-->
</config>
queue_topology.xml
After declaring the exchange with the name od.add.to.cart.exchange, we need to connect the topic od.add.to.cart and queue od.add.to.cart.queue.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="od.add.to.cart.exchange" type="topic" connection="amqp">
<binding
id="TestRMQBinding"
topic="od.add.to.cart"
destinationType="queue"
destination="od.add.to.cart.queue"/>
</exchange>
</config>
queue_publisher.xml
Declaring the publisher and connecting it to the topic and the exchange
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="od.add.to.cart">
<connection name="amqp" exchange="od.add.to.cart.exchange" />
</publisher>
</config>
More detailed description of xml you can check here
It is ready. Now, we can move to creating classes.
Let’s describe the consumer:
<?php
namespace Overdose\RabbitMQ\Model\Queue\Handler;
use Magento\Quote\Api\Data\CartInterface;
use Psr\Log\LoggerInterface;
/**
* class TestRMQHandler
*/
class TestRMQHandler
{
/**
* @var LoggerInterface
*/
private $logger;
/**
* @param LoggerInterface $logger
*/
public function __construct(
LoggerInterface $logger
) {
$this->logger = $logger;
}
/**
* @param CartInterface $cart
*/
public function execute(CartInterface $cart)
{
$this->logger->info('Added to cart, Customer Tax Id:' . $cart->getCustomerTaxClassId());
}
}
The publisher description:
<?php
namespace Overdose\RabbitMQ\Model\Queue;
use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Quote\Api\Data\CartInterface;
/**
* class AddToCartPublisher
*/
class AddToCartPublisher
{
/**
* @TOPIC_NAME
*/
const TOPIC_NAME = 'od.add.to.cart';
/**
* @var PublisherInterface
*/
private $publisher;
/**
* @param PublisherInterface $publisher
*/
public function __construct(PublisherInterface $publisher)
{
$this->publisher = $publisher;
}
/**
* @param CartInterface $cart
*/
public function execute(CartInterface $cart)
{
$this->publisher->publish(self::TOPIC_NAME, $cart);
}
}
Now, it is time to add to the queue. For simplicity, let’s add Observer to the event checkout_cart_product_add_after
Overdose/RabbitMQ/etc/events.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Event/etc/events.xsd">
<event name="checkout_cart_product_add_after">
<observer
name="overdose_rabbit_mq_observer_test"
instance="Overdose\RabbitMQ\Observer\AddToCartObserver" />
</event>
</config>
Overdose/RabbitMQ/Observer/AddToCartObserver.php
<?php
namespace Overdose\RabbitMQ\Observer;
use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Quote\Api\Data\CartInterface;
use Overdose\RabbitMQ\Model\Queue\AddToCartPublisher;
/**
* @AddToCartObserver
*/
class AddToCartObserver implements ObserverInterface
{
/**
* @var AddToCartPublisher
*/
private $publisher;
/**
* @var CartInterface
*/
private $cart;
/**
* @param AddToCartPublisher $publisher
* @param CartInterface $cart
*/
public function __construct(
AddToCartPublisher $publisher,
CartInterface $cart
) {
$this->publisher = $publisher;
$this->cart = $cart;
}
/**
* @param Observer $observer
*/
public function execute(Observer $observer)
{
// упустим все формальности проверок и т.д
$this->publisher->execute($this->cart);
}
}
Launch bin/magento setup:upgrade
Using Management Plugin we can see our queue od.add.to.cart.queue
Adding item to the cart and entering the web address http://127.0.0.1:15672/
We successfully added our object to the queue
Now, we need to possess it:
The consumer has to receive the message from the queue and process it (write it to log)
Launching our handler
bin/magento queue:consumers:start OverdoseAddToCart
After opening the logs, we see the result of the processing:
main.INFO: Added to cart, Customer Tax Id:3 [] []
That’s it. We have learned how to implement the queues with RabbitMQ.