Pub/Sub Pattern

Please Note: A working example of the Pub/Sub pattern shown here is available in the Frisbee Examples Repository

Setting Up Frisbee

The first step when designing a protocol with Frisbee is to list out the various message types that you'll need. This is often something that needs to be consistent between the server and client so we recommend having a central configuration for this.

Next, we need to define our routing logic. In this example we will be using a Broker for handling the routing logic for the published messages.

In this simplified example, we will only have two message types: PUB and SUB. In a more complex system we would recommend having authorization messages and reconnection logic to handle efficient message routing. Frisbee requires the use of a uint16 as the message type, and we will use the iota package to keep things simple.

golang

package main

const (
  PUB = uint16(iota)
  SUB
)

Broker

Our broker will keep track of the publishers and subscribers for a given topic, and we can route messages based on the Routing field of a Frisbee Message object. The simplest way of implementing this is by using a Golang map, where the key is of type uint32 and the value is an array of *frisbee.Conn objects. With this, we can easily send published messages to multiple subscribers by simply iterating over the array.

golang

var subscribers = make(map[uint32][]*frisbee.Conn)

One thing we need to note is that we will somehow need to handle converting arbitrary topics to uint32. Thankfully, Golang provides an extremely performant method for doing this via the hash/crc32 package, which takes an arbitrary slice of bytes and converts it to a uint32.

Handling subscriptions

We are now ready to set up our subscription handling function. Later, we will tell Frisbee to call this function whenever a message of type SUB arrives, but for now let's define how the function will work.

golang

func handleSub(c *frisbee.Conn, incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.ContentLength > 0 {
        log.Printf("Server Received SUB on topic %s from %s", string(incomingContent), c.RemoteAddr())
        checksum := crc32.ChecksumIEEE(incomingContent)
        subscribers[checksum] = append(subscribers[checksum], c)
    }
    return
}

As you can see above, all Frisbee functions have the same function signature, and within our handleSub function we're first checking to make sure that the message has content (otherwise we discard it), then we compute the uint32 checksum of the subscription topic, and add the connection that sent the PUB message to the proper array in the subscribers map.

Next we can take a look at how to handle published messages.

Handling published messages

Routing PUB messages to the correct location is just as easy as setting up the logic for the SUB message types:

golang

func handlePub(_ *frisbee.Conn, incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.ContentLength > 0 {
        log.Printf("Server Received PUB on hashed topic %d with content %s", incomingMessage.Routing, string(incomingContent))
        if connections := subscribers[incomingMessage.Routing]; connections != nil {
            for _, c := range connections {
                _ = c.Write(&frisbee.Message{
                    Id:            0,
                    Operation:     PUB,
                    Routing:       incomingMessage.Routing,
                    ContentLength: incomingMessage.ContentLength,
                }, &incomingContent)
            }
        }
    }
    return
}

Again, our function follows the standard Frisbee function signature, and it begins by making sure our message has a length greater than 0. Next, we simply loop through the correct array in our subscribers map, and send a message with the PUB message contents to it.

Initializing the Broker

All that's left now is to start the Frisbee server and register our functions and message types:

golang

func main() {
    router := make(frisbee.ServerRouter)
    router[SUB] = handleSub
    router[PUB] = handlePub
    exit := make(chan os.Signal)
    signal.Notify(exit, os.Interrupt)

    s := frisbee.NewServer(":8192", router)
    _ = s.Start()

    <-exit
    err := s.Shutdown()
    if err != nil {
        panic(err)
    }
}

Notice that we're using the frisbee.ServerRouter type, which functions like a map. We simply register our handleSub and handlePub functions with their corresponding message types, and let frisbee do everything else.

Final Broker

Putting it all together, our final broker code looks like this:

golang

package main

import (
    "github.com/loophole-labs/frisbee"
    "github.com/rs/zerolog/log"
    "hash/crc32"
    "os"
    "os/signal"
)

const (
  PUB = uint16(iota)
  SUB
)

func handleSub(c *frisbee.Conn, incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.ContentLength > 0 {
        log.Printf("Server Received SUB on topic %s from %s", string(incomingContent), c.RemoteAddr())
        checksum := crc32.ChecksumIEEE(incomingContent)
        subscribers[checksum] = append(subscribers[checksum], c)
    }
    return
}

func handlePub(_ *frisbee.Conn, incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.ContentLength > 0 {
        log.Printf("Server Received PUB on hashed topic %d with content %s", incomingMessage.Routing, string(incomingContent))
        if connections := subscribers[incomingMessage.Routing]; connections != nil {
            for _, c := range connections {
                _ = c.Write(&frisbee.Message{
                    Id:            0,
                    Operation:     PUB,
                    Routing:       incomingMessage.Routing,
                    ContentLength: incomingMessage.ContentLength,
                }, &incomingContent)
            }
        }
    }
    return
}

