M_Worker

Woher die Inspiration:

Eigentlich reizt mich PHP nicht mehr, vor allem nicht in meiner Freizeit, ich habe einfach schon viel zu viel davon gelesen/geschrieben und das nicht nur in meiner Freizeit. …Viel zu viel! …Alles …aber aberwitzig wurde es neulich, als es Probleme mit Server-Cronjobs gab, die PHP-Skripte starten, die wiederum per Http-Request neue PHP-Skripte starten. Also kurz gesagt ein Alptraum und schwerer Plaunungsfehler.


Was ich wollte:

          - PHP-Skripte, die sich wie Services/Deamonen verhalten
          - Jobs einem Worker übergeben, der diese Zyklisch ausführt (z.b. alle 10 Minuten)
          - der Worker soll nach bedarf neue Worker öffnen (per Http-Request)
          - wenn man den ersten Worker beendet, sollen sich alle Child-Worker auch beenden
       …und das in PHP :(


Also hier meine Antwort (download):

Der erste gestartete Worker lockt eine Datei und startet nacheinander seine Jobs. Konnte er die Jobs nicht in einem konfigurierbaren Intervall abarbeiten, startet er einen neuen Worker per Http-Request, anhand der gleichen Url, mit der er selbst gestartet wurde. Jeder Child-Worker kann selbst anhand der Konfiguration neue Childs erzeugen oder sich ggf. selbst beenden.


M/Worker/Test.php (download)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class M_Worker_Test
{
    public function run()
    {
        $config = array('maxLoopSleepMs' => 4000,
                        'minLoopSleepMs' => 1000,
                        'loopSleepAdjustmentRate' => 2,
                        'forkAfterNoSleepCycles' => 2,
                        'destroyAfterSleepCycles' => 2,
                        'maxWorker' => 10);
 
        $job = new M_Worker_Job('testJob');  //job needs 1-6secs
 
        $worker = new M_Worker('testWorker', array($job), $config);
 
        $worker->run();
    }
}
class M_Worker_Test
{
	public function run()
	{
		$config = array('maxLoopSleepMs' => 4000,
						'minLoopSleepMs' => 1000,
						'loopSleepAdjustmentRate' => 2,
						'forkAfterNoSleepCycles' => 2,
						'destroyAfterSleepCycles' => 2,
						'maxWorker' => 10);

		$job = new M_Worker_Job('testJob');  //job needs 1-6secs

		$worker = new M_Worker('testWorker', array($job), $config);

		$worker->run();
	}
}



und hier das Herzstück(gekürzt) M/Worker.php (download)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
ignore_user_abort(true);
ini_set('max_execution_time', 0);
ob_implicit_flush(true);
 
class M_Worker
{
        [...]
 
    public function __construct($workerName, array $jobs, array $config = null)
    {
        $this->uid = uniqid('M_Id');
        $this->workerName = $workerName;
 
        if($config)
            $this->config = array_merge($this->config, $config);
 
        foreach($jobs as $job)
            $this->addJob($job);
    }
 
    public function run()
    {
        $this->triggerInit();
        while(!$this->stopped)
        {
            $lastRunStartTime = microtime(true);        
 
            foreach($this->jobQueue as $job)
                $job->runJob($this);    
 
            $lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000);
 
            $this->calibrateAndDoLoopSleep($lastRunDuration);
 
            $this->forkOrDestroyIfNeeded();
        }
        $this->triggerShoutDown();
    }   
 
    protected function triggerInit()
    {
        $this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2;
 
        $this->stopped = false;
 
        if($this->getConfigValue('outputLogFile'))
            $this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a');
 
        $this->isMainWorker = $this->getMainLock();
 
        if($this->isMainWorker)
        {
            ignore_user_abort(false);
            $this->initWorkerCounter();
        }
        else
        {
            header("Connection: close");
            $this->runningWorker = $this->countRunningWorker();
            $this->incrementWorkerCounter();
        }   
 
        foreach($this->jobQueue as $job)
            $job->onJobInit($this);
    }
 
    protected function triggerShoutdown()
    {
        foreach($this->jobQueue as $job)
            $job->onJobDestroy($this);
 
        if(!$this->isMainWorker)
            $this->decrementWorkerCounter();            
 
        if(is_resource($this->workerLogFileHandle))
            fclose($this->workerLogFileHandle);
 
        $this->releaseMainLock();
    }
 
    protected function calibrateAndDoLoopSleep($lastRunDuration)
    {
        if($lastRunDuration > $this->actualLoopSleep)
        {
            $this->sleepCycles = 0;
            ++$this->noSleepCycles;
            $this->actualLoopSleep = max( $this->actualLoopSleep
                                             / $this->getConfigValue('loopSleepAdjustmentRate'),
                                          $this->getConfigValue('minLoopSleepMs'));
        }
        else
        {
            $this->noSleepCycles = 0;
            ++$this->sleepCycles;
            $this->actualLoopSleep = min( $this->actualLoopSleep
                                            * $this->getConfigValue('loopSleepAdjustmentRate'),
                                          $this->getConfigValue('maxLoopSleepMs'));
 
            $timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration);
            usleep($timeToSleep);
        }
    }   
 
    protected function forkOrDestroyIfNeeded()
    {
        if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles')
        && $this->countRunningWorker() < $this->getConfigValue('maxWorker'))
        {
            $this->fork();
        }
        elseif(!$this->isMainWorker
        &&(
               $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles')
            || $this->getMainLock() ))
        {
            $this->releaseMainLock();
            $this->stopped = true;
        }
    }
 
    protected function getMainLock()
    {
        if(!is_resource($this->mainWorkerLockFileHandle))
        {
            $this->mainWorkerLockFileHandle =
                    fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r');
        }
        return is_resource($this->mainWorkerLockFileHandle)
               && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
    }
 
    protected function fork()
    {
        $url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI'];
        $fp = fopen($url, 'r');
        if(is_resource($fp))
            fclose($fp);
    }
 
        [...]
}
ignore_user_abort(true);
ini_set('max_execution_time', 0);
ob_implicit_flush(true);

