Newer
Older
backup-commander / html / lib / sysVcom.php
  1. <?php
  2.  
  3. /**
  4. * This file provides a mechanism to allow inter-process communication (IPC) by way of an RPC.
  5. *
  6. * It uses the System V architecture which was meant to have been superceeded by POSIX. However,
  7. * I found some issues with POSIX around non-blocking behaviour and timeouts. This may have been
  8. * the PHP inplementation and I have intentions to revisit this later. For the time being, SysV
  9. * is works exactly as you would expect.
  10. *
  11. * Given the issues around the message length, I will not even try to use the serialisation. There
  12. * is no way to know what size the resultant serialised message will be. There are errors occurring
  13. * when the message length is not even halve of that specified by the value at /proc/sys/kernel/msgmnb
  14. *
  15. */
  16. $g_procName = 'BU-commander';
  17. //posix_mkfifo( $g_procName, 0644 );
  18. $g_k_caller = null;
  19. $g_k_reply = null;
  20.  
  21. //max msg length - this fails on Mint-21. Trial and error gives a max size of 8182.
  22. //$g_max_chan = file_get_contents('/proc/sys/kernel/msgmnb');
  23. $g_max_chan = 8182;
  24.  
  25. // If the receive bytes is not larger than what was sent, the rcv will fail with code 7. Making it 10 bytes
  26. // larger was not enough but 100 was emough. I mean, go figure ???
  27. $g_max_rcv = $g_max_chan+100;
  28.  
  29. $g_max_data = $g_max_chan - 2;
  30. $main_pid = 0;
  31.  
  32.  
  33. function rpc_setGlobals( $name, $value ){
  34.  
  35. switch( $name ){
  36. case 'procName':
  37. global $g_procName;
  38. global $g_k_caller;
  39. global $g_k_reply;
  40. $g_procName = $value;
  41. $g_k_caller = ftok( $g_procName, 'A'); //caller
  42. $g_k_reply = ftok( $g_procName, 'B'); //reply
  43. debug_print( "g_k_caller key = $g_k_caller\n" );
  44. debug_print( "g_k_reply key = $g_k_reply\n" );
  45. break;
  46. case 'max_chan':
  47. global $g_max_chan;
  48. global $g_max_rcv;
  49. global $g_max_data;
  50. $g_max_chan = $value;
  51. $g_max_rcv = $g_max_chan+100;
  52. $g_max_data = $g_max_chan - 2; //we use the first 2 bytes for msg chunks
  53. break;
  54. case 'main_pid':
  55. global $main_pid;
  56. $main_pid = (int)$value;
  57. break;
  58. }
  59.  
  60. }
  61.  
  62. //************************************************************************************************************************
  63. // The following functions all relate to making an RPC.
  64. // They use the php serialize() to send objects across the process boundary
  65.  
  66. /**
  67. * Sends a call to the remote process. The call blocks until a reply is recieved. There is no unblocking version of this method.
  68. *
  69. * The underlying layer uses the process ID of this process and so it is not possible to send more than one message at a time
  70. * because it would not be possible to correlate the replies. This is not a really problem because of the lack of multi-threading
  71. * support in PHP.
  72. */
  73. function rpc( $funcName, ...$params ){
  74. array_unshift( $params, $funcName);
  75. return unserialize( IPC( serialize( $params ) ) );
  76. }
  77.  
  78. function rpc_with_array( $params ){
  79. return unserialize( IPC( serialize( $params ) ) );
  80. }
  81.  
  82. /**
  83. * Returns the next message waiting on the queue if there is one. The value of $caller_id must be checked
  84. * to ascertain the validity of the return value.
  85. *
  86. * A greater of zero means that either no message was received in
  87. * the case of non-blocking or that a message has only partially been retrieved and is not yet ready. A positive value
  88. * means that the message is ready and unserialize.
  89. *
  90. * $msgArr is of little use here because it contains the unserialized msg. It must be supplied though as it is
  91. * used by the underlying layer as a storage of partially retrieved messages.
  92. *
  93. */
  94. function rpc_listen( &$caller_id, &$msgArr, $bBlock = true ){
  95. //debug_print( "-- rpc_listen in\n" );
  96. $caller_id = IPC_listen( $msgArr, $bBlock );
  97. //debug_print( "-- rpc_listen out\n" );
  98. if($caller_id==0 && !$bBlock) return null;
  99. $obj = unserialize( $msgArr[$caller_id]['msg'] );
  100. //done with the global message
  101. unset( $msgArr[$caller_id] );
  102. return $obj;
  103. }
  104.  
  105. /**
  106. * A server process listening for messages must use this method to reply. The sender will be waiting!
  107. */
  108. function rpc_reply( $caller_id, $obj ){
  109. IPC_reply( $caller_id, serialize($obj) );
  110. }
  111.  
  112.  
  113. //************************************************************************************************************************
  114. //
  115. // All functions below relate to sending and recieving of a single message string without the concept
  116. // of a function or parameters.
  117. //
  118.  
  119. function clear_IPC( $g_procName ){
  120. global $g_max_rcv;
  121. $received_message_type=null;
  122. $message=null;
  123. $q_caller = msg_get_queue( ftok( $g_procName, 'A') );
  124. while( FALSE !== msg_receive( $q_caller, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){
  125. print("Dumping msg: $received_message_type\n");
  126. }
  127. msg_remove_queue($q_caller);
  128. $q_reply = msg_get_queue( ftok( $g_procName, 'B') );
  129. while( FALSE !== msg_receive( $q_reply, 0, $received_message_type, $g_max_rcv, $message, false, MSG_IPC_NOWAIT ) ){
  130. print("Dumping msg: $received_message_type\n");
  131. }
  132. msg_remove_queue($q_reply);
  133. print("Queues cleared\n");
  134. }
  135.  
  136. /**
  137. * Turns out that the largest premissible queue size is found at /proc/sys/kernel/msgmnb
  138. * On my system this is 16384 bytes.
  139. *
  140. * Rather than increase this, it will be easier to split the messages into chunks.
  141. *
  142. */
  143. function IPC( $str_data ){
  144. global $procName;
  145. global $g_k_caller;
  146. global $g_k_reply;
  147. global $g_max_data;
  148. global $g_max_rcv;
  149. global $main_pid;
  150. $q_caller = msg_get_queue($g_k_caller);
  151. $q_reply = msg_get_queue($g_k_reply);
  152. $pid = getmypid();
  153.  
  154. if($main_pid==$pid){
  155. /* This is a safeguard so that you do not spend years trying to find out why a receive is blocked.
  156. * The main process sets its pid (main_pid) at startup. Although it is conceivable that main.php might want to make an rpc call, it
  157. * will not be a call to itself, rather another process. But this would need to happen on another queue but this API is only set up
  158. * to use the 2 hard-coded queues currently. It is likely that you are trying to call a DB-function through rpc when main.php should
  159. * be calling the db-function directly.
  160. */
  161. throw new Exception("IPC calls are not allowed within the same process ($pid)");
  162. }
  163. IPC_send( $q_caller, $pid, $str_data );
  164. $msgArr = null;
  165. IPC_rcv( $q_reply, $pid, $msgArr );
  166. return $msgArr[$pid]['msg'];
  167.  
  168. }
  169.  
  170. /**
  171. * Listens to the caller queue and collects messages from all calling processes and populates the
  172. * given array indexed by the ID of the message rcvd.
  173. *
  174. * As soon as a message is complete, the procid will be returned. The array may also contain partially filled
  175. * message from other IDs so the caller should check the contents of msgArr before deleting. This method is
  176. * designed to be called by an RPC serving process.
  177. *
  178. * Note that there may still be other procids still sending but returning as soon as possible ensures that
  179. * no one caller can hog the channel.
  180. */
  181. function IPC_listen( &$msgArr, $bBlock = true ){
  182. global $g_k_caller;
  183.  
  184. $q_caller = msg_get_queue($g_k_caller);
  185. return IPC_rcv( $q_caller, 0, $msgArr, $bBlock );
  186.  
  187. }
  188.  
  189. /**
  190. * Server replies to an RPC request.
  191. */
  192. function IPC_reply( $caller_id, $str_data ){
  193. global $g_k_reply;
  194. $q_reply = msg_get_queue($g_k_reply);
  195. IPC_send( $q_reply, $caller_id, $str_data );
  196. }
  197.  
  198. /**
  199. * Receives a complete message from the queue. $rqd_msg_id may be zero to get
  200. * any message or >0 for a specific message ID.
  201. */
  202. function IPC_rcv( $queue, $rqd_msg_id, &$msgArr, $bBlock = true ){
  203.  
  204. $message = '';
  205. $received_message_type = 0;
  206. while(true){
  207. global $g_max_rcv;
  208.  
  209. $flags = 0;
  210. if(!$bBlock){
  211. $flags = MSG_IPC_NOWAIT;
  212. }
  213. //debug_print( "IPC_rcv in for id = $rqd_msg_id\n" );
  214. if( FALSE === msg_receive( $queue, $rqd_msg_id, $received_message_type, $g_max_rcv, $message, false, $flags, $error_code ) ){
  215. if(!$bBlock && $error_code==MSG_ENOMSG){
  216. //debug_print( "IPC_rcv out 1\n" );
  217. return 0;
  218. }
  219. //debug_print( "IPC_rcv out 2\n" );
  220. //I'm guessing at these error codes...
  221. //code 7 when g_max_rcv is too small
  222. //code 43 when pipe was broken by server
  223. throw new Exception("Failed to rcv message err = $error_code");
  224. }
  225. //debug_print( "IPC_rcv out 3\n" );
  226. //echo "Got msg part " . substr( $message, 0, 20 ) . "\n\n";
  227. $d_meta = substr( $message, 0, 2 );
  228. $message = substr( $message, 2 );
  229. if( $msgArr!=null && array_key_exists( $received_message_type, $msgArr) ){
  230. $msgArr[ $received_message_type ]['msg'] .= $message;
  231. }else{
  232. $msgArr=array();
  233. $msgArr[ $received_message_type ] = array( 'final' => false, 'msg' => $message );
  234. }
  235.  
  236. if($d_meta=='f:'){
  237. // final message part
  238. $msgArr[ $received_message_type ]['final'] = true;
  239. break;
  240. }
  241. }
  242.  
  243. return $received_message_type;
  244.  
  245. }
  246.  
  247.  
  248. /**
  249. * Send a data string of any length.
  250. *
  251. * */
  252. function IPC_send( $queue, $caller_id, $str_data ){
  253. //print_r( msg_stat_queue($queue) );
  254. global $procName;
  255. global $g_k_caller;
  256. global $g_k_reply;
  257. global $g_max_data;
  258.  
  259. $d_meta = null;
  260. $chunk_cnt = 0;
  261.  
  262. if( strlen($str_data) > $g_max_data ){
  263. //split the data into chunks
  264. $chunk_cnt = intdiv( strlen($str_data), $g_max_data );
  265. $d_meta = "p:"; //partial data - many calls
  266. }
  267.  
  268. $str_chunk = null;
  269. while( $chunk_cnt>=0 ){
  270.  
  271. if($chunk_cnt>0){
  272. $str_chunk = substr( $str_data, 0, $g_max_data );
  273. $str_data = substr( $str_data, $g_max_data );
  274. }else{
  275. $d_meta = "f:"; //'final' part of data
  276. $str_chunk = $str_data;
  277. }
  278.  
  279. //echo "send($caller_id) len = ".strlen($d_meta.$str_chunk)."\n";
  280. //debug_print( "IPC_send 2\n" );
  281. $error_code = 0;
  282. if( FALSE === msg_send( $queue, $caller_id, $d_meta.$str_chunk, false, true, $error_code ) ){
  283. //code 22 when the message is too large
  284. throw new Exception("Failed to send message err = $error_code");
  285. }
  286. $chunk_cnt--;
  287. if($chunk_cnt==0){
  288. $d_meta = "f:"; //final data chunk
  289. }
  290. }
  291. }
  292.  
  293. ?>