NSQ实现消息延迟执行

NSQ实现消息延迟执行

最近我给自己做了一个记事微信小程序,主要是怕自己把事情给忘了,虽然现在市面上有很多成熟的应用,但是作为程序猿,而且最近工作也不是那么忙,就想着自己折腾一个。既然是备忘,当然得有一个消息提醒,这就需要涉及到延迟执行。现在针对延迟执行也有许多方案,比如:定时扫描、消息队列等。定时扫描在时间上有延迟而且扫描频率影响着数据库的性能,另外还有定时轮(TimingWheel)算法,不过这里我还是选择了消息队列的方式。

支持延迟消息投递的消息队列也比较多,如:rabbitmq、rocketmq、kafka等都可以实现延迟消息。但是这里我比较中意NSQ,因为它开源、简单易用、部署快捷、高性能。

NSQ部署

在部署NSQ之前,我们首先得了解它的三个组件nsqdnsqlookupdnsqadmin:

  • nsqd主要负责接收、存储和转发消息发给客户端
  • nsqlookupd主要负责nsqd的注册以及心跳、状态监控。
  • nsqadmin是消息的web管理界面

Windows部署NSQ

我们这里先实践一下Windows的安装方式,首先需要去NSQ下载Windows版本并解压.

  1. 首先启动nsqlookupd:

    1
    2
    3
    4
    E:\tool\nsq-1.2.0\bin>nsqlookupd
    [nsqlookupd] 2020/01/14 09:26:37.816513 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
    [nsqlookupd] 2020/01/14 09:26:37.882337 INFO: HTTP: listening on [::]:4161
    [nsqlookupd] 2020/01/14 09:26:37.882337 INFO: TCP: listening on [::]:4160

    默认端口为http:4161tcp:4160,可以通过分别指定-http-address-tcp-address两个参数来修改。

  2. 启动nsqd实例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    E:\tool\nsq-1.2.0\bin>nsqd --lookupd-tcp-address=0.0.0.0:4160 -data-path="D:/nsqdata" --broadcast-address=0.0.0.0
    [nsqd] 2020/01/14 09:38:00.365419 INFO: nsqd v1.2.0 (built w/go1.12.9)
    [nsqd] 2020/01/14 09:38:00.403363 INFO: ID: 632
    [nsqd] 2020/01/14 09:38:00.408304 INFO: NSQ: persisting topic/channel metadata to D:/nsqdata/nsqd.dat
    [nsqd] 2020/01/14 09:38:00.453186 INFO: TCP: listening on [::]:4150
    [nsqd] 2020/01/14 09:38:00.453186 INFO: HTTP: listening on [::]:4151
    [nsqd] 2020/01/14 09:38:00.453186 INFO: LOOKUP(0.0.0.0:4160): adding peer
    [nsqd] 2020/01/14 09:38:00.500058 INFO: LOOKUP connecting to 0.0.0.0:4160
    [nsqd] 2020/01/14 09:38:00.523995 INFO: LOOKUPD(0.0.0.0:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004}
  • -lookupd-tcp-address:nsqlookupd的IP和tcp端口
  • -broadcast-address:会注册到nsqlookupd,填写自己主机IP
  • -data-path:消息持久化的存储路径

    NSQ支持延时消息的默认最长时间为3600000(60分钟),可以通过-max-req-timeout参数修改

  1. 最后启动我们的nsqadmin实例:
    1
    2
    3
    E:\tool\nsq-1.2.0\bin>nsqadmin --lookupd-http-address=0.0.0.0:4161
    [nsqadmin] 2020/01/14 09:38:34.709978 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
    [nsqadmin] 2020/01/14 09:38:34.745941 INFO: HTTP: listening on [::]:4171
  • -lookupd-http-address:nsqlookupd的IP和http端口

上面我们演示的均是单个实例,仅适合于开发环境,以后有机会我们再来探究如何部署多实例搭建高可用NSQ

Docker部署NSQ

NSQ支持跨平台,如果是部署在Linux的话,建议还是使用Docker部署,在NSQ官网有详细的Docker安装说明,这里我们参照官网实践一下。编辑我们的docker-compose文件,这里我命名为nsq.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160"
- "4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 –data-path=/data/nsqdata
depends_on:
- nsqlookupd
ports:
- "4150"
- "4151"
volumes:
- /root/nsqdata:/data/nsqdata
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171"

