8398c9048d
code was modified slightly, so the code differs from the original downloadable 1.9.5 version
658 lines
19 KiB
PHP
658 lines
19 KiB
PHP
<?php
|
|
/**
|
|
* Zend Framework
|
|
*
|
|
* LICENSE
|
|
*
|
|
* This source file is subject to the new BSD license that is bundled
|
|
* with this package in the file LICENSE.txt.
|
|
* It is also available through the world-wide-web at this URL:
|
|
* http://framework.zend.com/license/new-bsd
|
|
* If you did not receive a copy of the license and are unable to
|
|
* obtain it through the world-wide-web, please send an email
|
|
* to license@zend.com so we can send you a copy immediately.
|
|
*
|
|
* @category ZendX
|
|
* @package ZendX_Console
|
|
* @copyright Copyright (c) 2005-2008 Zend Technologies USA Inc. (http://www.zend.com)
|
|
* @license http://framework.zend.com/license/new-bsd New BSD License
|
|
* @version $Id: Unix.php 14367 2009-03-18 21:39:39Z dasprid $
|
|
*/
|
|
|
|
|
|
/**
|
|
* ZendX_Console_Process_Unix allows you to spawn a class as a separated process
|
|
*
|
|
* @category ZendX
|
|
* @package ZendX_Console
|
|
* @copyright Copyright (c) 2005-2008 Zend Technologies USA Inc. (http://www.zend.com)
|
|
* @license http://framework.zend.com/license/new-bsd New BSD License
|
|
*/
|
|
abstract class ZendX_Console_Process_Unix
|
|
{
|
|
/**
|
|
* Void method
|
|
*/
|
|
const VOID_METHOD = 'void_method';
|
|
|
|
/**
|
|
* Return method
|
|
*/
|
|
const RETURN_METHOD = 'void_method';
|
|
|
|
/**
|
|
* Unique thread name
|
|
*
|
|
* @var string
|
|
*/
|
|
private $_name;
|
|
|
|
/**
|
|
* PID of the child process
|
|
*
|
|
* @var integer
|
|
*/
|
|
private $_pid = null;
|
|
|
|
/**
|
|
* UID of the child process owner
|
|
*
|
|
* @var integer
|
|
*/
|
|
private $_puid = null;
|
|
|
|
/**
|
|
* GUID of the child process owner
|
|
*
|
|
* @var integer
|
|
*/
|
|
private $_guid = null;
|
|
|
|
/**
|
|
* Whether the process is yet forked or not
|
|
*
|
|
* @var boolean
|
|
*/
|
|
private $_isRunning = false;
|
|
|
|
/**
|
|
* Wether we are into child process or not
|
|
*
|
|
* @var boolean
|
|
*/
|
|
private $_isChild = false;
|
|
|
|
/**
|
|
* A data structure to hold data for Inter Process Communications
|
|
*
|
|
* @var array
|
|
*/
|
|
private $_internalIpcData = array();
|
|
|
|
/**
|
|
* Key to access to Shared Memory Area.
|
|
*
|
|
* @var integer
|
|
*/
|
|
private $_internalIpcKey;
|
|
|
|
/**
|
|
* Key to access to Sync Semaphore.
|
|
*
|
|
* @var integer
|
|
*/
|
|
private $_internalSemKey;
|
|
|
|
/**
|
|
* Is Shared Memory Area OK? If not, the start() method will block.
|
|
* Otherwise we'll have a running child without any communication channel.
|
|
*
|
|
* @var boolean
|
|
*/
|
|
private $_ipcIsOkay;
|
|
|
|
/**
|
|
* Filename of the IPC segment file
|
|
*
|
|
* @var string
|
|
*/
|
|
private $_ipcSegFile;
|
|
|
|
/**
|
|
* Filename of the semaphor file
|
|
*
|
|
* @var string
|
|
*/
|
|
private $_ipcSemFile;
|
|
|
|
/**
|
|
* Constructor method
|
|
*
|
|
* Allocates a new pseudo-thread object. Optionally, set a PUID, a GUID and
|
|
* a UMASK for the child process. This also initialize Shared Memory
|
|
* Segments for process communications.
|
|
*
|
|
* @param integer $puid
|
|
* @param integer $guid
|
|
* @param integer $umask
|
|
* @throws ZendX_Console_Process_Exception When running on windows
|
|
* @throws ZendX_Console_Process_Exception When running in web enviroment
|
|
* @throws ZendX_Console_Process_Exception When shmop_* functions don't exist
|
|
* @throws ZendX_Console_Process_Exception When pcntl_* functions don't exist
|
|
* @throws ZendX_Console_Process_Exception When posix_* functions don't exist
|
|
*/
|
|
public function __construct($puid = null, $guid = null, $umask = null)
|
|
{
|
|
if (substr(PHP_OS, 0, 3) === 'WIN') {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Cannot run on windows');
|
|
} else if (!in_array(substr(PHP_SAPI, 0, 3), array('cli', 'cgi'))) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Can only run on CLI or CGI enviroment');
|
|
} else if (!function_exists('shmop_open')) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('shmop_* functions are required');
|
|
} else if (!function_exists('pcntl_fork')) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('pcntl_* functions are required');
|
|
} else if (!function_exists('posix_kill')) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('posix_* functions are required');
|
|
}
|
|
|
|
$this->_isRunning = false;
|
|
|
|
$this->_name = md5(uniqid(rand()));
|
|
$this->_guid = $guid;
|
|
$this->_puid = $puid;
|
|
|
|
if ($umask !== null) {
|
|
umask($umask);
|
|
}
|
|
|
|
// Try to create the shared memory segment. The variable
|
|
// $this->_ipcIsOkay contains the return code of this operation and must
|
|
// be checked before forking
|
|
if ($this->_createIpcSegment() && $this->_createIpcSemaphore()) {
|
|
$this->_ipcIsOkay = true;
|
|
} else {
|
|
$this->_ipcIsOkay = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop the child on destruction
|
|
*/
|
|
public function __destruct()
|
|
{
|
|
if ($this->isRunning()) {
|
|
$this->stop();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Causes this pseudo-thread to begin parallel execution.
|
|
*
|
|
* This method first checks of all the Shared Memory Segment. If okay, it
|
|
* forks the child process, attaches signal handler and returns immediatly.
|
|
* The status is set to running, and a PID is assigned. The result is that
|
|
* two pseudo-threads are running concurrently: the current thread (which
|
|
* returns from the call to the start() method) and the other thread (which
|
|
* executes its run() method).
|
|
*
|
|
* @throws ZendX_Console_Process_Exception When SHM segments can't be created
|
|
* @throws ZendX_Console_Process_Exception When process forking fails
|
|
* @return void
|
|
*/
|
|
public function start()
|
|
{
|
|
if (!$this->_ipcIsOkay) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Unable to create SHM segments for process communications');
|
|
}
|
|
|
|
// @see http://www.php.net/manual/en/function.pcntl-fork.php#41150
|
|
@ob_end_flush();
|
|
|
|
pcntl_signal(SIGCHLD, SIG_IGN);
|
|
|
|
$pid = @pcntl_fork();
|
|
if ($pid === -1) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Forking process failed');
|
|
} else if ($pid === 0) {
|
|
// This is the child
|
|
$this->_isChild = true;
|
|
|
|
// Sleep a second to avoid problems
|
|
sleep(1);
|
|
|
|
// Install the signal handler
|
|
pcntl_signal(SIGUSR1, array($this, '_sigHandler'));
|
|
|
|
// If requested, change process identity
|
|
if ($this->_guid !== null) {
|
|
posix_setgid($this->_guid);
|
|
}
|
|
|
|
if ($this->_puid !== null) {
|
|
posix_setuid($this->_puid);
|
|
}
|
|
|
|
// Run the child
|
|
try {
|
|
$this->_run();
|
|
} catch (Exception $e) {
|
|
// We have to catch any exceptions and clean up the process,
|
|
// else we will have a memory leak.
|
|
}
|
|
|
|
// Destroy the child after _run() execution. Required to avoid
|
|
// unuseful child processes after execution
|
|
exit(0);
|
|
} else {
|
|
// Else this is the parent
|
|
$this->_isChild = false;
|
|
$this->_isRunning = true;
|
|
$this->_pid = $pid;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Causes the current thread to die.
|
|
*
|
|
* The relative process is killed and disappears immediately from the
|
|
* processes list.
|
|
*
|
|
* @return boolean
|
|
*/
|
|
public function stop()
|
|
{
|
|
$success = false;
|
|
|
|
if ($this->_pid > 0) {
|
|
$status = 0;
|
|
|
|
posix_kill($this->_pid, 9);
|
|
pcntl_waitpid($this->_pid, $status, WNOHANG);
|
|
$success = pcntl_wifexited($status);
|
|
$this->_cleanProcessContext();
|
|
}
|
|
|
|
return $success;
|
|
}
|
|
|
|
/**
|
|
* Test if the pseudo-thread is already started.
|
|
*
|
|
* @return boolean
|
|
*/
|
|
public function isRunning()
|
|
{
|
|
return $this->_isRunning;
|
|
}
|
|
|
|
/**
|
|
* Set a variable into the shared memory segment, so that it can accessed
|
|
* both from the parent and from the child process. Variable names
|
|
* beginning with underlines are only permitted to interal functions.
|
|
*
|
|
* @param string $name
|
|
* @param mixed $value
|
|
* @throws ZendX_Console_Process_Exception When an invalid variable name is supplied
|
|
* @return void
|
|
*/
|
|
public function setVariable($name, $value)
|
|
{
|
|
if ($name[0] === '_') {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Only internal functions may use underline (_) as variable prefix');
|
|
}
|
|
|
|
$this->_writeVariable($name, $value);
|
|
}
|
|
|
|
/**
|
|
* Get a variable from the shared memory segment. Returns NULL if the
|
|
* variable doesn't exist.
|
|
*
|
|
* @param string $name
|
|
* @return mixed
|
|
*/
|
|
public function getVariable($name)
|
|
{
|
|
$this->_readFromIpcSegment();
|
|
|
|
if (isset($this->_internalIpcData[$name])) {
|
|
return $this->_internalIpcData[$name];
|
|
} else {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read the time elapsed since the last child setAlive() call.
|
|
*
|
|
* This method is useful because often we have a pseudo-thread pool and we
|
|
* need to know each pseudo-thread status. If the child executes the
|
|
* setAlive() method, the parent with getLastAlive() can know that child is
|
|
* alive.
|
|
*
|
|
* @return integer
|
|
*/
|
|
public function getLastAlive()
|
|
{
|
|
$pingTime = $this->getVariable('_pingTime');
|
|
|
|
return ($pingTime === null ? 0 : (time() - $pingTime));
|
|
}
|
|
|
|
/**
|
|
* Returns the PID of the current pseudo-thread.
|
|
*
|
|
* @return integer
|
|
*/
|
|
public function getPid()
|
|
{
|
|
return $this->_pid;
|
|
}
|
|
|
|
/**
|
|
* Set a pseudo-thread property that can be read from parent process
|
|
* in order to know the child activity.
|
|
*
|
|
* Practical usage requires that child process calls this method at regular
|
|
* time intervals; parent will use the getLastAlive() method to know
|
|
* the elapsed time since the last pseudo-thread life signals...
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function _setAlive()
|
|
{
|
|
$this->_writeVariable('_pingTime', time());
|
|
}
|
|
|
|
|
|
/**
|
|
* This is called from within the parent; all the communication stuff
|
|
* is done here.
|
|
*
|
|
* @param string $methodName
|
|
* @param array $argList
|
|
* @param string $type
|
|
* @return mixed
|
|
*/
|
|
protected function _callCallbackMethod($methodName, array $argList = array(), $type = self::VOID_METHOD)
|
|
{
|
|
// This is the parent, so we really cannot execute the method. Check
|
|
// arguments passed to the method.
|
|
if ($type === self::RETURN_METHOD) {
|
|
$this->_internalIpcData['_callType'] = self::RETURN_METHOD;
|
|
} else {
|
|
$this->_internalIpcData['_callType'] = self::VOID_METHOD;
|
|
}
|
|
|
|
// These setting are common to both the calling types
|
|
$this->_internalIpcData['_callMethod'] = $methodName;
|
|
$this->_internalIpcData['_callInput'] = $argList;
|
|
|
|
// Write the IPC data to the shared segment
|
|
$this->_writeToIpcSegment();
|
|
|
|
// Now we need to differentiate a bit.
|
|
switch ($this->_internalIpcData['_callType']) {
|
|
case VOID_METHOD:
|
|
// Notify the child so it can process the request
|
|
$this->_sendSigUsr1();
|
|
break;
|
|
|
|
case RETURN_METHOD:
|
|
// Set the semaphorew
|
|
shmop_write($this->_internalSemKey, 1, 0);
|
|
|
|
// Notify the child so it can process the request
|
|
$this->_sendSigUsr1();
|
|
|
|
// Block until the child process return
|
|
$this->_waitForIpcSemaphore();
|
|
|
|
// Read from the SHM segment. The result is stored into
|
|
// $this->_internalIpcData['_callOutput']
|
|
$this->_readFromIpcSegment();
|
|
|
|
// Data are returned. Now we can reset the semaphore
|
|
shmop_write($this->_internalSemKey, 0, 1);
|
|
|
|
// Return the result. Hence no break required here
|
|
return $this->_internalIpcData['_callOutput'];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This method actually implements the pseudo-thread logic.
|
|
*
|
|
* @return void
|
|
*/
|
|
abstract protected function _run();
|
|
|
|
/**
|
|
* Sends signal to the child process
|
|
*
|
|
* @return void
|
|
*/
|
|
private function _sendSigUsr1()
|
|
{
|
|
if ($this->_pid > 0) {
|
|
posix_kill($this->_pid, SIGUSR1);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Acutally Write a variable to the shared memory segment
|
|
*
|
|
* @param string $name
|
|
* @param mixed $value
|
|
* @return void
|
|
*/
|
|
private function _writeVariable($name, $value)
|
|
{
|
|
$this->_internalIpcData[$name] = $value;
|
|
$this->_writeToIpcSegment();
|
|
}
|
|
|
|
/**
|
|
* Destroy thread context and free relative resources.
|
|
*
|
|
* @return void
|
|
*/
|
|
private function _cleanProcessContext()
|
|
{
|
|
shmop_delete($this->_internalIpcKey);
|
|
shmop_delete($this->_internalSemKey);
|
|
|
|
shmop_close($this->_internalIpcKey);
|
|
shmop_close($this->_internalSemKey);
|
|
|
|
@unlink($this->_ipcSegFile);
|
|
@unlink($this->_ipcSemFile);
|
|
|
|
$this->_isRunning = false;
|
|
$this->_pid = null;
|
|
}
|
|
|
|
/**
|
|
* This is the signal handler that makes the communications between client
|
|
* and server possible.
|
|
*
|
|
* @param integer $signo
|
|
* @return void
|
|
*/
|
|
private function _sigHandler($signo)
|
|
{
|
|
switch ($signo) {
|
|
case SIGTERM:
|
|
// Handle shutdown tasks. Hence no break is require
|
|
exit;
|
|
|
|
case SIGUSR1:
|
|
// This is the User-defined signal we'll use. Read the SHM segment
|
|
$this->_readFromIpcSegment();
|
|
|
|
if (isset($this->_internalIpcData['_callType'])) {
|
|
$method = $this->_internalIpcData['_callMethod'];
|
|
$params = $this->_internalIpcData['_callInput'];
|
|
|
|
switch ($this->_internalIpcData['_callType']) {
|
|
case self::VOID_METHOD:
|
|
// Simple call the (void) method and return immediatly
|
|
// no semaphore is placed into parent, so the processing
|
|
// is async
|
|
call_user_func(array($this, $method), $params);
|
|
break;
|
|
|
|
case self::RETURN_METHOD:
|
|
// Process the request
|
|
$this->_internalIpcData['_callOutput'] = call_user_func(array($this, $method), $params);
|
|
|
|
// Write the result into IPC segment
|
|
$this->_writeToIPCsegment();
|
|
|
|
// Unlock the semaphore but block _writeToIpcSegment()
|
|
shmop_write($this->_internalSemKey, 0, 0);
|
|
shmop_write($this->_internalSemKey, 1, 1);
|
|
break;
|
|
}
|
|
}
|
|
break;
|
|
|
|
default:
|
|
// Ignore all other singals
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Wait for IPC Semaphore
|
|
*
|
|
* @return void
|
|
*/
|
|
private function _waitForIpcSemaphore()
|
|
{
|
|
while (true) {
|
|
$okay = shmop_read($this->_internalSemKey, 0, 1);
|
|
|
|
if ($okay === 0) {
|
|
break;
|
|
}
|
|
|
|
usleep(10);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read data from IPC segment
|
|
*
|
|
* @throws ZendX_Console_Process_Exception When writing of SHM segment fails
|
|
* @return void
|
|
*/
|
|
private function _readFromIpcSegment()
|
|
{
|
|
$serializedIpcData = shmop_read($this->_internalIpcKey,
|
|
0,
|
|
shmop_size($this->_internalIpcKey));
|
|
|
|
if ($serializedIpcData === false) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Fatal error while reading SHM segment');
|
|
}
|
|
|
|
$data = @unserialize($serializedIpcData);
|
|
|
|
if ($data !== false) {
|
|
$this->_internalIpcData = $data;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Write data to IPC segment
|
|
*
|
|
* @throws ZendX_Console_Process_Exception When writing of SHM segment fails
|
|
* @return void
|
|
*/
|
|
private function _writeToIpcSegment()
|
|
{
|
|
// Read the transaction bit (2 bit of _internalSemKey segment). If it's
|
|
// value is 1, we're into the execution of a PHP_FORK_RETURN_METHOD, so
|
|
// we must not write to segment (data corruption)
|
|
if (shmop_read($this->_internalSemKey, 1, 1) === 1) {
|
|
return;
|
|
}
|
|
|
|
$serializedIpcData = serialize($this->_internalIpcData);
|
|
|
|
// Set the exchange array (IPC) into the shared segment
|
|
$shmBytesWritten = shmop_write($this->_internalIpcKey,
|
|
$serializedIpcData,
|
|
0);
|
|
|
|
// Check if lenght of SHM segment is enougth to contain data
|
|
if ($shmBytesWritten !== strlen($serializedIpcData)) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Fatal error while writing to SHM segment');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create an IPC segment
|
|
*
|
|
* @throws ZendX_Console_Process_Exception When SHM segment can't be created
|
|
* @return boolean
|
|
*/
|
|
private function _createIpcSegment()
|
|
{
|
|
$this->_ipcSegFile = realpath(sys_get_temp_dir()) . '/' . rand() . $this->_name . '.shm';
|
|
touch($this->_ipcSegFile);
|
|
|
|
$shmKey = ftok($this->_ipcSegFile, 't');
|
|
if ($shmKey === -1) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Could not create SHM segment');
|
|
}
|
|
|
|
$this->_internalIpcKey = @shmop_open($shmKey, 'c', 0644, 10240);
|
|
|
|
if (!$this->_internalIpcKey) {
|
|
@unlink($this->_ipcSegFile);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Create IPC semaphore
|
|
*
|
|
* @throws ZendX_Console_Process_Exception When semaphore can't be created
|
|
* @return boolean
|
|
*/
|
|
private function _createIpcSemaphore()
|
|
{
|
|
$this->_ipcSemFile = realpath(sys_get_temp_dir()) . '/' . rand() . $this->_name . '.sem';
|
|
touch($this->_ipcSemFile);
|
|
|
|
$semKey = ftok($this->_ipcSemFile, 't');
|
|
if ($semKey === -1) {
|
|
require_once 'ZendX/Console/Process/Exception.php';
|
|
throw new ZendX_Console_Process_Exception('Could not create semaphore');
|
|
}
|
|
|
|
$this->_internalSemKey = @shmop_open($semKey, 'c', 0644, 10);
|
|
|
|
if (!$this->_internalSemKey) {
|
|
@unlink($this->_ipcSemFile);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
}
|