среда, 3 декабря 2014 г.

21.3 ZeroMQ: рабочий пример. Межброкерная маршрутизация. Прототипирование локального и облачного потоков данных.


(Начало - здесь).

Прототипирование локального и облачного потоков.

Теперь займемся прототипированием потока данных локальных и облачных сокетов. Код будет тянуть запросы от клиентов и распределять их между локальными рабочими и облачными партнерами случайным образом.

Потоки данных задач:


Перед написанием кода (который все больше усложняется) обрисуем основы логики маршрутизации и разобьем её на простые, но надежные элементы.


Нам потребуется две очереди, одна - для запросов от локальных клиентов и вторая - для запросов от облачных клиентов. Как вариант, можно вытягивать сообщения ("заявки") из локальных и облачных фронтэнд сокетов и помещать их в соответствующие очереди. Однако, это отчасти бессмысленно, так как в сокетах ZMQ собщения и так буферизуются в очереди.
Такая техника использовалась в брокере с балансировкой нагрузки, и там она работала достаточно хорошо. Мы всего лишь читаем из двух фронтэндов, когда есть что-то где-то, куда можно отправить запросы. И мы всегда можем читать из бэкэндов, так как они возвращают ответы на запросы. Пока бэкэнда не общаются с нами, на фронтэнды нет даже смысла смотреть.

Таким образом, наш главный цикл станет:
  • Опрашивать бэкэнды на предмет активности. Когда получаем сообщение, это может быть сигнал "готов" от рабочего или это может быть ответом на запрос. Если это ответ - направляем его локальному или облачному фронтэнду.
  • Если отвечает рабочий, он становится доступным, помещаем его в очередь и добавляем его к счетчику доступных рабочих.
  • Если есть доступные рабочие, принимаем запрос - если таковой имеется - от  любого из фронтэндов и перенаправляем этот запрос или локальному рабочему или (случайным образом) партнерам в облаке.

Отправка работы за пределы кластера имитируется рассылкой их случайным образом брокерам - партнерам (а не рабочим). Немного туповато, но для данной стадии  сойдет.
Мы используем идентификаторы брокеров для распределения сообщений между брокерами.  Каждый брокер идентифицируется номером tcp порта, который мы передаем ему как параметр командной строки. Так как эти идентификаторы не совпадают с UUID-дами, генерируемы ZeroMQ для клиентских узлов, мы может использовать их для распределения ответных сообщений клиентам или брокерам.
Так будет работать наш код (самая интересная часть начинается возле комментария "Интересная часть").

program Peering2;

{$APPTYPE CONSOLE}

uses
//  FastMM4,
  SysUtils
  , Classes
  , Windows
  , zmq_h
  , czmq_h
  , zmq_utils
  , math
  ;

// Симуляция брокера партнеров (часть 2)
// Прототипирование потока данных "Запрос - Ответ"
// Запуск:peering свой_порт список_портов_партнеров
//
// Для облачной связи используются порты:
// cloudfe (входные запросы) = 2свой_порт
// cloudbe (запросы к партнерам) = 2чужие_порты

// Для локальной связи используются порты:
// localfe (входные запросы от клиентов) = 1свой_порт
// localbe (запросы к рабочим) = свой_порт


const
  c_NBR_CLIENTS = 10;
  c_NBR_WORKERS = 3;
  c_WORKER_READY: byte = 1; // Сигнал "рабочий готов"

  // Свое собственное имя (порт ввода-вывода).
  // В реальности должно быть задано при конфиргурировнии узла

var
  self: PChar = nil;
  //  d : TThread

function zFormat(const aFormat: string;
  const aArgs: array of const): string;
var
  fFS: TFormatSettings;
begin
  GetLocaleFormatSettings(GetUserDefaultLCID(), fFS);
  Result := Format(aFormat, aArgs, fFS)
end;



function client_task(args: Pointer): Pointer; cdecl;
// Задача клиента ведет диалог "Запрос-Ответ" с помощью стандартнного
// синхронного сокета REQ

var
  client: Pointer;
  ctx: p_zctx_t;
  reply: PChar;
  cli_nmbr: Integer;
begin
  cli_nmbr := Integer(args);
  ctx := zctx_new();
  client := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(client, 'tcp://127.0.0.1:%s', self);

  while (true) do begin
    zstr_send(client, PChar(Format('Hello from %s (client %d)', [self, cli_nmbr]))); // Запрос
    reply := zstr_recv(client); // Ответ
    if reply = nil then
      break; // Прерван
    z_Log(Format('Here %s (client %d)', [self, cli_nmbr]));
    zstr_free(reply);
    sleep(1);
  end;
  zctx_destroy(ctx);
  result := nil;
