<?php
/**
* This file provides a mechanism to allow inter-process communication (IPC) by way of an RPC.
*
* It uses the System V architecture which was meant to have been superceeded by POSIX. However,
* I found some issues with POSIX around non-blocking behaviour and timeouts. This may have been
* the PHP inplementation and I have intentions to revisit this later. For the time being, SysV
* is works exactly as you would expect.
*
* Given the issues around the message length, I will not even try to use the serialisation. There
* is no way to know what size the resultant serialised message will be. There are errors occurring
* when the message length is not even halve of that specified by the value at /proc/sys/kernel/msgmnb
*
*/
$g_procName = 'BU-commander';
//posix_mkfifo( $g_procName, 0644 );
$g_k_caller = null;
$g_k_reply = null;
//max msg length - this fails on Mint-21. Trial and error gives a max size of 8182.
//$g_max_chan = file_get_contents('/proc/sys/kernel/msgmnb');
$g_max_chan = 8182;
// If the receive bytes is not larger than what was sent, the rcv will fail with code 7. Making it 10 bytes
// larger was not enough but 100 was emough. I mean, go figure ???
$g_max_rcv = $g_max_chan+100;
$g_max_data = $g_max_chan - 2;
function rpc_setGlobals( $name, $value ){
switch( $name ){
case 'procName':
global $g_procName;
global $g_k_caller;
global $g_k_reply;
$g_procName = $value;
$g_k_caller = ftok( $g_procName, 'A'); //caller
$g_k_reply = ftok( $g_procName, 'B'); //reply
break;
case 'max_chan':
global $g_max_chan;
global $g_max_rcv;
global $g_max_data;
$g_max_chan = $value;
$g_max_rcv = $g_max_chan+100;
$g_max_data = $g_max_chan - 2; //we use the first 2 bytes for msg chunks
break;
}
}
//************************************************************************************************************************
// The following functions all relate to making an RPC.
// They use the php serialize() to send objects across the process boundary
function rpc( $funcName, ...$params ){
array_unshift( $params, $funcName);
return unserialize( IPC( serialize( $params ) ) );
}
function rpc_listen( &$caller_id, &$msgArr, $bBlock = true ){
$caller_id = IPC_listen( $msgArr, $bBlock );
if($caller_id==0 && !$bBlock) return null;
return unserialize( $msgArr[$caller_id]['msg'] );
}
function rpc_reply( $caller_id, $obj ){
IPC_reply( $caller_id, serialize($obj) );
}
//************************************************************************************************************************
//
// All functions fast this point relate to sending and recieving of a single message as a string without the concept
// of a function or parameters.
//
function clear_IPC( $g_procName ){
$q_caller = msg_get_queue( ftok( $g_procName, 'A') );
msg_remove_queue($q_caller);
$q_reply = msg_get_queue( ftok( $g_procName, 'B') );
msg_remove_queue($q_reply);
}
/**
* Turns out that the largest premissible queue size is found at /proc/sys/kernel/msgmnb
* On my system this is 16384 bytes.
*
* Rather than increase this, it will be easier to split the messages into chunks.
*
*/
function IPC( $str_data ){
global $procName;
global $g_k_caller;
global $g_k_reply;
global $g_max_data;
global $g_max_rcv;
$q_caller = msg_get_queue($g_k_caller);
$q_reply = msg_get_queue($g_k_reply);
$pid = getmypid();
IPC_send( $q_caller, $pid, $str_data );
$msgArr = null;
IPC_rcv( $q_reply, $pid, $msgArr );
return $msgArr[$pid]['msg'];
}
/**
* Listens to the caller queue and collects messages from all calling processes and populates the
* given array indexed by the ID of the message rcvd.
*
* As soon as a message is complete, the procid will be returned. The array may also contain partially filled
* message from other IDs so the caller should check the contents of msgArr before deleting. This method is
* designed to be called by an RPC serving process.
*
* Note that there may still be other procids still sending but returning as soon as possible ensures that
* no one caller can hog the channel.
*/
function IPC_listen( &$msgArr, $bBlock = true ){
global $g_k_caller;
$q_caller = msg_get_queue($g_k_caller);
return IPC_rcv( $q_caller, 0, $msgArr, $bBlock );
}
/**
* Server replies to an RPC request.
*/
function IPC_reply( $caller_id, $str_data ){
global $g_k_reply;
$q_reply = msg_get_queue($g_k_reply);
IPC_send( $q_reply, $caller_id, $str_data );
}
/**
* Receives a complete message from the queue. $rqd_msg_id may be zero to get
* any message or >0 for a specific message ID.
*/
function IPC_rcv( $queue, $rqd_msg_id, &$msgArr, $bBlock = true ){
$message = '';
$received_message_type = 0;
while(true){
global $g_max_rcv;
$flags = 0;
if(!$bBlock){
$flags = MSG_IPC_NOWAIT;
}
if( FALSE === msg_receive( $queue, $rqd_msg_id, $received_message_type, $g_max_rcv, $message, true, $flags, $error_code ) ){
if(!$bBlock && $error_code==MSG_ENOMSG){
return 0;
}
//code 7 when g_max_rcv is too small
throw new Exception("Failed to rcv message err = $error_code");
}
//echo "Got msg part " . substr( $message, 0, 20 ) . "\n\n";
$d_meta = substr( $message, 0, 2 );
$message = substr( $message, 2 );
if( $msgArr!=null && array_key_exists( $received_message_type, $msgArr) ){
$msgArr[ $received_message_type ]['msg'] .= $message;
}else{
$msgArr=array();
$msgArr[ $received_message_type ] = array( 'final' => false, 'msg' => $message );
}
if($d_meta=='f:'){
// final message part
$msgArr[ $received_message_type ]['final'] = true;
break;
}
}
return $received_message_type;
}
/**
* Send a data string of any length.
*
* */
function IPC_send( $queue, $caller_id, $str_data ){
//print_r( msg_stat_queue($queue) );
global $procName;
global $g_k_caller;
global $g_k_reply;
global $g_max_data;
$d_meta = null;
$chunk_cnt = 0;
if( strlen($str_data) > $g_max_data ){
//split the data into chunks
$chunk_cnt = intdiv( strlen($str_data), $g_max_data );
$d_meta = "p:"; //partial data - many calls
}
$str_chunk = null;
while( $chunk_cnt>=0 ){
if($chunk_cnt>0){
$str_chunk = substr( $str_data, 0, $g_max_data );
$str_data = substr( $str_data, $g_max_data );
}else{
$d_meta = "f:"; //'final' part of data
$str_chunk = $str_data;
}
//echo "send($caller_id) len = ".strlen($d_meta.$str_chunk)."\n";
$error_code = 0;
if( FALSE === msg_send( $queue, $caller_id, $d_meta.$str_chunk, true, true, $error_code ) ){
//code 22 when the message is too large
throw new Exception("Failed to send message err = $error_code");
}
$chunk_cnt--;
if($chunk_cnt==0){
$d_meta = "f:"; //final data chunk
}
}
}
?>