Skip to content

API Reference

Quick reference for all goflux types, interfaces, and operators.

Transport Support Matrix

InterfaceChannelNATSJetStreamHTTP
Publisher[T]yesyesyesyes
Subscriber[T]yesyesyesyes
Requester[Req, Resp]-yes-yes
Responder[Req, Resp]-yes-yes

Messaging Patterns

PatternCore Interface(s)Transport(s)
Fire and ForgetPublisher + Subscriber, acker=nilchannel, nats
At-Least-Once (push)Publisher + Subscriber, auto-ackjetstream
At-Least-Once (pull)Publisher + Subscriber + WithManualAck, middleware ackjetstream
Stream Processingbridge.ToStream + goflow operatorsany
Manual Ack/Nak/TermSubscriber + WithManualAck()jetstream
Exactly-OncePublisher with Nats-Msg-Id headerjetstream
Request-ReplyRequester + Respondernats, http
Queue GroupsSubscriber + WithQueueGroupnats
Fan-outbridge.ToStream + goflow Tee / FanOutany
Fan-inbridge.ToStream + goflow FanInany

Core Types

Interfaces

go
// Publisher sends encoded messages to a nats/topic.
type Publisher[T any] interface {
    Publish(ctx context.Context, subject string, v T) error
    Close() error
}

// Subscriber listens on a nats and dispatches decoded messages to a Handler.
// Subscribe blocks until ctx is cancelled or a fatal error occurs.
type Subscriber[T any] interface {
    Subscribe(ctx context.Context, subject string, handler Handler[T]) error
    Close() error
}

// Requester sends a typed request and waits for a typed response.
type Requester[Req, Resp any] interface {
    Request(ctx context.Context, subject string, req Req) (Resp, error)
    Close() error
}

// Responder handles incoming requests and produces typed responses.
type Responder[Req, Resp any] interface {
    Serve(ctx context.Context, subject string, handler RequestHandler[Req, Resp]) error
    Close() error
}

Handler and RequestHandler

go
// Handler is the callback for Subscriber.Subscribe.
type Handler[T any] func(ctx context.Context, msg Message[T]) error

// RequestHandler processes a request and returns a response.
type RequestHandler[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)

Message[T]

go
type Message[T any] struct {
    Subject string
    Payload T
    Header  Header
}
MethodDescription
NewMessage(subject, payload)Create a message
NewMessageWithHeader(subject, payload, header)Create a message with header
msg.WithAcker(a)Attach an acker (transport use only)
msg.HasAcker()Check if acker is attached
msg.Ack()Acknowledge successful processing
msg.Nak()Signal processing failure (redelivery)
msg.NakWithDelay(d)Nak with redelivery delay hint
msg.Term()Terminal rejection (no redelivery)
go
type Header map[string][]string
MethodDescription
h.Get(key)First value for key, or ""
h.Set(key, value)Replace all values for key
h.Add(key, value)Append a value for key
h.Del(key)Remove key
h.Clone()Deep copy

Acknowledgment Interfaces

go
// Acker -- minimal acknowledgment (Ack + Nak).
type Acker interface {
    Ack() error
    Nak() error
}

// DelayedNaker -- extends Acker with delayed negative acknowledgment.
type DelayedNaker interface {
    Acker
    NakWithDelay(d time.Duration) error
}

// Terminator -- extends Acker with terminal rejection.
type Terminator interface {
    Acker
    Term() error
}

Convenience Types

TypeDescription
Topic[T]Bundles Publisher[T] + Subscriber[T]
BoundPublisher[T]Wraps Publisher[T] with a fixed subject
Middleware[T]func(Handler[T]) Handler[T]
PublisherMiddleware[T]func(Publisher[T]) Publisher[T]
pipe.Filter[T]func(ctx, msg Message[T]) bool
pipe.MapFunc[T, U]func(ctx, msg Message[T]) (U, error)
pipe.FlatMapFunc[T, U]func(ctx, msg Message[T]) ([]U, error)
pipe.DeadLetterFunc[T]func(ctx, msg Message[T], err error)

Context Helpers

FunctionDescription
WithMessageID(ctx, id)Attach business-level message ID
MessageID(ctx)Read message ID from context
WithHeader(ctx, h)Attach header to context (read by transports on Publish)
HeaderFromContext(ctx)Read header from context

Middleware

All middleware have the signature Middleware[T] = func(Handler[T]) Handler[T].

Built-in implementations live in the github.com/foomo/goflux/middleware package. The Middleware[T] type and Chain[T] remain in the root package.

MiddlewarePackageSignatureDescription
AutoAck[T]()middleware-> Middleware[T]Ack on nil error, Nak on non-nil
RetryAck[T](policy)middlewareRetryPolicy -> Middleware[T]Classify errors into ack/nak/term via policy
InjectMessageID[T]()middleware-> Middleware[T]Header message ID → context
InjectHeader[T]()middleware-> Middleware[T]Message header → context
ForwardMessageID[T]()middleware-> Middleware[T]Forward message ID from context through pipe stages
Chain[T](mws...)goflux...Middleware[T] -> Middleware[T]Compose left-to-right

Stream Processing Operators

