跳到主要内容

nsqlookup 是什么

nsdlookupd 是什么

nsqlookupd 是 NSQ(一种实时分布式消息传递平台)中的一个组件,用于服务发现和元数据管理。

nsqlookupd 包含两个端口,分别是 HTTP 端口和 TCP 端口,它们的区别如下:

  1. HTTP 端口:默认端口为 4161。该端口是 nsqlookupd 的管理接口,用于提供 HTTP API。通过这个接口,可以查询集群中的主题(topics)、通道(channels)、生产者(producers)和消费者(consumers)的信息。可以通过 HTTP 请求获取和修改这些元数据,例如创建或删除主题,查看当前的节点列表等。

  2. TCP 端口:默认端口为 4160。该端口是 nsqlookupd 的客户端接口,用于提供消息路由服务。生产者和消费者通过连接到 nsqlookupd 的 TCP 端口,将自己注册到集群中,以便进行消息的发布和订阅。nsqlookupd 会根据当前集群的状态,将消息的路由信息返回给客户端,从而实现消息的可靠传递和负载均衡。

总结来说,HTTP 端口主要用于管理和查询集群的元数据,而 TCP 端口用于处理消息的路由和分发。两个端口提供了不同的功能,共同协作使得 NSQ 集群能够高效地处理消息。

连接 nsqd 的方式

这里有 2 种消费者的写法,第一种是直连 nsqd(tcp长连接),第二种是通过 nsqlookupd 的 http 接口查询后长连接到 nsqd, 显然第二种更易于分布式容错和高可用。以下分别介绍两种的使用方式

nsqd 直连

package main

import (
"flag"
"log"
"time"

"github.com/nsqio/go-nsq"
)

func main() {
go startConsumer()
startProducer()
}

var url string

func init() {
//具体ip,端口根据实际情况传入或者修改默认配置
flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd")
flag.Parse()
}

// 生产者
func startProducer() {
cfg := nsq.NewConfig()
producer, err := nsq.NewProducer(url, cfg)
if err != nil {
log.Fatal(err)
}
// 发布消息
for {
if err := producer.Publish("test", []byte("test message")); err != nil {
log.Fatal("publish error: " + err.Error())
}
time.Sleep(1 * time.Second)
}
}

// 消费者
func startConsumer() {
cfg := nsq.NewConfig()
consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
if err != nil {
log.Fatal(err)
}
// 设置消息处理函数
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Println(string(message.Body))
return nil
}))
// 连接到单例nsqd
if err := consumer.ConnectToNSQD(url); err != nil {
log.Fatal(err)
}
<-consumer.StopChan
}

nsqlookupd 查询后连接

package main

import (
"flag"
"log"
"time"

"github.com/nsqio/go-nsq"
)

func main() {
go startConsumer()
startProducer()
}

var url string
var url1 string

func init() {
//具体ip,端口根据实际情况传入或者修改默认配置
flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd") //tcp
flag.StringVar(&url1, "url1", "127.0.0.1:4161", "nsqlookupd") //http

flag.Parse()
}

// 生产者
func startProducer() {
cfg := nsq.NewConfig()
producer, err := nsq.NewProducer(url, cfg)
if err != nil {
log.Fatal(err)
}
// 发布消息
for {
if err := producer.Publish("test", []byte("test message")); err != nil {
log.Fatal("publish error: " + err.Error())
}
time.Sleep(1 * time.Second)
}
}

// 消费者
func startConsumer() {
cfg := nsq.NewConfig()
consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
if err != nil {
log.Fatal(err)
}
// 设置消息处理函数
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Println(string(message.Body))
return nil
}))
// nsqlookupd
//[]string
if err := consumer.ConnectToNSQLookupds([]string{url1}); err != nil {
log.Fatal(err)
}
<-consumer.StopChan
}

两种方式最大的区别除了函数不同

consumer.ConnectToNSQLookupds([]string{url1})

nsqlookupd 方式的参数传入的一个字符串的切片,这里要特别注意。

另外如果 nsqlookupd 的地址确定只有一个也可以使用

consumer.ConnectToNSQLookupd(url)

容器外访问

version: '3.9'

# 网络配置
networks:
backendNetwork:
driver: ${NETWORKS_DRIVER}

# 服务容器配置
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "${NSQ_LOOKUPD_PORT}:4160"
- "${NSQ_LOOKUPD_UI_PORT}:4161"
networks:
- backendNetwork
# nsqd 是接收、队列和传送消息到客户端的守护进程。
nsqd:
image: nsqio/nsq
command: >
/nsqd --lookupd-tcp-address=${NSQ_EXTERNAL_IP}:${NSQ_LOOKUPD_PORT}
--broadcast-address=${NSQ_EXTERNAL_IP}
--broadcast-tcp-port=${NSQD_TCP_PORT}
--broadcast-http-port=${NSQD_HTTP_PORT}
depends_on:
- nsqlookupd
volumes:
- ${NSQ_DATA01_PATH}:/data
ports:
- "${NSQD_TCP_PORT}:4150"
- "${NSQD_HTTP_PORT}:4151"
networks:
- backendNetwork
# nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)。
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=${NSQ_EXTERNAL_IP}:${NSQ_LOOKUPD_UI_PORT}
depends_on:
- nsqlookupd
ports:
- "${NSQ_ADMIN_PORT}:4171"
networks:
- backendNetwork

如果需要在容器外面访问,需要给 nsqd 传入参数,参考官方文档 https://nsq.io/components/nsqd.html