Исходный код проекта: GearmanDaemon
Gearman это универсальный фреймворк разработки приложений для раздачи работ множеству машин или процессов. Это позволяет приложениям выполнять задачи параллельно, распределяя нагрузку и использовать функции различных языков. Фреймворк может использоваться во множестве приложений, от высоконадежных веб-сайтов до транспорта событий репликации базы данных.
Т.е. у нас есть задача не на 15 секунд, допустим сделать несколько ресайзнутых копии загруженных пользователем фотографии в наилучшем качестве. Если это делать синхронно, то это может занять до нескольких минут. Для устранения проблемы ожидания создано это расширение.
Принцип работы:
- Клиент: Получаем данные от клиента (набор фотографии или другое.)
- Клиент: Разбиваем данные на группы по какому то признаку
- Клиент: Ставим задачи серверам: каждому серверу или разных потокам сервера своя задача
- Сервер: Демон сервера, на любом языке, получает новое задание, стартует поток и выполняет задание
Более подробное описание реализации под катом.
Описание структуры проекта:
1. server - папка с файлами, которые должны располагаться на сервере
1.1 server/daemon - многопоточный демон
server/daemon/start.php - запуск
server/daemon/stop.php - остановка
демон должен иметь доступ на запись в папку server/daemon/log/ - для записи лога и server/daemon для записи pid файла демона
1.2 server/jobs - список обработчиков заданий на стороне сервера
файл должен иметь название job_название.php , функция внутри fnc_название
1.3. server/config.php - настройки демона и обработчиков (см. комментарии в файле)
2. client - папка с функцией создания задания (обработчик, данные, сервера, приоритет)
3. index.html, ajax.php - extjs форма с созданием заданий на обработку на серверах в нескольких потоках
Класс демона. Демон реализован на PHP, но серверную часть можно реализовать на любом другом языке.
<? /*Класс с логирование в файл*/ abstract class DaemonWithFileLog extends DaemonClass { private $fapl_log; public $format = "%s [%u]: %s\n"; //имя файла для логирования public $file_log; public function __construct($log) { $this->file_log = $log; $this->fapl_log = fopen($log, "wb"); parent::__construct(); } protected function log($msg) { flock($this->fapl_log, LOCK_EX); $msg = sprintf($this->format, Date("Y-m-d H:i:s"), getmypid(), $msg); fwrite($this->fapl_log, $msg); flock($this->fapl_log, LOCK_UN); } public function __destruct() { fclose($this->fapl_log); parent::__destruct(); } }; /*работа с gearman*/ class GearDaemon extends DaemonWithFileLog { //воркер для выполнения задания protected $workers = array(); //список серверов protected $servers = array(); //список заданий для выполнения protected $jobs = array(); public function __construct(array $srv, array $jbs, $log) { $this->servers = $srv; $this->jobs = $jbs; parent::__construct($log); } protected function addWorkers() { //задания $p = getmypid(); $this->workers[$p] = new GearmanWorker(); foreach($this->servers as $srv) { if($srv[0] !== NULL && $srv[1] !== NULL) { $this->workers[$p]->addServer($srv[0], $srv[1]); } else { $this->workers[$p]->addServer(); } } $this->log("Add watch worker: ".$this->jobs[$this->cur_job][0]); $this->workers[$p]->addFunction($this->jobs[$this->cur_job][0], $this->jobs[$this->cur_job][1]); while(!$this->stop_server) { $this->workers[$p]->work(); $this->log("Worker ".($this->jobs[$this->cur_job][0])." return: ".$this->workers[$p]->returnCode()); usleep($this->sleep_mcsec); } } public function __destruct() { foreach($this->workers as $k=>$w) { unset($this->workers[$k]); } parent::__destruct(); } }; abstract class DaemonClass { // Максимальное количество дочерних процессов public $maxProcesses = 5; // Когда установится в TRUE, демон завершит работу protected $stop_server = FALSE; // Здесь будем хранить запущенные дочерние процессы protected $currentJobs = array(); //файл в котором хранится PID демона public $pid_file = "daemon.pid"; //задержка между циклами public $sleep_mcsec = 5000000; protected $cur_job = 0; public function __construct() { //проверим наличие демона if ($this->isDaemonActive($this->pid_file)) { $this->log('Daemon already active.'); exit; } //запишем pid демона file_put_contents($this->pid_file, getmypid()); $this->log("Сonstructed daemon controller..."); // Ждем сигналы SIGTERM и SIGCHLD pcntl_signal(SIGTERM, array($this, "childSignalHandler")); pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); } /*старт демона*/ public function run() { $this->log("Running daemon controller."); // Пока $stop_server не установится в TRUE, гоняем бесконечный цикл while (!$this->stop_server) { pcntl_signal_dispatch(); // Если уже запущено максимальное количество дочерних процессов, ждем их завершения while(count($this->currentJobs) >= $this->maxProcesses) { pcntl_signal_dispatch(); //$this->log("Maximum children allowed, waiting..."); usleep($this->sleep_mcsec); } //проверим список заданий, для каждого задания создадим отдельный процесс $this->launchJob(); usleep($this->sleep_mcsec); } } abstract protected function log($msg); abstract protected function addWorkers(); /*обработка заданий*/ protected function launchJob() { // Создаем дочерний процесс // весь код после pcntl_fork() будет выполняться // двумя процессами: родительским и дочерним $pid = pcntl_fork(); if ($pid == -1) { // Не удалось создать дочерний процесс error_log('Could not launch new job, exiting'); return FALSE; } elseif ($pid) { // Этот код выполнится родительским процессом $this->currentJobs[$pid] = TRUE; $this->cur_job++; } else { $this->addWorkers(); exit(); } return TRUE; } /*обработка сигналов*/ public function childSignalHandler($signo, $pid = null, $status = null) { //$this->log($signo."!"); switch($signo) { case SIGTERM: // При получении сигнала завершения работы устанавливаем флаг $this->stop_server = true; $this->log('Daemon stop.'); break; case SIGCHLD: // При получении сигнала от дочернего процесса if (!$pid) { $pid = pcntl_waitpid(-1, $status, WNOHANG); } // Пока есть завершенные дочерние процессы while ($pid > 0) { if ($pid && isset($this->currentJobs[$pid])) { // Удаляем дочерние процессы из списка unset($this->currentJobs[$pid]); $this->log("Child ".$pid." stopped."); } $pid = pcntl_waitpid(-1, $status, WNOHANG); } break; default: // все остальные сигналы } } //проверка запущенности демона protected function isDaemonActive($pid_file) { if( is_file($pid_file) ) { $pid = file_get_contents($pid_file); //проверяем на наличие процесса if(posix_kill($pid,0)) { //демон уже запущен return true; } else { //pid-файл есть, но процесса нет if(!unlink($pid_file)) { //не могу уничтожить pid-файл. ошибка exit(-1); } } } return false; } } ?>
Стартуем демона, подхватываем обработчики заданий, для каждого задания создаем отдельный поток
<? require_once('../config.php'); //сформируем массив заданий и загрузим обработчики $jobs = array(); $fjobs = glob($jobs_dir."/job_*.php"); foreach($fjobs as $job) { $func = "fnc_".substr(basename($job),4,-4); $jobs[] = array($func, $func); require_once($job); //добавим трансляцию заданий if(in_array($func, $add_jobs)) { foreach($add_jobs as $kj=>$aj) { if($aj == $func) { $jobs[] = array($kj, $aj); } } } } require_once('DaemonClass.php'); // Создаем дочерний процесс $child_pid = pcntl_fork(); if ($child_pid) { // Выходим из родительского, привязанного к консоли, процесса exit(); } // Делаем основным процессом дочерний. posix_setsid(); ini_set('error_log', 'log/error.log'); $daemon = new GearDaemon($servers, $jobs, $file_log); $daemon->maxProcesses = count($jobs);//число процессов = числу заданий $daemon->pid_file = $pid_file; $daemon->sleep_mcsec = $sleep_mcsec; $daemon->format = $log_format; $daemon->run(); ?>
Пример обработчика задания (Название файла и обработчика должны быть согласованы)
<? function fnc_test($job) { sleep(1); file_put_contents("log/l".getmypid().".log", print_r($job->workload(),1)); return "Завершено!!!!"; } ?>
Клиентская часть для постановки задачи:
<? class GPrior { const High = 3; const Normal = 2; const Low = 1; } /*добавить задачу для выполнения в фоне*/ function addTaskBg($func_name, array &$data, array $servers = array(), $prior = GPrior::Normal) { $gmc = new GearmanClient(); //сервера для обработки задания if(empty($servers)) { $gmc->addServer(); } else { foreach($servers as $srv) { if(count($srv) != 2) throw Exception("Wrong Server format"); $gmc->addServer($srv[0], $srv[1]); } } //данные $sdata = serialize($data); //уникальное значение для текущей задачи $uuid = ip2long($_SERVER['SERVER_ADDR']).time().rand(1,100000); //постановка задачи $task = NULL; if($prior == GPrior::High) { $task = $gmc->addTaskHighBackground($func_name, $sdata, null, $uuid); } elseif($prior == GPrior::Low) { $task = $gmc->addTaskLowBackground($func_name, $sdata, null, $uuid); } else { $task = $gmc->addTaskBackground($func_name, $sdata, null, $uuid); } if (!$gmc->runTasks()) throw Exception($gmc->error()); unset($gmc); return $task; } ?>
Привет! Спасибо за полезный класс, пытаюсь разобраться.
ОтветитьУдалитьУ меня после остановки демона остаются висеть процессы-воркеры - ps aux | grep 'php' выводит start.php в количестве 5 штук. Это нормально? :)
Не уверен, но думаю, что это не хорошо, видимо остаются висеть процессы-зомби без родителя демона. Возможно, нужно добавить в файл server/daemon/stop.php убийство всех дочерних процессов перед стопом демона.
ОтветитьУдалитьНо stop.php не знает идентификаторы дочерних процессов, скорее это должно быть в деструкторе GearDaemon. Попробую сделать, спасибо.
ОтветитьУдалитьХоть бы код отформатировали, стыдно должно быть такое выкладывать.
ОтветитьУдалить