M_Worker
Woher die Inspiration:
Eigentlich reizt mich PHP nicht mehr, vor allem nicht in meiner Freizeit, ich habe einfach schon viel zu viel davon gelesen/geschrieben und das nicht nur in meiner Freizeit. …Viel zu viel! …Alles …aber aberwitzig wurde es neulich, als es Probleme mit Server-Cronjobs gab, die PHP-Skripte starten, die wiederum per Http-Request neue PHP-Skripte starten. Also kurz gesagt ein Alptraum und schwerer Plaunungsfehler.
Was ich wollte:
- PHP-Skripte, die sich wie Services/Deamonen verhalten
- Jobs einem Worker übergeben, der diese Zyklisch ausführt (z.b. alle 10 Minuten)
- der Worker soll nach bedarf neue Worker öffnen (per Http-Request)
- wenn man den ersten Worker beendet, sollen sich alle Child-Worker auch beenden
…und das in PHP ![]()
Also hier meine Antwort (download):
Der erste gestartete Worker lockt eine Datei und startet nacheinander seine Jobs. Konnte er die Jobs nicht in einem konfigurierbaren Intervall abarbeiten, startet er einen neuen Worker per Http-Request, anhand der gleichen Url, mit der er selbst gestartet wurde. Jeder Child-Worker kann selbst anhand der Konfiguration neue Childs erzeugen oder sich ggf. selbst beenden.
M/Worker/Test.php (download)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | class M_Worker_Test { public function run() { $config = array('maxLoopSleepMs' => 4000, 'minLoopSleepMs' => 1000, 'loopSleepAdjustmentRate' => 2, 'forkAfterNoSleepCycles' => 2, 'destroyAfterSleepCycles' => 2, 'maxWorker' => 10); $job = new M_Worker_Job('testJob'); //job needs 1-6secs $worker = new M_Worker('testWorker', array($job), $config); $worker->run(); } } |
class M_Worker_Test
{
public function run()
{
$config = array('maxLoopSleepMs' => 4000,
'minLoopSleepMs' => 1000,
'loopSleepAdjustmentRate' => 2,
'forkAfterNoSleepCycles' => 2,
'destroyAfterSleepCycles' => 2,
'maxWorker' => 10);
$job = new M_Worker_Job('testJob'); //job needs 1-6secs
$worker = new M_Worker('testWorker', array($job), $config);
$worker->run();
}
}
und hier das Herzstück(gekürzt) M/Worker.php (download)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | ignore_user_abort(true); ini_set('max_execution_time', 0); ob_implicit_flush(true); class M_Worker { [...] public function __construct($workerName, array $jobs, array $config = null) { $this->uid = uniqid('M_Id'); $this->workerName = $workerName; if($config) $this->config = array_merge($this->config, $config); foreach($jobs as $job) $this->addJob($job); } public function run() { $this->triggerInit(); while(!$this->stopped) { $lastRunStartTime = microtime(true); foreach($this->jobQueue as $job) $job->runJob($this); $lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000); $this->calibrateAndDoLoopSleep($lastRunDuration); $this->forkOrDestroyIfNeeded(); } $this->triggerShoutDown(); } protected function triggerInit() { $this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2; $this->stopped = false; if($this->getConfigValue('outputLogFile')) $this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a'); $this->isMainWorker = $this->getMainLock(); if($this->isMainWorker) { ignore_user_abort(false); $this->initWorkerCounter(); } else { header("Connection: close"); $this->runningWorker = $this->countRunningWorker(); $this->incrementWorkerCounter(); } foreach($this->jobQueue as $job) $job->onJobInit($this); } protected function triggerShoutdown() { foreach($this->jobQueue as $job) $job->onJobDestroy($this); if(!$this->isMainWorker) $this->decrementWorkerCounter(); if(is_resource($this->workerLogFileHandle)) fclose($this->workerLogFileHandle); $this->releaseMainLock(); } protected function calibrateAndDoLoopSleep($lastRunDuration) { if($lastRunDuration > $this->actualLoopSleep) { $this->sleepCycles = 0; ++$this->noSleepCycles; $this->actualLoopSleep = max( $this->actualLoopSleep / $this->getConfigValue('loopSleepAdjustmentRate'), $this->getConfigValue('minLoopSleepMs')); } else { $this->noSleepCycles = 0; ++$this->sleepCycles; $this->actualLoopSleep = min( $this->actualLoopSleep * $this->getConfigValue('loopSleepAdjustmentRate'), $this->getConfigValue('maxLoopSleepMs')); $timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration); usleep($timeToSleep); } } protected function forkOrDestroyIfNeeded() { if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles') && $this->countRunningWorker() < $this->getConfigValue('maxWorker')) { $this->fork(); } elseif(!$this->isMainWorker &&( $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles') || $this->getMainLock() )) { $this->releaseMainLock(); $this->stopped = true; } } protected function getMainLock() { if(!is_resource($this->mainWorkerLockFileHandle)) { $this->mainWorkerLockFileHandle = fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r'); } return is_resource($this->mainWorkerLockFileHandle) && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB); } protected function fork() { $url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI']; $fp = fopen($url, 'r'); if(is_resource($fp)) fclose($fp); } [...] } |
ignore_user_abort(true);
ini_set('max_execution_time', 0);
ob_implicit_flush(true);
class M_Worker
{
[...]
public function __construct($workerName, array $jobs, array $config = null)
{
$this->uid = uniqid('M_Id');
$this->workerName = $workerName;
if($config)
$this->config = array_merge($this->config, $config);
foreach($jobs as $job)
$this->addJob($job);
}
public function run()
{
$this->triggerInit();
while(!$this->stopped)
{
$lastRunStartTime = microtime(true);
foreach($this->jobQueue as $job)
$job->runJob($this);
$lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000);
$this->calibrateAndDoLoopSleep($lastRunDuration);
$this->forkOrDestroyIfNeeded();
}
$this->triggerShoutDown();
}
protected function triggerInit()
{
$this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2;
$this->stopped = false;
if($this->getConfigValue('outputLogFile'))
$this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a');
$this->isMainWorker = $this->getMainLock();
if($this->isMainWorker)
{
ignore_user_abort(false);
$this->initWorkerCounter();
}
else
{
header("Connection: close");
$this->runningWorker = $this->countRunningWorker();
$this->incrementWorkerCounter();
}
foreach($this->jobQueue as $job)
$job->onJobInit($this);
}
protected function triggerShoutdown()
{
foreach($this->jobQueue as $job)
$job->onJobDestroy($this);
if(!$this->isMainWorker)
$this->decrementWorkerCounter();
if(is_resource($this->workerLogFileHandle))
fclose($this->workerLogFileHandle);
$this->releaseMainLock();
}
protected function calibrateAndDoLoopSleep($lastRunDuration)
{
if($lastRunDuration > $this->actualLoopSleep)
{
$this->sleepCycles = 0;
++$this->noSleepCycles;
$this->actualLoopSleep = max( $this->actualLoopSleep
/ $this->getConfigValue('loopSleepAdjustmentRate'),
$this->getConfigValue('minLoopSleepMs'));
}
else
{
$this->noSleepCycles = 0;
++$this->sleepCycles;
$this->actualLoopSleep = min( $this->actualLoopSleep
* $this->getConfigValue('loopSleepAdjustmentRate'),
$this->getConfigValue('maxLoopSleepMs'));
$timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration);
usleep($timeToSleep);
}
}
protected function forkOrDestroyIfNeeded()
{
if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles')
&& $this->countRunningWorker() < $this->getConfigValue('maxWorker'))
{
$this->fork();
}
elseif(!$this->isMainWorker
&&(
$this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles')
|| $this->getMainLock() ))
{
$this->releaseMainLock();
$this->stopped = true;
}
}
protected function getMainLock()
{
if(!is_resource($this->mainWorkerLockFileHandle))
{
$this->mainWorkerLockFileHandle =
fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r');
}
return is_resource($this->mainWorkerLockFileHandle)
&& @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
}
protected function fork()
{
$url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI'];
$fp = fopen($url, 'r');
if(is_resource($fp))
fclose($fp);
}
[...]
}