суббота, 1 ноября 2014 г.

06. ZeroMQ: Проблема динамического обнаружения. Создание брокеров/прокси.

(Начало здесь)

Проблема динамического обнаружения


Вопрос сродни вот этому: Как получить список доступных MS SQL серверов?


Одной из проблем, возникающих при проектировании крупных распределенных систем является обнаружение сервисов. Частный случай проблемы - "как клиент узнает, к какому серверу нужно коннектиться?" А в общем случае - "как элементам сети найти друг друга?"

Особенно трудно в случае, когда разные элементы подключаются и отключаются, из разных узлов сети. Поэтому это называется "проблемой динамического обнаружения".

Эта проблема решается везде по-разному: используется служба DNS, или широковещательные сообщения (UDP) и т.п.

Есть несколько простых решений проблемы динамического обнаружения. Можно жестко закодировать адрес(ip + порт) (т,е., "конечную точку"). Вообще никаких проблем - и никакой гибкости. Впрочем, некоторой гибкости для tcp транспорта можно добавить с помощью службы DNS.
Можно конечную току задавать с помощью конфигурационных файлов (и т.д.). Главное - не забывать их вовремя обновлять .
Можно использовать специальный брокер адресации, который передаст вам нужные данные. Только вот адрес этого брокера должен быть известен...
Можно построить средство, исследующее окружение, сканирующее известные диапазоны адресов и порты. Или рассылающее (получающее) udp - пакеты об адресной информации.

...
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Можно при запуске нового элемента сети вручную задавать конечную точку ("адрес сервера"). То есть, при подключении вы просто знаете конфигурацию сети и сообщаете нужные данные новому элементу сети. Так часто делают, но в реальности это приводит к громоздким и хрупким топологиям.

Вспомним пример "Издатель-Подписчик".

Пусть есть один издатель и тысяча подписчиков. Каждый подписчик коннектится к одному издателю. Можно руками конфигурировать подключение каждого подписчика, это просто. Сделали, работает.
Дальше. Подписчик - динамичный (может подключаться из любого места, и включаться когда угодно), а издатель - статичный.
Сделали, работает.
Неожиданно набежали янычары. А вот теперь представим, что появился еще одни издатель. Нужно еще раз настроить каждого подписчика. Всех, 1000 шт. И каждый раз появление нового издателя повлечет за собой увеличение стоимости настройки.

Нужно динамическое обнаружение.

Добавим брокер сообщений. ZMQ не поставляется с брокерами, но позволяет его легко построить.

Теперь Издатель-Подписчик схема будет выглядеть вот так:

Таким образом, со статичным борокером сообщений задача решена.

Можно даже запустить общий брокер для всех типов сообщений, и строить сетевую архитектуру вокруг него.

Т.е., топология "Звезда" работает. Какое-то время.
...пока не возникнут вопросы вроде пропускной способности и расширения/усложнения логики брокера.

АДЪ неминуем.



Решение.

Куда лучше реализовать брокер не в качестве транспорта сообщений, а в качестве поставщика адресной информации.

Для этого следует использовать сокеты ZMQ типа XPUB и XSUB, так как с ними ZMQ не пересылает сообщения от издателя к подписчику напрямую.
Сокеты XSUB и XPUB - точно такие же, как и сокеты типа SUB и PUB, за исключением того, что они обрабатывают подписки в форме специальных сообщений. А при подключении SUB и PUB сокетов к XPUB и XSUB сокетам первые связываются друг с другом уже по известным адресам.

То есть, основной поток данных идет, минуя брокер:



PS: с сокетами XPUB/XSUB я пока не работал. Возможно, все не так, как здесь описано, так как я просто плохо понял документацию на английском. Но я обезательно разберусь и отпишусь здесь.


....
....
....
Шаблон "Разделяемая очередь".

Потихоньку перейдем к сокетам DEALER и ROUTER.
...

