Core Concepts
This page covers every core type in goflux and the rules that govern how they interact.
Message[T]
Message[T] is the unit passed to every handler. It carries the routing key, a fully decoded payload, optional headers, and acknowledgment controls:
type Message[T any] struct {
Subject string `json:"subject"`
Payload T `json:"payload"`
Header Header `json:"header,omitempty"`
// unexported: acker Acker
}Use the constructors to create messages:
msg := goflux.NewMessage("orders.created", OrderEvent{ID: "42", Total: 99.95})
// With headers:
h := goflux.Header{"X-Tenant": {"acme"}}
msg := goflux.NewMessageWithHeader("orders.created", event, h)Messages expose acknowledgment methods -- Ack(), Nak(), NakWithDelay(d), and Term(). These are no-ops on transports that do not support acknowledgments (channels, NATS core). See Acker below.
Handler[T]
A handler is the callback that processes incoming messages:
type Handler[T any] func(ctx context.Context, msg Message[T]) error- Returning
nilsignals success. - Returning a non-nil error signals failure. The exact consequence is transport-specific: fire-and-forget transports log and move on; JetStream transports can nak and redeliver.
handler := func(ctx context.Context, msg goflux.Message[OrderEvent]) error {
if msg.Payload.Total <= 0 {
return fmt.Errorf("invalid order total: %f", msg.Payload.Total)
}
return processOrder(ctx, msg.Payload)
}Publisher[T]
A publisher sends messages to a subject:
type Publisher[T any] interface {
Publish(ctx context.Context, subject string, v T) error
Close() error
}The subject is specified at the call site, not at construction time. Publish serializes v via the transport's codec (if any) and delivers it:
pub := gofluxnats.NewPublisher(conn, codec)
err := pub.Publish(ctx, "orders.created", OrderEvent{ID: "42"})Subscriber[T]
A subscriber listens on a subject and dispatches decoded messages to a handler:
type Subscriber[T any] interface {
Subscribe(ctx context.Context, subject string, handler Handler[T]) error
Close() error
}WARNING
Subscribe blocks until the context is cancelled or the transport encounters a fatal error. Always run it in a goroutine.
sub := gofluxnats.NewSubscriber(conn, codec)
go func() {
if err := sub.Subscribe(ctx, "orders.created", handler); err != nil {
slog.Error("subscriber exited", "error", err)
}
}()Consumer[T]
Consumer[T] is the pull-based counterpart to Subscriber[T]. Instead of pushing messages to a handler, the caller fetches batches at its own pace:
type Consumer[T any] interface {
Fetch(ctx context.Context, n int) ([]Message[T], error)
Close() error
}Fetch blocks until at least one message is available or the context is cancelled. Each fetched message must be explicitly acknowledged:
consumer := jetstream.NewConsumer[OrderEvent](js, codec, consumerConfig)
msgs, err := consumer.Fetch(ctx, 10)
if err != nil {
return err
}
for _, msg := range msgs {
if err := processOrder(ctx, msg.Payload); err != nil {
_ = msg.Nak()
continue
}
_ = msg.Ack()
}Requester[Req, Resp] and Responder[Req, Resp]
Request-reply uses two paired interfaces:
type Requester[Req, Resp any] interface {
Request(ctx context.Context, subject string, req Req) (Resp, error)
Close() error
}
type RequestHandler[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)
type Responder[Req, Resp any] interface {
Serve(ctx context.Context, subject string, handler RequestHandler[Req, Resp]) error
Close() error
}The requester sends a typed request and blocks until a typed response arrives. The responder processes incoming requests and returns responses. Serve blocks like Subscribe:
// Responder side
responder := gofluxnats.NewResponder[GetOrderReq, GetOrderResp](conn, reqCodec, respCodec)
go func() {
_ = responder.Serve(ctx, "orders.get", func(ctx context.Context, req GetOrderReq) (GetOrderResp, error) {
order, err := db.FindOrder(ctx, req.OrderID)
if err != nil {
return GetOrderResp{}, err
}
return GetOrderResp{Order: order}, nil
})
}()
// Requester side
requester := gofluxnats.NewRequester[GetOrderReq, GetOrderResp](conn, reqCodec, respCodec)
resp, err := requester.Request(ctx, "orders.get", GetOrderReq{OrderID: "42"})Header
Header carries message metadata -- trace context, message IDs, custom key-value pairs. It follows http.Header semantics: keys are case-sensitive, values are string slices.
type Header map[string][]stringh := make(goflux.Header)
h.Set("X-Tenant", "acme")
h.Add("X-Tag", "priority")
h.Add("X-Tag", "express")
tenant := h.Get("X-Tenant") // "acme"
h.Del("X-Tag")
copy := h.Clone() // deep copyTransports propagate headers automatically. NATS maps them to NATS headers; HTTP maps them to HTTP headers.
Acker
Acker is the minimal acknowledgment interface for transports that support at-least-once delivery:
type Acker interface {
Ack() error
Nak() error
}Two extended interfaces add more control:
// Redeliver after a delay.
type DelayedNaker interface {
Acker
NakWithDelay(d time.Duration) error
}
// Permanently reject -- the message will not be redelivered.
type Terminator interface {
Acker
Term() error
}Message[T] exposes these through convenience methods that gracefully degrade:
| Method | Has Acker | Has DelayedNaker | Has Terminator |
|---|---|---|---|
msg.Ack() | calls Ack() | calls Ack() | calls Ack() |
msg.Nak() | calls Nak() | calls Nak() | calls Nak() |
msg.NakWithDelay(d) | falls back to Nak() | calls NakWithDelay(d) | falls back to Nak() |
msg.Term() | falls back to Ack() | falls back to Ack() | calls Term() |
If the message has no acker at all (fire-and-forget transports), every method is a no-op.
Use msg.HasAcker() to check at runtime whether acknowledgment is available.
AutoAck Middleware
For handlers that should always ack on success and nak on error, use the AutoAck middleware instead of manual calls:
wrapped := goflux.AutoAck[OrderEvent]()(handler)Topic[T]
Topic[T] bundles a Publisher[T] and Subscriber[T] for services that need both:
type Topic[T any] struct {
Publisher[T]
Subscriber[T]
}topic := goflux.Topic[OrderEvent]{
Publisher: pub,
Subscriber: sub,
}
// Publish through the topic.
_ = topic.Publish(ctx, "orders", event)
// Subscribe through the topic.
go func() {
_ = topic.Subscribe(ctx, "orders", handler)
}()BoundPublisher[T]
BoundPublisher[T] wraps a Publisher[T] with a fixed subject, removing the subject parameter from Publish:
pub := gofluxnats.NewPublisher(conn, codec)
bound := goflux.Bind(pub, "orders.created")
// No subject argument needed.
err := bound.Publish(ctx, OrderEvent{ID: "42"})Note that BoundPublisher does not implement Publisher[T] -- its Publish method has a different signature (no subject parameter).
Middleware[T]
Middleware wraps a handler to add cross-cutting behavior:
type Middleware[T any] func(Handler[T]) Handler[T]Compose multiple middleware with Chain. The first middleware in the list is the outermost wrapper:
wrapped := goflux.Chain[OrderEvent](
goflux.Process[OrderEvent](10), // concurrency limit
goflux.Throttle[OrderEvent](time.Second), // rate limit
goflux.AutoAck[OrderEvent](), // auto-ack on success
)(handler)See Middleware for the full list: Process, Peek, Distinct, Skip, Take, Throttle, AutoAck.
Key Design Rules
Rules to Remember
- Subscribe and Serve block. Always run them in a goroutine.
- Caller owns connections. Transport constructors for NATS, JetStream, and HTTP accept an existing connection. The caller connects and closes.
- Close semantics vary.
Close()onFanOut,FanIn,RoundRobin, and channel types is a no-op. NATS and JetStream transports callconn.Drain(). Check the transport documentation. - No raw bytes in handlers.
Message[T]always carries the fully decoded payload. Decoding happens at the transport boundary. - Codecs are stateless. Share them freely across publishers and subscribers.
- Ack methods degrade gracefully. Calling
Ack()on a fire-and-forget message is a no-op, not a panic.
What's Next
- Fire & Forget -- the simplest messaging pattern
- At-Least-Once -- acknowledgment-based delivery with JetStream
- Transports -- choose and configure a transport
- Middleware -- wrap handlers with cross-cutting behavior
- Pipeline -- wire subscribers to publishers with filtering and transformation
