NSQ的可用性和可靠性

在前面,NSQ实现消息延迟执行中我简单的介绍了NSQ的搭建和使用,在此,我们继续探究一下NSQ的高可用性。毕竟开发的服务都希望7*24小时都能正常使用,虽然不能保证**100%的可用性,但是我们也希望无限趋近于100%**。

NSQ数据持久化

NSQ是一个内存消息队列,默认情况下,消息是存储在内存中不会立刻持久化到磁盘,只有当消息数量达到MemQueueSize的默认阀值10000时会进行持久化。如果想确保所有传入消息都持久化到磁盘,可以设置**–mem-queue-size0**,但是这样必定会影响NSQ的性能。

NSQ的可用性

单节点部署都会存在可用性风险,万一节点挂掉,会导致业务不可用。所以,我们要避免单节点。接下来,我就进行单机部署多节点的演示。

1.启动N(大于等于2)个nsqlookupd节点

首先我们需要保证我们的注册中心不是单节点。这里我在cmd启动两个nsqlookupd节点:

1
2
3
4
5
6
7
8
9
E:\tool\nsq-1.2.0\bin>nsqlookupd -http-address=":4161" -tcp-address=":4160"
[nsqlookupd] 2020/01/16 09:38:09.605212 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2020/01/16 09:38:09.660066 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2020/01/16 09:38:09.660066 INFO: TCP: listening on [::]:4160

E:\tool\nsq-1.2.0\bin>nsqlookupd -http-address=":4171" -tcp-address=":4170"
[nsqlookupd] 2020/01/16 09:38:51.065321 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2020/01/16 09:38:51.102226 INFO: HTTP: listening on [::]:4171
[nsqlookupd] 2020/01/16 09:38:51.102226 INFO: TCP: listening on [::]:4170

2.启动N(大于等于2)个nsqd节点

这里我在cmd启动三个nsqd节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
E:\tool\nsq-1.2.0\bin>nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4150" -http-address=":4151"
[nsqd] 2020/01/16 09:55:25.104651 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2020/01/16 09:55:25.138560 INFO: ID: 632
[nsqd] 2020/01/16 09:55:25.183440 INFO: TOPIC(nsq_producer): created
[nsqd] 2020/01/16 09:55:25.183440 INFO: TOPIC(nsq_demo): created
[nsqd] 2020/01/16 09:55:25.185434 INFO: TOPIC(nsq_demo): new channel(ch)
[nsqd] 2020/01/16 09:55:25.185434 INFO: TOPIC(delay): created
[nsqd] 2020/01/16 09:55:25.186431 INFO: TOPIC(delay): new channel(ch)
[nsqd] 2020/01/16 09:55:25.186431 INFO: NSQ: persisting topic/channel metadata to D:/nsqdata1/nsqd.dat
[nsqd] 2020/01/16 09:55:25.260526 INFO: TCP: listening on [::]:4150
[nsqd] 2020/01/16 09:55:25.260526 INFO: HTTP: listening on [::]:4151
[nsqd] 2020/01/16 09:55:25.260526 INFO: LOOKUP(0.0.0.0:4160): adding peer
[nsqd] 2020/01/16 09:55:25.262521 INFO: LOOKUP connecting to 0.0.0.0:4160
[nsqd] 2020/01/16 09:55:25.269502 INFO: LOOKUPD(0.0.0.0:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004}
[nsqd] 2020/01/16 09:55:25.269502 INFO: LOOKUPD(0.0.0.0:4160): REGISTER nsq_producer
[nsqd] 2020/01/16 09:55:25.270500 INFO: LOOKUPD(0.0.0.0:4160): REGISTER nsq_demo ch
[nsqd] 2020/01/16 09:55:25.273546 INFO: LOOKUPD(0.0.0.0:4160): REGISTER delay ch
[nsqd] 2020/01/16 09:55:25.278485 INFO: LOOKUP(0.0.0.0:4170): adding peer
[nsqd] 2020/01/16 09:55:25.278485 INFO: LOOKUP connecting to 0.0.0.0:4170
[nsqd] 2020/01/16 09:55:25.284462 INFO: LOOKUPD(0.0.0.0:4170): peer info {TCPPort:4170 HTTPPort:4171 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004}
[nsqd] 2020/01/16 09:55:25.284462 INFO: LOOKUPD(0.0.0.0:4170): REGISTER nsq_demo ch
[nsqd] 2020/01/16 09:55:25.287454 INFO: LOOKUPD(0.0.0.0:4170): REGISTER delay ch
[nsqd] 2020/01/16 09:55:25.291443 INFO: LOOKUPD(0.0.0.0:4170): REGISTER nsq_producer
[nsqd] 2020/01/16 09:55:25.292444 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER nsq_producer
[nsqd] 2020/01/16 09:55:25.294437 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER nsq_producer
[nsqd] 2020/01/16 09:55:25.296430 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER nsq_demo
[nsqd] 2020/01/16 09:55:25.296430 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER nsq_demo
[nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4160): channel REGISTER nsq_demo ch
[nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4170): channel REGISTER nsq_demo ch
[nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER delay
[nsqd] 2020/01/16 09:55:25.298425 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER delay
[nsqd] 2020/01/16 09:55:25.298425 INFO: LOOKUPD(0.0.0.0:4160): channel REGISTER delay ch
[nsqd] 2020/01/16 09:55:25.299423 INFO: LOOKUPD(0.0.0.0:4170): channel REGISTER delay ch

其他NSQD节点:

1
2
nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4140" -http-address=":4141"
nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4130" -http-address=":4131"

3.启动nsqadmin节点

nsqadmin只是监控,这里我暂时只部署一个节点:

1
2
3
E:\tool\nsq-1.2.0\bin>nsqadmin --lookupd-http-address=0.0.0.0:4161 --lookupd-http-address=0.0.0.0:4171 -http-address=":4121"
[nsqadmin] 2020/01/16 10:00:57.383466 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
[nsqadmin] 2020/01/16 10:00:57.420368 INFO: HTTP: listening on [::]:4121

最终效果:

代码示例

在之前,我们是直接连接的NSQD

这样做的话很多工作就需要客户端来处理。所以官方推荐我们连接NSQD的注册中心nsqlookupd。所以我们Comsumer的代码需要调整一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"errors"
"fmt"
"github.com/nsqio/go-nsq"
"sync"
"time"
)

type ComsumerHandler struct {

}

func (h *ComsumerHandler) HandleMessage(message *nsq.Message) error {
if string(message.Body) == "TOBEFAILED" {
return errors.New("fail this message")
}
fmt.Println("time",time.Now().Format("2020-01-14 15:04:05"),"receive", message.NSQDAddress, "message:", string(message.Body))
return nil
}

func main() {

waiter := sync.WaitGroup{}
waiter.Add(1)

go func() {
defer waiter.Done()

config := nsq.NewConfig()
topicName := "nsq_demo"
adds := []string{"127.0.0.1:4161", "127.0.0.1:4171"}
config.MaxInFlight = 1000
config.MaxBackoffDuration = 5 * time.Second
config.DialTimeout = 10 * time.Second
consumer, _ := nsq.NewConsumer(topicName, "ch", config)


handler := &ComsumerHandler{
}
consumer.AddHandler(handler)

err := consumer.ConnectToNSQLookupds(adds)
if nil != err {
fmt.Println("err", err)
return
}
select{}

}()
waiter.Wait()
}

启动后会看到连接日志:

1
2
3
4
2020/01/16 10:39:34 INF    1 [nsq_demo/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=nsq_demo
2020/01/16 10:39:34 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4130) connecting to nsqd
2020/01/16 10:39:34 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4150) connecting to nsqd
2020/01/16 10:39:35 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4140) connecting to nsqd

小结

由于NSQproducer是直接连接NSQD,如果直接连接单节点NSQD,那么对于producer来说会存在单点问题,据我所知NSQ不支持副本机制,所以对于topic来说也存在单点问题,一般解决单点问题的方式就是冗余,那么要解决这些问题,简单点的方式可能就是自己在producer来维护一个NSQD的服务器列表信息,自己检测其健康状态,为了避免topic的单点问题的话,我们可以往多个NSQD发送消息。由于NSQ支持Comsumer连接nsqlookupd来避免单点问题,如果在使用中存在消息发往多个NSQD,那么Comsumer要确保幂等消费。

推荐阅读

我们也知道了NSQ的一些弊端,有些公司也有了解决方案,我们可以去借鉴他们的处理方式。
How we redesign the NSQ-NSQ重塑之客户端
How we redesigned the NSQ - NSQ重塑之详细设计
https://nsq.io/deployment/topology_patterns.html

文中如有理解错误之处还请指正。

作者

eyiadmin

发布于

2020-01-16

更新于

2024-05-31

许可协议

评论