Обмен сообщениями по протоколу AMQP

Темы, не касающиеся фреймворка, но относящиеся к программированию в целом.
Ответить
pioneer
Сообщения: 136
Зарегистрирован: 2013.03.10, 23:27

Обмен сообщениями по протоколу AMQP

Сообщение pioneer »

Всем привет!

Собственно, задача такова: "подружить" 2 сайта посредством протокола AMQP и клиента RabbitMQ в частности, конкретнее - для передачи новых данных (будем передавать сериализованные атрибуты модели) с одного сайта на другой, на котором будет производиться анализ и модерация этих данных с последующим принятием или отклонением (с оповещением первого сайта). То есть проще говоря "общение" между сайтами должно быть в двустороннем порядке.

Итак, ближе к делу. Все настройки по установке и инициализации RabbitMQ я выполнил (спасибо этой статье), убедившись в работоспособности сервера (на тестовом примере с передачей сообщения 'Hello. buddy!'). Однако на практике столкнулся со следующей проблемой: в afterSave() модели (атрибуты которой нужно отправлять) прописал функцию, которая будет отвечать за отправку данных на "модерирующий" сайт, на сайте-получателе создал команду наследующую CConsoleCommand, по исполнении которой и должен произойти прием нужных данных с сохранением, однако консоль "плюет" вот это: exception 'AMQPQueueException' with message 'Server channel error: 404, message: NOT_FOUND - no exchange 'amq.product' in vhost '/'' in...

Теперь по коду. В модели откуда отправляются данные:

Код: Выделить всё

public function afterSave()
{
    $this->sendRabbit();
}
public function sendRabbit()
{
    $data['model_name'] = get_class($this);
    $data['model_id'] = $this->id;
    $data['data'] = $this->attributes;

    $rabbit = new RabbitSender();
    if ($rabbit->send('amq.product', 'send_product_data', serialize($data))) {
        return true;
    }

    return false;

}
И тут же вот такой незамысловатый компонент для отправки сообщений:

Код: Выделить всё

class RabbitSender
{
    public $connection;

    function __construct()
    {
        $this->connection = new AMQPConnection(Yii::app()->params['amqp_connection']);
        $this->connection->connect();
    }

    public function send($exchange_name, $routing_key, $message)
    {
        $channel = new AMQPChannel($this->connection);
        $exchange = new AMQPExchange($channel);

        $exchange->setName($exchange_name);
        if ($exchange->publish($message, $routing_key)) {
            $result = true;
        } else {
            $result = false;
        }

        $this->connection->disconnect();

        return $result;
    }
}
Кстати, попутно вопрос (из документации если честно так и не понял этот момент): на сайтах нужно обязательно указывать одинаковые настройки подключения к AMQP или для каждого свои? Упомяну также, что пару guest/guest я не использовал (хотя и с ней пробовал - ситуация с исключением та же), поскольку в курсе, что она работает только в пределах localhost.

А вот код команды (которая по задумке в будущем будет исполняться по cron'у):

Код: Выделить всё

class RabbitCommand extends CConsoleCommand
{
    public function run($args)
    {
        $this->checkProductData();
    }

    public function checkProductData()
    {
        $rabbit = new RabbitReceiver();
        if ($message = $rabbit->receive('receive_product_data', 'amq.product', 'send_product_data')) {
            $data = unserialize($message);

            $product = new Product();
            $product->attributes = $data['data'];

            if ($product->save()) {
                return 0;
            }
        }
    }
}
И вот такой компонент для принятия сообщения:

Код: Выделить всё

class RabbitReceiver
{
    public $connection;

    function __construct()
    {
        $this->connection = new AMQPConnection(Yii::app()->params['amqp_connection']);
        $this->connection->connect();
    }

    public function receive($queue_name, $exchange_name, $routing_key)
    {
        $result = false;

        $channel = new AMQPChannel($this->connection);

        $queue = new AMQPQueue($channel);
        $queue->setName($queue_name);
        $queue->declare();
        $queue->bind($exchange_name, $routing_key);

        $envelope = $queue->get();
        if ($envelope && $queue->ack($envelope->getDeliveryTag())) {
            $result = $envelope;
        }

        $this->connection->disconnect();

        return $result;
    }
}
По факту exchange_name и routing_key в обоих сайтах указал одинаковые, однако вот как описал выше - "плюется" вот то самое исключение. В чем может быть причина/проблема?

Буду всем очень признателен за любую помощь и советы. Спасибо.
Ответить