Seven Story Rabbit Hole

Sometimes awesome things happen in deep rabbit holes. Or not.

   images

An Example of Using NSQ From Go

NSQ is a message queue, similar to RabbitMQ. I decided I’d give it a whirl.

Install Nsq

1
2
3
$ wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.2.31.darwin-amd64.go1.3.1.tar.gz
$ tar xvfz nsq-0.2.31.darwin-amd64.go1.3.1.tar.gz
$ sudo mv nsq-0.2.31.darwin-amd64.go1.3.1/bin/* /usr/local/bin

Launch Nsq

1
2
3
$ nsqlookupd & 
$ nsqd --lookupd-tcp-address=127.0.0.1:4160 &
$ nsqadmin --lookupd-http-address=127.0.0.1:4161 &

Get Go client library

1
$ go get -u -v github.com/bitly/go-nsq

Create a producer

Add the following code to main.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
  "log"
  "github.com/bitly/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  w, _ := nsq.NewProducer("127.0.0.1:4150", config)

  err := w.Publish("write_test", []byte("test"))
  if err != nil {
      log.Panic("Could not connect")
  }

  w.Stop()
}

and then run it with:

1
$ go run main.go

If you go to your NSQAdmin at http://localhost:4171, you should see a single message in the write_test topic.

NSQAdmin

Create a consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
  "log"
  "sync"

  "github.com/bitly/go-nsq"
)

func main() {

  wg := &sync.WaitGroup{}
  wg.Add(1)

  config := nsq.NewConfig()
  q, _ := nsq.NewConsumer("write_test", "ch", config)
  q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
      log.Printf("Got a message: %v", message)
      wg.Done()
      return nil
  }))
  err := q.ConnectToNSQD("127.0.0.1:4150")
  if err != nil {
      log.Panic("Could not connect")
  }
  wg.Wait()

}

and then run it with:

1
$ go run main.go

You should see output:

1
2
2014/11/12 08:37:29 INF    1 [write_test/ch] (127.0.0.1:4150) connecting to nsqd
2014/11/12 08:37:29 Got a message: &{[48 55 54 52 48 57 51 56 50 100 50 56 101 48 48 55] [116 101 115 116] 1415810020571836511 2 0xc208042118 0 0}

Congratulations! You just pushed a message through NSQ.

Enhanced consumer: use NSQLookupd

The above example hardcoded the ip of nsqd into the consumer code, which is not a best practice. A better way to go about it is to point the consumer at nsqlookupd, which will transparently connect to the appropriate nsqd that happens to be publishing that topic.

In our example, we only have a single nsqd, so it’s an extraneous lookup. But it’s good to get into the right habits early, especially if you are a habitual copy/paster.

The consumer example only needs a one-line change to get this enhancement:

1
err := q.ConnectToNSQLookupd("127.0.0.1:4161")

Which will connect to the HTTP port of nsqlookupd.

Comments