Исходный код проекта: GearmanDaemon
Gearman это универсальный фреймворк разработки приложений для раздачи работ множеству машин или процессов. Это позволяет приложениям выполнять задачи параллельно, распределяя нагрузку и использовать функции различных языков. Фреймворк может использоваться во множестве приложений, от высоконадежных веб-сайтов до транспорта событий репликации базы данных.
Т.е. у нас есть задача не на 15 секунд, допустим сделать несколько ресайзнутых копии загруженных пользователем фотографии в наилучшем качестве. Если это делать синхронно, то это может занять до нескольких минут. Для устранения проблемы ожидания создано это расширение.
Принцип работы:
- Клиент: Получаем данные от клиента (набор фотографии или другое.)
- Клиент: Разбиваем данные на группы по какому то признаку
- Клиент: Ставим задачи серверам: каждому серверу или разных потокам сервера своя задача
- Сервер: Демон сервера, на любом языке, получает новое задание, стартует поток и выполняет задание
Более подробное описание реализации под катом.
Описание структуры проекта:
1. server - папка с файлами, которые должны располагаться на сервере
1.1 server/daemon - многопоточный демон
server/daemon/start.php - запуск
server/daemon/stop.php - остановка
демон должен иметь доступ на запись в папку server/daemon/log/ - для записи лога и server/daemon для записи pid файла демона
1.2 server/jobs - список обработчиков заданий на стороне сервера
файл должен иметь название job_название.php , функция внутри fnc_название
1.3. server/config.php - настройки демона и обработчиков (см. комментарии в файле)
2. client - папка с функцией создания задания (обработчик, данные, сервера, приоритет)
3. index.html, ajax.php - extjs форма с созданием заданий на обработку на серверах в нескольких потоках
Класс демона. Демон реализован на PHP, но серверную часть можно реализовать на любом другом языке.
<?
/*Класс с логирование в файл*/
abstract class DaemonWithFileLog extends DaemonClass {
private $fapl_log;
public $format = "%s [%u]: %s\n";
//имя файла для логирования
public $file_log;
public function __construct($log) {
$this->file_log = $log;
$this->fapl_log = fopen($log, "wb");
parent::__construct();
}
protected function log($msg) {
flock($this->fapl_log, LOCK_EX);
$msg = sprintf($this->format, Date("Y-m-d H:i:s"), getmypid(), $msg);
fwrite($this->fapl_log, $msg);
flock($this->fapl_log, LOCK_UN);
}
public function __destruct() {
fclose($this->fapl_log);
parent::__destruct();
}
};
/*работа с gearman*/
class GearDaemon extends DaemonWithFileLog {
//воркер для выполнения задания
protected $workers = array();
//список серверов
protected $servers = array();
//список заданий для выполнения
protected $jobs = array();
public function __construct(array $srv, array $jbs, $log) {
$this->servers = $srv;
$this->jobs = $jbs;
parent::__construct($log);
}
protected function addWorkers() {
//задания
$p = getmypid();
$this->workers[$p] = new GearmanWorker();
foreach($this->servers as $srv) {
if($srv[0] !== NULL && $srv[1] !== NULL) {
$this->workers[$p]->addServer($srv[0], $srv[1]);
} else {
$this->workers[$p]->addServer();
}
}
$this->log("Add watch worker: ".$this->jobs[$this->cur_job][0]);
$this->workers[$p]->addFunction($this->jobs[$this->cur_job][0], $this->jobs[$this->cur_job][1]);
while(!$this->stop_server) {
$this->workers[$p]->work();
$this->log("Worker ".($this->jobs[$this->cur_job][0])." return: ".$this->workers[$p]->returnCode());
usleep($this->sleep_mcsec);
}
}
public function __destruct() {
foreach($this->workers as $k=>$w) {
unset($this->workers[$k]);
}
parent::__destruct();
}
};
abstract class DaemonClass {
// Максимальное количество дочерних процессов
public $maxProcesses = 5;
// Когда установится в TRUE, демон завершит работу
protected $stop_server = FALSE;
// Здесь будем хранить запущенные дочерние процессы
protected $currentJobs = array();
//файл в котором хранится PID демона
public $pid_file = "daemon.pid";
//задержка между циклами
public $sleep_mcsec = 5000000;
protected $cur_job = 0;
public function __construct() {
//проверим наличие демона
if ($this->isDaemonActive($this->pid_file)) {
$this->log('Daemon already active.');
exit;
}
//запишем pid демона
file_put_contents($this->pid_file, getmypid());
$this->log("Сonstructed daemon controller...");
// Ждем сигналы SIGTERM и SIGCHLD
pcntl_signal(SIGTERM, array($this, "childSignalHandler"));
pcntl_signal(SIGCHLD, array($this, "childSignalHandler"));
}
/*старт демона*/
public function run() {
$this->log("Running daemon controller.");
// Пока $stop_server не установится в TRUE, гоняем бесконечный цикл
while (!$this->stop_server) {
pcntl_signal_dispatch();
// Если уже запущено максимальное количество дочерних процессов, ждем их завершения
while(count($this->currentJobs) >= $this->maxProcesses) {
pcntl_signal_dispatch();
//$this->log("Maximum children allowed, waiting...");
usleep($this->sleep_mcsec);
}
//проверим список заданий, для каждого задания создадим отдельный процесс
$this->launchJob();
usleep($this->sleep_mcsec);
}
}
abstract protected function log($msg);
abstract protected function addWorkers();
/*обработка заданий*/
protected function launchJob() {
// Создаем дочерний процесс
// весь код после pcntl_fork() будет выполняться
// двумя процессами: родительским и дочерним
$pid = pcntl_fork();
if ($pid == -1) {
// Не удалось создать дочерний процесс
error_log('Could not launch new job, exiting');
return FALSE;
}
elseif ($pid) {
// Этот код выполнится родительским процессом
$this->currentJobs[$pid] = TRUE;
$this->cur_job++;
}
else {
$this->addWorkers();
exit();
}
return TRUE;
}
/*обработка сигналов*/
public function childSignalHandler($signo, $pid = null, $status = null) {
//$this->log($signo."!");
switch($signo) {
case SIGTERM:
// При получении сигнала завершения работы устанавливаем флаг
$this->stop_server = true;
$this->log('Daemon stop.');
break;
case SIGCHLD:
// При получении сигнала от дочернего процесса
if (!$pid) {
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
// Пока есть завершенные дочерние процессы
while ($pid > 0) {
if ($pid && isset($this->currentJobs[$pid])) {
// Удаляем дочерние процессы из списка
unset($this->currentJobs[$pid]);
$this->log("Child ".$pid." stopped.");
}
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
break;
default:
// все остальные сигналы
}
}
//проверка запущенности демона
protected function isDaemonActive($pid_file) {
if( is_file($pid_file) ) {
$pid = file_get_contents($pid_file);
//проверяем на наличие процесса
if(posix_kill($pid,0)) {
//демон уже запущен
return true;
} else {
//pid-файл есть, но процесса нет
if(!unlink($pid_file)) {
//не могу уничтожить pid-файл. ошибка
exit(-1);
}
}
}
return false;
}
}
?>
Стартуем демона, подхватываем обработчики заданий, для каждого задания создаем отдельный поток
<?
require_once('../config.php');
//сформируем массив заданий и загрузим обработчики
$jobs = array();
$fjobs = glob($jobs_dir."/job_*.php");
foreach($fjobs as $job) {
$func = "fnc_".substr(basename($job),4,-4);
$jobs[] = array($func, $func);
require_once($job);
//добавим трансляцию заданий
if(in_array($func, $add_jobs)) {
foreach($add_jobs as $kj=>$aj) {
if($aj == $func) {
$jobs[] = array($kj, $aj);
}
}
}
}
require_once('DaemonClass.php');
// Создаем дочерний процесс
$child_pid = pcntl_fork();
if ($child_pid) {
// Выходим из родительского, привязанного к консоли, процесса
exit();
}
// Делаем основным процессом дочерний.
posix_setsid();
ini_set('error_log', 'log/error.log');
$daemon = new GearDaemon($servers, $jobs, $file_log);
$daemon->maxProcesses = count($jobs);//число процессов = числу заданий
$daemon->pid_file = $pid_file;
$daemon->sleep_mcsec = $sleep_mcsec;
$daemon->format = $log_format;
$daemon->run();
?>
Пример обработчика задания (Название файла и обработчика должны быть согласованы)
<?
function fnc_test($job) {
sleep(1);
file_put_contents("log/l".getmypid().".log", print_r($job->workload(),1));
return "Завершено!!!!";
}
?>
Клиентская часть для постановки задачи:
<?
class GPrior {
const High = 3;
const Normal = 2;
const Low = 1;
}
/*добавить задачу для выполнения в фоне*/
function addTaskBg($func_name, array &$data, array $servers = array(), $prior = GPrior::Normal) {
$gmc = new GearmanClient();
//сервера для обработки задания
if(empty($servers)) {
$gmc->addServer();
} else {
foreach($servers as $srv) {
if(count($srv) != 2) throw Exception("Wrong Server format");
$gmc->addServer($srv[0], $srv[1]);
}
}
//данные
$sdata = serialize($data);
//уникальное значение для текущей задачи
$uuid = ip2long($_SERVER['SERVER_ADDR']).time().rand(1,100000);
//постановка задачи
$task = NULL;
if($prior == GPrior::High) {
$task = $gmc->addTaskHighBackground($func_name, $sdata, null, $uuid);
} elseif($prior == GPrior::Low) {
$task = $gmc->addTaskLowBackground($func_name, $sdata, null, $uuid);
} else {
$task = $gmc->addTaskBackground($func_name, $sdata, null, $uuid);
}
if (!$gmc->runTasks()) throw Exception($gmc->error());
unset($gmc);
return $task;
}
?>
Привет! Спасибо за полезный класс, пытаюсь разобраться.
ОтветитьУдалитьУ меня после остановки демона остаются висеть процессы-воркеры - ps aux | grep 'php' выводит start.php в количестве 5 штук. Это нормально? :)
Не уверен, но думаю, что это не хорошо, видимо остаются висеть процессы-зомби без родителя демона. Возможно, нужно добавить в файл server/daemon/stop.php убийство всех дочерних процессов перед стопом демона.
ОтветитьУдалитьНо stop.php не знает идентификаторы дочерних процессов, скорее это должно быть в деструкторе GearDaemon. Попробую сделать, спасибо.
ОтветитьУдалитьХоть бы код отформатировали, стыдно должно быть такое выкладывать.
ОтветитьУдалить