ActiveMQ の producer , consumer を実装してみる
結局 ActiveMQ を使うには JMS の API に沿って実装した producer , consumer を実装する必要がある。
単純な例
の3つを作成しました。
producer.groovy は ActiveMQ にメッセージを送る producer , consumer.groovy は ActiveMQ のキューにあるメッセージを受け取り標準出力する consumer です。
consumer.groovy は一つのキューを受信して終了してしまうので、 スレッドとしてずっと待機して キューにメッセージが入ったら受信するバージョンが consumerAsDaemon.groovy です。
http://www.javablogging.com/simple-guide-to-java-message-service-jms-using-activemq/ を参考にしました。
1) message producer の実装 → producer.groovy
import javax.jms.*
import org.apache.activemq.*
// 0) 事前準備
def subject = 'TESTQUEUE'
def msg = 'hello jms , ' + new Date()
// 1) ActiveMQに接続して connection を得る
def url = ActiveMQConnection.DEFAULT_BROKER_URL // → url == failover://tcp://localhost:61616
def cf = new ActiveMQConnectionFactory(url);
def conn= cf.createConnection()
conn.start()
// 2) セッションを取得して TESTQUEUE にメッセージを送信
def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
def destination = session.createQueue(subject);
def producer = session.createProducer(destination)
// 3) メッセージを生成して送信
def message = session.createTextMessage(msg)
producer.send( message )
// 4) connection を閉じておしまい
conn.close()
実行するには、activemq/lib/*jar を全部 ~/.groovy/lib/ 以下に配置しておくべし。
groovy producer
2) message consumer の実装 → consumer.groovy
import javax.jms.*
import org.apache.activemq.*
// 0)
def subject = 'TESTQUEUE'
// 1)
def url = ActiveMQConnection.DEFAULT_BROKER_URL // failover://tcp://localhost:61615
def cf = new ActiveMQConnectionFactory(url);
def conn= cf.createConnection()
conn.start()
// 2)
def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
def destination = session.createQueue(subject);
// 3) メッセージの受信
def consumer = session.createConsumer(destination);
def message = consumer.receive();
if (message instanceof TextMessage) {
println message.text
}
//println message.toString()
// 4)
conn.close()
3) message consumer の実装その2 → consumerAsDaemon.groovy
producerがいつメッセージを送ってくるかわからないので ずっと待ち受けして、キューにメッセージが入り次第 consume する consumer 。
import javax.jms.*
import org.apache.activemq.*
class MsgConsumer implements Runnable{
def subject = 'TESTQUEUE'
def conn
def consumer
def MsgConsumer(){
def url = ActiveMQConnection.DEFAULT_BROKER_URL // failover://tcp://localhost:61615
def cf = new ActiveMQConnectionFactory(url);
this.conn= cf.createConnection()
conn.start()
def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
def destination = session.createQueue(subject);
this.consumer = session.createConsumer(destination);
this.consumer.setMessageListener( {
if (it instanceof TextMessage) { println it.text }
} as MessageListener )
}
void run(){
while(true){
Thread.sleep(1000)
if( stop ){ break }
}
conn.close()
}
boolean stop
void pleaseStop(){ this.stop=true }
}
new Thread( new MsgConsumer() ).start()