воскресенье, 15 апреля 2012 г.

Демон и клиент Gearman

Сегодня пойдет речь о расширении к PHP Gearman.
Исходный код проекта: GearmanDaemon


Gearman это универсальный фреймворк разработки приложений для раздачи работ множеству машин или процессов. Это позволяет приложениям выполнять задачи параллельно, распределяя нагрузку и использовать функции различных языков. Фреймворк может использоваться во множестве приложений, от высоконадежных веб-сайтов до транспорта событий репликации базы данных.

Т.е. у нас есть задача не на 15 секунд, допустим сделать несколько ресайзнутых копии загруженных пользователем фотографии в наилучшем качестве. Если это делать синхронно, то это может занять до нескольких минут. Для устранения проблемы ожидания создано это расширение.

Принцип работы:
  1. Клиент: Получаем данные от клиента (набор фотографии или другое.)
  2. Клиент: Разбиваем данные на группы по какому то признаку
  3. Клиент: Ставим задачи серверам: каждому серверу или разных потокам сервера своя задача
  4. Сервер: Демон сервера, на любом языке, получает новое задание, стартует поток и выполняет задание
В случае асинхронного выполнения, ответ обратно не отсылается. Но если задача выполняется синхронно, просто с разбивкой по потокам или серверам, то можно отправить ответ.

Более подробное описание реализации под катом.

Описание структуры проекта:
1. server - папка с файлами, которые должны располагаться на сервере
1.1 server/daemon - многопоточный демон
server/daemon/start.php - запуск
server/daemon/stop.php - остановка
демон должен иметь доступ на запись в папку server/daemon/log/ - для записи лога и server/daemon для записи pid файла демона
1.2 server/jobs - список обработчиков заданий на стороне сервера
файл должен иметь название job_название.php , функция внутри fnc_название
1.3. server/config.php - настройки демона и обработчиков (см. комментарии в файле)
2. client - папка с функцией создания задания (обработчик, данные, сервера, приоритет)
3. index.html, ajax.php - extjs форма с созданием заданий на обработку на серверах в нескольких потоках

Класс демона. Демон реализован на PHP, но серверную часть можно реализовать на любом другом языке.
<?

/*Класс с логирование в файл*/
abstract class DaemonWithFileLog extends DaemonClass {
  private $fapl_log;
  public $format = "%s [%u]: %s\n";
  //имя файла для логирования
  public $file_log;

  public function __construct($log) {
    $this->file_log = $log;
    $this->fapl_log = fopen($log, "wb");
    parent::__construct();
  }

  protected function log($msg) {
    flock($this->fapl_log, LOCK_EX);
    $msg = sprintf($this->format, Date("Y-m-d H:i:s"), getmypid(), $msg);
    fwrite($this->fapl_log, $msg);
    flock($this->fapl_log, LOCK_UN);
  }

  public function __destruct() {
    fclose($this->fapl_log);
    parent::__destruct();
  }
};

/*работа с gearman*/
class GearDaemon extends DaemonWithFileLog {
  //воркер для выполнения задания
  protected $workers = array();
  //список серверов
  protected $servers = array();
  //список заданий для выполнения
  protected $jobs = array();

  public function __construct(array $srv, array $jbs, $log) {
    $this->servers = $srv;
    $this->jobs = $jbs;
    parent::__construct($log);
  }

  protected function addWorkers() {
      //задания
      $p = getmypid();
      $this->workers[$p] = new GearmanWorker();
      foreach($this->servers as $srv) {
 if($srv[0] !== NULL && $srv[1] !== NULL) {
   $this->workers[$p]->addServer($srv[0], $srv[1]);
 } else {
   $this->workers[$p]->addServer();
 }
      }

      $this->log("Add watch worker: ".$this->jobs[$this->cur_job][0]);

      $this->workers[$p]->addFunction($this->jobs[$this->cur_job][0], $this->jobs[$this->cur_job][1]);

      while(!$this->stop_server) {
 $this->workers[$p]->work();
 $this->log("Worker ".($this->jobs[$this->cur_job][0])." return: ".$this->workers[$p]->returnCode());

 usleep($this->sleep_mcsec);
      }
  }

