Newer
Older
Php-rpc / src / sysVcom.php
<?php

/**
 * This file provides a mechanism to allow inter-process communication (IPC) by way of an RPC.
 * 
 * It uses the System V architecture which was meant to have been superceeded by POSIX. However,
 * I found some issues with POSIX around non-blocking behaviour and timeouts. This may have been
 * the PHP inplementation and I have intentions to revisit this later. For the time being, SysV
 * is works exactly as you would expect. 
 * 
 * Given the issues around the message length, I will not even try to use the serialisation. There
 * is no way to know what size the resultant serialised message will be. There are errors occurring
 * when the message length is not even halve of that specified by the value at /proc/sys/kernel/msgmnb
 * 
 */
 
$g_procName = 'BU-commander';
//posix_mkfifo( $g_procName, 0644 );
$g_k_caller = null;
$g_k_reply = null;

//max msg length - this fails on Mint-21. Trial and error gives a max size of 8182.
//$g_max_chan = file_get_contents('/proc/sys/kernel/msgmnb');
$g_max_chan = 8182;

// If the receive bytes is not larger than what was sent, the rcv will fail with code 7. Making it 10 bytes
// larger was not enough but 100 was emough. I mean, go figure ???
$g_max_rcv = $g_max_chan+100;

$g_max_data = $g_max_chan - 2;
$main_pid = 0;


function rpc_setGlobals( $name, $value ){

		
	switch( $name ){
	
	case 'procName':
		global $g_procName;
		global $g_k_caller;
		global $g_k_reply;
		$g_procName = $value;
		$g_k_caller = ftok( $g_procName, 'A');		//caller
		$g_k_reply = ftok( $g_procName, 'B');		//reply
		debug_print( "g_k_caller key = $g_k_caller\n" );
		debug_print( "g_k_reply key = $g_k_reply\n" );
		break;
	
	case 'max_chan':
		global $g_max_chan;
		global $g_max_rcv;
		global $g_max_data;
		$g_max_chan = $value;
		$g_max_rcv = $g_max_chan+100;
		$g_max_data = $g_max_chan - 2;	//we use the first 2 bytes for msg chunks
		break;
		
	case 'main_pid':
		global $main_pid;
		$main_pid = (int)$value;
		break;
	}

}

//************************************************************************************************************************
// The following functions all relate to making an RPC.
// They use the php serialize() to send objects across the process boundary

/**
 * Sends a call to the remote process. The call blocks until a reply is recieved. There is no unblocking version of this method.
 * 
 * The underlying layer uses the process ID of this process and so it is not possible to send more than one message at a time 
 * because it would not be possible to correlate the replies. This is not a really problem because of the lack of multi-threading
 * support in PHP.
 */
function rpc( $funcName, ...$params ){
	array_unshift( $params, $funcName);
	return unserialize( IPC( serialize( $params ) ) );
}

/**
 * Returns the next message waiting on the queue if there is one. The value of $caller_id must be checked
 * to ascertain the validity of the return value.
 * 
 * A greater of zero means that either no message was received in
 * the case of non-blocking or that a message has only partially been retrieved and is not yet ready. A positive value
 * means that the message is ready and unserialize.
 * 
 * $msgArr is of little use here because it contains the unserialized msg. It must be supplied though as it is
 * used by the underlying layer as a storage of partially retrieved messages.
 * 
 */
function rpc_listen( &$caller_id, &$msgArr, $bBlock = true ){
	//debug_print( "-- rpc_listen in\n" );
	$caller_id = IPC_listen( $msgArr, $bBlock );
	//debug_print( "-- rpc_listen out\n" );
	if($caller_id==0 && !$bBlock) return null;
	$obj = unserialize( $msgArr[$caller_id]['msg'] );
	//done with the global message
	unset( $msgArr[$caller_id] );
	return $obj;
}

/**
 * A server process listening for messages must use this method to reply. The sender will be waiting!
 */
function rpc_reply( $caller_id, $obj ){
	IPC_reply( $caller_id, serialize($obj) );
}


//************************************************************************************************************************
//
// All functions below relate to sending and recieving of a single message string without the concept
// of a function or parameters.
//

