Pull Consumer
JetStream pull consumers work through the same Subscriber[T] interface as push consumers. The nats.go library's jetstream.Consumer.Consume() method handles both push and pull delivery modes -- goflux's Subscriber[T] wraps this uniformly. The difference is in the JetStream consumer configuration, not the goflux API.
How It Works
A JetStream consumer configured with AckPolicy: jetstream.AckExplicitPolicy operates in pull mode. The goflux Subscriber[T] calls consumer.Consume() internally, which works for both push and pull consumers. Combine this with middleware for concurrency control, auto-ack, and retry policies.
JetStream Pull Consumer with Middleware
package main
import (
"context"
"fmt"
"log"
"time"
json "github.com/foomo/goencode/json/v1"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
gofluxjs "github.com/foomo/goflux/transport/jetstream"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type Task struct {
ID string
Payload string
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Drain()
js, _ := jetstream.New(nc)
stream, _ := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "TASKS",
Subjects: []string{"tasks.>"},
})
// Pull consumer with explicit ack policy.
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "batch-worker",
AckPolicy: jetstream.AckExplicitPolicy,
})
codec := json.NewCodec[Task]()
// Create a subscriber from the pull consumer -- same interface as push.
sub := gofluxjs.NewSubscriber[Task](cons, codec.Decode, gofluxjs.WithManualAck())
// Use ToStream to bridge into goflow for stream processing.
stream := goflux.ToStream[Task](ctx, sub, "tasks.>", 16)
// Process with bounded concurrency via goflow.
policy := middleware.NewRetryPolicy(5 * time.Second)
if err := stream.Process(5, func(ctx context.Context, msg goflux.Message[Task]) error {
fmt.Printf("processing task %s\n", msg.Payload.ID)
err := processTask(msg.Payload)
if err != nil {
d := policy(err)
switch d.Action {
case middleware.RetryNak:
return msg.Nak()
case middleware.RetryNakWithDelay:
return msg.NakWithDelay(d.Delay)
case middleware.RetryTerm:
return msg.Term()
}
}
return msg.Ack()
}); err != nil {
log.Fatal(err)
}
}
func processTask(_ Task) error { return nil }Processing Patterns
Stream-based (goflow)
Use ToStream + goflow Process for bounded concurrency with stream operators:
stream := goflux.ToStream[Task](ctx, sub, "tasks.>", 16)
stream.Process(10, func(ctx context.Context, msg goflux.Message[Task]) error {
err := myHandler(ctx, msg)
if err != nil {
return msg.Nak()
}
return msg.Ack()
})Handler-based (middleware)
Use Chain + middleware for simple auto-ack or retry-aware ack:
// Simple auto-ack.
handler := goflux.Chain[Task](
middleware.AutoAck[Task](),
)(myHandler)
// Retry-aware ack with custom policy.
policy := middleware.RetryPolicy(func(err error) middleware.RetryDecision {
if goflux.IsNonRetryable(err) {
return middleware.RetryDecision{Action: middleware.RetryTerm}
}
return middleware.RetryDecision{Action: middleware.RetryNakWithDelay, Delay: 5 * time.Second}
})
handler := goflux.Chain[Task](
middleware.RetryAck[Task](policy),
)(myHandler)Key Rules
- Use
WithManualAck(). When composing ack behavior via middleware (AutoAckorRetryAck), create the subscriber withWithManualAck()to prevent the transport from double-acking. - Decode failures are terminated. The subscriber automatically calls
Term()on messages that fail codec decoding, preventing infinite redelivery of malformed data. - Caller owns the connection.
Subscriber.Close()is a no-op. The caller is responsible for draining the NATS connection.
When to Use
- Backpressure control -- the consumer configuration controls delivery rate instead of being overwhelmed by unbounded push delivery.
- Worker-paced consumption -- useful when processing time varies widely and you want bounded concurrency.
- Policy-driven retry -- classify errors into retry/term without embedding ack logic in handlers.
For push-based delivery with automatic acknowledgment, see at-least-once.
