Skip to content

API Reference

Quick reference for all goflux types, interfaces, and operators.

Transport Support Matrix

InterfaceChannelNATSJetStreamHTTP
Publisher[T]yesyesyesyes
Subscriber[T]yesyesyesyes
Consumer[T]--yes-
Requester[Req, Resp]-yes-yes
Responder[Req, Resp]-yes-yes

Messaging Patterns

PatternCore Interface(s)Transport(s)
Fire and ForgetPublisher + Subscriber, acker=nilchannel, nats
At-Least-Once (push)Publisher + Subscriber, auto-ackjetstream
At-Least-Once (pull)Publisher + Consumer, explicit ackjetstream
Manual Ack/Nak/TermSubscriber + WithManualAck()jetstream
Exactly-OncePublisher with Nats-Msg-Id headerjetstream
Request-ReplyRequester + Respondernats, http
Queue GroupsSubscriber + WithQueueGroupnats
Fan-outFanOut operatorany
Fan-inFanIn operatorany

Core Types

Interfaces

go
// Publisher sends encoded messages to a subject/topic.
type Publisher[T any] interface {
    Publish(ctx context.Context, subject string, v T) error
    Close() error
}

// Subscriber listens on a subject and dispatches decoded messages to a Handler.
// Subscribe blocks until ctx is cancelled or a fatal error occurs.
type Subscriber[T any] interface {
    Subscribe(ctx context.Context, subject string, handler Handler[T]) error
    Close() error
}

// Consumer provides pull-based message consumption.
// Each fetched message MUST be explicitly acknowledged.
type Consumer[T any] interface {
    Fetch(ctx context.Context, n int) ([]Message[T], error)
    Close() error
}

// Requester sends a typed request and waits for a typed response.
type Requester[Req, Resp any] interface {
    Request(ctx context.Context, subject string, req Req) (Resp, error)
    Close() error
}

// Responder handles incoming requests and produces typed responses.
type Responder[Req, Resp any] interface {
    Serve(ctx context.Context, subject string, handler RequestHandler[Req, Resp]) error
    Close() error
}

Handler and RequestHandler

go
// Handler is the callback for Subscriber.Subscribe.
type Handler[T any] func(ctx context.Context, msg Message[T]) error

// RequestHandler processes a request and returns a response.
type RequestHandler[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)

Message[T]

go
type Message[T any] struct {
    Subject string
    Payload T
    Header  Header
}
MethodDescription
NewMessage(subject, payload)Create a message
NewMessageWithHeader(subject, payload, header)Create a message with header
msg.WithAcker(a)Attach an acker (transport use only)
msg.HasAcker()Check if acker is attached
msg.Ack()Acknowledge successful processing
msg.Nak()Signal processing failure (redelivery)
msg.NakWithDelay(d)Nak with redelivery delay hint
msg.Term()Terminal rejection (no redelivery)
go
type Header map[string][]string
MethodDescription
h.Get(key)First value for key, or ""
h.Set(key, value)Replace all values for key
h.Add(key, value)Append a value for key
h.Del(key)Remove key
h.Clone()Deep copy

Acknowledgment Interfaces

go
// Acker -- minimal acknowledgment (Ack + Nak).
type Acker interface {
    Ack() error
    Nak() error
}

// DelayedNaker -- extends Acker with delayed negative acknowledgment.
type DelayedNaker interface {
    Acker
    NakWithDelay(d time.Duration) error
}

// Terminator -- extends Acker with terminal rejection.
type Terminator interface {
    Acker
    Term() error
}

Convenience Types

TypeDescription
Topic[T]Bundles Publisher[T] + Subscriber[T]
BoundPublisher[T]Wraps Publisher[T] with a fixed subject
Middleware[T]func(Handler[T]) Handler[T]
Filter[T]func(ctx, msg Message[T]) (bool, error)
MapFunc[T, U]func(ctx, msg Message[T]) (Message[U], error)
DeadLetterFunc[T]func(ctx, msg Message[T], err error)

Context Helpers

FunctionDescription
WithMessageID(ctx, id)Attach business-level message ID
MessageID(ctx)Read message ID from context
WithHeader(ctx, h)Attach header to context (read by transports on Publish)
HeaderFromContext(ctx)Read header from context

Middleware

All middleware have the signature Middleware[T] = func(Handler[T]) Handler[T].

MiddlewareSignatureDescription
Process[T](n)int -> Middleware[T]Concurrency limiter (semaphore, blocks when full)
Peek[T](fn)func(ctx, msg) -> Middleware[T]Side-effect tap, errors ignored
Distinct[T](key)func(msg) string -> Middleware[T]Dedup by key (unbounded set, first wins)
Skip[T](n)int -> Middleware[T]Drop first n, pass rest
Take[T](n)int -> Middleware[T]Pass first n, drop rest
Throttle[T](d)time.Duration -> Middleware[T]Rate-limit to one per duration
AutoAck[T]()-> Middleware[T]Ack on nil error, Nak on non-nil
Chain[T](mws...)...Middleware[T] -> Middleware[T]Compose left-to-right

See Middleware for details and examples.

Pipeline Operators

OperatorDescription
Pipe[T](pub, opts...)Forward messages to a publisher
PipeMap[T, U](pub, mapFn, opts...)Transform and forward
FanOut[T](pubs, opts...)Broadcast to N publishers
FanIn[T](subs...)Merge N subscribers into one handler
RoundRobin[T](pubs...)Cycle through publishers
Bind[T](pub, subject)Fix subject on a publisher
ToChan[T](ctx, sub, subject, bufSize)Bridge subscriber to channel

Pipe Options

OptionDescription
WithFilter[T](f)Drop messages where filter returns false
WithDeadLetter[T](fn)Handle failed messages

FanOut Options

OptionDescription
WithFanOutAllOrNothing[T]()First error short-circuits

See Pipeline Operators for details and examples.

Telemetry

Function / MethodDescription
NewTelemetry(opts...)Create instrumentation instance
WithTracerProvider(tp)Set tracer provider
WithMeterProvider(mp)Set meter provider
WithPropagator(p)Set text-map propagator
RecordPublish(ctx, subject, system, fn)Producer span
RecordProcess(ctx, subject, system, fn, opts...)Consumer span
RecordFetch(ctx, subject, system, count, fn)Pull consumer span
RecordRequest(ctx, subject, system, fn)Request-reply client span
RegisterLag(subject, lagFn)Observable gauge
InjectContext(ctx, carrier)Inject span context (publish side)
ExtractContext(ctx, carrier)Extract as parent (sync transports)
ExtractSpanContext(ctx, carrier)Extract as link (async transports)
WithRemoteSpanContext(sc)Attach producer span as link

See Telemetry for details.