magento rabbit

Как работают очереди с помощью RabbitMQ в Magento 2

Magento 2 поддерживает очереди сообщений на основе MySql и RabbitMQ. В примере, описанном ниже , мы будем использовать Magento 2 и RabbitMQ.

Итак давайте рассмотрим как в Magento 2 интегрируются очереди с помощью RabbitMQ.

Что такое очередь сообщений?

Прежде чем мы сможем углубиться в то, что такое RabbitMQ, нам сначала нужно понять, что такое очередь сообщений и как она работает. 

Очередь сообщений позволяет различным приложениям, не имеющим встроенной интеграции, подключаться и обмениваться информацией.

Очередь сообщений состоит из производителя (1), брокера (2) (программное обеспечение очереди сообщений) и потребителя (3)

Производители – это клиентские приложения, которые создают сообщения и доставляются брокеру. Брокер сохраняет сообщения и ждет, пока потребитель подключится к нему и получит сообщения.

Общая схема выглядит так:

Что такое RabbitMQ?

RabbitMQ – популярное программное обеспечение для создания очередей сообщений с открытым исходным кодом. Согласно его веб-сайту: « RabbitMQ легок и прост в развертывании как локально, так и в облаке ». Он поддерживает несколько протоколов API, таких как AMQP, STOMP, MQTT и HTTP. Кроме того, RabbitMQ поддерживает множество распространенных языков программирования и может работать в различных облачных средах и операционных системах. Круто.

Каковы преимущества использования RabbitMQ?

RabbitMQ популярен, потому что он имеет несколько преимуществ, в том числе:

  • Подтверждения доставки и подтверждения, которые повышают надежность очереди сообщений за счет уменьшения потери сообщений.
  • Гибкая маршрутизация, позволяющая доставлять определенные сообщения в определенные очереди и конкретным потребителям.
  • Множественные типы обмена, которые позволяют разными способами направлять сообщения потребителям.
  • Легко развертывается в корпоративных и общедоступных облаках, потому что он легковесный.

Однако пользователи RabbitMQ отметили следующие недостатки:

  • Некоторые аспекты слишком технические, особенно для новичков.
  • Невозможность просматривать очереди и сообщения в очереди.
  • Расплывчатые сообщения об ошибках, затрудняющие диагностику сбоев.

Настройка Magento 2 и RabbitMQ

Теперь мы знаем что такое очередь и RabbitMQ, давайте перейдем к интеграции в M2.

Условимся, что RabbitMQ у нас установлен, правильно настроен  и конечно же работает. Установку самого сервиса вы можете найти тут или тут

Ex. (Ubuntu) : sudo apt install -y rabbitmq-server

Дополнительно установим плагин Management Plugin он поможет нам мониторить очереди в Web-интерфейсе.

Ex.(enable plugin):  sudo rabbitmq-plugins enable rabbitmq_managementПосле активации плагина перейдите по адресу http://127.0.0.1:15672/ и введите пользователя и пароль, guest/guest (по умолчанию)

После входа в систему вы должны увидеть следующий интерфейс:

У Вас все получилось?) Продолжаем.

Связать RabbitMQ c M2, мы можем как до инсталляции M2 так и после:

Вариант 1 (добавление настроек при инсталляции M2):

Добавьте следующие параметры командной строки при установке

--amqp-host="<hostname>" --amqp-port="5672" --amqp-user="<user_name>" --amqp-password="<password>" --amqp-virtualhost="/"

Где:

  • –amqp-host порт (localhost)
  • –amqp-port по дефолту 5672
  • –amqp-user по дефолту quest
  • –amqp-password по дефолту quest

Вариант 2 .Если у вас уже установлена M2, добавьте queue раздел в
<install_directory>/app/etc/env.php

'queue' =>
  array (
    'amqp' =>
    array (
      'host' => 'rabbitmq.example.com',
      'port' => '11213',
      'user' => 'magento',
      'password' => 'magento',
      'virtualhost' => '/'
     ),
  ),

С поддержкой ssl:

ueue' =>
  array (
    'amqp' =>
    array (
      'host' => 'rabbitmq.example.com',
      'port' => '11213',
      'user' => 'magento',
      'password' => 'magento',
      'virtualhost' => '/',
// start ssl configuration
      'ssl' => 'true',
      'ssl_options' => [
            'cafile' =>  '/etc/pki/tls/certs/DigiCertCA.crt',
            'certfile' => '/path/to/magento/app/etc/ssl/test-rabbit.crt',
            'keyfile' => '/path/to/magento/app/etc/ssl/test-rabbit.key'
       ],
     ),
  ),

Реализация

Создадим простой модуль Overdose_RabbitMQ

Суть которого будет заключаться в том чтобы отправить сообщение в очередь при добавлении продукта в корзину, далее прослушаем данную очередь и получим сообщение и запишем его в логи.

Поехали:

Добавляем конфигурацию и определим сущности для работы с RMQ.

Очередь сообщений требует 4-е xml файла в <vendor>/<module>/etc:

 В этих файлах мы будем определять биржу, тему, очередь, издателя и потребителя

  • communication.xml – в нем храниться общая конфигурация для связи модуля и очереди.
  • queue_consumer.xml – определяет отношения между существующей очередью и ее потребителем,  а также класс и метод, которые будут обрабатывать сообщения из очереди.
  • queue_topology.xml – определяет правила маршрутизации сообщений и объявляет очереди.

queue_publisher.xml – определяет обмен, на котором публикуется topic

