Top > Blog Index > ActiveMQ の producer , consumer を実装してみる

ActiveMQ の producer , consumer を実装してみる

結局 ActiveMQ を使うには JMS の API に沿って実装した producer , consumer を実装する必要がある。

単純な例

の3つを作成しました。

producer.groovy は ActiveMQ にメッセージを送る producer , consumer.groovy は ActiveMQ のキューにあるメッセージを受け取り標準出力する consumer です。

consumer.groovy は一つのキューを受信して終了してしまうので、 スレッドとしてずっと待機して キューにメッセージが入ったら受信するバージョンが consumerAsDaemon.groovy です。

http://www.javablogging.com/simple-guide-to-java-message-service-jms-using-activemq/ を参考にしました。

1) message producer の実装 → producer.groovy

import javax.jms.*
import org.apache.activemq.*

// 0) 事前準備
def subject = 'TESTQUEUE'
def msg = 'hello jms , ' + new Date() 

// 1) ActiveMQに接続して connection を得る
def url = ActiveMQConnection.DEFAULT_BROKER_URL // → url == failover://tcp://localhost:61616
def cf  = new ActiveMQConnectionFactory(url);
def conn= cf.createConnection()
conn.start()

// 2) セッションを取得して TESTQUEUE にメッセージを送信
def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
def destination = session.createQueue(subject);
def producer = session.createProducer(destination)

// 3) メッセージを生成して送信
def message = session.createTextMessage(msg)
producer.send( message )

// 4) connection を閉じておしまい
conn.close()

実行するには、activemq/lib/*jar を全部 ~/.groovy/lib/ 以下に配置しておくべし。

groovy producer

2) message consumer の実装 → consumer.groovy

import javax.jms.*
import org.apache.activemq.*

// 0)
def subject = 'TESTQUEUE'

// 1)
def url = ActiveMQConnection.DEFAULT_BROKER_URL // failover://tcp://localhost:61615
def cf  = new ActiveMQConnectionFactory(url);
def conn= cf.createConnection()
conn.start()

// 2)
def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
def destination = session.createQueue(subject);

// 3) メッセージの受信
def consumer = session.createConsumer(destination);
def message = consumer.receive();

if (message instanceof TextMessage) {
    println message.text
}
//println message.toString()


// 4)
conn.close()

3) message consumer の実装その2 → consumerAsDaemon.groovy

producerがいつメッセージを送ってくるかわからないので ずっと待ち受けして、キューにメッセージが入り次第 consume する consumer 。

import javax.jms.*
import org.apache.activemq.*


class MsgConsumer implements Runnable{

    def subject = 'TESTQUEUE'

    def conn
    def consumer

    def MsgConsumer(){
        def url = ActiveMQConnection.DEFAULT_BROKER_URL // failover://tcp://localhost:61615
        def cf  = new ActiveMQConnectionFactory(url);
        this.conn= cf.createConnection()
        conn.start()

        def session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
        def destination = session.createQueue(subject);

        this.consumer = session.createConsumer(destination);
        this.consumer.setMessageListener( {
            if (it instanceof TextMessage) { println it.text }
        } as MessageListener )
    }

    void run(){
        while(true){
            Thread.sleep(1000)
            if( stop ){ break }
        }
        conn.close()
    }
    boolean stop
    void pleaseStop(){ this.stop=true }
}


new Thread( new MsgConsumer() ).start()