В реальности может потребоваться, чтобы множество клиентов могли подключаться к разным сервисам (например, для распределения нагрузки по сервисам).

Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.

Например, коннектим клинтский сокет к трем конечным точкам ("сервисам") :A, B, и C.
Вспомним этот пример, поясняющий схему "Запрос - Ответ" (REQ - REP). Чтобы этот клиент подключился к трем сервисам (например, по tpc на localhost, на трех разных портах), сокет следует приконнектить ко всем трем конечным точкам:

  zmq_connect(requester, 'tcp://localhost:5555');
  zmq_connect(requester, 'tcp://localhost:5556');
  zmq_connect(requester, 'tcp://localhost:5557'); 



Клиент последовательно выполняет запросы R1, R2, R3, R4. В результате запросы R1 и R4 отправляются к сервису A, R2 - к B, R3 - к C. Циклически распределяя запросы (наш любимый roud-robin). Такая конструкция позволяет добавлять без проблем добавлять сколько угодно клиентов.
Как показано выше, с помощью zmq_connect() можно добавлять сколько угодно сервисов. Беда в том, что клиент должен знать, где находится новый сервис. Если клиентов - 100, и в течении скажем, суток, добавляется всего три новых сервиса, то нужно в итоге триста раз переконфигурировать всех клиентов.
Что грустно.

В идеале, мы должны быть в состоянии добавлять и удалять сервисы или клиентов в любое время, не касаясь любой другой части топологии.
...

Для реализации коннектов "много:много" есть два пути:
  1. Каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.
  2. Второй путь - использование брокера запросов как промежуточного слоя.
...
Напишем крошечный брокер запросов, реализующий желаемую гибкость топологии.
Брокер соединит две конечные точки - фронтенд (сокет стороны клиентов) и бэкэнд (сокет стороны сервисов).
Затем брокер, используя zmq_poll(), будет отслеживать активность этих сокетов, и перебрасывать сообщения от одного сокета к другому. При чем, в ручном управлении очередности использования сервисов нет необходимости, т.к. ZeroMQ делает это автоматически для каждого сокета.
Когда мы построили приложение по схеме "Запрос - Ответ (Req_Rep)", система получилась с синхронным диалогом обмена. Клиент шлет запрос. Сервис читает запрос и шлет ответ. Клиент читает ответ. Если клиент или сервис будут выполнять что-то другое (например, клиент пошлет два запроса подряд без ожидания ответа), система просто перестанет работать.
...
Конечно, раз теперь мы умеем пользоваться zmq_poll(), мы можем сделать брокер неблокирующим.
Но мы пойдем другим путем, и вообще не станем использовать сокеты типа REP и REQ.

Так вот, есть схема использования пар сокетов, которые реализуют схему "Посредник - Маршрутизатор"; режимы сокетов соответственно называются DEALER и ROUTER. Они позволяют получить неблокирующий режим для схемы "Запрос - Ответ".
В нашей схеме "Запрос - Ответ" сокет REQ будет "говорить" с сокетом ROUTER, а сокет DEALER - с сокетом REP.
Сокеты DEALER и ROUTER будет как раз размещаться на нашем брокере, а передачу сообщений между ними мы обеспечим с помощью кода. Будем извлекать сообщение из одного сокета и передавать его другому сокету.

Наш брокер схемы "Запрос - Ответ" привязывается к двум конечным точкам: одна для коннектов к ней клиентов(фронтэнд сокет), вторая - для коннектов сервисов(бэкэнд сокет).


Задача та же, что и в первом примере: возведение в квадрат целых чисел. Клиент отсылает запросы, сервис (сервер) - выполняет. Запрос - беззнаковое x32 целое число (UInt32) , ответ - квадрат беззнаковое x64 целого (UInt64).

Вот наш немудрёный REQ - клиент :

program BrRR_Client;

{$APPTYPE CONSOLE}
// Клиент
// Коннектится сокетом REQ к tcp://localhost:5559
// Шлет целое число сервису (серверу), обратно получает квадрат числа