communication.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
   <topic
       name="od.add.to.cart"
       request="Magento\Quote\Api\Data\CartInterface"/>
   <!--
       name = od.add.to.cart уникальное значение разделенное точками
       request = Задает тип данных topic (в данном случае CartInterface )
   -->
</config>

queue_consumer.xml

добавляем потребителя, назначаем ему очередь, за которой он будет следить

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
   <consumer name="OverdoseAddToCart"
             queue="od.add.to.cart.queue"
             connection="amqp"
             handler="Overdose\RabbitMQ\Model\Queue\Handler\TestRMQHandler::execute"/>
   <!--
          consumer :
              name = уникальное значение
              queue = Определяет имя очереди для отправки сообщения
              connection =для AMQP имя подключения должно соответствовать connection атрибуту
                          в queue_topology.xml. В противном случае имя подключения должно быть db.
              handler = Задает класс и метод, обрабатывающий сообщение
              Доп. параметры
              maxMessages = определяет макс. количество сообщений для использования
              maxIdleTime = ожидание в секундах для нового сообщения из очереди
              sleep = Задает время в секундах для перехода в спящий режим
                      перед проверкой наличия нового сообщения в очереди. Значение по умолчанию null(1 сек.)
      -->
</config>

queue_topology.xml

объявляем обменник с именем od.add.to.cart.exchange и привязываем к нему топик od.add.to.cart и очередь od.add.to.cart.queue.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
   <exchange name="od.add.to.cart.exchange" type="topic" connection="amqp">
       <binding
           id="TestRMQBinding"
           topic="od.add.to.cart"
           destinationType="queue"
           destination="od.add.to.cart.queue"/>
   </exchange>
</config>

queue_publisher.xml

объявляем издателя и привязываем его к топику и обменнику

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
   <publisher topic="od.add.to.cart">
       <connection name="amqp" exchange="od.add.to.cart.exchange" />
   </publisher>
</config>

Более подробное описание xml Вы можете посмотреть тут

Готово. Теперь переходим к созданию классов.

Опишем потребителя

<?php
 
namespace Overdose\RabbitMQ\Model\Queue\Handler;
 
use Magento\Quote\Api\Data\CartInterface;
use Psr\Log\LoggerInterface;
 
/**
* class TestRMQHandler
*/
class TestRMQHandler
{
   /**
    * @var LoggerInterface
    */
   private $logger;
 
   /**
    * @param LoggerInterface $logger
    */
   public function __construct(
       LoggerInterface $logger
   ) {
       $this->logger = $logger;
   }
 
   /**
    * @param CartInterface $cart
    */
   public function execute(CartInterface $cart)
   {
       $this->logger->info('Added to cart, Customer Tax Id:' . $cart->getCustomerTaxClassId());
 
   }
}

Опишем издателя:

<?php
 
namespace Overdose\RabbitMQ\Model\Queue;
 
use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Quote\Api\Data\CartInterface;
 
/**
* class AddToCartPublisher
*/
class AddToCartPublisher
{
   /**
    * @TOPIC_NAME
    */
   const TOPIC_NAME = 'od.add.to.cart';
 
   /**
    * @var PublisherInterface
    */
   private $publisher;
 
   /**
    * @param PublisherInterface $publisher
    */
   public function __construct(PublisherInterface $publisher)
   {
       $this->publisher = $publisher;
   }
 
 
   /**
    * @param CartInterface $cart
    */
   public function execute(CartInterface $cart)
   {
       $this->publisher->publish(self::TOPIC_NAME, $cart);
   }
}

Теперь пришло время добавлять в очередьДля простоты, добавим Observer и подвесим его на event checkout_cart_product_add_after

Overdose/RabbitMQ/etc/events.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Event/etc/events.xsd">
   <event name="checkout_cart_product_add_after">
       <observer
           name="overdose_rabbit_mq_observer_test"
           instance="Overdose\RabbitMQ\Observer\AddToCartObserver" />
   </event>
</config>

Overdose/RabbitMQ/Observer/AddToCartObserver.php

<?php
namespace Overdose\RabbitMQ\Observer;
 
use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Quote\Api\Data\CartInterface;
use Overdose\RabbitMQ\Model\Queue\AddToCartPublisher;
 
/**
* @AddToCartObserver
*/
class AddToCartObserver implements ObserverInterface
{
   /**
    * @var AddToCartPublisher
    */
   private $publisher;
 
   /**
    * @var CartInterface
    */
   private $cart;
 
   /**
    * @param AddToCartPublisher $publisher
    * @param CartInterface $cart
    */
   public function __construct(
       AddToCartPublisher $publisher,
       CartInterface $cart
   ) {
       $this->publisher = $publisher;
       $this->cart = $cart;
   }
 
   /**
    * @param Observer $observer
    */
   public function execute(Observer $observer)
   {
       // упустим все формальности проверок и т.д
       $this->publisher->execute($this->cart);
   }
}

Запускаем bin/magento setup:upgrade
Используя Management Plugin  можно увидеть нашу очередь od.add.to.cart.queue

Добавляем продукт в корзину и переходим  по адресу http://127.0.0.1:15672/

Мы успешно добавили наш объект в очередь

Теперь обработаем его:

потребитель должен получить сообщение из очереди и обработать его (в нашем случае записать в лог)

Запускаем наш хендлер

bin/magento queue:consumers:start OverdoseAddToCart

Переходим в логи и видим результат обработки) :

main.INFO: Added to cart, Customer Tax Id:3 [] []

На этом Все.

Related Articles