magento rabbit

Message Queuing with RabbitMQ: How It Works?

Magento 2 supports message queuing with MySql и RabbitMQ. In the example below, we will use Magento 2 and RabbitMQ.

Let’s examine how queueing with RabbitMQ can be integrated in Magento 2.

What is message queuing?

Before we dive into the explanation of RabbitMQ, we need to understand what message queuing is and how it works.

Message queueing allows different apps, which do not have the built-in integration, to connect and share the information.

Message queuing consists of producer (1), broker (2) (message queuing software), and consumer (3)

Producers — the client applications which compose messages and deliver them to the broker. The broker saves messages and waits until the consumer will connect to him and receive the messages.

Illustration of the general scheme:

What is RabbitMQ?

RabbitMQ is popular software with open-source code for the creation of message queues. According to the official website, RabbitMQ is lightweight and easy to deploy locally and in the cloud. It supports multiple API protocols like AMQP, STOMP, MQTT, and HTTP. Besides, RabbitMQ supports a variety of common programming languages and can operate in different cloud environments and operating systems. Isn’t that awesome?!

The advantages of using RabbitMQ

RabbitMQ is popular because it has the following advantages: 

  • Confirmation of delivery and other confirmations increase the security of the message queuing by reducing the possibility of messages loss.
  • Flexible routing that enables the delivery of particular messages in the particular queues and to the specific consumers.
  • Multiple types of exchanges that allow directing the messages to the consumers with different methods.
  • Easy to deploy on corporate and public cloud networks due to its being lightweight.

However, RabbitMQ users mentioned such drawbacks:

  • Some aspects are too technical and might be very difficult for the new users.
  • Inability to see queues and messages in queues.
  • Vague error messages that obstruct the diagnostics of the malfunction.

Setting up Magento 2 and RabbitMQ

Now that we know more about queue and RabbitMQ, let’s examine their integration in Magento 2.

You should already have RabbitMQ installed, properly set up, and working. The guide on the service installation you can find here or here.

Ex. (Ubuntu) : sudo apt install -y rabbitmq-server

We need to additionally install Management Plugin which would assist in monitoring the queues in the web interface.  

Ex.(enable plugin):  sudo rabbitmq-plugins enable rabbitmq_management

After plugin activation, enter the web address http://127.0.0.1:15672/ and fill in the username and password fields guest/guest (by default).

Upon entering the system, you will see the following interface:

If everything is successful at this point, let’s move to the next steps.

Connecting RabbitMQ to Magento 2 would be possible either before or after the installation of Magento 2.

Option 1 (adding settings during the installation of Magento 2)

Add the following command line configurations during the installation:

--amqp-host="<hostname>" --amqp-port="5672" --amqp-user="<user_name>" --amqp-password="<password>" --amqp-virtualhost="/"

Where:

  • –amqp-host порт (localhost)
  • –amqp-port по дефолту 5672
  • –amqp-user по дефолту quest
  • –amqp-password по дефолту quest

Option 2. If you already have installed Magento 2, add queue section in the
<install_directory>/app/etc/env.php

'queue' =>
  array (
    'amqp' =>
    array (
      'host' => 'rabbitmq.example.com',
      'port' => '11213',
      'user' => 'magento',
      'password' => 'magento',
      'virtualhost' => '/'
     ),
  ),

With the SSL support:

ueue' =>
  array (
    'amqp' =>
    array (
      'host' => 'rabbitmq.example.com',
      'port' => '11213',
      'user' => 'magento',
      'password' => 'magento',
      'virtualhost' => '/',
// start ssl configuration
      'ssl' => 'true',
      'ssl_options' => [
            'cafile' =>  '/etc/pki/tls/certs/DigiCertCA.crt',
            'certfile' => '/path/to/magento/app/etc/ssl/test-rabbit.crt',
            'keyfile' => '/path/to/magento/app/etc/ssl/test-rabbit.key'
       ],
     ),
  ),

Implementation

Let’s create a simple module Overdose_RabbitMQ

The primary goal of this module is to send a message to the queue when adding an item to the cart. Then, after listening to the current queue, we will receive the message and will write it to logs.

Let’s go:

Add configurations and determine entities for the work with RMQ.  

Message Queuing requires four xml files in <vendor>/<module>/etc:

In these files, we will determine stock, subject, queue, publisher, and consumer. 

  • communication.xml — stores general configuration for the connection of module and queue.
  • queue_consumer.xml — defines the relationship between the existing queue and its consumer; also, class and method which will be used to process the messages from the queue.
  • queue_topology.xml — defines rules for message routing and declares queues.
  • queue_publisher.xml — defines an exchange that will be used to publish a topic.

communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
   <topic
       name="od.add.to.cart"
       request="Magento\Quote\Api\Data\CartInterface"/>
   <!--
       name = od.add.to.cart unique value divided by period
       request = defines the type of data topic (in this case CartInterface )
   -->
</config>

queue_consumer.xml