uses
  SysUtils, ZMQ_h;
const
  c_ReqCnt = 100;
var
  fContext: Pointer;
  fSocketRequester: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketRequester, 'tcp://localhost:5559'); // Коннект к сервису
  Randomize();
  for i := 0 to Pred(c_ReqCnt) do begin
    fSrcVal := Cardinal(Random(-1)); // Генерация случайного целого
    zmq_send(fSocketRequester, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    Write('Iter: ', i, ' src=', fSrcVal);
    zmq_recv(fSocketRequester, @fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln(' result=', fResultVal);
  end;
  zmq_close(fSocketRequester);
  zmq_ctx_destroy(fContext);
  Readln;

end.


А вот - REP-сервер (сервис):

program BrRR_Service;

{$APPTYPE CONSOLE}
// Сервис (сервер)
// Находится в известной клиентам конечной точке, связывает сокет REP с tcp:*:5560,
// Получает целое число, возводит в квадрат и отправляет обратно результат

uses
  SysUtils, ZMQ_h;
var
  fContext: Pointer;
  fSocketResponder: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketResponder := zmq_socket(fContext, ZMQ_REP);
  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке

  Writeln('Starting service...');
  i := 0;
  while True do begin
    zmq_recv(fSocketResponder, @fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    Write('Iter: ', i, ' src=', fSrcVal);
    fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
    zmq_send(fSocketResponder, @fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln(' result=', fResultVal);
    Inc(i);
  end;
  zmq_close(fSocketResponder);
  zmq_ctx_destroy(fContext);
  Readln;

end.


Пока ничего нового не видим: продублирована функциональность схемы "Вопрос - Ответ". Клиент - коннектится к конечной точке - к сервису, сервис находится в этой конечной точке и ждет запросов клиентов. "Конечная точка" - это известный адрес (Например, "tcp://localhost:5560"). То есть, реализована старая схема "Запрос-Ответ"/REQ-REP.


 Будем улучшать мир. Воткнем между ними брокер.

 Брокер.

Что-то новенькое: сокеты DIALER и ROUTER.

  
program BrRR_Broker;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ_h, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
  fZMQPoll: array[0..1] of zmq_pollitem_t;
  fMsg: zmq_msg_t;
  fDoMore: Boolean;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559');
    // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560');
    // Кончная точка для сервисов
  fZMQPoll[0].socket := fSocketFrontEnd;
    // Инициализация пула сокетов
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[0].revents := 0;
  fZMQPoll[1].socket := fSocketBackEnd;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;
  fZMQPoll[1].revents := 0;
  while true do begin
    zmq_poll(@fZMQPoll[0], Length(fZMQPoll), -1);
      // Проверка состояния сокетов из пула
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then
      while True do
      begin // Трансляция сообщний от клиента к сервису
      // Обработка всх частей сообщения
        zmq_msg_init(@fMsg);
        zmq_msg_recv(@fMsg, fSocketFrontEnd, 0);
        fDoMore := zmq_msg_more(@fMSG) <> 0;
        zmq_msg_send(@fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(@fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
      while True do
        // Трансляция сообщний от сервиса к клиенту
      begin // Обработка всх частей сообщения
        zmq_msg_init(@fMsg);
        zmq_msg_recv(@fMsg, fSocketBackEnd, 0);
        fDoMore := zmq_msg_more(@fMSG) <> 0;
        zmq_msg_send(@fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(@fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
  end;
  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.


Не работает система. :(
Еще бы.

Чтобы все заработало, нужно у сервиса заменить биндинг на коннект:
  


//  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке
  zmq_connect (fSocketResponder, 'tcp://localhost:5560'); // Коннект к конкретной точке

Теперь все в порядке.
Брокер в асинхронном режиме слушает пул из сокетов с помощью zmq_poll(), затем читает (возможно, по частям) сообщение и транслирует его в выходной сокет. В обе стороны.
Чтение по частям реализовано "для общности". Чтобы работало с любыми сообщениями.
Интересно, что наши крошечные сообщения
  

     zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
     zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ

и
  

    zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
    zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ

порождают на стороне брокера каскад из нескольких составных сообщений из структур zmq_msg_t, что хорошо видно в отладчике.
Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.
Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.

Жизнь налаживается. :)

Теперь  топология сети выглядит так:




 Приведенный выше код трансляции сообщений представляется очень полезным для многих случаев для схемы "Запрос - Ответ".

Так вот, ZeroMQ есть даже специальный метод, реализующий все, что мы нако'дли в брокере.
Важно!:

См. ниже: "Та-да-мммм!"
  


function zmq_proxy( frontend, backend, capture: Pointer ): Integer; 


frontend - сокет фронтэнд
backend - сокет бэкэнд
capture - сокет для перехвата сообщений (nil, если не используется)
Технически нет никакой разницы между фронтэнд и бэкэнд сокетами.


Та-да-мммм!:
  

program BrRR_BrokerIns;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ_h, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559');
    // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560');
    // Кончная точка для сервисов

  zmq_proxy(fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.


Правда, классно? :) Картинка с другого сайта.

Важно.
Видов сокетов, которые практически можно использовать в брокере:

ROUTER - DEALER
XSUB   - XPUB
PULL   - PUSH.

 Мосты.


Вопрос: "как передать сообщения из одной подсети в другую?"
Или даже - из сети с протоколом tcp в сеть pgm.

Вариант решения - с помощью моста.

В качестве моста используем только что рассмотренный прокси (брокер сообщений).
То есть, "мост" - это маленькое приложение, которое общается одним сокетом по одному протоколу, а другим - по другому.
Ну и преобразовывает сообщения в подходящий для протокола вид, если нужно.

В качестве примера напишем крошечный прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети.

Вернемся к примеру с метеостанцией.
Предположим, что сервер-издатель (которые измеряет температуру, давление и проч) работает во внутренней сети, часть подписчиков - тоже во внутренней. А еще часть - во внешней.
Создаем прокси, в которой фронтэнд сокет (SUB) будет общаться с внутренней сетью, а бэкэнд сокет (PUB) - с внешней.
Прокси будет подписываться фронтэнд-сокетом на события сервиса погоды и пере-публиковывать их на бэкэнд-сокете.

Прокси для Метео: "Мост В Интернет" :

  


program BrRR_BrokerMeteoInterNet;

{$APPTYPE CONSOLE}
// Брокер для проекта "Метео"
// Реализует мост между интрасаетью метеосервера
// и внешней сетью. Размещается во внутренней сети,
// для брокера открыт "наружу" tcp порт 8100.
// "Внешние" клиенты коннектятся к известной конечной точке tcp://10.1.1.0:8100
// Коннектит сокет ZMQ_XSUB с известной конечной точкой метеосервера tcp://192.168.55.210:5556
// Связывает сокет ZMQ_XPUB с tcp://10.1.1.0:8100,
// Перекидывает сообщения подписки между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils,
  ZMQ_h;

var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_XSUB);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_XPUB);
  zmq_connect(fSocketFrontEnd, 'tcp://192.168.55.210:5556'); // Конечная точка метеосервиса
  zmq_bind(fSocketBackEnd, 'tcp://10.1.1.0:8100'); // Конечная точка для "внешних" подписчиков
//  zmq_bind(fSocketBackEnd, 'tcp://*:8100'); // Можно и так, главное - чтобы внешние клиенты знали адрес

  zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.


Очень похоже на код предыдущего прокси, но используется для трансляции сообщений из одной подсети в другую. Точно так же подобный прокси - мост можно использовать, например, для подключения подписчиков в мультикаст PGM сети с издателем в TCP.
...
Теперь поговорим об обработке ошибок. (Продолжение).

Комментариев нет :

Отправить комментарий