end;



function worker_task(args: Pointer): Pointer; cdecl;
// Задача рабочего подключается в распределитель нагрузки с помощью сокета REQ

var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  msg: p_zmsg_t;
  worker: Pointer;
  wrk_nmbr: Integer;
begin
  wrk_nmbr := Integer(args);
  ctx := zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(worker, 'tcp://127.0.0.1:1%s', self);

  // Сообщить брокеру о готовности к работе
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

  // Обработка сообщений по мере их получения
  while true do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Прерван
    zframe_print(zmsg_last(msg), PChar('Worker: ' + IntToStr(wrk_nmbr)));
    zframe_reset(zmsg_last(msg), PChar('OK'), 2);
    Sleep(Random(5) + 1);
    zmsg_send(msg, worker);
  end;
  zctx_destroy(ctx);
  Result := nil;
end;

procedure doMain;
// Основная задача начинается с настройки frontend и backend сокетов
// и запуска задач клиентов и рабочих
var
  backends: array[0..1] of zmq_pollitem_t;
  capacity: Integer;
  cloudbe: Pointer;
  cloudfe: Pointer;
  ctx: p_zctx_t;
  data: PChar;
  frame: p_zframe_t;
  frontends: array[0..1] of zmq_pollitem_t;
  i: Integer;
  identity: p_zframe_t;
  localbe: Pointer;
  localfe: Pointer;
  msg: p_zmsg_t;
  peer: PChar;
  random_peer: Integer;
  rc: Integer;
  reroutable: Boolean;
  size: size_t;
  workers: p_zlist_t;
