Skip to content

fxsml/gopipe

Repository files navigation

gopipe

CI Go Reference License: MIT

Composable data pipelines for Go — from simple channel operations to CloudEvents message routing, with zero dependencies.

Why gopipe?

  • Progressive complexity — Start with simple channel functions, scale to full message engines
  • Type-safe generics — Full Go 1.18+ generics support throughout
  • Zero dependencies — Core packages have no external dependencies
  • CloudEvents aligned — Message package follows CloudEvents specification

Quick Start

Transform and Filter Channels

import "github.com/fxsml/gopipe/channel"

// Transform values
doubled := channel.Transform(numbers, func(n int) int {
    return n * 2
})

// Filter values
evens := channel.Filter(numbers, func(n int) bool {
    return n%2 == 0
})

// Fan-out to multiple consumers (returns []<-chan T)
outputs := channel.Broadcast(source, 2)

Merge Dynamic Inputs

import "github.com/fxsml/gopipe/pipe"

merger := pipe.NewMerger[Order](pipe.MergerConfig{Buffer: 100})
merger.AddInput(webOrders)
merger.AddInput(apiOrders) // Can add more at runtime

merged, _ := merger.Merge(ctx)

Route Messages by Type

import "github.com/fxsml/gopipe/message"

engine := message.NewEngine(message.EngineConfig{
    Marshaler: message.NewJSONMarshaler(),
})

handler := message.NewCommandHandler(
    func(ctx context.Context, cmd CreateOrder) ([]OrderCreated, error) {
        return []OrderCreated{{ID: cmd.ID}}, nil
    },
    message.CommandHandlerConfig{Source: "/orders"},
)
engine.AddHandler("orders", nil, handler)
engine.AddRawInput("input", nil, inputCh)
output, _ := engine.AddRawOutput("output", nil)

engine.Start(ctx)

Packages

Package Purpose Use When
channel Stateless operations Simple transforms, filters, fan-in/out
pipe Stateful components Dynamic inputs/outputs, lifecycle control
message Message routing CloudEvents, type-based handlers, middleware

When to Use What

channel.Transform  →  Simple 1:1 mapping
channel.Filter     →  Drop unwanted values
channel.Merge      →  Combine fixed channels
channel.Broadcast  →  Send to multiple consumers

pipe.Merger        →  Add inputs at runtime
pipe.Distributor   →  Route by matcher, first-match wins
pipe.ProcessPipe   →  Stateful processing with lifecycle

message.Engine     →  Full message bus with routing
message.Router     →  Just the handler dispatch part
message.Handler    →  Type-safe command/event handlers

Installation

gopipe uses Go modules. Install the packages you need:

# Channel operations (stateless transforms, filters, fan-in/out)
go get github.com/fxsml/gopipe/channel

# Pipe components (stateful, dynamic inputs/outputs)
go get github.com/fxsml/gopipe/pipe

# Message routing (CloudEvents, type-based handlers)
go get github.com/fxsml/gopipe/message

Learning Path

  1. Channel — Filter, Transform, Merge basics
  2. Pipe — Stateful pipe with lifecycle
  3. Merger — Dynamic input merging
  4. Message — CloudEvents message handling
  5. Generator — Producing values from functions

Documentation

  • Package Reference:
    • channel — Stateless channel operations
    • pipe — Stateful pipeline components
    • message — CloudEvents message routing
  • Examples — Working code examples
  • AGENTS.md — Architecture decisions and design notes

License

MIT

About

A go library for building pipelines and handling channels.

Resources

License

Contributing

Stars

Watchers

Forks

Contributors

Languages