最近我给自己做了一个记事微信小程序,主要是怕自己把事情给忘了,虽然现在市面上有很多成熟的应用,但是作为程序猿,而且最近工作也不是那么忙,就想着自己折腾一个。既然是备忘,当然得有一个消息提醒,这就需要涉及到延迟执行。现在针对延迟执行也有许多方案,比如:定时扫描、消息队列等。定时扫描在时间上有延迟而且扫描频率影响着数据库的性能,另外还有定时轮(TimingWheel)算法,不过这里我还是选择了消息队列的方式。
支持延迟消息投递的消息队列也比较多,如:rabbitmq、rocketmq、kafka等都可以实现延迟消息。不过 NSQ
具有分布式、无单点故障、故障容错、高可用性以及高可靠等特征,已有一些公司用于生产。毕竟我只有2台1核2G的服务器,所以我还是更加青睐于它。
NSQ部署 在部署NSQ
之前,我们首先得了解它的三个组件nsqd
、nsqlookupd
、nsqadmin
:
Windows部署NSQ 我们这里先实践一下Windows
的安装方式,首先需要去NSQ 下载Windows版本并解压.
首先启动nsqlookupd
:
1 2 3 4 E:\tool\nsq-1.2.0\bin>nsqlookupd [nsqlookupd] 2020/01/14 09:26:37.816513 INFO: nsqlookupd v1.2.0 (built w/go1.12.9) [nsqlookupd] 2020/01/14 09:26:37.882337 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2020/01/14 09:26:37.882337 INFO: TCP: listening on [::]:4160
默认端口为http:4161
、tcp:4160
,可以通过分别指定-http-address
和-tcp-address
两个参数来修改。
启动nsqd
实例:
1 2 3 4 5 6 7 8 9 E:\tool\nsq-1.2.0\bin>nsqd --lookupd-tcp-address=0.0.0.0:4160 -data-path="D:/nsqdata" --broadcast-address=0.0.0.0 [nsqd] 2020/01/14 09:38:00.365419 INFO: nsqd v1.2.0 (built w/go1.12.9) [nsqd] 2020/01/14 09:38:00.403363 INFO: ID: 632 [nsqd] 2020/01/14 09:38:00.408304 INFO: NSQ: persisting topic/channel metadata to D:/nsqdata/nsqd.dat [nsqd] 2020/01/14 09:38:00.453186 INFO: TCP: listening on [::]:4150 [nsqd] 2020/01/14 09:38:00.453186 INFO: HTTP: listening on [::]:4151 [nsqd] 2020/01/14 09:38:00.453186 INFO: LOOKUP(0.0.0.0:4160): adding peer [nsqd] 2020/01/14 09:38:00.500058 INFO: LOOKUP connecting to 0.0.0.0:4160 [nsqd] 2020/01/14 09:38:00.523995 INFO: LOOKUPD(0.0.0.0:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004}
-lookupd-tcp-address:nsqlookupd的IP和tcp端口
-broadcast-address:会注册到nsqlookupd,填写自己主机IP
-data-path:消息持久化的存储路径
NSQ支持延时消息的默认最长时间为3600000(60分钟),可以通过-max-req-timeout
参数修改
最后启动我们的nsqadmin
实例:1 2 3 E:\tool\nsq-1.2.0\bin>nsqadmin --lookupd-http-address=0.0.0.0:4161 [nsqadmin] 2020/01/14 09:38:34.709978 INFO: nsqadmin v1.2.0 (built w/go1.12.9) [nsqadmin] 2020/01/14 09:38:34.745941 INFO: HTTP: listening on [::]:4171
-lookupd-http-address:nsqlookupd的IP和http端口
上面我们演示的均是单个实例,仅适合于开发环境,以后有机会我们再来探究如何部署多实例搭建高可用NSQ 。
Docker部署NSQ NSQ 支持跨平台,如果是部署在Linux的话,建议还是使用Docker部署,在NSQ官网有详细的Docker安装 说明,这里我们参照官网实践一下。编辑我们的docker-compose
文件,这里我命名为nsq.yml
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160" - "4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 –data-path=/data/nsqdata depends_on: - nsqlookupd ports: - "4150" - "4151" volumes: - /root/nsqdata:/data/nsqdata nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "4171"
我们稍作修改,将NSQD
数据目录挂载到物理机。创建好yml配置文件后,就运行命令创建我们的容器:
1 2 3 4 5 6 7 8 9 10 11 12 13 [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml up -d Creating network "root_default" with the default driver Pulling nsqlookupd (nsqio/nsq:)... latest: Pulling from nsqio/nsq 5d20c808ce19: Pull complete e43ee7addbdb: Pull complete cbc99497dda7: Pull complete Digest: sha256:78b986254986c4ae1237b32219a83c5a23354a6c30c18817597f776a4edcac41 Status: Downloaded newer image for nsqio/nsq:latest Creating root_nsqlookupd_1 ... done Creating root_nsqd_1 ... done Creating root_nsqadmin_1 ... done
执行docker-compose ps
查看我们的容器:
1 2 3 4 5 6 7 [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml ps Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------------------------- root_nsqadmin_1 /nsqadmin --lookupd-http-a ... Up 4150/tcp, 4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 0.0.0.0:32772->4171/tcp root_nsqd_1 /nsqd --lookupd-tcp-addres ... Up 0.0.0.0:32771->4150/tcp, 0.0.0.0:32770->4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 4171/tcp root_nsqlookupd_1 /nsqlookupd Up 4150/tcp, 4151/tcp, 0.0.0.0:32769->4160/tcp, 0.0.0.0:32768->4161/tcp, 4170/tcp, 4171/tcp
如果需要删除,需要先stop
,在rm
:
1 2 3 4 5 6 7 8 9 10 11 12 [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml stop Stopping root_nsqadmin_1 ... done Stopping root_nsqd_1 ... done Stopping root_nsqlookupd_1 ... done [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml rm Going to remove root_nsqadmin_1, root_nsqd_1, root_nsqlookupd_1 Are you sure? [yN] y Removing root_nsqadmin_1 ... done Removing root_nsqd_1 ... done Removing root_nsqlookupd_1 ... done
刚才看到,如果不指定端口,容器启动时随机分配绑定的主机端口号,这样就会比较乱,所以要指定端口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "14160:4160" - "14161:4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 –data-path=/data/nsqdata depends_on: - nsqlookupd ports: - "14150:4150" - "14151:4151" volumes: - /root/nsqdata:/data/nsqdata nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "14171:4171"
我们在重新创建NSQ容器:
1 2 3 4 5 6 7 8 9 10 11 [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml up -d Creating root_nsqlookupd_1 ... done Creating root_nsqadmin_1 ... done Creating root_nsqd_1 ... done [root@instance-p0a4erj8 ~]# docker-compose -f nsq.yml ps Name Command State Ports ----------------------------------------------------------------------------------------------------------------------------------------------------- root_nsqadmin_1 /nsqadmin --lookupd-http-a ... Up 4150/tcp, 4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 0.0.0.0:14171->4171/tcp root_nsqd_1 /nsqd --lookupd-tcp-addres ... Up 0.0.0.0:14150->4150/tcp, 0.0.0.0:14151->4151/tcp, 4160/tcp, 4161/tcp, 4170/tcp, 4171/tcp root_nsqlookupd_1 /nsqlookupd Up 4150/tcp, 4151/tcp, 0.0.0.0:14160->4160/tcp, 0.0.0.0:14161->4161/tcp, 4170/tcp, 4171/tcp
现在我们就可以查看NSQ的状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [root@instance-p0a4erj8 ~]# curl http://127.0.0.1:14151/stats nsqd v1.2.0 (built w/go1.12.9) start_time 2020-01-14T08:50:35Z uptime 1m9.272087092s Health: OK Memory: heap_objects 1863 heap_idle_bytes 65486848 heap_in_use_bytes 1130496 heap_released_bytes 0 gc_pause_usec_100 0 gc_pause_usec_99 0 gc_pause_usec_95 0 next_gc_bytes 4473924 gc_total_runs 0 Topics: None Producers: None
关于NSQ AUTH方面的知识点,大家可以看看https://nsq.io/components/nsqd.html#auth 。
代码示例 在NSQ官网 我们可以看到支持多种语言,这里我还是使用Golang
,Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func main() { config := nsq.NewConfig() topicName := "nsq_demo" msgCount := 10 producer, _ := nsq.NewProducer("127.0.0.1:4150", config) err := producer.Ping() if err != nil { log.Fatal("should not be able to ping after Stop()") return } defer producer.Stop() for i := 1; i < msgCount; i++ { fmt.Println(fmt.Sprintf("test nsq message index:%d", i)) err1 := producer.Publish(topicName, []byte(fmt.Sprintf("test nsq message index:%d", i))) if err1 != nil { log.Fatal("error",err1) } } }
Comsumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 type ComsumerHandler struct { } func (h *ComsumerHandler) HandleMessage(message *nsq.Message) error { if string(message.Body) == "TOBEFAILED" { return errors.New("fail this message") } fmt.Println("receive", message.NSQDAddress, "message:", string(message.Body)) return nil } func main() { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config := nsq.NewConfig() laddr := "127.0.0.1" // so that the test can simulate binding consumer to specified address config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0") // so that the test can simulate reaching max requeues and a call to LogFailedMessage config.DefaultRequeueDelay = 0 // so that the test wont timeout from backing off config.MaxBackoffDuration = time.Millisecond * 50 topicName := "nsq_demo" if config.Deflate { topicName = topicName + "_deflate" } else if config.Snappy { topicName = topicName + "_snappy" } if config.TlsV1 { topicName = topicName + "_tls" } consumer, _ := nsq.NewConsumer(topicName, "ch", config) handler := &ComsumerHandler{ } consumer.AddHandler(handler) err := consumer.ConnectToNSQD("127.0.0.1:4150") if nil != err { fmt.Println("err", err) return } select{} }() waiter.Wait() }
, 我主要的目的是实现延迟执行,所以这里需要尝试NSQ延迟消息投递功能,我们稍作修改: Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 unc main() { config := nsq.NewConfig() topicName := "nsq_demo" msgCount := 10 producer, _ := nsq.NewProducer("127.0.0.1:4150", config) err := producer.Ping() if err != nil { log.Fatal("should not be able to ping after Stop()") return } defer producer.Stop() for i := 1; i < msgCount; i++ { fmt.Println(fmt.Sprintf("test nsq message index:%d", i)) //err1 := producer.Publish(topicName, []byte(fmt.Sprintf("test nsq message index:%d", i))) delay := rand.Intn(10) //延时消息,delay为延迟多少秒投递。 err1 := producer.DeferredPublish(topicName, time.Second * time.Duration(delay), []byte(fmt.Sprintf("test nsq message index:%d,time:%s,delay:%d", i, time.Now().Format("2020-01-14 15:04:05"), delay))) if err1 != nil { log.Fatal("error", err1) } } }
Comsumer可以不用修改,但是我还是加上时间打印:
1 fmt.Println("time",time.Now().Format("2020-01-14 15:04:05"),"receive", message.NSQDAddress, "message:", string(message.Body))
我们再来看看效果:
1 2 3 4 5 6 7 8 9 time 14140-01-119 16:19:44 receive 127.0.0.1:4150 message: test nsq message index:8,time:14140-01-119 16:19:44,delay:0 time 14140-01-119 16:19:45 receive 127.0.0.1:4150 message: test nsq message index:1,time:14140-01-119 16:19:44,delay:1 time 14140-01-119 16:19:45 receive 127.0.0.1:4150 message: test nsq message index:5,time:14140-01-119 16:19:44,delay:1 time 14140-01-119 16:19:49 receive 127.0.0.1:4150 message: test nsq message index:7,time:14140-01-119 16:19:44,delay:5 time 14140-01-119 16:19:50 receive 127.0.0.1:4150 message: test nsq message index:9,time:14140-01-119 16:19:44,delay:6 time 14140-01-119 16:19:51 receive 127.0.0.1:4150 message: test nsq message index:2,time:14140-01-119 16:19:44,delay:7 time 14140-01-119 16:19:51 receive 127.0.0.1:4150 message: test nsq message index:3,time:14140-01-119 16:19:44,delay:7 time 14140-01-119 16:19:52 receive 127.0.0.1:4150 message: test nsq message index:6,time:14140-01-119 16:19:44,delay:8 time 14140-01-119 16:19:53 receive 127.0.0.1:4150 message: test nsq message index:4,time:14140-01-119 16:19:44,delay:9