Skip to content

Concurrency Patterns

Common concurrency patterns solved with gofuncy.

Fan-Out / Fan-In

Distribute work across multiple goroutines and collect all results.

go
package main

import (
	"context"
	"fmt"

	"github.com/foomo/gofuncy"
)

type Order struct {
	ID    int
	Total float64
}

func main() {
	ctx := context.Background()

	orderIDs := []int{101, 102, 103, 104, 105}

	// Fan out: fetch all orders concurrently (max 3 at a time)
	// Fan in: collect results in order
	orders, err := gofuncy.Map(ctx, orderIDs, func(ctx context.Context, id int) (Order, error) {
		// Simulate fetching from a database or API
		return Order{ID: id, Total: float64(id) * 9.99}, nil
	},
		gofuncy.WithLimit(3),
	)
	if err != nil {
		fmt.Println("errors:", err)
		return
	}

	for _, o := range orders {
		fmt.Printf("Order %d: $%.2f\n", o.ID, o.Total)
	}
}

Pipeline

Chain multiple transformation stages, each running concurrently.

go
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/foomo/gofuncy"
)

func main() {
	ctx := context.Background()

	raw := []string{"  hello  ", "  WORLD  ", "  GoFuncy  "}

	// Stage 1: trim whitespace
	trimmed, err := gofuncy.Map(ctx, raw, func(ctx context.Context, s string) (string, error) {
		return strings.TrimSpace(s), nil
	})
	if err != nil {
		fmt.Println("trim error:", err)
		return
	}

	// Stage 2: lowercase
	lowered, err := gofuncy.Map(ctx, trimmed, func(ctx context.Context, s string) (string, error) {
		return strings.ToLower(s), nil
	})
	if err != nil {
		fmt.Println("lowercase error:", err)
		return
	}

	fmt.Println(lowered) // [hello world gofuncy]
}

Timeout with Fallback

Attempt a primary operation with a timeout, then fall back to a cached value.

go
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/foomo/gofuncy"
)

func main() {
	ctx := context.Background()

	var result string

	g := gofuncy.NewGroup(ctx,
		gofuncy.WithFailFast(),
	)

	g.Add(func(ctx context.Context) error {
		// Simulate a slow API call
		time.Sleep(2 * time.Second)
		if ctx.Err() != nil {
			return ctx.Err()
		}
		result = "fresh data from API"
		return nil
	},
		gofuncy.WithTimeout(500*time.Millisecond),
	)

	if err := g.Wait(); err != nil {
		// Timeout hit -- use fallback
		if errors.Is(err, context.DeadlineExceeded) {
			result = "cached data (fallback)"
		} else {
			fmt.Println("unexpected error:", err)
			return
		}
	}

	fmt.Println(result) // cached data (fallback)
}

Graceful Shutdown

Use context cancellation to signal all goroutines to stop.

go
package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"
	"time"

	"github.com/foomo/gofuncy"
)

func main() {
	// Cancel on SIGINT or SIGTERM
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	g := gofuncy.NewGroup(ctx)

	for i := range 3 {
		g.Add(func(ctx context.Context) error {
			for {
				select {
				case <-ctx.Done():
					fmt.Printf("worker %d shutting down\n", i)
					return nil
				case <-time.After(500 * time.Millisecond):
					fmt.Printf("worker %d: tick\n", i)
				}
			}
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Println("errors:", err)
	}

	fmt.Println("all workers stopped")
}

Batch Processing

Process a large dataset in batches with controlled concurrency.

go
package main

import (
	"context"
	"fmt"

	"github.com/foomo/gofuncy"
)

func main() {
	ctx := context.Background()

	// Simulate 100 items to process
	items := make([]int, 100)
	for i := range items {
		items[i] = i
	}

	// Process all items, 10 at a time
	err := gofuncy.All(ctx, items, func(ctx context.Context, item int) error {
		// Simulate processing
		if item%25 == 0 {
			fmt.Printf("processing item %d\n", item)
		}
		return nil
	},
		gofuncy.WithLimit(10),
	)
	if err != nil {
		fmt.Println("errors:", err)
	}

	fmt.Println("all items processed")
}

Parallel Aggregation

Run multiple data sources in parallel and aggregate results.

go
package main

import (
	"context"
	"fmt"
	"sync/atomic"

	"github.com/foomo/gofuncy"
)

func main() {
	ctx := context.Background()

	var totalUsers atomic.Int64
	var totalOrders atomic.Int64
	var totalRevenue atomic.Int64

	g := gofuncy.NewGroup(ctx)

	g.Add(func(ctx context.Context) error {
		// Fetch from users service
		totalUsers.Store(1500)
		return nil
	})

	g.Add(func(ctx context.Context) error {
		// Fetch from orders service
		totalOrders.Store(3200)
		return nil
	})

	g.Add(func(ctx context.Context) error {
		// Fetch from billing service
		totalRevenue.Store(450000)
		return nil
	})

	if err := g.Wait(); err != nil {
		fmt.Println("errors:", err)
		return
	}

	fmt.Printf("Users: %d, Orders: %d, Revenue: $%d\n",
		totalUsers.Load(), totalOrders.Load(), totalRevenue.Load())
	// Output: Users: 1500, Orders: 3200, Revenue: $450000
}