Tuesday, January 10, 2012

A process manager in PHP

namespace shell;

/**
 * pcntl is a process manager for spawning new child processes
 * either through php closures or shell commands. pcntl will
 * make sure zombies dont start invading your systems
 * @author shean massey
 * @since Jan 7th 2012
 */
class pcntl {
    protected $_child_pids = array();
    protected $_max_wait = 0;
    protected $_debug_mode = false;

    /**
     * register the wait function as a shutdown procedure
     * and register the signal handlers for SIGCHLD and SIGTERM
     */
    public function __construct() {
        foreach ( array( SIGCHLD, SIGTERM ) as $signal )
        if ( false === ( pcntl_signal( $signal, array( $this, 'signal_handler') ) ) ) {
            throw new \exception('failed tp attach signal: ' . $signal . ' handler');
        }
        # on shutdown, wait() for all/any forked child processes 
        register_shutdown_function( array( $this, 'wait' ) );
    }

    /**
     * on destruction wait() for all/any forked child processes
     */
    public function __destruct() {
        $this->wait();
    }

    /**
     * enable/disable debug messages
     */
    public function use_debugger( $bool = true ) {
        $this->_debug_mode = (bool)$bool;
        return $this;
    }

    /**
     * this is the signal handler registered for cleaning the dead children
     * processes and remove them from the pid collection.
     */
    public function signal_handler( $signum ) {
        switch ( $signum ) {
            # this event is sent to processes when one of their child processes
            # passed away
            case SIGCHLD:
                while ( $pid = pcntl_wait( $status, WNOHANG) ) {
                    $this->debug( function() use ($pid, $status){
                        echo 'caught pid by signal handler: ', $pid, ' ';
                        echo 'return status: ', $status, ' ';
                        echo '[my pid = ', posix_getpid(), ']', PHP_EOL;
                    });
                    # there are no more children to handle:
                    if ( empty( $this->_child_pids ) ) return;
                    # pcntl_wait failed:
                    if ( $pid === -1 ) {
                        pcntl_signal_dispatch();
                        break;
                    }
                    # how could this even happen ?
                    if ( ! array_key_exists( $pid, $this->_child_pids ) ) {
                        throw new \appcore\exception('caught someone elses dead baby');
                    }
                    # remove the child pid from the pid collection
                    unset( $this->_child_pids[ $pid ] );
                }
            break;
            # this process is sent from a kill -1:
            case SIGTERM:
                
            break;
        }
    }

    /**
     * fork a closure as a new process
     */
    public function fork( \closure $function ) {
        switch ( $pid = pcntl_fork() ) {
            # error:
            case -1:
                throw new \appcore\exception('failed to fork()');
            break;
            # child proc:
            case 0:
                # empty the array of children:
                $this->_child_pids = array();
                $function();
                exit(0);
            break;
            # parent proc:
            default:
                $this->_child_pids[ $pid ] = $pid;
                $this->debug( function() use ($pid) {
                    echo 'new child: ', $pid, PHP_EOL;
                });
                pcntl_signal_dispatch();
                return $this;
            break;
        }
    }

    /**
     * fork a shell command
     */
    public function shell_fork( $cmd_line = '' ) {
        $this->fork( function() use ( $cmd_line ) {
            $cmd_line .= ' 2>&1 > /tmp/proc_manager.lck.'.posix_getpid().' &';
            shell_exec( $cmd_line );
        });
        pcntl_signal_dispatch();
        return $this;
    }

    /**
     * if the debug_mode is set, the execute the function being
     * passed as a closure. Note that this function should never
     * attempt to change the current state.
     */
    public function debug( \closure $function ) {
        if ( ! $this->_debug_mode ) return false;
        $function();
        return true;
    }

    /**
     * this will loop with short sleeps and dispatch any lingering
     * signals to the signal handler (to reap the dead children)
     */
    public function wait() {
        $this->debug( function(){
            echo 'dispatch loop', PHP_EOL;
        });
        while ( true ) {
            if ( ! $this->_child_pids ) break;
            pcntl_signal_dispatch();
            usleep(10);
        }
        return $this;
    }
}
Using it is VERY simple, you fork a new process either as a closures or as shell commands:
#!/usr/bin/env php
use_debugger( true );

$proc_manager->fork( function( ) {
  $i = 0;
  while ( $i++ < 5 ) {
    sleep( 1 );
    file_put_contents('/tmp/test', 'i = '.$i.PHP_EOL, FILE_APPEND);
  }
});

$proc_manager->fork( function( ) {
  $m = 0; while ( $m++ < 6 ) {
    sleep( 1 ); 
    file_put_contents('/tmp/test', 'm = '.$m.PHP_EOL, FILE_APPEND);
  }
});

$proc_manager->shell_fork('sleep 1 && ls -la');

$proc_manager->shell_fork('rsync # ...');