function clear_IPC( $g_procName ){	
	
	global $g_max_rcv;
	$received_message_type=null;
	$message=null;
	
	$q_caller = msg_get_queue( ftok( $g_procName, 'A') );
	while( FALSE !== msg_receive( $q_caller, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){
		print("Dumping msg: $received_message_type\n");
	}
	msg_remove_queue($q_caller);
				
	$q_reply = msg_get_queue( ftok( $g_procName, 'B') );
	while( FALSE !== msg_receive( $q_reply, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){
		print("Dumping msg: $received_message_type\n");
	}
	msg_remove_queue($q_reply);
	
	print("Queues cleared\n");
}

/**
 * Turns out that the largest premissible queue size is found at  /proc/sys/kernel/msgmnb
 * On my system this is 16384 bytes.
 * 
 * Rather than increase this, it will be easier to split the messages into chunks.
 * 
*/
function IPC( $str_data ){
	
	global $procName;
	global $g_k_caller;
	global $g_k_reply;
	global $g_max_data;
	global $g_max_rcv;
	global $main_pid;
	
	$q_caller = msg_get_queue($g_k_caller);
	$q_reply = msg_get_queue($g_k_reply);
	$pid = getmypid();

	if($main_pid==$pid){
		/* This is a safeguard so that you do not spend years trying to find out why a receive is blocked.
		 * The main process sets its pid (main_pid) at startup. Although it is conceivable that main.php might want to make an rpc call, it
		 * will not be a call to itself, rather another process. But this would need to happen on another queue but this API is only set up
		 * to use the 2 hard-coded queues currently. It is likely that you are trying to call a DB-function through rpc when main.php should
		 * be calling the db-function directly.		 
		*/	
		throw new Exception("IPC calls are not allowed within the same process ($pid)");
	}
	
	IPC_send( $q_caller, $pid, $str_data );
	
	$msgArr = null;
	IPC_rcv( $q_reply, $pid, $msgArr );
	
	return $msgArr[$pid]['msg'];

}

/**
 * Listens to the caller queue and collects messages from all calling processes and populates the
 * given array indexed by the ID of the message rcvd.
 * 
 * As soon as a message is complete, the procid will be returned. The array may also contain partially filled
 * message from other IDs so the caller should check the contents of msgArr before deleting. This method is
 * designed to be called by an RPC serving process.
 * 
 * Note that there may still be other procids still sending but returning as soon as possible ensures that
 * no one caller can hog the channel.
 */
function IPC_listen( &$msgArr, $bBlock = true ){
	
	global $g_k_caller;

	$q_caller = msg_get_queue($g_k_caller);	
	return IPC_rcv( $q_caller, 0, $msgArr, $bBlock );

}

/**
 * Server replies to an RPC request.
 */
function IPC_reply( $caller_id, $str_data ){
	global $g_k_reply;
	$q_reply = msg_get_queue($g_k_reply);	
	IPC_send( $q_reply, $caller_id, $str_data );
}

/**
 * Receives a complete message from the queue. $rqd_msg_id may be zero to get
 * any message or >0 for a specific message ID.
 */
function IPC_rcv( $queue, $rqd_msg_id, &$msgArr, $bBlock = true ){

	$message = '';
	$received_message_type = 0;
	
	while(true){
				
		global $g_max_rcv;

		$flags = 0;
		if(!$bBlock){
			$flags = MSG_IPC_NOWAIT;
		}
		//debug_print( "IPC_rcv in for id = $rqd_msg_id\n" );
		if( FALSE === msg_receive( $queue, $rqd_msg_id, $received_message_type, $g_max_rcv, $message, false, $flags, $error_code ) ){
			
			if(!$bBlock && $error_code==MSG_ENOMSG){
				//debug_print( "IPC_rcv out 1\n" );
				return 0;
			}
			//debug_print( "IPC_rcv out 2\n" );
			//I'm guessing at these error codes...
			//code 7 when g_max_rcv is too small
			//code 43 when pipe was broken by server
			throw new Exception("Failed to rcv message err = $error_code");	
		}
		//debug_print( "IPC_rcv out 3\n" );
		
		//echo "Got msg part " . substr( $message, 0, 20 ) . "\n\n";
		
		$d_meta = substr( $message, 0, 2 );
		$message = substr( $message, 2 );
	
		if( $msgArr!=null && array_key_exists( $received_message_type, $msgArr) ){
			$msgArr[ $received_message_type ]['msg'] .= $message;
			
		}else{
			$msgArr=array();
			$msgArr[ $received_message_type ] = array( 'final' => false, 'msg' => $message );
		}

		if($d_meta=='f:'){
			// final message part
			$msgArr[ $received_message_type ]['final'] = true;
			break;
		}
					
	}

	return $received_message_type;

}


/**
 * Send a data string of any length.
 * 
 * */
function IPC_send( $queue, $caller_id, $str_data ){
	
	//print_r( msg_stat_queue($queue) );
		
	global $procName;
	global $g_k_caller;
	global $g_k_reply;
	global $g_max_data;

	$d_meta = null;
	$chunk_cnt = 0;

	if( strlen($str_data) > $g_max_data ){
		//split the data into chunks
		$chunk_cnt = intdiv( strlen($str_data), $g_max_data );
		$d_meta = "p:";	//partial data - many calls
	}

	$str_chunk = null;
	while( $chunk_cnt>=0 ){

		if($chunk_cnt>0){
			$str_chunk = substr( $str_data, 0, $g_max_data );
			$str_data = substr( $str_data, $g_max_data );
		}else{
			$d_meta = "f:";	//'final' part of data
			$str_chunk = $str_data;
		}

		//echo "send($caller_id) len = ".strlen($d_meta.$str_chunk)."\n";
		//debug_print( "IPC_send 2\n" );
		
		$error_code = 0;
		if( FALSE === msg_send( $queue, $caller_id, $d_meta.$str_chunk, false, true, $error_code ) ){
			//code 22 when the message is too large
			throw new Exception("Failed to send message err = $error_code");
		}
		$chunk_cnt--;
		if($chunk_cnt==0){
			$d_meta = "f:";	//final data chunk
		}
	}
	
}

?>