go-nsq分布式实时消息平台用法

NSQ是一个基于Go语言的分布式实时消息平台,可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。

NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。支持主流库(Go、Python、nodejs、Java、C see: Client Libraries )

NSQ是由四个重要组件构成:

  • nsqd:一个负责接收、排队、转发消息到客户端的守护进程,它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的
  • nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
  • utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

快速启动NSQ

brew install nsq

启动拓扑发现 nsqlookupd

启动主服务、并注册 nsqd --lookupd-tcp-address=127.0.0.1:4160

启动WEB UI管理程序 nsqadmin --lookupd-http-address=127.0.0.1:4161

简单使用演示

可以用浏览器访问 http://127.0.0.1:4171/ 观察数据

也可尝试下 watch -n 0.5 "curl -s http://127.0.0.1:4151/stats" 监控统计数据

发布一个消息 curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'

创建一个消费者 nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

再发布几个消息

1
2
curl -d 'hello world 2' 'http://127.0.0.1:4151/put?topic=test'
curl -d 'hello world 3' 'http://127.0.0.1:4151/put?topic=test'

Golang使用NSQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"log"
"time"

"github.com/nsqio/go-nsq"
)

func main() {
go startConsumer()
startProducer()
}

// 生产者
func startProducer() {
cfg := nsq.NewConfig()

producer, err := nsq.NewProducer("127.0.0.1:4150", cfg)
if err != nil {
log.Fatal(err)
}

// 发布消息
for {
if err := producer.Publish("test", []byte("test message")); err != nil {
log.Fatal("publish error: " + err.Error())
}

time.Sleep(1 * time.Second)
}
}

// 消费者
func startConsumer() {
cfg := nsq.NewConfig()

consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
if err != nil {
log.Fatal(err)
}

// 设置消息处理函数
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Println(string(message.Body))
return nil
}))

// 连接到单例nsqd
if err := consumer.ConnectToNSQD("127.0.0.1:4150"); err != nil {
log.Fatal(err)
}

<-consumer.StopChan
}

参考: