PHPããActiveMQã«ç¹ãã§ã¿ã
via. PHP Stompライブラリを呼び出す消費者スクリプトを書く | 秋元@サイボウズラボ・プログラマー・ブログ
ããã¶ãå(2007å¹´10æ)ã«ãµã¤ãã¦ãºã©ãã®ç§å ãããPHP勉強会で発表したネタã§ãããstomp拡張モジュールããªãªã¼ã¹ããããã¨ã¨ããã¤ãæ¥åã§ãã¥ã¼ãå°å ¥ããããç¥ããªãã¨ãããã¨ã§ãã¡ãã£ã¨ãã¤èª¿ã¹å§ãã¦ããç¨åº¦éãæºã¾ã£ã¦ããã®ã§ãã¡ãã£ã¨ã¾ã¨ãã¦ã¿ã¾ããã
以ä¸ãã¡ãã£ã¨é·ãã§ãã
ã¡ãã»ã¼ã¸ã»ãã¥ã¼ã¤ã³ã°ã¨ã¯
BPnetã®èª¬æãåãããããããªã
ã¡ãã»ã¼ã¸ã»ãã¥ã¼ã¤ã³ã°ã¯ï¼ãã¥ã¼ã«å ¥ããã¡ãã»ã¼ã¸ãããåããããã¨ã§ï¼ã·ã¹ãã éã飿ºãããã¡ãã»ã¼ã¸ã»ãã¥ã¼ã¤ã³ã°ã¯ãã¡ã¤ã«è»¢éã¨åãéåæåã®ã¡ã«ããºã ã§ãããï¼ãã¡ã¤ã«è»¢éããã峿æ§ãé«ããã¢ããªã±ã¼ã·ã§ã³ã®å é¨ã§ã¡ãã»ã¼ã¸ãçæãã¦ãã¥ã¼ã«å ¥ããããï¼å©ç¨è ããªã¿ã¼ã³ã»ãã¼ãæ¼ããã¿ã¤ãã³ã°ã§ãµã¼ãã¼ã¨é£æºãããã¨ãå¯è½ã ããã¡ã¤ã«è»¢éã§ã¯ï¼ãã®ãããªé£æºã¯é£ããããã¾ãï¼ã¡ãã»ã¼ã¸ã®éãæã¯ï¼ãã¥ã¼ã«ã¡ãã»ã¼ã¸ãå ¥ããã¨ããã«æ¬¡ã®å¦çã«ç§»ããåãæã¯ï¼ãã¥ã¼ããã¡ãã»ã¼ã¸ãåãåºãã¦å¦çãããã¡ãã»ã¼ã¸ã®éãæã¨åãæã¯éåæã§å¦çããããï¼ä¸æ¹ã®ã·ã¹ãã ããã¦ã³ãã¦ãç´æ¥å½±é¿ãåããªãã
Stompã¨ã¯
Stompã¨ã¯ãStreaming Text Orientated Messaging Protocolã®ç¥ã§ããã¥ã¼ã¸ã®å
¥åºåã«ç¹åãããããã³ã«ã§ããStompã®ä»æ§ã¯http://stomp.codehaus.org/ProtocolSTOMP Protocol Specification, Version 1.2ã«ããã¾ããé常ã«ç°¡æ½ãªãããã³ã«ã§ããããã©ã³ã¶ã¯ã·ã§ã³ããµãã¼ããã¦ãã¾ãã
éåä¿¡ãããã¡ãã»ã¼ã¸ã¯ã
- ã³ãã³ãè¡
- ããã(ãã¼ã¨å¤)
- 空è¡
- ããã£
- NULLæå
ã¨ããæ§æã§ããã¬ã¼ã ã¨å¼ã°ãã¾ããæå¾ã®NULLæåãé¤ãã°ãHTTPã¨ä¼¼ã¦ãã¾ããã
ActiveMQã¨ã¯
Apache ActiveMQ - Wikipediaã«ãããã¾ãããASF(Apache Software Foundation)ããリリースされているã¡ãã»ã¼ã¸é¢é£ã®ãªã¼ãã³ã½ã¼ã¹ã®ããã«ã¦ã§ã¢ã§ãStompããµãã¼ããã¦ãã¾ããJava以å¤ã«Perl, Python, Ruby, PHP, .NET, C/C++ãªã©ã®ã¯ã©ã¤ã¢ã³ãã©ã¤ãã©ãªãç¨æããã¦ãã¾ãã
PHPç¨Stompã©ã¤ãã©ãª
大ããã¯ä»¥ä¸ã®2ã¤ã
- 秋元さんのエントリã«ãããPHPçStompã©ã¤ãã©ãª(ç¾å¨ã¯CodehausããFUSEForgeに引っ越した模æ§)
- PECL::stomp
ä»åã¯ãããã2ã¤ã使ã£ã¦ActiveMQã«ç¹ãã§ã¿ã¾ããããªããPHPçStompã©ã¤ãã©ãªã使ç¨ããå ´åã¯ãSocketæ¡å¼µã¢ã¸ã¥ã¼ã«ãå¿ è¦ã«ãªãã¾ããã¾ããSSLã§ã®æ¥ç¶ããããªãå ´åã¯ãopensslæ¡å¼µã¢ã¸ã¥ã¼ã«ãå¿ è¦ã«ãªãã¾ãã
ãã¹ãç°å¢
PHPçStompã©ã¤ãã©ãªã®ã¤ã³ã¹ãã¼ã«
FUSEForgeãããã¦ã³ãã¼ãã»å±éããã ãã§ãã
$ wget http://stomp.fusesource.org/release/php/1.0/stomp-php-1.0.0.tar.gz $ tar zxf stomp-php-1.0.0.tar.gz $
Stompæ¡å¼µã¢ã¸ã¥ã¼ã«ã®ã¤ã³ã¹ãã¼ã«ã¨è¨å®
ä»åã¯ãã¼ã¸ã§ã³0.3.1ãã¤ã³ã¹ãã¼ã«ãStomp-0.2.0ããSSLã§ã®æ¥ç¶ããµãã¼ãããã¦ãã¾ãã
$ sudo pecl install stomp-beta $ php -i -dextension=stomp.so | grep -i stomp Stomp Stomp => enabled stomp.default_broker => tcp://localhost:61613 => tcp://localhost:61613 stomp.default_connection_timeout_sec => 2 => 2 stomp.default_connection_timeout_usec => 0 => 0 stomp.default_read_timeout_sec => 2 => 2 stomp.default_read_timeout_usec => 0 => 0 $
ActiveMQã®ã¤ã³ã¹ãã¼ã«ã»è¨å®ã»èµ·å/忢
ActiveMQã®ãã¦ã³ãã¼ããã¼ã¸ãããã¦ã³ãã¼ãã»å±éããã ãã§ãã
$ wget http://ftp.riken.jp/net/apache/activemq/apache-activemq/5.3.0/apache-activemq-5.3.0-bin.tar.gz
$ tar zxf apache-activemq-5.3.0-bin.tar.gz
$ cd apache-activemq-5.3.0/bin/
$
è¨å®ãã¡ã¤ã«ã¯ãapache-activemq-5.3.0/confãã£ã¬ã¯ããªã«é ç½®ãã¾ããããã©ã«ãã®è¨å®ãã¡ã¤ã«ã¯activemq.xmlã§ãããªããactivemq.xmlã§ã¯Stompãåãä»ããè¨å®ã«ã¯ãªã£ã¦ãã¾ãããä»åã¯ã次ã®ãããªç°¡åãªè¨å®ãã¡ã¤ã«ãsimple.xmlã¨ãã¦ç¨æãã¾ããã
<?xml version="1.0" encoding="UTF-8"?> <beans> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <transportConnectors> <transportConnector name="stomp" uri="stomp://localhost:61613"/> </transportConnectors> </broker> </beans>
è¨å®ãã¡ã¤ã«ã使ãããèµ·åãã¦ã¿ã¾ããç°å¢å¤æ°JAVA_HOMEãè¨å®ãã¦ããªãå ´åã¯è¨å®ãã¦ããã¾ããããèµ·åããã¨ãã°ãåºåããã¾ãã®ã§ãstompãããã³ã«ããã¼ã61613ã§listenãã¦ãããã©ããã確èªãã¦ããã¾ãã
$ export JAVA_HOME=/usr/local/jdk $ ./activemq xbean:simple.xml Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre ï¼ INFO | Listening for connections at: stomp://localhost.localdomain:61613 INFO | Connector stomp Started INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-33413-1257844882047-0:0) started
忢ã¯ãCTRL+Cã§ã
ç°¡åãªãµã³ãã«
PHPçã©ã¤ãã©ãªã使ã£ãä¾
ã¡ãã»ã¼ã¸ãéä¿¡ããå´(simple_publisher.php)ã¯æ¬¡ã®ããã«ãªãã¾ãã
<?php require_once("Stomp.php"); $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $stomp->connect(); $stomp->send($queue, $msg); } catch (StompException $e) { echo $e->getMessage(); }
connectã¡ã½ããã§æ¥ç¶ããsendã¡ã½ããã«éä¿¡ãããã¥ã¼(destination)ã¨ã¡ãã»ã¼ã¸ãæå®ãã¦å¼ã³åºãã ãã§ãã
次ã«ã¡ãã»ã¼ã¸ãåä¿¡ããå´(simple_consumer.php)ã
<?php require_once("Stomp.php"); $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; try { $stomp = new Stomp($broker); $stomp->connect(); $stomp->subscribe($queue); if ($stomp->hasFrameToRead()) { $frame = $stomp->readFrame(); echo $frame->body . PHP_EOL; $stomp->ack($frame); } else { echo 'no frame' . PHP_EOL; } } catch(StompException $e) { echo $e->getMessage() . PHP_EOL; }
æ¥ç¶å¾ããã¥ã¼ãsubscribeãããã¬ã¼ã ããããã©ãããhasFrameToReadã¡ã½ããã§ç¢ºèªãã¾ããã¾ã ãã¬ã¼ã ãããå ´åã¯ãã¬ã¼ã ãreadFrameã¡ã½ããã§èªã¿è¾¼ã¿ã¾ããæå¾ã«ackã¡ã½ããã§ã¡ãã»ã¼ã¸ãèªã¿è¾¼ãã ãã¨ãéç¥ãããã¥ã¼ãããã®ã¡ãã»ã¼ã¸ãåé¤ãã¾ãã
å®è¡ããã¨æ¬¡ã®ããã«ãªãã¾ãã
$ php simple_publisher.php $ php simple_consumer.php bar20091110222402 $
PECL::stomp使ã£ãä¾
PHPçã©ã¤ãã©ãªã使ã£ãä¾ã¨åºæ¬çã«ã¯åãã§ãããã ããStompã¯ã©ã¹ãã¤ã³ã¹ã¿ã³ã¹åããéã«æ¥ç¶(connectã¡ã½ããç¸å½)ãå®è¡ãã¾ãã
<?php $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $stomp->send($queue, $msg); } catch (StompException $e) { echo $e->getMessage(); }
ã¡ãã»ã¼ã¸ãåä¿¡ããå´ãã»ã¼åãã§ã(simple_consumer.php)ãhasFrameToReadã¡ã½ããã®åç§°ãhasFrameã¡ã½ããã«ãªãã ãã§ãã
<?php $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; try { $stomp = new Stomp($broker); $stomp->subscribe($queue); if ($stomp->hasFrame()) { $frame = $stomp->readFrame(); echo $frame->body . PHP_EOL; $stomp->ack($frame); } else { echo 'no frame' . PHP_EOL; } } catch(StompException $e) { echo $e->getMessage() . PHP_EOL; }
ãã©ã³ã¶ã¯ã·ã§ã³
Stompã®ãã©ã³ã¶ã¯ã·ã§ã³ã¯SQLã®ãã©ã³ã¶ã¯ã·ã§ã³ã¨ä¼¼ã¦ãã¦ãå¦çæé ã¯
- begin
- sendãackãªã©ã®å¦ç
- commit or abort
ã®ããã«ãªãã¾ãããã©ã³ã¶ã¯ã·ã§ã³ã¯åå¥ã®ID(ãã©ã³ã¶ã¯ã·ã§ã³ID)ã§åºå¥ããããã¹ããããã©ã³ã¶ã¯ã·ã§ã³ãå¯è½ã§ãã
以ä¸ãStompæ¡å¼µã¢ã¸ã¥ã¼ã«ã使ãä¾(publisher_tx.php)ã§ãããPHPçStompã©ã¤ãã©ãªã§ãã»ã¼åæ§ã§ããã©ã³ã¶ã¯ã·ã§ã³IDãé
åã§ã¯ãªãæååã¨ãã¦ã¡ã½ããã«æ¸¡ãã¾ãããã®è¾ºã¯Stomp.phpã®ã½ã¼ã¹ã³ã¼ããè¦ã¦ãã ãã;-)
<?php /** * ãã©ã³ã¶ã¯ã·ã§ã³IDãçºè¡ãã */ function getTxId() { return sha1(md5(time() . rand() . uniqid(rand(), true))); } $broker = 'tcp://localhost:61613'; $queue = '/queue/stomp_sample'; $msg = 'bar' . date('YmdHis'); try { $stomp = new Stomp($broker); $tx1 = getTxId(); $stomp->begin($tx1); $stomp->send($queue, 'å¤å´1', array('transaction' => $tx1)); /** * ãã©ã³ã¶ã¯ã·ã§ã³å ã®å¥ãã©ã³ã¶ã¯ã·ã§ã³(æå¾ã«abort) * ã¤ã³ãã³ãããé¨åããã®ç¯å² */ $tx2 = getTxId(); $stomp->begin($tx2); $stomp->send($queue, 'å å´', array('transaction' => $tx2)); $stomp->abort($tx2); $stomp->send($queue, 'å¤å´2', array('transaction' => $tx1)); $stomp->commit($tx1); } catch (StompException $e) { echo $e->getMessage(); }
å®è¡ããã¨æ¬¡ã®ããã«ãªãã¾ãã"å å´"ã¨ããã¡ãã»ã¼ã¸ãabortããã¦ãããã¨ã«æ³¨æã
$ php -dextension=stomp.so publisher_tx.php $ php -dextension=stomp.so simple_consumer.php å¤å´1 $ php -dextension=stomp.so simple_consumer.php å¤å´2 $ php -dextension=stomp.so simple_consumer.php no frame $
SSLã§ã®æ¥ç¶
ã¯ã©ã¤ã¢ã³ãã¨ActiveMQéã§SSLéä¿¡ããããªãå ´åãã¾ãã¯ActiveMQå´ã®è¨å®ãã¡ã¤ã«ã以ä¸ã®ããã«å¤æ´ãã¾ã(ssl.xml)ã以ä¸ã®ä¾ã§ã¯ã
- ãã¼ã61613ï¼Stompã§ã®éä¿¡
- ãã¼ã61614ï¼Stomp+SSLã§ã®éä¿¡
ã¨ãªãã¾ãã
<?xml version="1.0" encoding="UTF-8"?> <beans> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" useJmx="false"> <sslContext> <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/> </sslContext> <transportConnectors> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61614"/> </transportConnectors> </broker> </beans>
ssl.xmlã使ããããActiveMQãåèµ·åãã¾ãã
$ ./activemq xbean:ssl.xml Java Runtime: Sun Microsystems Inc. 1.6.0_16 /usr/local/jdk1.6.0_16/jre ï¼ INFO | Listening for connections at: stomp://localhost.localdomain:61613 INFO | Connector stomp Started INFO | Listening for connections at: stomp+ssl://localhost.localdomain:61614 INFO | Connector stomp+ssl Started INFO | ActiveMQ JMS Message Broker (localhost, ID:centos-53915-1257859186651-0:0) started
ã¯ã©ã¤ã¢ã³ãå´ã¯ãtcpã¹ãã¼ã ã®ä»£ããã«"ssl"ãæå®ãã¾ãã
<?php //$broker = 'tcp://localhost:61613'; $broker = 'ssl://localhost:61614'; ï¼
ã¡ãªã¿ã«
ãµã³ãã«ãå®è¡ããã¨ãã®éä¿¡å 容ãngrepã§è¦ãã¨æ¬¡ã®ãããªæãã«ãªãã¾ããã¾ãã¯simple_publisher.phpã
$ sudo ngrep -W byline -d lo port 61613 interface: lo (127.0.0.0/255.0.0.0) filter: (ip) and ( port 61613 ) #### T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] CONNECT login: passcode: ## T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34405 [AP] CONNECTED session:ID:centos-53915-1257859186651-2:5 . ## T 127.0.0.1:34405 -> 127.0.0.1:61613 [AP] SEND destination:/queue/stomp_sample bar20091110223756 # T 127.0.0.1:34405 -> 127.0.0.1:61613 [AFP] . DISCONNECT . ###exit 42 received, 0 dropped $
ç¶ãã¦simple_consumer.phpã
$ sudo ngrep -W byline -d lo port 61613 interface: lo (127.0.0.0/255.0.0.0) filter: (ip) and ( port 61613 ) #### T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] CONNECT login: passcode: ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] CONNECTED session:ID:centos-53915-1257859186651-2:6 . ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] SUBSCRIBE ack:client destination:/queue/stomp_sample activemq.prefetchSize:1 ## T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] MESSAGE message-id:ID:centos-53915-1257859186651-2:2:-1:1:1 destination:/queue/stomp_sample timestamp:1257860176371 expires:0 priority:0 bar20091110223616. # T 127.0.0.1:34406 -> 127.0.0.1:61613 [AP] ACK message-id:ID:centos-53915-1257859186651-2:2:-1:1:1 destination:/queue/stomp_sample timestamp:1257860176371 expires:0 priority:0 # T 127.0.0.1:34406 -> 127.0.0.1:61613 [AFP] . DISCONNECT . ## T 127.0.0.1:61613 -> 127.0.0.1:34406 [AP] MESSAGE message-id:ID:centos-53915-1257859186651-2:3:-1:1:1 destination:/queue/stomp_sample timestamp:1257860203444 expires:0 priority:0 bar20091110223643. #exit 57 received, 0 dropped $
ãããã³ã«ä»æ§ã¨è¦æ¯ã¹ã¦ã¿ãã¨é¢ç½ãããã