还在用定时任务取消订单?这些支持延迟消息的开源组件,让你延迟处理任务不再犯愁

还在用定时任务取消订单?这些支持延迟消息的开源组件,让你延迟处理任务不再犯愁

延迟消息是指在消息发送之后,经过一定时间间隔后再进行消费的消息。具体实现方式常见的有两种:1、将消息存储在消息队列中,等待一段时间后再出队进行消费;2、使用定时器,在设定的时间到达后再触发消息的发送。延迟消息是一种非常实用的技术手段,可以帮助我们实现很多复杂的业务逻辑和需求。延迟消息可以应用于很多场景,其中比较常见的场景有:

  1. 订单超时未支付提醒:用户下单后,如果超过一定时间未完成支付,系统会自动发送提醒消息,以便提醒用户及时付款。
  2. 活动开始提醒:对于需要提前宣传和预热的活动,可以提前设置定时器,在活动开始前发送推送消息,以便吸引用户参加。
  3. 重试机制:在网络不稳定的情况下,消息可能发送失败,此时可以将消息存储在消息队列中,并根据设定的时间间隔自动重试发送,提高消息的成功率。
  4. 积分到期提醒:对于积分到期时间比较长的用户,可以设置定时提醒,让用户尽快使用积分,提高用户粘性。

那么支持延迟消息的开源组件有哪些呢?

RabbitMQ

RabbitMQ是一个流行的消息中间件,支持延迟消息功能。可以通过设置消息的 TTL(Time To Live)来实现延迟功能,3.5.8以上的还可以使用插件 rabbitmq_delayed_message_exchange 实现更精确的延迟控制为了实现延迟消息。值得注意的是,RabbitMQ不支持精确到毫秒的延迟,最小粒度是秒级别的。如果需要更高精度的延迟,可以使用额外的定时器来完成。

代码地址:https://github.com/rabbitmq

RocketMQ

RocketMQ是一个分布式消息队列系统,由阿里巴巴集团开源的项目。它是一款高性能、高可靠、高并发、可扩展的消息队列,支持多种消息模式,如点对点、发布/订阅和广播等。RocketMQ还提供了很多高级特性,如事务消息、顺序消息和延迟消息等。
为了实现延迟消息,需要先配置RocketMQ Broker的相关参数。在Broker的配置文件(broker.conf)中添加以下配置messageDelayLevel,messageDelayLevel表示消息的延迟级别,默认为0。RocketMQ支持18个延迟级别,每个级别代表不同的延迟时间。例如,messageDelayLevel=5表示使用第5个延迟级别,该级别对应的延迟时间为10秒。

在使用RocketMQ实现延迟消息时,需要注意以下几点:

  1. 延迟级别不宜过多,建议控制在18个以内。
  2. 消息的实际延迟时间可能会有一定的误差,因此不适合需要精确控制延迟时间的场景。
  3. 在高并发场景下,可能会出现消息堆积的情况。

    代码地址:https://github.com/apache/rocketmq.git

Kafka

Kafka是一种高性能、分布式、可扩展的消息队列系统,最初由LinkedIn公司开发并开源。目前已经成为Apache基金会的顶级项目之一。Kafka通过将数据分散存储在多个broker节点上来提高可靠性和可伸缩性
实现延迟消息可以通过以下两种方法:

  1. 使用Kafka自带的延迟消息功能:Kafka提供了一种基于时间的延迟消息机制,使用时只需将消息写入对应的topic并设置一个延迟时间,Kafka会在指定时间后将消息发送到消费者端。这种方式需要配置Kafka broker侧的参数,具体可以参考Kafka官方文档。
  2. 在消息中添加延迟时间戳:在消息中添加一个表示延迟时间的时间戳字段,在消费者端消费消息时,根据时间戳判断消息是否已经延迟完成。这种方式需要在生产者端和消费者端都进行代码实现。

延迟消息都需要注意以下几点:

  1. 要确保Kafka broker和消费者端的时间同步;
  2. 要考虑网络延迟和时钟漂移等因素对延迟消息的影响;
  3. 要注意延迟时间的精度和范围,以及对Kafka性能的影响。

代码地址:https://github.com/apache/kafka.git

ActiveMQ