Adding the consumers and assigning the queue which they will track.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
   <consumer name="OverdoseAddToCart"
             queue="od.add.to.cart.queue"
             connection="amqp"
             handler="Overdose\RabbitMQ\Model\Queue\Handler\TestRMQHandler::execute"/>
   <!--
          consumer :
              name = unique value
              queue = determines the name of the queue for sending a message
              connection = for AMQP, the name of the connection has to correspond to connection attribute
                          in queue_topology.xml. Otherwise, the name of the connection should be db.
              handler = assigns class and method which will be used for processing the messages.
              Additional configurations
              maxMessages = determines maximum allowed messages in the use
              maxIdleTime = waiting time (in seconds) for the new message from the queue
              sleep = assigns waiting time (in seconds) for entering the sleep mode before checking 
                      the new available messages in the queue. The default value is null (1 second).
      -->
</config>

queue_topology.xml

After declaring the exchange with the name od.add.to.cart.exchange, we need to connect the topic od.add.to.cart and queue od.add.to.cart.queue.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
   <exchange name="od.add.to.cart.exchange" type="topic" connection="amqp">
       <binding
           id="TestRMQBinding"
           topic="od.add.to.cart"
           destinationType="queue"
           destination="od.add.to.cart.queue"/>
   </exchange>
</config>

queue_publisher.xml

Declaring the publisher and connecting it to the topic and the exchange

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
   <publisher topic="od.add.to.cart">
       <connection name="amqp" exchange="od.add.to.cart.exchange" />
   </publisher>
</config>

More detailed description of xml you can check here

It is ready. Now, we can move to creating classes. 

Let’s describe the consumer:

<?php
 
namespace Overdose\RabbitMQ\Model\Queue\Handler;
 
use Magento\Quote\Api\Data\CartInterface;
use Psr\Log\LoggerInterface;
 
/**
* class TestRMQHandler
*/
class TestRMQHandler
{
   /**
    * @var LoggerInterface
    */
   private $logger;
 
   /**
    * @param LoggerInterface $logger
    */
   public function __construct(
       LoggerInterface $logger
   ) {
       $this->logger = $logger;
   }
 
   /**
    * @param CartInterface $cart
    */
   public function execute(CartInterface $cart)
   {
       $this->logger->info('Added to cart, Customer Tax Id:' . $cart->getCustomerTaxClassId());
 
   }
}

The publisher description:

<?php
 
namespace Overdose\RabbitMQ\Model\Queue;
 
use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Quote\Api\Data\CartInterface;
 
/**
* class AddToCartPublisher
*/
class AddToCartPublisher
{
   /**
    * @TOPIC_NAME
    */
   const TOPIC_NAME = 'od.add.to.cart';
 
   /**
    * @var PublisherInterface
    */
   private $publisher;
 
   /**
    * @param PublisherInterface $publisher
    */
   public function __construct(PublisherInterface $publisher)
   {
       $this->publisher = $publisher;
   }
 
 
   /**
    * @param CartInterface $cart
    */
   public function execute(CartInterface $cart)
   {
       $this->publisher->publish(self::TOPIC_NAME, $cart);
   }
}

Now, it is time to add to the queue. For simplicity, let’s add Observer to the event checkout_cart_product_add_after

Overdose/RabbitMQ/etc/events.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Event/etc/events.xsd">
   <event name="checkout_cart_product_add_after">
       <observer
           name="overdose_rabbit_mq_observer_test"
           instance="Overdose\RabbitMQ\Observer\AddToCartObserver" />
   </event>
</config>

Overdose/RabbitMQ/Observer/AddToCartObserver.php

<?php
namespace Overdose\RabbitMQ\Observer;
 
use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Quote\Api\Data\CartInterface;
use Overdose\RabbitMQ\Model\Queue\AddToCartPublisher;
 
/**
* @AddToCartObserver
*/
class AddToCartObserver implements ObserverInterface
{
   /**
    * @var AddToCartPublisher
    */
   private $publisher;
 
   /**
    * @var CartInterface
    */
   private $cart;
 
   /**
    * @param AddToCartPublisher $publisher
    * @param CartInterface $cart
    */
   public function __construct(
       AddToCartPublisher $publisher,
       CartInterface $cart
   ) {
       $this->publisher = $publisher;
       $this->cart = $cart;
   }
 
   /**
    * @param Observer $observer
    */
   public function execute(Observer $observer)
   {
       // упустим все формальности проверок и т.д
       $this->publisher->execute($this->cart);
   }
}

Launch bin/magento setup:upgrade

Using Management Plugin we can see our queue od.add.to.cart.queue

Adding item to the cart and entering the web address http://127.0.0.1:15672/

We successfully added our object to the queue

Now, we need to possess it:

The consumer has to receive the message from the queue and process it (write it to log)

Launching our handler

bin/magento queue:consumers:start OverdoseAddToCart

After opening the logs, we see the result of the processing:

main.INFO: Added to cart, Customer Tax Id:3 [] []

That’s it. We have learned how to implement the queues with RabbitMQ.

Related Articles

Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments