在前面,NSQ实现消息延迟执行中我简单的介绍了NSQ
的搭建和使用,在此,我们继续探究一下NSQ
的高可用性。毕竟开发的服务都希望7*24
小时都能正常使用,虽然不能保证100%的可用性,但是我们也希望无限趋近于100%。
NSQ数据持久化
NSQ是一个内存消息队列,默认情况下,消息是存储在内存中不会立刻持久化到磁盘,只有当消息数量达到MemQueueSize的默认阀值10000时会进行持久化。如果想确保所有传入消息都持久化到磁盘,可以设置–mem-queue-size为0,但是这样必定会影响NSQ的性能。
NSQ的可用性
单节点部署都会存在可用性风险,万一节点挂掉,会导致业务不可用。所以,我们要避免单节点。接下来,我就进行单机部署多节点的演示。
1.启动N(大于等于2)个nsqlookupd节点
首先我们需要保证我们的注册中心不是单节点。这里我在cmd启动两个nsqlookupd
节点:
1 2 3 4 5 6 7 8 9
| E:\tool\nsq-1.2.0\bin>nsqlookupd -http-address=":4161" -tcp-address=":4160" [nsqlookupd] 2020/01/16 09:38:09.605212 INFO: nsqlookupd v1.2.0 (built w/go1.12.9) [nsqlookupd] 2020/01/16 09:38:09.660066 INFO: HTTP: listening on [::]:4161 [nsqlookupd] 2020/01/16 09:38:09.660066 INFO: TCP: listening on [::]:4160
E:\tool\nsq-1.2.0\bin>nsqlookupd -http-address=":4171" -tcp-address=":4170" [nsqlookupd] 2020/01/16 09:38:51.065321 INFO: nsqlookupd v1.2.0 (built w/go1.12.9) [nsqlookupd] 2020/01/16 09:38:51.102226 INFO: HTTP: listening on [::]:4171 [nsqlookupd] 2020/01/16 09:38:51.102226 INFO: TCP: listening on [::]:4170
|
2.启动N(大于等于2)个nsqd节点
这里我在cmd启动三个nsqd
节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| E:\tool\nsq-1.2.0\bin>nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4150" -http-address=":4151" [nsqd] 2020/01/16 09:55:25.104651 INFO: nsqd v1.2.0 (built w/go1.12.9) [nsqd] 2020/01/16 09:55:25.138560 INFO: ID: 632 [nsqd] 2020/01/16 09:55:25.183440 INFO: TOPIC(nsq_producer): created [nsqd] 2020/01/16 09:55:25.183440 INFO: TOPIC(nsq_demo): created [nsqd] 2020/01/16 09:55:25.185434 INFO: TOPIC(nsq_demo): new channel(ch) [nsqd] 2020/01/16 09:55:25.185434 INFO: TOPIC(delay): created [nsqd] 2020/01/16 09:55:25.186431 INFO: TOPIC(delay): new channel(ch) [nsqd] 2020/01/16 09:55:25.186431 INFO: NSQ: persisting topic/channel metadata to D:/nsqdata1/nsqd.dat [nsqd] 2020/01/16 09:55:25.260526 INFO: TCP: listening on [::]:4150 [nsqd] 2020/01/16 09:55:25.260526 INFO: HTTP: listening on [::]:4151 [nsqd] 2020/01/16 09:55:25.260526 INFO: LOOKUP(0.0.0.0:4160): adding peer [nsqd] 2020/01/16 09:55:25.262521 INFO: LOOKUP connecting to 0.0.0.0:4160 [nsqd] 2020/01/16 09:55:25.269502 INFO: LOOKUPD(0.0.0.0:4160): peer info {TCPPort:4160 HTTPPort:4161 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004} [nsqd] 2020/01/16 09:55:25.269502 INFO: LOOKUPD(0.0.0.0:4160): REGISTER nsq_producer [nsqd] 2020/01/16 09:55:25.270500 INFO: LOOKUPD(0.0.0.0:4160): REGISTER nsq_demo ch [nsqd] 2020/01/16 09:55:25.273546 INFO: LOOKUPD(0.0.0.0:4160): REGISTER delay ch [nsqd] 2020/01/16 09:55:25.278485 INFO: LOOKUP(0.0.0.0:4170): adding peer [nsqd] 2020/01/16 09:55:25.278485 INFO: LOOKUP connecting to 0.0.0.0:4170 [nsqd] 2020/01/16 09:55:25.284462 INFO: LOOKUPD(0.0.0.0:4170): peer info {TCPPort:4170 HTTPPort:4171 Version:1.2.0 BroadcastAddress:2CNU7X5OLAUE004} [nsqd] 2020/01/16 09:55:25.284462 INFO: LOOKUPD(0.0.0.0:4170): REGISTER nsq_demo ch [nsqd] 2020/01/16 09:55:25.287454 INFO: LOOKUPD(0.0.0.0:4170): REGISTER delay ch [nsqd] 2020/01/16 09:55:25.291443 INFO: LOOKUPD(0.0.0.0:4170): REGISTER nsq_producer [nsqd] 2020/01/16 09:55:25.292444 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER nsq_producer [nsqd] 2020/01/16 09:55:25.294437 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER nsq_producer [nsqd] 2020/01/16 09:55:25.296430 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER nsq_demo [nsqd] 2020/01/16 09:55:25.296430 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER nsq_demo [nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4160): channel REGISTER nsq_demo ch [nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4170): channel REGISTER nsq_demo ch [nsqd] 2020/01/16 09:55:25.297427 INFO: LOOKUPD(0.0.0.0:4160): topic REGISTER delay [nsqd] 2020/01/16 09:55:25.298425 INFO: LOOKUPD(0.0.0.0:4170): topic REGISTER delay [nsqd] 2020/01/16 09:55:25.298425 INFO: LOOKUPD(0.0.0.0:4160): channel REGISTER delay ch [nsqd] 2020/01/16 09:55:25.299423 INFO: LOOKUPD(0.0.0.0:4170): channel REGISTER delay ch
|
其他NSQD
节点:
1 2
| nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4140" -http-address=":4141" nsqd --lookupd-tcp-address="0.0.0.0:4160" --lookupd-tcp-address="0.0.0.0:4170" -data-path="D:/nsqdata1" --mem-queue-size=0 -tcp-address=":4130" -http-address=":4131"
|
3.启动nsqadmin节点
nsqadmin只是监控,这里我暂时只部署一个节点:
1 2 3
| E:\tool\nsq-1.2.0\bin>nsqadmin --lookupd-http-address=0.0.0.0:4161 --lookupd-http-address=0.0.0.0:4171 -http-address=":4121" [nsqadmin] 2020/01/16 10:00:57.383466 INFO: nsqadmin v1.2.0 (built w/go1.12.9) [nsqadmin] 2020/01/16 10:00:57.420368 INFO: HTTP: listening on [::]:4121
|
最终效果:
代码示例
在之前,我们是直接连接的NSQD
这样做的话很多工作就需要客户端来处理。所以官方推荐我们连接NSQD
的注册中心nsqlookupd
。所以我们Comsumer
的代码需要调整一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package main
import ( "errors" "fmt" "github.com/nsqio/go-nsq" "sync" "time" )
type ComsumerHandler struct {
}
func (h *ComsumerHandler) HandleMessage(message *nsq.Message) error { if string(message.Body) == "TOBEFAILED" { return errors.New("fail this message") } fmt.Println("time",time.Now().Format("2020-01-14 15:04:05"),"receive", message.NSQDAddress, "message:", string(message.Body)) return nil }
func main() {
waiter := sync.WaitGroup{} waiter.Add(1)
go func() { defer waiter.Done()
config := nsq.NewConfig() topicName := "nsq_demo" adds := []string{"127.0.0.1:4161", "127.0.0.1:4171"} config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second consumer, _ := nsq.NewConsumer(topicName, "ch", config)
handler := &ComsumerHandler{ } consumer.AddHandler(handler)
err := consumer.ConnectToNSQLookupds(adds) if nil != err { fmt.Println("err", err) return } select{}
}() waiter.Wait() }
|
启动后会看到连接日志:
1 2 3 4
| 2020/01/16 10:39:34 INF 1 [nsq_demo/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=nsq_demo 2020/01/16 10:39:34 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4130) connecting to nsqd 2020/01/16 10:39:34 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4150) connecting to nsqd 2020/01/16 10:39:35 INF 1 [nsq_demo/ch] (2CNU7X5OLAUE004:4140) connecting to nsqd
|
小结
由于NSQ
的producer是直接连接NSQD
,如果直接连接单节点NSQD,那么对于producer
来说会存在单点问题,据我所知NSQ不支持副本机制,所以对于topic来说也存在单点问题,一般解决单点问题的方式就是冗余,那么要解决这些问题,简单点的方式可能就是自己在producer来维护一个NSQD
的服务器列表信息,自己检测其健康状态,为了避免topic的单点问题的话,我们可以往多个NSQD
发送消息。由于NSQ
支持Comsumer
连接nsqlookupd
来避免单点问题,如果在使用中存在消息发往多个NSQD
,那么Comsumer
要确保幂等消费。
推荐阅读
我们也知道了NSQ
的一些弊端,有些公司也有了解决方案,我们可以去借鉴他们的处理方式。
How we redesign the NSQ-NSQ重塑之客户端
How we redesigned the NSQ - NSQ重塑之详细设计
https://nsq.io/deployment/topology_patterns.html
文中如有理解错误之处还请指正。