ActiveMQ 是一款开源的消息中间件,它实现了多种消息协议和传输方式。可以用来实现异步通信、解耦系统、增强系统可靠性等。其中延迟消息是 ActiveMQ 的一个重要特性之一,可以实现定时任务调度、消息重试等功能。
实现延迟消息的方法有两种:

  1. 使用 ActiveMQ 提供的 Scheduler 功能:在消息发送时,设置一个延迟时间(单位为毫秒),然后将消息发送到指定的队列或主题。当时间到达时,Scheduler 会从队列中取出该消息并投递给消费者。使用这种方法需要在 ActiveMQ 配置文件中启用 Scheduler 插件,配置示例如下:
    1
    2
    3
    <plugins>
    <schedulerSupport />
    </plugins>

2.自定义消息过期时间:在消息发送时,设置一个 TTL(Time To Live)属性,表示消息的过期时间。如果消息在到达 Broker 后未被消费者接收,则 Broker 将其删除。在 ActiveMQ 的 Java 客户端中,可以使用 setJMSExpiration() 方法设置消息的过期时间。

代码地址:https://github.com/apache/activemq.git

Pulsar

Pulsar是一种开源的分布式消息系统,由Apache Software Foundation管理。它提供了高可用性、可扩展性和灵活性,支持多租户、多数据中心、多云环境等特性。Pulsar的主要优势在于其架构上的灵活性,可以不受限制地进行水平扩展,并且具有较低的延迟和高吞吐量。

Pulsar支持两种类型的消息:持久化消息和非持久化消息。持久化消息会被保存在磁盘上,即使broker节点出现故障仍然能够恢复,而非持久化消息只存在于内存中,当节点故障时会丢失。同时,Pulsar还支持延迟消息功能,可以设置消息延迟时间,到达指定时间后再发送消息。

实现延迟消息需要使用Pulsar的生产者API,通过设置消息的delayTime属性来实现。例如,以下代码演示如何将延迟时间设置为5秒的消息发送到名为“my-topic”的主题:

1
2
3
4
5
6
7
8
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.create();

MessageId messageId = producer.newMessage()
.value("Hello, Pulsar!")
.deliverAfter(5, TimeUnit.SECONDS)
.send();

在这个例子中,我们使用Pulsar客户端创建了一个生产者,并指定了要发送消息的主题。接下来,我们通过调用newMessage()方法来创建一个新的消息,并使用deliverAfter()方法设置延迟时间,然后将消息发送到主题中。

值得注意的是,Pulsar的延迟消息依赖于broker节点配置的定时任务调度器,如果broker节点出现故障或重启,则延迟消息可能会被重新计算,因此建议在实际应用中对延迟消息进行充分测试和评估。

代码地址:https://github.com/apache/pulsar.git

Beanstalkd

Beanstalkd是一个简单、快速、轻量级的分布式消息队列系统,它专注于处理任务和消息。它通过提供一个高性能的管道,使得生产者可以将任务或消息发送到队列中,而消费者可以从队列中获取任务或消息,并执行相应的操作。
当生产者发送一个带有延迟时间的任务或消息到队列中时,Beanstalkd会将该任务或消息存储在“延迟队列”中,并设置一个延迟时间。当延迟时间到达后,Beanstalkd将该任务或消息移动到主队列中,等待消费者来获取并执行。
需要注意的是,Beanstalkd 的延迟时间精度不是非常高,因此不能用于对时间精度要求非常高的场景。此外,延迟时间最长为 2^32 秒,即约49天左右。

代码地址:https://github.com/beanstalkd/beanstalkd.git

Disque

Disque是一款轻量级的分布式消息队列系统,它支持多种语言.使用Disque可以方便地实现消息队列功能,例如异步任务处理、事件驱动架构等。可以实现高性能和可靠性,并支持延迟消息。

要实现延迟消息,可以利用Disque提供的特性——DELAYED队列。当一个消息被添加到DELAYED队列时,它会在指定的延迟时间后自动转移到目标队列,从而实现延迟消息的功能。下面是一个简单的示例:

1
2
3
4
5
6
7
8
# 将消息添加到延迟队列中并设置延迟时间为30秒
> DISQUE.NADD myqueue delay 30000 "Hello, world!"
"OK"

# 在30秒后,将消息从延迟队列中移到普通队列中,并准备进行消费
> BRPOP myqueue 0
1) "myqueue"
2) "Hello, world!"

