MqConsumer.java(rocketmq消息消费者)

package com.kenai.mq;
import com.alibaba.fastjson.JSON;
import com.kenai.dao.ItemStockDOMapper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;@Component
public class MqConsumer {private DefaultMQPushConsumer consumer;@Resourceprivate ItemStockDOMapper itemStockDOMapper;@Value("${mq.nameserver.addr}")private String nameAddr;@Value("${mq.topicname}")private String topicName;@PostConstructpublic void init() throws MQClientException {consumer = new DefaultMQPushConsumer("stock_consumer_group");// consumer连接nameserverconsumer.setNamesrvAddr(nameAddr);// consumer订阅所有stock topic消息consumer.subscribe(topicName, "*");// 当消息推送过来之后的处理方式consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 实现库存真正到数据库内扣减的逻辑Message msg = msgs.get(0);String jsonString = new String(msg.getBody());Map<String, Object> map = JSON.parseObject(jsonString, Map.class);Integer itemId = (Integer) map.get("itemId");Integer amount = (Integer) map.get("amount");itemStockDOMapper.decreaseStock(itemId, amount);// 说明该消息已经被消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}