Skip to content

txlaijava/alimq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 

Repository files navigation

spring boot starter for aliYunMQ Build Status

Maven CentralGitHub release

项目介绍

消息队列 RocketMQ 是阿里巴巴自研消息产品,服务于整个集团已超过 13 年,经过阿里巴巴交易核心链路反复打磨与历年双十一购物狂欢节的严苛考验,是一个真正具备低延迟、高并发、高可用、高可靠,可支撑万亿级数据洪峰的分布式消息中间件。

本模块实现了阿里云MQ的以下几个功能。

  • 同步发送消息
  • 异步发送消息
  • 广播发送消息
  • 有序发送和消费消息
  • 发送延时消息
  • 消息tag支持
  • 自动序列化和反序列化消息体
  • 发送事务消息

如何集成

1、添加依赖
<dependency>
  <groupId>com.shopping</groupId>
  <artifactId>spring-boot-starter-aliyunmq</artifactId>
  <version>1.0.0.RELEASE</version>
</dependency>

因为我们有自己的maven私库,所以没有po到maven公有库里。配置代码库:

<repositories>
    <repository>
        <id>shopping</id>
        <url>http://maven.sucok.com/content/groups/public</url>
    </repository>
    <repository>
        <id>sonatype-nexus-staging</id>
        <name>Sonatype Nexus Staging</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

2、添加配置

aliyun.mq.onsAddr=<地址>
aliyun.mq.accessKey=
aliyun.mq.secretKey=
aliyun.mq.TopicIdRblc=
#为false表示不引入producer为true则producerId必须提供
aliyun.mq.producer.enabled=true
aliyun.mq.producer.producerId=
#为false表示不引入consumer为true则consumerId必须提供
aliyun.mq.consumer.enabled=true

3、添加生产者

顺序消息

按照消息的发布顺序进行顺序消费(FIFO),支持全局顺序与分区顺序

@Autowired
OrderMessageTemplate orderMessageTemplate;

//发送
orderMessageTemplate(new MessageEvent("{topic}", "{tag}", msgBody));

普通消息,定时消息,延迟消息生产者

消息可在指定的时间点(如2019/01/01 15:00:00)或延迟时间(如30分钟后)进行投递

@Autowired
RocketMQTemplate rocketMQTemplate;

// 发送普通消息
rocketMQTemplate.send(new MessageEvent("{topic}", "{tag}", msgBody));
// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递
rocketMQTemplate.sendAsync(new MessageEvent("{topic}", "{tag}", msgBody),3000);
// 延时消息,指定时间进行投递。例如消息在1天之后投递
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.DATE, 1);
rocketMQTemplate.sendAsync(new MessageEvent("{topic}", "{tag}", msgBody), cal.getTime(););

事务消息

类似 X/Open XA 的分布事务功能,既可做到系统间的解耦,又能保证数据的最终一致性

@Autowired
TransactionMessageTemplate messageTemplate;

// 发送事务消息
/**封装消息*/
MessageEvent event = new MessageEvent();
event.setTopic("base_sms");
event.setTag("Tag_user");
   
User user = new User();
user.setName("Paul");
user.setAdds("北京市 昌平区 龙锦苑东二区");
 /**封装任意类型领域对象*/
event.setDomain(user);
    
transactionMessageTemplate.send(event,new TransactionExecuter() {
@Override
public TransactionStatus executer(MessageEvent messageEvent, Long hashValue, Object arg) {
	String transactionId = TransactionDemo.createTransaction();
	TransactionStatus status = TransactionDemo.checker();
	return status;
}
),"参数对象,以本字符串示例,会传递给TransactionExecuter.executer");

4、添加订阅者

普通消息订阅者,实现AbstractMessageListener接口

  • topic:支持SpringEl风格,取配置文件中的值。(必填)
  • tag:标签。* 标识所有。 (必填)
  • consumerId:消费者Id。 (必填)
  • consumeMode:消费模式 有序(单线程)或者无序(多线程) (默认无序)
@RocketMQMessageListener(topic = "${aliyun.mq.TopicIdRblc}", tag = "giveCoupon", consumerId = "CID_shopping_giveCoupon", consumeMode = MessageExtConst.CONSUME_MODE_ORDERLY)
public class GiveCouponMessageListener extends AbstractMessageListener<MessageEvent> {
    /**
     * 消息处理
     */
    @Override
    public void handle(MessageEvent messageEvent) throws Exception {
    	//TODO 业务处理
    }
}

顺序消息订阅者,实现AbstractMessageOrderListener

@RocketMQMessageListener(topic = "${aliyun.mq.TopicIdRblc}", tag = "giveCoupon", consumerId = "CID_shopping_giveCoupon", consumeMode = MessageExtConst.CONSUME_MODE_ORDERLY)
public class GiveCouponMessageListener extends AbstractMessageOrderListener <MessageEvent> {
    /**
     * 消息处理
     */
    @Override
    public void handle(MessageEvent messageEvent) throws Exception {
    	//TODO 业务处理
    }
}

5、相关参考


About

基于阿里云MQ服务封装的spring-boot模块

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages