<?php /** * This file provides a mechanism to allow inter-process communication (IPC) by way of an RPC. * * It uses the System V architecture which was meant to have been superceeded by POSIX. However, * I found some issues with POSIX around non-blocking behaviour and timeouts. This may have been * the PHP inplementation and I have intentions to revisit this later. For the time being, SysV * is works exactly as you would expect. * * Given the issues around the message length, I will not even try to use the serialisation. There * is no way to know what size the resultant serialised message will be. There are errors occurring * when the message length is not even halve of that specified by the value at /proc/sys/kernel/msgmnb * */ $g_procName = 'BU-commander'; //posix_mkfifo( $g_procName, 0644 ); $g_k_caller = null; $g_k_reply = null; //max msg length - this fails on Mint-21. Trial and error gives a max size of 8182. //$g_max_chan = file_get_contents('/proc/sys/kernel/msgmnb'); $g_max_chan = 8182; // If the receive bytes is not larger than what was sent, the rcv will fail with code 7. Making it 10 bytes // larger was not enough but 100 was emough. I mean, go figure ??? $g_max_rcv = $g_max_chan+100; $g_max_data = $g_max_chan - 2; $main_pid = 0; function rpc_setGlobals( $name, $value ){ switch( $name ){ case 'procName': global $g_procName; global $g_k_caller; global $g_k_reply; $g_procName = $value; $g_k_caller = ftok( $g_procName, 'A'); //caller $g_k_reply = ftok( $g_procName, 'B'); //reply debug_print( "g_k_caller key = $g_k_caller\n" ); debug_print( "g_k_reply key = $g_k_reply\n" ); break; case 'max_chan': global $g_max_chan; global $g_max_rcv; global $g_max_data; $g_max_chan = $value; $g_max_rcv = $g_max_chan+100; $g_max_data = $g_max_chan - 2; //we use the first 2 bytes for msg chunks break; case 'main_pid': global $main_pid; $main_pid = (int)$value; break; } } //************************************************************************************************************************ // The following functions all relate to making an RPC. // They use the php serialize() to send objects across the process boundary /** * Sends a call to the remote process. The call blocks until a reply is recieved. There is no unblocking version of this method. * * The underlying layer uses the process ID of this process and so it is not possible to send more than one message at a time * because it would not be possible to correlate the replies. This is not a really problem because of the lack of multi-threading * support in PHP. */ function rpc( $funcName, ...$params ){ array_unshift( $params, $funcName); return unserialize( IPC( serialize( $params ) ) ); } /** * Returns the next message waiting on the queue if there is one. The value of $caller_id must be checked * to ascertain the validity of the return value. * * A greater of zero means that either no message was received in * the case of non-blocking or that a message has only partially been retrieved and is not yet ready. A positive value * means that the message is ready and unserialize. * * $msgArr is of little use here because it contains the unserialized msg. It must be supplied though as it is * used by the underlying layer as a storage of partially retrieved messages. * */ function rpc_listen( &$caller_id, &$msgArr, $bBlock = true ){ //debug_print( "-- rpc_listen in\n" ); $caller_id = IPC_listen( $msgArr, $bBlock ); //debug_print( "-- rpc_listen out\n" ); if($caller_id==0 && !$bBlock) return null; $obj = unserialize( $msgArr[$caller_id]['msg'] ); //done with the global message unset( $msgArr[$caller_id] ); return $obj; } /** * A server process listening for messages must use this method to reply. The sender will be waiting! */ function rpc_reply( $caller_id, $obj ){ IPC_reply( $caller_id, serialize($obj) ); } //************************************************************************************************************************ // // All functions below relate to sending and recieving of a single message string without the concept // of a function or parameters. // function clear_IPC( $g_procName ){ global $g_max_rcv; $received_message_type=null; $message=null; $q_caller = msg_get_queue( ftok( $g_procName, 'A') ); while( FALSE !== msg_receive( $q_caller, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){ print("Dumping msg: $received_message_type\n"); } msg_remove_queue($q_caller); $q_reply = msg_get_queue( ftok( $g_procName, 'B') ); while( FALSE !== msg_receive( $q_reply, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){ print("Dumping msg: $received_message_type\n"); } msg_remove_queue($q_reply); print("Queues cleared\n"); } /** * Turns out that the largest premissible queue size is found at /proc/sys/kernel/msgmnb * On my system this is 16384 bytes. * * Rather than increase this, it will be easier to split the messages into chunks. * */ function IPC( $str_data ){ global $procName; global $g_k_caller; global $g_k_reply; global $g_max_data; global $g_max_rcv; global $main_pid; $q_caller = msg_get_queue($g_k_caller); $q_reply = msg_get_queue($g_k_reply); $pid = getmypid(); if($main_pid==$pid){ /* This is a safeguard so that you do not spend years trying to find out why a receive is blocked. * The main process sets its pid (main_pid) at startup. Although it is conceivable that main.php might want to make an rpc call, it * will not be a call to itself, rather another process. But this would need to happen on another queue but this API is only set up * to use the 2 hard-coded queues currently. It is likely that you are trying to call a DB-function through rpc when main.php should * be calling the db-function directly. */ throw new Exception("IPC calls are not allowed within the same process ($pid)"); } IPC_send( $q_caller, $pid, $str_data ); $msgArr = null; IPC_rcv( $q_reply, $pid, $msgArr ); return $msgArr[$pid]['msg']; } /** * Listens to the caller queue and collects messages from all calling processes and populates the * given array indexed by the ID of the message rcvd. * * As soon as a message is complete, the procid will be returned. The array may also contain partially filled * message from other IDs so the caller should check the contents of msgArr before deleting. This method is * designed to be called by an RPC serving process. * * Note that there may still be other procids still sending but returning as soon as possible ensures that * no one caller can hog the channel. */ function IPC_listen( &$msgArr, $bBlock = true ){ global $g_k_caller; $q_caller = msg_get_queue($g_k_caller); return IPC_rcv( $q_caller, 0, $msgArr, $bBlock ); } /** * Server replies to an RPC request. */ function IPC_reply( $caller_id, $str_data ){ global $g_k_reply; $q_reply = msg_get_queue($g_k_reply); IPC_send( $q_reply, $caller_id, $str_data ); } /** * Receives a complete message from the queue. $rqd_msg_id may be zero to get * any message or >0 for a specific message ID. */ function IPC_rcv( $queue, $rqd_msg_id, &$msgArr, $bBlock = true ){ $message = ''; $received_message_type = 0; while(true){ global $g_max_rcv; $flags = 0; if(!$bBlock){ $flags = MSG_IPC_NOWAIT; } //debug_print( "IPC_rcv in for id = $rqd_msg_id\n" ); if( FALSE === msg_receive( $queue, $rqd_msg_id, $received_message_type, $g_max_rcv, $message, false, $flags, $error_code ) ){ if(!$bBlock && $error_code==MSG_ENOMSG){ //debug_print( "IPC_rcv out 1\n" ); return 0; } //debug_print( "IPC_rcv out 2\n" ); //I'm guessing at these error codes... //code 7 when g_max_rcv is too small //code 43 when pipe was broken by server throw new Exception("Failed to rcv message err = $error_code"); } //debug_print( "IPC_rcv out 3\n" ); //echo "Got msg part " . substr( $message, 0, 20 ) . "\n\n"; $d_meta = substr( $message, 0, 2 ); $message = substr( $message, 2 ); if( $msgArr!=null && array_key_exists( $received_message_type, $msgArr) ){ $msgArr[ $received_message_type ]['msg'] .= $message; }else{ $msgArr=array(); $msgArr[ $received_message_type ] = array( 'final' => false, 'msg' => $message ); } if($d_meta=='f:'){ // final message part $msgArr[ $received_message_type ]['final'] = true; break; } } return $received_message_type; } /** * Send a data string of any length. * * */ function IPC_send( $queue, $caller_id, $str_data ){ //print_r( msg_stat_queue($queue) ); global $procName; global $g_k_caller; global $g_k_reply; global $g_max_data; $d_meta = null; $chunk_cnt = 0; if( strlen($str_data) > $g_max_data ){ //split the data into chunks $chunk_cnt = intdiv( strlen($str_data), $g_max_data ); $d_meta = "p:"; //partial data - many calls } $str_chunk = null; while( $chunk_cnt>=0 ){ if($chunk_cnt>0){ $str_chunk = substr( $str_data, 0, $g_max_data ); $str_data = substr( $str_data, $g_max_data ); }else{ $d_meta = "f:"; //'final' part of data $str_chunk = $str_data; } //echo "send($caller_id) len = ".strlen($d_meta.$str_chunk)."\n"; //debug_print( "IPC_send 2\n" ); $error_code = 0; if( FALSE === msg_send( $queue, $caller_id, $d_meta.$str_chunk, false, true, $error_code ) ){ //code 22 when the message is too large throw new Exception("Failed to send message err = $error_code"); } $chunk_cnt--; if($chunk_cnt==0){ $d_meta = "f:"; //final data chunk } } } ?>