  public function __destruct() {
    foreach($this->workers as $k=>$w) {
      unset($this->workers[$k]);
    }
    parent::__destruct();
  }
};

abstract class DaemonClass {
    // Максимальное количество дочерних процессов
    public $maxProcesses = 5;
    // Когда установится в TRUE, демон завершит работу
    protected $stop_server = FALSE;
    // Здесь будем хранить запущенные дочерние процессы
    protected $currentJobs = array();
    //файл в котором хранится PID демона
    public $pid_file = "daemon.pid";
    //задержка между циклами
    public $sleep_mcsec = 5000000;

    protected $cur_job = 0;

    public function __construct() {
 //проверим наличие демона
 if ($this->isDaemonActive($this->pid_file)) {
     $this->log('Daemon already active.');
     exit;
 }
 //запишем pid демона
 file_put_contents($this->pid_file, getmypid());

        $this->log("Сonstructed daemon controller...");

        // Ждем сигналы SIGTERM и SIGCHLD
        pcntl_signal(SIGTERM, array($this, "childSignalHandler"));
        pcntl_signal(SIGCHLD, array($this, "childSignalHandler"));
    }

    /*старт демона*/
    public function run() {
        $this->log("Running daemon controller.");

        // Пока $stop_server не установится в TRUE, гоняем бесконечный цикл
        while (!$this->stop_server) {
     pcntl_signal_dispatch();
            // Если уже запущено максимальное количество дочерних процессов, ждем их завершения
            while(count($this->currentJobs) >= $this->maxProcesses) {
   pcntl_signal_dispatch();
                 //$this->log("Maximum children allowed, waiting...");
                 usleep($this->sleep_mcsec);
            }
     //проверим список заданий, для каждого задания создадим отдельный процесс
     $this->launchJob();
     usleep($this->sleep_mcsec);
        }
    }

    abstract protected function log($msg);
    abstract protected function addWorkers();

    /*обработка заданий*/
    protected function launchJob() {
        // Создаем дочерний процесс
        // весь код после pcntl_fork() будет выполняться
        // двумя процессами: родительским и дочерним
        $pid = pcntl_fork();
        if ($pid == -1) {
            // Не удалось создать дочерний процесс
            error_log('Could not launch new job, exiting');
            return FALSE;
        }
        elseif ($pid) {
            // Этот код выполнится родительским процессом
            $this->currentJobs[$pid] = TRUE;
     $this->cur_job++;
        }
        else {
     $this->addWorkers();
            exit();
        }
        return TRUE;
    }