begin
  // Первый аргумент - это порт брокера
  // Другие аргумент - орты брокеров - партнеров
  //
  if ParamCount < 2 then begin
    z_Log('syntax: peering2 me {you} ...');
    exit;
  end;
  self := PChar(ParamStr(1));
  z_Log('I: preparing broker at ' + self + '...');
  Randomize;

  ctx := zctx_new();

  // Привязывем облачный frontend к конечной точке
  cloudfe := zsocket_new(ctx, ZMQ_ROUTER); // Входящие облачные запросы
  zsocket_set_identity(cloudfe, self);
  zsocket_bind(cloudfe, 'tcp://*:2%s', self);

  // Подключаем облачный backend ко всем партнерам
  cloudbe := zsocket_new(ctx, ZMQ_ROUTER); // Исходящие облачные запросы
  zsocket_set_identity(cloudbe, self);
  for i := 2 to ParamCount() do begin
    peer := PChar(ParamStr(i));
    z_Log('I: connecting to cloud frontend at ' + peer);
    rc := zsocket_connect(cloudbe, 'tcp://127.0.0.1:2%s', peer);
  end;
  // Подготовка локальных frontend и backend
  localfe := zsocket_new(ctx, ZMQ_ROUTER); // Запросы от клиентов
  zsocket_bind(localfe, 'tcp://*:%s', self);
  localbe := zsocket_new(ctx, ZMQ_ROUTER); // Запросы к рабочим
  zsocket_bind(localbe, 'tcp://*:1%s', self);

  // Просим пользователя сообщить, когда можно начинать
  z_Log('Press Enter when all brokers are started: ');
  Readln;

  // Запуск локальных рабочих

  for i := 0 to Pred(c_NBR_WORKERS) do
    zthread_new(@worker_task, Pointer(i));

  // Запуск локальных клиентов
  for i := 0 to Pred(c_NBR_CLIENTS) do
    zthread_new(@client_task, Pointer(i));

  // Обработка потока "запрос-ответ". Используем балансировщик нагрузки.
  // Опрос рабочих постоянно, а клиентов - только при наличии свободных
  // рабочих

  // Очередь для доступа к наиболее редко используемым рабочим
  capacity := 0;
  workers := zlist_new();

  while (true) do begin
    // Сначала опрашиваем и ждем ответов от рабочих (локальных или облачных)
    zPollItemInit(backends[0], localbe, 0, ZMQ_POLLIN, 0);
    zPollItemInit(backends[1], cloudbe, 0, ZMQ_POLLIN, 0);

    // Если свободные рабочие есть, ждем 1 сек, иначе ждем (бесконечно),
    // пока они появятся
    rc := zmq_poll(@backends[0], 2, IfThen(capacity > 0, 1000 * ZMQ_POLL_MSEC, -1));
    if rc = -1 then
      break; // Прерван

    msg := nil;
    if (backends[0].revents and ZMQ_POLLIN) <> 0 then begin
      // Обработка ответа от локального рабочего
      msg := zmsg_recv(localbe);
      if msg = nil then
        break; // Прерван
      identity := zmsg_unwrap(msg);
      zlist_append(workers, identity);
      Inc(capacity);
      Assert(msg <> nil);
      frame := zmsg_first(msg);
      // Если это - сигнал готовности, не распределять сообщение дальше
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg);
    end
    else if (backends[1].revents and ZMQ_POLLIN) <> 0 then begin
      // Или обрабатываем ответ от партнера - брокера
      msg := zmsg_recv(cloudbe);
      if msg = nil then
        break; // Прерван
      // Идентификатор брокера-партнера не используется никак
      identity := zmsg_unwrap(msg);
      zframe_destroy(identity);
    end;
    // Направить ответ в облако, если он адресован брокеру
    for i := 2 to ParamCount do begin
      if msg = nil then
        Break;

      //          Assert(msg <> nil);

      data := PChar(zframe_data(zmsg_first(msg)));

      Assert(msg <> nil);

      size := zframe_size(zmsg_first(msg));
      if (size = Length(ParamStr(i)))
        and CompareMem(data, Pointer(ParamStr(i)), size) then
        zmsg_send(msg, cloudfe);
    end;
    // Отправка ответа клиенту, если это все еще необходимо
    if msg <> nil then
      zmsg_send(msg, localfe);

    // Мы обработаем столько клиентских запросов, сколько есть
    // свободных рабочих. Мы можем перенаправить запросы от локального
    // frontend, но не от облачного frontend. Сейчас мы будем перенаправлять
    // случайным образом, просто чтобы протестировать. В следующей версии
    // мы будем уже вычислять емкость облака:

    while capacity > 0 do begin
      zPollItemInit(frontends[0], localfe, 0, ZMQ_POLLIN, 0);
      zPollItemInit(frontends[1], cloudfe, 0, ZMQ_POLLIN, 0);
      rc := zmq_poll(@frontends[0], 2, 0);
      assert(rc >= 0);
      reroutable := False;
      // Сначала обрабатываем брокеров-партнеров, чтобы избежать зависания
      if (frontends[1].revents and ZMQ_POLLIN) <> 0 then begin
        msg := zmsg_recv(cloudfe);
        reroutable := False;
      end
      else
        if (frontends[0].revents and ZMQ_POLLIN) <> 0 then begin
          msg := zmsg_recv(localfe);
          reroutable := True;
        end
        else
          break; // Работы нет, возвращаемся к backends

      // Если reroutable (т.е. запрос локальный), шлём в облако 20% всех запросов
      // В реальности используют информацию о стоянии облака.
      //
      if reroutable and (ParamCount > 1) and (Random(5) = 0) then begin
        // Распределяем случайному брокеру - партнеру
        random_peer := Random(ParamCount-1) + 2;
        // Внедряем фрейм с адресом партнера
        zmsg_pushmem(msg, PChar(ParamStr(random_peer)), Length(ParamStr(random_peer)));
        zmsg_send(msg, cloudbe);
      end
      else begin
        frame := zlist_pop(workers);
        zmsg_wrap(msg, frame);
        zmsg_send(msg, localbe);
        Dec(capacity);
      end;
    end;
  end;
  // Конец работы, очистка при выходе
  while zlist_size(workers) > 0 do begin
    frame := p_zframe_t(zlist_pop(workers));
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

begin
  try
    IsMultiThread := True;
    zsys_handler_set(nil);
    doMain;
  except
    on E: Exception do begin
      z_Log(E.Message);
      raise;

      Readln;
    end;

  end;

end.



Запускаем, например три экземпляра peering2.exe:



Жмем поочередно в каждом окне консоли Enter и наблюдаем, как все здорово работает. Например:





Замечания по коду.
  • Использование CZMQ существенно упрощает жизнь.
  • Так как мы не получаем никакой информации о состоянии партнеров, мы наивно полагаем, что они запущены и работают. В коде есть запрос "Нажмите Enter, когда все брокеры будут запущены". В реальном приложении мы не станем ничего отсылать брокерам, которые не сообщили нам о своем существовании.
Все вроде бы работает здорово. Но если хоть одно сообщение потеряется, клиенты зависнут в ожидании ответа. Это легко проверить, "убив" одного из запущенных брокеров. Другие брокеры будут слать запросы в облако, и потихоньку все клиенты заблокируются в режиме ожидания ответа.

Дальше мы соберем оба прототипа в один. (Продолжение).


Комментариев нет :

Отправить комментарий