For concurrency limiting, deduplication, throttling, skip/take, and peek, use goflow operators via bridge.ToStream.

See Middleware for details and examples.

Pipeline Operators

OperatorDescription
pipe.New[T](pub, opts...)Forward messages to a publisher
pipe.NewMap[T, U](pub, mapFn, opts...)Transform and forward
pipe.NewFlatMap[T, U](pub, fn, opts...)Expand and forward (1→N)
BindPublisher[T](pub, subject)Fix subject on a publisher
ToChan[T](ctx, sub, subject, bufSize)Bridge subscriber to <-chan Message[T]
bridge.ToStream[T](ctx, sub, subject, bufSize)Bridge subscriber to goflow.Stream[Message[T]]
bridge.FromStream[T](stream, pub)Consume goflow stream and publish each message
RetryPublisher[T](pub, maxAttempts, backoff)Wrap publisher with retry logic

Pipe Options

OptionDescription
pipe.WithFilter[T](f)Skip messages where filter returns false
pipe.WithDeadLetter[T](fn)Observe failed messages (does not swallow error)
pipe.WithMiddleware[T](mw...)Wrap pipe handler with middleware
pipe.WithMapFilter[T, U](f)Filter for map/flatmap pipes
pipe.WithMapDeadLetter[T, U](fn)Dead-letter observer for map/flatmap pipes
pipe.WithMapMiddleware[T, U](mw...)Middleware for map/flatmap pipes

See Pipeline Operators for details and examples.

Group Coordination

go
// Group coordinates the lifecycle of multiple message handlers.
// Fail-fast: first error cancels all sibling tasks.
type Group struct{ /* ... */ }
Function / MethodDescription
NewGroup(opts...)Create a lifecycle coordinator
WithGroupOptions(opts...)Pass gofuncy.GroupOption to the underlying group
g.Go(name, fn)Register a named blocking task
g.GoWithReady(name, fn)Register a task that signals readiness via ready()
g.Run(ctx)Start all tasks, block until done (fail-fast)
g.RunWithReady(ctx, onReady)Start all tasks, call onReady when all are ready
GroupSubscribe[T](g, name, sub, subject, handler, mws...)Register a Subscriber handler on the group
go
g := goflux.NewGroup()

goflux.GroupSubscribe(g, "orders", orderSub, "orders.new", orderHandler)

err := g.Run(ctx) // blocks, fail-fast on first error

RetryPublisher

go
func RetryPublisher[T any](pub Publisher[T], maxAttempts int, backoff BackoffFunc) Publisher[T]

type BackoffFunc func(attempt int) time.Duration

Wraps a Publisher[T] with retry logic. On publish failure, retries up to maxAttempts times with delays from backoff. Context cancellation aborts immediately.

go
pub := goflux.RetryPublisher[Event](innerPub, 3, func(attempt int) time.Duration {
    return time.Duration(attempt+1) * time.Second // linear backoff
})

Sentinel Errors

ErrorDescription
ErrPublishFailure in the publish path
ErrSubscribeFailure in the subscribe path
ErrEncodeSerialization failure
ErrDecodeDeserialization failure
ErrTransportTransport-level failure (network, protocol)

Transports join these with causal errors via errors.Join, enabling:

go
if errors.Is(err, goflux.ErrEncode) { /* handle codec problem */ }

Retry Policy (middleware package)

go
type RetryPolicy func(err error) RetryDecision

type RetryDecision struct {
    Action RetryAction
    Delay  time.Duration
}
ValuePackageDescription
RetryNakmiddlewareImmediate redelivery via Nak()
RetryNakWithDelaymiddlewareDelayed redelivery via NakWithDelay(d)
RetryTermmiddlewareTerminal rejection via Term()
FunctionPackageDescription
NewRetryPolicy(delay)middlewareTerm non-retryable, nak-with-delay otherwise
RetryAck[T](policy)middlewareMiddleware that applies retry policy to ack decisions
ErrNonRetryablegofluxSentinel for permanent failures
NonRetryable(err)gofluxWrap error as non-retryable
IsNonRetryable(err)gofluxCheck if error is non-retryable

See Stream Processing for details and examples.

Telemetry

Function / MethodDescription
NewTelemetry(opts...)Create instrumentation instance
DefaultTelemetry(tel)Return tel if non-nil, else create from OTel globals
NewNoopTelemetry()Create noop instance (safe calls, no output)
WithTracerProvider(tp)Set tracer provider
WithMeterProvider(mp)Set meter provider
WithPropagator(p)Set text-map propagator
RecordPublish(ctx, subject, system, fn)Producer span
RecordProcess(ctx, subject, system, fn, opts...)Consumer span
RecordRequest(ctx, subject, system, fn)Request-reply client span
RecordAckOutcome(ctx, action, subject, err)Record ack/nak/term outcome
RegisterLag(subject, lagFn)Observable gauge
InjectContext(ctx, carrier)Inject span context (publish side)
ExtractContext(ctx, carrier)Extract as parent (sync transports)
ExtractSpanContext(ctx, carrier)Extract as link (async transports)
WithRemoteSpanContext(sc)Attach producer span as link

See Telemetry for details.