在上面的示例中,我们首先使用DISQUE.NADD命令将消息添加到延迟队列中,并设置延迟时间为30秒。当30秒后到达时,Disque将自动将消息从延迟队列中移到普通队列中,并准备进行消费。

需要注意的是,在使用DISQUE.NADD命令时,必须指定一个唯一的ID,以确保同一条消息不会被添加多次。此外,也可以在DELAY参数中指定一个具体的时间戳,以实现更精确的延迟控制。

代码地址:https://github.com/antirez/disque.git

NSQ

NSQ是一款基于Go语言编写的开源消息队列系统,它具有高可用性、高吞吐量和低延迟等优点。NSQ使用分布式架构,采用去中心化设计,每个节点都可以独立运行,不需要集中式的管理节点。同时,NSQ还提供了多种消息传输协议和客户端库,可以方便地与其他应用程序进行集成。
NSQ提供了一种叫做“Delayed Requeue”的延迟队列机制,可以将消息放入到指定的延迟队列中,并设置延迟的时间。NSQ会在指定的时间后将消息放入到普通队列中,供消费者消费。这种方式可以精确控制消息的延迟时间,但需要额外的配置和代码实现。NSQ支持延时消息的默认最长时间为3600000(60分钟),可以通过-max-req-timeout参数修改。
Delayed Requeue只能应用于具有持久性的消息,因为如果消息没有持久性,将无法将其重新推入队列中。此外,Delayed Requeue可能会影响NSQ的性能,因此建议仅在必要时使用它。

代码地址:https://github.com/nsqio/nsq.git

Redis

Redis可以通过使用sorted set和Lua脚本实现延迟消息,步骤如下:

  1. 将消息作为value,以延迟时间作为score写入一个sorted set中。
  2. 使用Lua脚本定时扫描sorted set,找到score小于当前时间的消息并将其取出来执行。
  3. 如果消息需要重复执行,则在执行完毕后再次将其放回sorted set中,并更新其score值。

Redisson

Redisson是基于Java的Redis客户端,提供了丰富的功能和API。它不仅支持Redis的常规操作,如字符串、列表、集合等数据类型,还支持分布式锁、限流器、分布式Map、队列、发布/订阅等常用的分布式工具。Redisson底层使用Netty作为网络框架,通过异步非阻塞的方式与Redis进行通信,保证了高效性能和可靠性。
Redisson提供了基于Redis实现的延迟队列,使用ZSet数据结构保存消息和对应的执行时间戳。当消息的时间戳达到指定时间时,Redisson会自动将消息从延迟队列中取出,并将其转移到目标队列中,以供消费者处理。这种实现方式具有高效性能和可扩展性,可以满足各种分布式场景下的需求。
Redisson提供了一个方便的API来创建延迟队列。例如,以下代码创建了一个名为”myQueue”的延迟队列,其中的元素都是String类型的:

1
2
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(redisson.getQueue("myQueue"));
delayedQueue.offer("Hello", 10, TimeUnit.SECONDS);

在上面的示例中,我们使用getQueue()方法获取一个普通的队列,然后将其传递给getDelayedQueue()方法来创建一个延迟队列。接着,我们向延迟队列中添加一个值为”Hello”的元素,并设置了10秒钟的延迟时间。

最后,我们需要创建一个消费者来处理延迟队列中过期的元素。以下代码会阻塞当前线程,等待延迟时间到达后处理相应的元素:

1
2
String element = delayedQueue.take();
System.out.println("Processing element: " + element);

在上面的示例中,我们调用了take()方法来获取下一个过期的元素,并打印出来进行处理。

需要注意的是,如果延迟队列中没有元素,take()方法会一直阻塞当前线程。

延迟消息需要额外的存储空间来存储消息的延迟信息。如果存在大量的延迟消息,可能会对消息队列的存储带宽和存储容量造成压力。延迟时间越长,滞留的消息就越多,可能会多消息队列的性能带来影响,所以,有时候,我们需要合理设置延迟时间,如果时间过长,可以通过定时任务+延迟队列来处理。各位有其他更好的方式吗?

还在用定时任务取消订单?这些支持延迟消息的开源组件,让你延迟处理任务不再犯愁

https://blogs.52fx.biz/posts/793559045.html

作者

eyiadmin

发布于

2023-05-17

更新于

2024-05-31

许可协议

评论