func main() {
    router := make(frisbee.ServerRouter)
    router[SUB] = handleSub
    router[PUB] = handlePub
    exit := make(chan os.Signal)
    signal.Notify(exit, os.Interrupt)

    s := frisbee.NewServer(":8192", router)
    _ = s.Start()

    <-exit
    err := s.Shutdown()
    if err != nil {
        panic(err)
    }
}

Subscriber

Our simplified Subscriber will need to first send a SUB message type to the broker, which will tell the broker to forward messages matching its topic, and then it will need a function for handling those messages. In our case we will just print out the contents of the message.

Handling incoming messages

Our handlePub function, then, will simply need to check that the Routing field of the incoming PUB message matches the topic it's subscribed to (which isn't actually necessary since we know our broker will never forward invalid packets to our subscriber, but this is good practice anyways), and then it can just print out the contents of the message.

golang

func handlePub(incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.Routing == topicHash {
        log.Printf("Client Received Message on Topic %s: %s", string(topic), string(incomingContent))
    }
    return
}

Initializing the Subscriber

Initializing the subscriber is done by first registering our PUB handling logic and connecting to the broker (in this case a Frisbee server running at 127.0.0.1:8192), and then sending a SUB message to the server to register ourselves in the subscribers map.

golang

package main

import (
    "github.com/loophole-labs/frisbee"
    "github.com/rs/zerolog/log"
    "hash/crc32"
    "os"
    "os/signal"
)

const PUB = uint16(1)
const SUB = uint16(2)

var topic = []byte("TOPIC 1")
var topicHash = crc32.ChecksumIEEE(topic)

// Handle the PUB message type
func handlePub(incomingMessage frisbee.Message, incomingContent []byte) (*frisbee.Message, []byte, frisbee.Action) {
    if incomingMessage.Routing == topicHash {
        log.Printf("Client Received Message on Topic %s: %s", string(topic), string(incomingContent))
    }
    return
}

func main() {
    router := make(frisbee.ClientRouter)
    router[PUB] = handlePub
    exit := make(chan os.Signal)
    signal.Notify(exit, os.Interrupt)

    c := frisbee.NewClient("127.0.0.1:8192", router)
    err := c.Connect()
    if err != nil {
        panic(err)
    }

    i := 0

    // First subscribe to the topic
    err = c.Write(&frisbee.Message{
        Id:            uint32(i),
        Operation:     SUB,
        Routing:       0,
        ContentLength: uint32(len(topic)),
    }, &topic)
    if err != nil {
        panic(err)
    }

    // Now the handle pub function will be called
    // automatically whenever a message that matches the topic arrives

    <-exit
    err = c.Close()
    if err != nil {
        panic(err)
    }
}

Publisher

Now let's look at the logic for the Publisher. This is extremely simple, as the only thing our Publisher will do is send messages in an infinite loop to the Broker.

golang

i := 0
for {
  message := []byte(fmt.Sprintf("PUBLISHED MESSAGE: %d", i))
  err := c.Write(&frisbee.Message{
    Id:            uint32(i),
    Operation:     PUB,
    Routing:       topicHash,
    ContentLength: uint32(len(message)),
  }, &message)
  if err != nil {
    panic(err)
  }
  i++
  time.Sleep(time.Second)
}

As we can see, this will loop forever, incrementing i and sending the message to the broker.

Initializing the Publisher

The publisher can now be initialized by starting the frisbee client, and running our messaging loop in its own goroutine.

golang

package main

import (
    "fmt"
    "github.com/loophole-labs/frisbee"
    "hash/crc32"
    "os"
    "os/signal"
    "time"
)

const PUB = uint16(iota)

var topic = []byte("TOPIC 1")
var topicHash = crc32.ChecksumIEEE(topic)

func main() {
    router := make(frisbee.ClientRouter)
    exit := make(chan os.Signal)
    signal.Notify(exit, os.Interrupt)

    c := frisbee.NewClient("127.0.0.1:8192", router)
    err := c.Connect()
    if err != nil {
        panic(err)
    }

    go func() {
        i := 0
        for {
            message := []byte(fmt.Sprintf("PUBLISHED MESSAGE: %d", i))
            err := c.Write(&frisbee.Message{
                Id:            uint32(i),
                Operation:     PUB,
                Routing:       topicHash,
                ContentLength: uint32(len(message)),
            }, &message)
            if err != nil {
                panic(err)
            }
            i++
            time.Sleep(time.Second)
        }
    }()

    <-exit
    err = c.Close()
    if err != nil {
        panic(err)
    }
}

As we can see, we're computing the crc32 hash of the topic for this Publisher ("Topic 1"), and sending a message on that topic every second.

Last Updated: 2021-06-08