RocketMQ实战与原理解析
上QQ阅读APP看书,第一时间看更新

2.3 发送/接收消息示例

可以用自己熟悉的开发工具创建一个Java项目,加入RocketMQ Client包的依赖,用代码清单2-1的内容发送消息,这个示例代码是以Sync方式发送消息的。

代码清单2-1 Producer示例程序

        public class SyncProducer {
            public static void main(String[] args) throws Exception {
                //Instantiate with a Producer group name.
                DefaultMQProducer Producer = new
                    DefaultMQProducer("please_rename_unique_group_name");
                producer.setNamesrvAddr("192.168.100.131:9876");
                //Launch the instance.
                Producer.start();
                for (int i = 0; i < 100; i++) {
                    //Create a Message instance, specifying Topic, tag and Message
                        body.
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message
                                body */
                    );
                    //Call send Message to deliver Message to one of brokers.
                    SendResult sendResult = Producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
                //Shut down once the Producer instance is not longer in use.
                Producer.shutdown();
            }
        }

主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送。接下来看看如何接收消息,也就是使用DefaultMQPushConsumer类实现的消费者程序,如代码清单2-2所示。

代码清单2-2 Consumer示例程序

              /*
                * Instantiate with specified Consumer group name.
                */
                DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please
                    rename to unique group name");
                /*
                * Specify name server addresses.
                Consumer.setNamesrvAddr("192.168.249.47:9876");
                /*
                * Specify where to start in case the specified Consumer group is a
                    brand new one.
                */
      Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                //Consumer.setMessageModel(MessageModel.BROADCASTING);
                /*
                * Subscribe one more more Topics to consume.
                */
                Consumer.subscribe("TopicTest”, "*");
                /*
                  *  Register callback to execute on arrival of Messages fetched from
                      brokers.
                  */
                Consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
                        msgs,   ConsumeConcurrentlyContext context) {
                        System.out.printf(Thread.currentThread().getName()  +  "
                            Receive New Messages: " + msgs + "%n");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                /*
                  *  Launch the Consumer instance.
                  */
                Consumer.start();

Consumer或Producer都必须设置GroupName、NameServer地址以及端口号。然后指明要操作的Topic名称,最后进入发送和接收逻辑。