我们稍作修改,将NSQD数据目录挂载到物理机。创建好yml配置文件后,就运行命令创建我们的容器:

1
2
3
4
5
6
7
8
9
10
11
12
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml up -d
Creating network "root_default" with the default driver
Pulling nsqlookupd (nsqio/nsq:)...
latest: Pulling from nsqio/nsq
5d20c808ce19: Pull complete
e43ee7addbdb: Pull complete
cbc99497dda7: Pull complete
Digest: sha256:78b986254986c4ae1237b32219a83c5a23354a6c30c18817597f776a4edcac41
Status: Downloaded newer image for nsqio/nsq:latest
Creating root_nsqlookupd_1 ... done
Creating root_nsqd_1 ... done
Creating root_nsqadmin_1 ... done

执行docker-compose ps查看我们的容器:

1
2
3
4
5
6
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------
root_nsqadmin_1 /nsqadmin --lookupd-http-a ... Up 4150/tcp, 4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 0.0.0.0:32772->4171/tcp
root_nsqd_1 /nsqd --lookupd-tcp-addres ... Up 0.0.0.0:32771->4150/tcp, 0.0.0.0:32770->4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 4171/tcp
root_nsqlookupd_1 /nsqlookupd Up 4150/tcp, 4151/tcp, 0.0.0.0:32769->4160/tcp, 0.0.0.0:32768->4161/tcp, 4170/tcp, 4171/tcp

如果需要删除,需要先stop,在rm:

1
2
3
4
5
6
7
8
9
10
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml stop
Stopping root_nsqadmin_1 ... done
Stopping root_nsqd_1 ... done
Stopping root_nsqlookupd_1 ... done
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml rm
Going to remove root_nsqadmin_1, root_nsqd_1, root_nsqlookupd_1
Are you sure? [yN] y
Removing root_nsqadmin_1 ... done
Removing root_nsqd_1 ... done
Removing root_nsqlookupd_1 ... done

刚才看到,如果不指定端口,容器启动时随机分配绑定的主机端口号,这样就会比较乱,所以要指定端口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "14160:4160"
- "14161:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 –data-path=/data/nsqdata
depends_on:
- nsqlookupd
ports:
- "14150:4150"
- "14151:4151"
volumes:
- /root/nsqdata:/data/nsqdata
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "14171:4171"

我们在重新创建NSQ容器:

1
2
3
4
5
6
7
8
9
10
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml up -d
Creating root_nsqlookupd_1 ... done
Creating root_nsqadmin_1 ... done
Creating root_nsqd_1 ... done
[root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------------------------
root_nsqadmin_1 /nsqadmin --lookupd-http-a ... Up 4150/tcp, 4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 0.0.0.0:14171->4171/tcp
root_nsqd_1 /nsqd --lookupd-tcp-addres ... Up 0.0.0.0:14150->4150/tcp, 0.0.0.0:14151->4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 4171/tcp
root_nsqlookupd_1 /nsqlookupd Up 4150/tcp, 4151/tcp, 0.0.0.0:14160->4160/tcp, 0.0.0.0:14161->4161/tcp, 4170/tcp, 4171/tcp

现在我们就可以查看NSQ的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@instance-p0a4erj8 ~]# curl http://127.0.0.1:14151/stats
nsqd v1.2.0 (built w/go1.12.9)
start_time 2020-01-14T08:50:35Z
uptime 1m9.272087092s

Health: OK

Memory:
heap_objects 1863
heap_idle_bytes 65486848
heap_in_use_bytes 1130496
heap_released_bytes 0
gc_pause_usec_100 0
gc_pause_usec_99 0
gc_pause_usec_95 0
next_gc_bytes 4473924
gc_total_runs 0

Topics: None

Producers: None

关于NSQ AUTH方面的知识点,大家可以看看https://nsq.io/components/nsqd.html#auth

代码示例

NSQ官网我们可以看到支持多种语言,这里我还是使用Golang,Producer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main()  {
config := nsq.NewConfig()
topicName := "nsq_demo"
msgCount := 10
producer, _ := nsq.NewProducer("127.0.0.1:4150", config)

err := producer.Ping()
if err != nil {
log.Fatal("should not be able to ping after Stop()")
return
}
defer producer.Stop()
for i := 1; i < msgCount; i++ {
fmt.Println(fmt.Sprintf("test nsq message index:%d", i))
err1 := producer.Publish(topicName, []byte(fmt.Sprintf("test nsq message index:%d", i)))
if err1 != nil {
log.Fatal("error",err1)
}
}
}

Comsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
type ComsumerHandler struct {

}

func (h *ComsumerHandler) HandleMessage(message *nsq.Message) error {
if string(message.Body) == "TOBEFAILED" {
return errors.New("fail this message")
}
fmt.Println("receive", message.NSQDAddress, "message:", string(message.Body))
return nil
}

func main() {

waiter := sync.WaitGroup{}
waiter.Add(1)

go func() {
defer waiter.Done()

config := nsq.NewConfig()
laddr := "127.0.0.1"
// so that the test can simulate binding consumer to specified address
config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
// so that the test can simulate reaching max requeues and a call to LogFailedMessage
config.DefaultRequeueDelay = 0
// so that the test wont timeout from backing off
config.MaxBackoffDuration = time.Millisecond * 50

topicName := "nsq_demo"
if config.Deflate {
topicName = topicName + "_deflate"
} else if config.Snappy {
topicName = topicName + "_snappy"
}
if config.TlsV1 {
topicName = topicName + "_tls"
}
consumer, _ := nsq.NewConsumer(topicName, "ch", config)


handler := &ComsumerHandler{
}
consumer.AddHandler(handler)
err := consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
select{}

}()
waiter.Wait()
}

,
我主要的目的是实现延迟执行,所以这里需要尝试NSQ延迟消息投递功能,我们稍作修改:
Producer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
unc main() {
config := nsq.NewConfig()
topicName := "nsq_demo"
msgCount := 10
producer, _ := nsq.NewProducer("127.0.0.1:4150", config)

err := producer.Ping()
if err != nil {
log.Fatal("should not be able to ping after Stop()")
return
}
defer producer.Stop()
for i := 1; i < msgCount; i++ {
fmt.Println(fmt.Sprintf("test nsq message index:%d", i))
//err1 := producer.Publish(topicName, []byte(fmt.Sprintf("test nsq message index:%d", i)))
delay := rand.Intn(10)
//延时消息,delay为延迟多少秒投递。
err1 := producer.DeferredPublish(topicName, time.Second * time.Duration(delay), []byte(fmt.Sprintf("test nsq message index:%d,time:%s,delay:%d", i, time.Now().Format("2020-01-14 15:04:05"), delay)))
if err1 != nil {
log.Fatal("error", err1)
}
}
}

Comsumer可以不用修改,但是我还是加上时间打印:

1
fmt.Println("time",time.Now().Format("2020-01-14 15:04:05"),"receive", message.NSQDAddress, "message:", string(message.Body))

我们再来看看效果:

1
2
3
4
5
6
7
8
9
time 14140-01-119 16:19:44 receive 127.0.0.1:4150 message: test nsq message index:8,time:14140-01-119 16:19:44,delay:0
time 14140-01-119 16:19:45 receive 127.0.0.1:4150 message: test nsq message index:1,time:14140-01-119 16:19:44,delay:1
time 14140-01-119 16:19:45 receive 127.0.0.1:4150 message: test nsq message index:5,time:14140-01-119 16:19:44,delay:1
time 14140-01-119 16:19:49 receive 127.0.0.1:4150 message: test nsq message index:7,time:14140-01-119 16:19:44,delay:5
time 14140-01-119 16:19:50 receive 127.0.0.1:4150 message: test nsq message index:9,time:14140-01-119 16:19:44,delay:6
time 14140-01-119 16:19:51 receive 127.0.0.1:4150 message: test nsq message index:2,time:14140-01-119 16:19:44,delay:7
time 14140-01-119 16:19:51 receive 127.0.0.1:4150 message: test nsq message index:3,time:14140-01-119 16:19:44,delay:7
time 14140-01-119 16:19:52 receive 127.0.0.1:4150 message: test nsq message index:6,time:14140-01-119 16:19:44,delay:8
time 14140-01-119 16:19:53 receive 127.0.0.1:4150 message: test nsq message index:4,time:14140-01-119 16:19:44,delay:9

推荐阅读

How we redesigned the NSQ - NSQ重塑之详细设计

You forgot to set the qrcode for Alipay. Please set it in _config.yml.
You forgot to set the qrcode for Wechat. Please set it in _config.yml.
You forgot to set the business and currency_code for Paypal. Please set it in _config.yml.
You forgot to set the url Patreon. Please set it in _config.yml.
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×