<?php $debug_level = 0; const DEBUG_LOW = 0; const DEBUG_MED = DEBUG_LOW+1; const DEBUG_HIGH = DEBUG_MED+1; const DEBUG_VHIGH = DEBUG_HIGH+1; $log_print_file = __DIR__."/../data/rpc_op.log"; function log_print( $msg ){ global $log_print_file; if($log_print_file==''){ print($msg); return; } file_put_contents( $log_print_file, $msg, FILE_APPEND); } function debug_print( $msg, $dl=0 ){ global $debug_level; if($dl>$debug_level) return; log_print( $msg ); } function debug_println( $msg, $dl=0 ){ global $debug_level; if($dl>$debug_level) return; debug_print( date_format( date_create("now", new DateTimeZone("UTC")), 'Y-m-d H:i:s') . " $msg\n", $dl ); } //$argv[0] is always the name that was used to run the script. if(count($argv)>1){ //do not output to service log file $log_print_file=null; include __DIR__."/main-svc.php"; if($argv[1]==='stop'){ debug_println( "stopping" ); $rtn = run_main_command( 'f:cli-stop' ); print_r( $rtn ); exit(0); } if($argv[1]==='reload'){ debug_println( "reloading" ); $rtn = run_main_command( 'f:cli-reload' ); print_r( $rtn ); exit(0); } if($argv[1]==='debug_inc'){ debug_println( "debug_inc" ); run_main_command( 'f:debug_inc' ); exit(0); } if($argv[1]==='debug_dec'){ debug_println( "debug_dec" ); run_main_command( 'f:debug_dec' ); exit(0); } $arg_copy = $argv; array_shift($arg_copy); $rtn = run_main_command( $arg_copy ); print_r( $rtn ); exit(0); } $g_shutdown = false; /* * This is the backup process CLI application. * * Rather than use 2 threads, I've decided to poll. This is because the threading in PHP is no longer supported. * Instead we should use parallel: * https://www.php.net/manual/en/parallel.run.php * which I don't like. Basically, it doesn't look like you can share code (by way of a reference)?!! * * So we are polling the RPC layer every 0.1secs. In practice, this will be plenty for the backup app. * */ include __DIR__."/../lib/sysVcom.php"; debug_println("started"); //create the comms file touch('BU-commander'); //remove all queues at startup clear_IPC( '../svc/BU-commander' ); rpc_setGlobals( 'procName', 'BU-commander' ); rpc_setGlobals( 'max_chan', 8182 ); rpc_setGlobals( 'main_pid', getmypid() ); print("rpc using ". __DIR__ . "/$g_procName\n" ); // problem is that the paths are not correct wrt this location, so we need to adjust them include __DIR__."/../inc/paths.php"; $g_database_path = '../' . $g_database_path; $g_data_dir = realpath( __DIR__."/../data" ); include "db-reads.php"; include "db-writes.php"; include "file-line-reader.php"; include "table-cache.php"; $g_bu_states = null; // maximum number of forks allowed $g_fork_max = 4; // currently running fork count $g_fork_cnt=0; //a queue of backup IDs waiting to commence $g_bu_queue = array(); function isQueued( $buid ){ global $g_bu_queue; foreach( $g_bu_queue as $q ){ if($q==$buid) return true; } return false; } while(true){ if($g_shutdown){ print("CLI request to shutdown: googbye!\n"); exit(0); } $bDidWork = false; $bDidWork |= check_rpc_clients(); $bDidWork |= check_bu_sched(); $bDidWork |= check_bu_queue(); if(!$bDidWork){ time_nanosleep( 0, 100000000); } } /** * Return true if the backup was kicked off */ function check_bu_sched(){ global $g_bu_table; global $g_bu_colIdxs; //get the first non null entry which is not running $idfound = 0; $row = null; for( $i=0; $i<count($g_bu_table); $i++){ $row = $g_bu_table[$i]; if( $row[ $g_bu_colIdxs['BU_DATE'] ] == '' ) continue; //skip anything without a date if( $row[ $g_bu_colIdxs['BuRunning'] ] == 1 ) continue; //skip any already running if( $row[ $g_bu_colIdxs['BuError'] ] != null ) continue; //skip entries with problems if( isQueued( $row[ $g_bu_colIdxs['BUID'] ] ) ) continue; //skip entries already queued $idfound = $row[ $g_bu_colIdxs['BUID'] ]; break; } if($idfound==0) return; // is it time...? $str_bu_dt = $row[ $g_bu_colIdxs['BU_DATE']] . ' ' . $row[ $g_bu_colIdxs['BU_TIME']] . ':00'; $schedTime = new DateTime( $str_bu_dt, new DateTimeZone("UTC") ); if( $schedTime > new DateTime('now', new DateTimeZone("UTC") ) ){ //...no it's not return; } [ $res, $state ] = queue_backup( $idfound ); return $state!=='queued'; //return true if something some work was started } function check_bu_queue(){ $res = dequeue_backup(); if($res==null) return false; if($res[1]=='queued') return false; return true; } /** * Adds a backup ID to the queue. It is kicked off immediately if a slot is available. */ function queue_backup( $buid, $scheduled = true, $bu_test = false ){ global $g_bu_queue; debug_println("(queue_backup) queue backup($buid)", DEBUG_MED ); array_push( $g_bu_queue, array($buid, $scheduled, $bu_test) ); $res = dequeue_backup( ); if($res!=null){ return $res; //either running or error. } debug_println("(queue_backup) backup($buid) in queue", DEBUG_MED ); return array("ok:", "queued" ); //will be run later } /** * Removes the next back up item from the front of the queue and kicks it off if there are enough * free forks. Returns the result of the kick-off or null if nothing to do. */ function dequeue_backup( ){ global $g_bu_queue; global $g_fork_cnt; global $g_fork_max; if( count($g_bu_queue)==0){ return null; //nothing to do } if($g_fork_cnt>=$g_fork_max){ //There are no available forks return null; } $bu_item = array_shift( $g_bu_queue ); debug_println("(dequeue_backup) dequeued backup($buid)", DEBUG_MED ); return kickoff_backup( $bu_item ); } /** * Kicks off a back up. Returns immediately. Deligates the work to a client fork. * * $scheduled: * true if it is run according to the $schedule info in the db * false if it has been manually initiated */ function kickoff_backup( $bu_item ){ [$buid, $scheduled, $bu_test] = $bu_item; debug_println("(kickoff_backup) backup($buid) to run now", DEBUG_MED ); global $g_bu_table; global $g_bu_colIdxs; global $g_fork_cnt; $row = &find_rec_by_id( $buid ); $isRunning = $row[ $g_bu_colIdxs['BuRunning'] ]; if( !$bu_test && $isRunning==1) return array("e:Error", "Backup already running" ); $i=0; $bu_row[$i++] = $row[ $g_bu_colIdxs['BUName'] ]; $bu_row[$i++] = $row[ $g_bu_colIdxs['Dir_Src'] ]; $bu_row[$i++] = $row[ $g_bu_colIdxs['Dir_Dest'] ]; $bu_row[$i++] = $row[ $g_bu_colIdxs['Files_Ex'] ]; $bu_row[$i++] = $row[ $g_bu_colIdxs['BuType'] ]; $bu_row[$i++] = $isRunning; // we should be able to use pcntl_fork() $pid = pcntl_fork(); if ( $pid == -1 ) { debug_println("fork failed"); return array("e:Error", "fork failed" ); } if ( $pid ) { // parent process debug_println( "(kickoff_backup) bu($buid) running, parent(". getmypid().")", DEBUG_MED ); set_bu_running_db( $buid, true ); $row[ $g_bu_colIdxs['BuRunning'] ] = 1; return array("ok:", "running" ); } else { // child process $g_fork_cnt++; $g_bu_table = array(); //for safety $pid = getmypid(); debug_println( "child($pid) running backup($buid)" ); include_once __DIR__."/backup.php"; $bu_result = ''; global $g_data_dir; $g_data_dir = __DIR__."/../data"; $bu_result = run_backup_block( $buid, $bu_row, $bu_test ); debug_println( "child($pid) finished bu($buid)" ); $obj = rpc( 'm:fork-complete', $pid, "backup-complete", $buid, $scheduled ); //we use our rpc mechanism to notify parent //print_r( $obj ); //print( "\n" ); exit(0); } } $msgArr = null; /** * Return true if a request was found and handled */ function check_rpc_clients(){ global $msgArr; global $g_fork_cnt; $caller_id = 0; $obj_in = rpc_listen( $caller_id, $msgArr, false ); $obj_out = null; if($caller_id==0) return false; //nothing to do try{ if( !is_array($obj_in[0])) { debug_println("req {$obj_in[0]}", DEBUG_MED ); } $obj_out = handle_rpc_req( $obj_in ); if($obj_out==null) $obj_out = array('ok:'); //generic return value }catch( Exception $e ){ debug_println( "Exception: " . $e->getMessage() ); $obj_out = array( 'e:Exception', $e ); } if( !is_array($obj_out[0])) { debug_println("reply {$obj_out[0]}", DEBUG_MED ); } //send msg back rpc_reply( $caller_id, $obj_out ); if( $obj_in[0] === 'm:fork-complete' ){ //clean up child process debug_println("wait for child({$obj_in[1]})", DEBUG_MED ); $status; pcntl_waitpid( $obj_in[1], $status ); $g_fork_cnt--; debug_println("child({$obj_in[1]}) complete.", DEBUG_MED ); //finish any requirements for specific child processes (db updates etc) if( count($obj_in)>2 && $obj_in[2]="backup-complete"){ backupComplete( $obj_in[3], $obj_in[4] ); } } return true; } /** * Called by the parent after a client notified that a backup completed. */ function backupComplete( $buid, $scheduled ){ global $g_bu_colIdxs; //reload the table data (in case child modified error field) $row = reload_cached_rec( $buid ); if($scheduled){ //if DB error, do nothing $err = $row[ $g_bu_colIdxs['BuError'] ]; if($err==''){ //update date and time according to the repeat field. $date = date_create( $row[ $g_bu_colIdxs['BU_DATE'] ] ); $dt_new = null; switch( $row[ $g_bu_colIdxs['BU_REPEAT'] ] ){ case 'M': //monthly date_add( $date, new DateInterval('P1M') ); $dt_new = date_format( $date, "Y-m-d"); break; case 'W': //weekly date_add( $date, new DateInterval('P1W') ); $dt_new = date_format( $date, "Y-m-d"); break; case 'D': //daily date_add( $date, new DateInterval('P1D') ); $dt_new = date_format( $date, "Y-m-d"); break; } $row[ $g_bu_colIdxs['BU_DATE'] ] = $dt_new; set_sched_date_db( $buid, $dt_new ); } set_bu_running_db( $buid, false ); }//if manually kicked off - nothing to do $row = reload_cached_rec( $buid ); debug_println("(backupComplete) bu($buid) done", DEBUG_MED ); } /** * Carries out the task requested from any RPC client. * That client may be from the webserver or from a forked child or any other process. */ function handle_rpc_req( $obj ){ // The idea will be that the indicator before the colon will be handled generically in cases // where it is a simple function call, i.e. 'f:' (provided it is a trusted source and/or has been previously checked) global $g_data_dir; global $g_shutdown; global $debug_level; if( 'f:getbu_list_state' !== $obj[0] ){ //print_r( $obj ); } switch( $obj[0] ){ case 'f:bu-test-file-lines': $buid = $obj[1]; $lineNum = $obj[2]; $lineData = ''; $objFile = getObject_outputFile( $g_data_dir, $buid ); $cnt = getLinesFrom( $objFile, $lineNum, $lineData ); if($cnt==0){ closeFile( $buid ); return array( 'r:bu-test-file-lines', 0 ); } return array( 'r:bu-test-file-lines', $cnt, $lineData ); case 'm:fork-complete': //send the same message back to the caller return $obj; case 'f:getbu_list_all': //($table, $colIdxs) = rpc( 'f:getbu_list_all', "order by BUID" ); $table = array(); $colIdxs = array(); getbu_list_all( $table, $colIdxs, $obj[1] ); return array( $table, $colIdxs ); case 'f:getbu_list_state': global $g_bu_states; if($g_bu_states==null){ //print("getbu_list_state (refresh\n"); $g_bu_states = getbu_list_state(); } return $g_bu_states; case 'f:save_bu_schedule': $buid = $obj[1]['id']; $err = save_bu_schedule( $obj[1] ); reload_cached_rec( $buid ); return array($err); case 'f:save_bu_item': $buid = $obj[2]; //[$err] = rpc( 'f:save_bu_item', $postVars, $bu_id ); $err = save_bu_item( $obj[1], $buid ); reload_cached_rec( $buid ); return array($err); case 'f:set_bu_error': $buid = $obj[1]; $err = set_bu_error_db( $obj[1], $obj[2] ); reload_cached_rec( $buid ); return array($err); case 'f:set_bu_running': $buid = $obj[1]; $err = set_bu_running_db( $obj[1], $obj[2] ); reload_cached_rec( $buid ); return array($err); case 'f:kickoff-backup': $buid = $obj[1]; $isTest = $obj[2]; $res = queue_backup( $buid, false, $isTest ); if($res[1]==='queued') return array('ok:','running'); //ToDo: return queued and client to handle return $res; case 'f:set_last_run_date': //[$err] = rpc( 'f:set_last_run_date', $buid, $str_dt ); $buid = $obj[1]; $err = set_last_run_date_db( $buid, $obj[2] ); reload_cached_rec( $buid ); return array($err); case 'f:cli-reload': clear_cached_data(); ensure_cached_table(); return array('ok:reload'); case 'f:cache-show': array_shift($obj); foreach( format_cache($obj) as $line){ log_print( "$line\n" ); } return array('ok:cache-show'); case 'f:cli-stop': $g_shutdown = true; return array('ok:shutdown'); case 'f:delete_bu_item': $buid = $obj[1]; $err = delete_bu_item_db( $buid ); delete_cached_rec( $buid ); return array($err); case 'f:debug_inc': //increment the debug level $debug_level++; debug_println( "debug level $debug_level" ); return array('ok:'); case 'f:debug_dec': //decrement the debug level $debug_level--; if($debug_level<0) $debug_level=0; debug_println( "debug level $debug_level" ); return array('ok:'); } array_unshift( $obj, "Unknown Command" ); print_r( $obj ); return $obj; } ?>