在之前的系统,用户主要是经销商,用户数不多,数据量也不大,便采用了最简单的方式-定时扫描表,这种方式确实简单粗暴,如果要实时性较高,那么就只有频繁扫描表,这样就增加了数据库的压力,如果要减轻数据库压力,那么就只有降低扫描频率,这样就出现延迟很高。这次的新系统的用户和数据量较之前的系统就要多一些了,如果再使用之前的方式,不止数据库压力会很大,用户体验也不好。
现在,有很多支持延迟消息的开源消息队列,如:beanstalkd
、rabbitmq
、rocketmq
、nsq
等,最开始,我们有想到过用消息队列来实现这种延迟消费,但是,这又增加了我们的运维成本,便暂时放弃这种方案。因为系统有用到过Redis,当时也有想到过基于Redis
的key过期事件机制来实现,但是这货有个大问题,如果消费程序重启或者挂掉期间有key过期,这部分的事件就丢失了,所以这种方式也不想。直到某天,我在查看Redisson文档时,突然眼前一亮,看到一个关键字RDelayedQueue
,这不就是延迟队列吗?正好现在系统有用到Redis,要不就试试这个家伙吧。
引入相关包
这里我主要用到了hutool
、lombok
和redisson
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-core</artifactId> <version>5.7.16</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.16.5</version> </dependency>
|
简单配置
这里只需要简单配置一些Redisson
的redis连接信息即可
1 2 3 4
| spring: redis: host: 127.0.0.1 port: 6379
|
Show Code
这里,我新建了一个OrderDto
类,简单加入两个属性
1 2 3 4 5 6
| @Getter @Setter public class OrderDto implements Serializable { private String orderNo; private String createDate; }
|
再新建一个Runner
类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component public class RedissonRunner implements ApplicationRunner {
@Autowired RedissonClient redissonClient;
private static final int DELAYTIME = 1 * 60; //延迟时间,1分钟
@Override public void run(ApplicationArguments args) throws Exception { RBlockingQueue<OrderDto> blockingQueue = redissonClient.getBlockingQueue("delay_queue"); RDelayedQueue<OrderDto> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
for (int orderIndex = 0; orderIndex <= 99; orderIndex++) { OrderDto orderDto = new OrderDto(); orderDto.setOrderNo("order0" + String.valueOf(orderIndex)); orderDto.setCreateDate(DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss:SSS")); delayedQueue.offer(orderDto, DELAYTIME, TimeUnit.SECONDS); //放入队列 }
ThreadUtil.execAsync(() -> { while (true) { OrderDto orderDto = blockingQueue.poll(); if (ObjectUtils.isEmpty(orderDto)) { try { Thread.sleep(1000); continue; } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(StrUtil.format("当前时间:[{}],订单号:[{}],订单时间:[{}]", DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss:SSS"), orderDto.getOrderNo(), orderDto.getCreateDate())); } }); } }
|
这里,我们为了模拟延迟,就把延迟时间设置为60
秒。好,我们运行起来看看效果,当我们运行起来时,redis里面会多出两个key
等待一分钟后,我们边能通过poll
来获取需要消费的消息
当消息消费完后且无新消息写入该队列中,这两个key就会被删除。
我们再来模拟一下,消费程序退出再重启的情况下,是否可以继续消费队列。
在redis中我们也可以看到超过延迟时间且还未消费的消息会一直存在,直到消费端将其消费。
这次没有时间去看底层的实现逻辑,先用着试试看,后面有时间再深入研究。如果家人们有更好的方式,请提出你们宝贵的意见,谢谢。