go-nsq分布式实时消息平台用法

NSQ是一个基于Go语言的分布式实时消息平台,可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。

NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。支持主流库(Go、Python、nodejs、Java、C see: Client Libraries )

NSQ是由四个重要组件构成:

快速启动NSQ

brew install nsq

启动拓扑发现 nsqlookupd

启动主服务、并注册 nsqd --lookupd-tcp-address=127.0.0.1:4160

启动WEB UI管理程序 nsqadmin --lookupd-http-address=127.0.0.1:4161

简单使用演示

可以用浏览器访问 http://127.0.0.1:4171/ 观察数据

也可尝试下 watch -n 0.5 "curl -s http://127.0.0.1:4151/stats" 监控统计数据

发布一个消息 curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'

创建一个消费者 nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

再发布几个消息

curl -d 'hello world 2' 'http://127.0.0.1:4151/put?topic=test'
curl -d 'hello world 3' 'http://127.0.0.1:4151/put?topic=test'

Golang使用NSQ

package main

import (
	"log"
	"time"

	"github.com/nsqio/go-nsq"
)

func main() {
	go startConsumer()
	startProducer()
}

// 生产者
func startProducer() {
	cfg := nsq.NewConfig()

	producer, err := nsq.NewProducer("127.0.0.1:4150", cfg)
	if err != nil {
		log.Fatal(err)
	}

	// 发布消息
	for {
		if err := producer.Publish("test", []byte("test message")); err != nil {
			log.Fatal("publish error: " + err.Error())
		}

		time.Sleep(1 * time.Second)
	}
}

// 消费者
func startConsumer() {
	cfg := nsq.NewConfig()

	consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
	if err != nil {
		log.Fatal(err)
	}

	// 设置消息处理函数
	consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		log.Println(string(message.Body))
		return nil
	}))

	// 连接到单例nsqd
	if err := consumer.ConnectToNSQD("127.0.0.1:4150"); err != nil {
		log.Fatal(err)
	}

	<-consumer.StopChan
}

参考: