Pipeline Operators
Pipeline operators wire subscribers to publishers, composing Handler[T] and Publisher[T] into message-processing topologies. They handle filtering, transformation, fan-out, fan-in, and round-robin distribution.
Pipe
func Pipe[T any](pub Publisher[T], opts ...PipeOption[T]) Handler[T]Returns a Handler[T] that forwards every accepted message to pub, preserving the original subject. Filters run first; a dropped message never reaches the publisher. Publish errors are returned to the subscriber as-is.
// Forward all order events from sub to pub
err := sub.Subscribe(ctx, "orders.>", goflux.Pipe[OrderEvent](pub))PipeMap
func PipeMap[T, U any](pub Publisher[U], mapFn MapFunc[T, U], opts ...PipeOption[T]) Handler[T]Type-transforming pipe. Maps each Message[T] to a Message[U] before publishing. Filters run on T before the map. A map error routes the original Message[T] to the dead-letter handler (if set) and drops the message -- map errors are non-fatal to the subscriber.
mapFn := func(ctx context.Context, msg goflux.Message[OrderEvent]) (goflux.Message[Invoice], error) {
inv := Invoice{OrderID: msg.Payload.ID, Amount: msg.Payload.Total}
return goflux.NewMessage(msg.Subject, inv), nil
}
err := sub.Subscribe(ctx, "orders.created", goflux.PipeMap[OrderEvent, Invoice](
invoicePub,
mapFn,
))Options
WithFilter
func WithFilter[T any](f Filter[T]) PipeOption[T]Registers a filter that runs before publish (or before map in PipeMap). Messages for which the filter returns false or an error are silently dropped and logged.
type Filter[T any] func(ctx context.Context, msg Message[T]) (bool, error)Multiple filters can be stacked -- they evaluate in order and short-circuit on the first false or error.
WithDeadLetter
func WithDeadLetter[T any](fn DeadLetterFunc[T]) PipeOption[T]Registers a dead-letter handler called when MapFunc returns an error or when the publisher fails. The original Message[T] and the terminal error are passed to the handler.
type DeadLetterFunc[T any] func(ctx context.Context, msg Message[T], err error)Combined Example
handler := goflux.PipeMap[RawEvent, CleanEvent](
cleanPub,
transformFn,
goflux.WithFilter[RawEvent](func(ctx context.Context, msg goflux.Message[RawEvent]) (bool, error) {
return msg.Payload.Valid, nil
}),
goflux.WithDeadLetter[RawEvent](func(ctx context.Context, msg goflux.Message[RawEvent], err error) {
slog.ErrorContext(ctx, "dead letter",
slog.String("subject", msg.Subject),
slog.Any("error", err),
)
}),
)
err := sub.Subscribe(ctx, "events.raw", handler)FanOut
func FanOut[T any](publishers []Publisher[T], opts ...FanOutOption[T]) Publisher[T]Returns a Publisher[T] that broadcasts every Publish call to all inner publishers. By default, errors from individual publishers are joined via errors.Join (best-effort). With WithFanOutAllOrNothing, the first error short-circuits.
broadcast := goflux.FanOut[Event]([]goflux.Publisher[Event]{pubA, pubB, pubC})
err := broadcast.Publish(ctx, "events.order", event)With all-or-nothing semantics:
broadcast := goflux.FanOut[Event](
[]goflux.Publisher[Event]{pubA, pubB},
goflux.WithFanOutAllOrNothing[Event](),
)FanIn
func FanIn[T any](subscribers ...Subscriber[T]) Subscriber[T]Returns a Subscriber[T] that subscribes to the same subject on all provided subscribers and dispatches every message to a single handler. Subscribe blocks until all inner subscriptions complete.
merged := goflux.FanIn[Event](subNATS, subHTTP)
err := merged.Subscribe(ctx, "events.>", handler)RoundRobin
func RoundRobin[T any](publishers ...Publisher[T]) Publisher[T]Returns a Publisher[T] that distributes each Publish call to a single inner publisher, cycling through them via an atomic counter.
loadBalanced := goflux.RoundRobin[Event](pub1, pub2, pub3)
// First call goes to pub1, second to pub2, third to pub3, fourth to pub1, ...
err := loadBalanced.Publish(ctx, "events.order", event)BoundPublisher
func Bind[T any](pub Publisher[T], subject string) *BoundPublisher[T]Wraps a Publisher[T] with a fixed subject. Useful when a component always publishes to the same destination and the subject should not leak into business logic.
orderPub := goflux.Bind(pub, "orders.created")
// No subject argument needed
err := orderPub.Publish(ctx, event)BoundPublisher exposes Publish(ctx, v) (no subject parameter) and delegates Close() to the underlying publisher.
ToChan
func ToChan[T any](ctx context.Context, sub Subscriber[T], subject string, bufSize int) <-chan TBridges a Subscriber[T] into a plain Go channel. Launches Subscribe in a goroutine and forwards each message payload into a buffered channel. The returned channel closes when ctx is cancelled.
ch := goflux.ToChan(ctx, sub, "orders.>", 16)
for event := range ch {
fmt.Println("order:", event.ID)
}Ownership
Close() on FanOut, FanIn, and RoundRobin is a no-op. The caller owns the inner publishers and subscribers and is responsible for closing them.