class M_Worker
{
		[...]

	public function __construct($workerName, array $jobs, array $config = null)
	{
		$this->uid = uniqid('M_Id');
		$this->workerName = $workerName;

		if($config)
			$this->config = array_merge($this->config, $config);

		foreach($jobs as $job)
			$this->addJob($job);
	}

	public function run()
	{
		$this->triggerInit();
		while(!$this->stopped)
		{
			$lastRunStartTime = microtime(true);		

			foreach($this->jobQueue as $job)
				$job->runJob($this);	

			$lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000);

			$this->calibrateAndDoLoopSleep($lastRunDuration);

			$this->forkOrDestroyIfNeeded();
		}
		$this->triggerShoutDown();
	}	

	protected function triggerInit()
	{
		$this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2;

		$this->stopped = false;

		if($this->getConfigValue('outputLogFile'))
			$this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a');

		$this->isMainWorker = $this->getMainLock();

		if($this->isMainWorker)
		{
			ignore_user_abort(false);
			$this->initWorkerCounter();
		}
		else
		{
			header("Connection: close");
			$this->runningWorker = $this->countRunningWorker();
			$this->incrementWorkerCounter();
		}	

		foreach($this->jobQueue as $job)
			$job->onJobInit($this);
	}

	protected function triggerShoutdown()
	{
		foreach($this->jobQueue as $job)
			$job->onJobDestroy($this);

		if(!$this->isMainWorker)
			$this->decrementWorkerCounter();			

		if(is_resource($this->workerLogFileHandle))
			fclose($this->workerLogFileHandle);

		$this->releaseMainLock();
	}

	protected function calibrateAndDoLoopSleep($lastRunDuration)
	{
		if($lastRunDuration > $this->actualLoopSleep)
		{
			$this->sleepCycles = 0;
			++$this->noSleepCycles;
			$this->actualLoopSleep = max( $this->actualLoopSleep
								 	         / $this->getConfigValue('loopSleepAdjustmentRate'),
					  	         		  $this->getConfigValue('minLoopSleepMs'));
		}
		else
		{
			$this->noSleepCycles = 0;
			++$this->sleepCycles;
			$this->actualLoopSleep = min( $this->actualLoopSleep
										    * $this->getConfigValue('loopSleepAdjustmentRate'),
						         		  $this->getConfigValue('maxLoopSleepMs'));

			$timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration);
			usleep($timeToSleep);
		}
	}	

	protected function forkOrDestroyIfNeeded()
	{
		if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles')
		&& $this->countRunningWorker() < $this->getConfigValue('maxWorker'))
		{
			$this->fork();
		}
		elseif(!$this->isMainWorker
		&&(
			   $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles')
			|| $this->getMainLock() ))
		{
			$this->releaseMainLock();
			$this->stopped = true;
		}
	}

	protected function getMainLock()
	{
		if(!is_resource($this->mainWorkerLockFileHandle))
		{
			$this->mainWorkerLockFileHandle =
					fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r');
		}
		return is_resource($this->mainWorkerLockFileHandle)
			   && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
	}

	protected function fork()
	{
		$url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI'];
		$fp = fopen($url, 'r');
		if(is_resource($fp))
			fclose($fp);
	}

		[...]
}
Kategorienmono|PHP