Skip to content

Getting Started

Prerequisites

  • Go 1.22 or later

Installation

Install the core library:

sh
go get github.com/foomo/goflux

For a specific transport, add its submodule:

sh
go get github.com/foomo/goflux/transport/channel   # in-process channels
go get github.com/foomo/goflux/transport/nats       # NATS core
go get github.com/foomo/goflux/transport/jetstream  # NATS JetStream
go get github.com/foomo/goflux/transport/http       # HTTP POST

Fire-and-Forget with Channels

The channel transport has zero external dependencies -- no broker, no network. It is a good starting point and useful for tests.

go
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/foomo/goflux"
	"github.com/foomo/goflux/transport/channel"
)

type OrderEvent struct {
	OrderID string
	Status  string
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create the in-process bus, publisher, and subscriber.
	bus := channel.NewBus[OrderEvent]()
	pub := channel.NewPublisher(bus)
	sub, err := channel.NewSubscriber(bus, 8) // bufSize controls backpressure
	if err != nil {
		panic(err)
	}

	// Define a handler -- same signature regardless of transport.
	handler := func(ctx context.Context, msg goflux.Message[OrderEvent]) error {
		fmt.Printf("received: subject=%s order=%s status=%s\n",
			msg.Subject, msg.Payload.OrderID, msg.Payload.Status)
		return nil
	}

	// Subscribe blocks, so run it in a goroutine.
	done := make(chan struct{})
	go func() {
		defer close(done)
		_ = sub.Subscribe(ctx, "orders", handler)
	}()

	// Give the subscriber time to register.
	time.Sleep(10 * time.Millisecond)

	// Publish a message.
	if err := pub.Publish(ctx, "orders", OrderEvent{
		OrderID: "ORD-42",
		Status:  "confirmed",
	}); err != nil {
		panic(err)
	}

	cancel()
	<-done
	// Output: received: subject=orders order=ORD-42 status=confirmed
}

The handler is a goflux.Handler[OrderEvent] -- a function with the signature func(ctx context.Context, msg goflux.Message[T]) error. This signature is the same for every transport.

Swapping to NATS

The handler does not change. Only the import and constructor differ:

go
package main

import (
	"context"
	"fmt"

	"github.com/foomo/goflux"
	gofluxnats "github.com/foomo/goflux/transport/nats"
	jsoncodec "github.com/foomo/goencode/json/v1"
	"github.com/nats-io/nats.go"
)

type OrderEvent struct {
	OrderID string `json:"order_id"`
	Status  string `json:"status"`
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Connect to NATS -- caller owns the connection.
	conn, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		panic(err)
	}
	defer conn.Drain()

	// Create publisher and subscriber with a JSON codec.
	codec := jsoncodec.New[OrderEvent]()
	pub := gofluxnats.NewPublisher(conn, codec)
	sub := gofluxnats.NewSubscriber(conn, codec)

	// Same handler as before -- no transport-specific code.
	handler := func(ctx context.Context, msg goflux.Message[OrderEvent]) error {
		fmt.Printf("received: %s %s\n", msg.Payload.OrderID, msg.Payload.Status)
		return nil
	}

	go func() {
		_ = sub.Subscribe(ctx, "orders", handler)
	}()

	_ = pub.Publish(ctx, "orders", OrderEvent{OrderID: "ORD-42", Status: "confirmed"})
}

Network transports require a goencode.Codec[T] for serialization. Codecs are stateless and safe for concurrent use. The goencode library provides codecs for JSON, Protocol Buffers, and other formats.

Other Transports

  • JetStream -- NATS JetStream with ack/nak/term, pull consumers, and durable subscriptions. Use when you need at-least-once delivery.
  • HTTP -- HTTP POST publisher and http.ServeMux-based subscriber. Also supports request-reply.

What's Next