    /*обработка сигналов*/
    public function childSignalHandler($signo, $pid = null, $status = null) {
 //$this->log($signo."!");
        switch($signo) {
            case SIGTERM:
                // При получении сигнала завершения работы устанавливаем флаг
                $this->stop_server = true;
  $this->log('Daemon stop.');
                break;
            case SIGCHLD:
                // При получении сигнала от дочернего процесса
                if (!$pid) {
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                // Пока есть завершенные дочерние процессы
                while ($pid > 0) {
                    if ($pid && isset($this->currentJobs[$pid])) {
                        // Удаляем дочерние процессы из списка
                        unset($this->currentJobs[$pid]);
   $this->log("Child ".$pid." stopped.");
                    }
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                break;
            default:
                // все остальные сигналы
        }
    }

    //проверка запущенности демона
    protected function isDaemonActive($pid_file) {
 if( is_file($pid_file) ) {
     $pid = file_get_contents($pid_file);
     //проверяем на наличие процесса
     if(posix_kill($pid,0)) {
  //демон уже запущен
  return true;
     } else {
  //pid-файл есть, но процесса нет
  if(!unlink($pid_file)) {
      //не могу уничтожить pid-файл. ошибка
      exit(-1);
  }
     }
 }
 return false;
    }
}
?>


Стартуем демона, подхватываем обработчики заданий, для каждого задания создаем отдельный поток
<?
require_once('../config.php');

//сформируем массив заданий и загрузим обработчики
$jobs = array();
$fjobs = glob($jobs_dir."/job_*.php");
foreach($fjobs as $job) {
  $func = "fnc_".substr(basename($job),4,-4);
  $jobs[] = array($func, $func);
  require_once($job);

  //добавим трансляцию заданий
  if(in_array($func, $add_jobs)) {
    foreach($add_jobs as $kj=>$aj) {
      if($aj == $func) {
 $jobs[] = array($kj, $aj);
      }
    }
  }
}
require_once('DaemonClass.php');

// Создаем дочерний процесс
$child_pid = pcntl_fork();
if ($child_pid) {
    // Выходим из родительского, привязанного к консоли, процесса
    exit();
}
// Делаем основным процессом дочерний.
posix_setsid();

ini_set('error_log', 'log/error.log');

$daemon = new GearDaemon($servers, $jobs, $file_log);
$daemon->maxProcesses  = count($jobs);//число процессов = числу заданий
$daemon->pid_file = $pid_file;
$daemon->sleep_mcsec  = $sleep_mcsec;
$daemon->format  = $log_format;
$daemon->run();
?>


Пример обработчика задания (Название файла и обработчика должны быть согласованы)
<?
function fnc_test($job) {
  sleep(1);
  file_put_contents("log/l".getmypid().".log", print_r($job->workload(),1));
  return "Завершено!!!!";
}
?>


Клиентская часть для постановки задачи:
<?
class GPrior {
  const High = 3;
  const Normal = 2;
  const Low = 1;
}

/*добавить задачу для выполнения в фоне*/
function addTaskBg($func_name, array &$data, array $servers = array(), $prior = GPrior::Normal) {
  $gmc = new GearmanClient();

  //сервера для обработки задания
  if(empty($servers)) {
    $gmc->addServer();
  } else {
    foreach($servers as $srv) {
      if(count($srv) != 2) throw Exception("Wrong Server format");
      $gmc->addServer($srv[0], $srv[1]);
    }
  }
  //данные
  $sdata = serialize($data);

  //уникальное значение для текущей задачи
  $uuid = ip2long($_SERVER['SERVER_ADDR']).time().rand(1,100000);

  //постановка задачи
  $task = NULL;
  if($prior == GPrior::High) {
    $task = $gmc->addTaskHighBackground($func_name, $sdata, null, $uuid);
  } elseif($prior == GPrior::Low) {
    $task = $gmc->addTaskLowBackground($func_name, $sdata, null, $uuid);
  } else {
    $task = $gmc->addTaskBackground($func_name, $sdata, null, $uuid);
  }

  if (!$gmc->runTasks()) throw Exception($gmc->error());

  unset($gmc);

  return $task;

}
?>

4 комментария:

  1. Привет! Спасибо за полезный класс, пытаюсь разобраться.
    У меня после остановки демона остаются висеть процессы-воркеры - ps aux | grep 'php' выводит start.php в количестве 5 штук. Это нормально? :)

    ОтветитьУдалить
  2. Не уверен, но думаю, что это не хорошо, видимо остаются висеть процессы-зомби без родителя демона. Возможно, нужно добавить в файл server/daemon/stop.php убийство всех дочерних процессов перед стопом демона.

    ОтветитьУдалить
  3. Но stop.php не знает идентификаторы дочерних процессов, скорее это должно быть в деструкторе GearDaemon. Попробую сделать, спасибо.

    ОтветитьУдалить
  4. Хоть бы код отформатировали, стыдно должно быть такое выкладывать.

    ОтветитьУдалить