Fan-Out & Fan-In
Fan-out, fan-in, and round-robin patterns are provided by goflow. Use the bridge.ToStream function to convert a goflux Subscriber[T] into a goflow.Stream and apply goflow operators.
Broadcast (Tee)
Tee sends every element to all output streams -- equivalent to broadcasting a message to multiple destinations.
go
stream := bridge.ToStream[Event](ctx, sub, "events.>", 16)
// Broadcast to 3 consumers.
streams := stream.Tee(3)
// Each stream receives every message.
gofuncy.Go(ctx, func(ctx context.Context) error {
return streams[0].ForEach(func(ctx context.Context, msg goflux.Message[Event]) error {
return archiveEvent(ctx, msg.Payload)
})
})
gofuncy.Go(ctx, func(ctx context.Context) error {
return streams[1].ForEach(func(ctx context.Context, msg goflux.Message[Event]) error {
return indexEvent(ctx, msg.Payload)
})
})
streams[2].ForEach(func(ctx context.Context, msg goflux.Message[Event]) error {
return notifyEvent(ctx, msg.Payload)
})Round-Robin (FanOut)
FanOut distributes elements across output streams in round-robin order -- useful for load balancing.
go
stream := bridge.ToStream[Job](ctx, sub, "jobs.>", 16)
// Distribute across 3 worker streams.
workers := stream.FanOut(3)
for _, w := range workers {
gofuncy.Go(ctx, func(ctx context.Context) error {
return w.ForEach(func(ctx context.Context, msg goflux.Message[Job]) error {
return processJob(ctx, msg.Payload)
})
})
}Fan-In (Merge)
FanIn merges multiple streams into one. Combine with ToStream on each subscriber to merge messages from different sources.
go
streamA := bridge.ToStream[Metric](ctx, subA, "metrics.>", 16)
streamB := bridge.ToStream[Metric](ctx, subB, "metrics.>", 16)
merged := goflow.FanIn([]goflow.Stream[goflux.Message[Metric]]{streamA, streamB})
merged.ForEach(func(ctx context.Context, msg goflux.Message[Metric]) error {
fmt.Printf("%s = %.2f\n", msg.Payload.Name, msg.Payload.Value)
return nil
})Publishing Results
Use FromStream to publish processed stream elements to a publisher:
go
stream := bridge.ToStream[RawEvent](ctx, sub, "events.raw", 16)
// Filter and forward to a different transport.
filtered := stream.Filter(func(_ context.Context, msg goflux.Message[RawEvent]) bool {
return msg.Payload.Priority > 5
})
err := bridge.FromStream(filtered, dstPub)