events API

events

package

API reference for the events package.

F
function

TestBus_SubscribeAndEmit

Parameters

pkg/events/bus_test.go:9-25
func TestBus_SubscribeAndEmit(t *testing.T)

{
	b := New()

	var received MyEvent
	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		received = e
		return nil
	})

	err := Emit(context.Background(), b, MyEvent{ID: 1})
	if err != nil {
		t.Fatalf("Emit: %v", err)
	}
	if received.ID != 1 {
		t.Errorf("received %d, want 1", received.ID)
	}
}
S
struct

MyEvent

pkg/events/bus_test.go:27-27
type MyEvent struct

Fields

Name Type Description
ID int
F
function

TestBus_DefaultBus

Parameters

pkg/events/bus_test.go:29-42
func TestBus_DefaultBus(t *testing.T)

{
	defaultBus = New()

	var received MyEvent
	Subscribe(Default(), func(ctx context.Context, e MyEvent) error {
		received = e
		return nil
	})

	Emit(context.Background(), Default(), MyEvent{ID: 2})
	if received.ID != 2 {
		t.Errorf("received %d, want 2", received.ID)
	}
}
F
function

TestBus_NilBusDefaults

Parameters

pkg/events/bus_test.go:44-58
func TestBus_NilBusDefaults(t *testing.T)

{
	b := New()
	defer b.Close()

	var received MyEvent
	Subscribe(nil, func(ctx context.Context, e MyEvent) error {
		received = e
		return nil
	})

	Emit(context.Background(), nil, MyEvent{ID: 8})
	if received.ID != 8 {
		t.Errorf("received %d, want 8", received.ID)
	}
}
F
function

TestBus_EmitAsync

Parameters

pkg/events/bus_test.go:60-93
func TestBus_EmitAsync(t *testing.T)

{
	b := New()
	defer b.Close()

	var mu sync.Mutex
	var received []MyEvent
	done := make(chan struct{})

	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		mu.Lock()
		defer mu.Unlock()
		received = append(received, e)
		if len(received) == 5 {
			close(done)
		}
		return nil
	})

	for i := 0; i < 5; i++ {
		EmitAsync(context.Background(), b, MyEvent{ID: i})
	}

	select {
	case <-done:
	case <-b.asyncClose:
		t.Fatal("bus closed before events processed")
	}

	mu.Lock()
	defer mu.Unlock()
	if len(received) != 5 {
		t.Fatalf("got %d events, want 5", len(received))
	}
}
F
function

TestBus_Middleware

Parameters

pkg/events/bus_test.go:95-117
func TestBus_Middleware(t *testing.T)

{
	b := New()
	defer b.Close()

	var chain []string
	b.Use(func(ctx context.Context, event any, next func(ctx context.Context, event any) error) error {
		chain = append(chain, "before")
		err := next(ctx, event)
		chain = append(chain, "after")
		return err
	})

	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		chain = append(chain, "handler")
		return nil
	})

	Emit(context.Background(), b, MyEvent{ID: 3})

	if len(chain) != 3 || chain[0] != "before" || chain[1] != "handler" || chain[2] != "after" {
		t.Errorf("middleware chain wrong: %v", chain)
	}
}
F
function

TestBus_StrategyStopOnFirstError

Parameters

pkg/events/bus_test.go:119-139
func TestBus_StrategyStopOnFirstError(t *testing.T)

{
	b := New()
	defer b.Close()

	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		return context.Canceled
	})
	called := false
	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		called = true
		return nil
	})

	err := Emit(context.Background(), b, MyEvent{ID: 5})
	if err != context.Canceled {
		t.Errorf("expected context.Canceled, got %v", err)
	}
	if called {
		t.Error("second handler should not be called under StopOnFirstError")
	}
}
F
function

TestBus_OnAsyncError

Parameters

pkg/events/bus_test.go:141-157
func TestBus_OnAsyncError(t *testing.T)

{
	b := New(
		WithStrategy(StopOnFirstError),
		WithOnAsyncError(func(err error) {
			if err == nil {
				t.Error("expected error in async handler")
			}
		}),
	)
	defer b.Close()

	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		return context.Canceled
	})

	EmitAsync(context.Background(), b, MyEvent{ID: 5})
}
F
function

TestBus_Close

Parameters

pkg/events/bus_test.go:159-162
func TestBus_Close(t *testing.T)

{
	b := New()
	b.Close()
}
F
function

TestBus_EmitNoSubscribers

Parameters

pkg/events/bus_test.go:164-171
func TestBus_EmitNoSubscribers(t *testing.T)

{
	b := New()
	defer b.Close()
	err := Emit(context.Background(), b, MyEvent{ID: 7})
	if err != nil {
		t.Errorf("Emit without subscribers: %v", err)
	}
}
F
function

TestBus_NilBusDefault

Parameters

pkg/events/bus_test.go:173-182
func TestBus_NilBusDefault(t *testing.T)

{
	var b *Bus = nil
	Subscribe(b, func(ctx context.Context, e MyEvent) error {
		return nil
	})
	err := Emit(context.Background(), b, MyEvent{ID: 8})
	if err != nil {
		t.Errorf("Emit with nil bus: %v", err)
	}
}
T
type

Handler

Handler processes an event of type T.

pkg/events/bus.go:16-16
type Handler func(ctx context.Context, event T) error
T
type

Priority

Priority defines the ordering of event handlers.

pkg/events/bus.go:19-19
type Priority int
T
type

DispatchStrategy

DispatchStrategy controls error handling during event dispatch.

pkg/events/bus.go:31-31
type DispatchStrategy int
T
type

Middleware

Middleware wraps event dispatch with cross-cutting behavior.

pkg/events/bus.go:41-41
type Middleware func(ctx context.Context, event any, next func(ctx context.Context, event any) error) error
S
struct

asyncEvent

pkg/events/bus.go:43-46
type asyncEvent struct

Fields

Name Type Description
event any
emit func(ctx context.Context, event any) error
T
type

OverflowStrategy

OverflowStrategy controls behavior when the async channel is full.

pkg/events/bus.go:49-49
type OverflowStrategy int
S
struct

Bus

Bus is the event bus that dispatches events to registered handlers.

pkg/events/bus.go:61-73
type Bus struct

Methods

func (*Bus) asyncProcessor()
{
	for {
		select {
		case <-b.asyncClose:
			return
		case evt, ok := <-b.asyncCh:
			if !ok {
				return
			}
			if err := evt.emit(context.Background(), evt.event); err != nil {
				b.mu.RLock()
				fn := b.onAsyncError
				b.mu.RUnlock()
				if fn != nil {
					fn(err)
				}
			}
		}
	}
}
Close
Method

Close shuts down the async event processor.

func (*Bus) Close()
{
	close(b.asyncClose)
}
Use
Method

Use appends middleware to the bus.

Parameters

func (*Bus) Use(mw Middleware)
{
	b.mu.Lock()
	defer b.mu.Unlock()
	b.middlewares = append(b.middlewares, mw)
}

Fields

Name Type Description
subscribers *safemap.Map[reflect.Type, []subscriber]
strategy DispatchStrategy
middlewares []Middleware
onAsyncError func(error)
wildcard []subscriber
mu sync.RWMutex
asyncCh chan asyncEvent
asyncClose chan struct{}
overflowStrat OverflowStrategy
bufferSize int
S
struct

subscriber

pkg/events/bus.go:75-78
type subscriber struct

Fields

Name Type Description
handler any
priority Priority
F
function

Default

Default returns the package-level default Bus.

Returns

pkg/events/bus.go:83-85
func Default() *Bus

{
	return defaultBus
}
T
type

Option

Option configures a Bus.

pkg/events/bus.go:88-88
type Option options.Option[Bus]
F
function

New

New creates a new Bus with the given options.

Parameters

opts
...Option

Returns

pkg/events/bus.go:91-103
func New(opts ...Option) *Bus

{
	b := &Bus{
		subscribers:   safemap.New[reflect.Type, []subscriber](),
		strategy:      StopOnFirstError,
		asyncCh:       make(chan asyncEvent, 1024),
		asyncClose:    make(chan struct{}),
		bufferSize:    1024,
		overflowStrat: OverflowBlock,
	}
	go b.asyncProcessor()
	options.Apply(b, opts...)
	return b
}
F
function

WithBufferSize

WithBufferSize sets the async channel buffer size.

Parameters

n
int

Returns

pkg/events/bus.go:106-111
func WithBufferSize(n int) Option

{
	return func(b *Bus) {
		b.bufferSize = n
		b.asyncCh = make(chan asyncEvent, n)
	}
}
F
function

WithOverflowStrategy

WithOverflowStrategy sets the overflow behavior for async events.

Parameters

Returns

pkg/events/bus.go:114-116
func WithOverflowStrategy(s OverflowStrategy) Option

{
	return func(b *Bus) { b.overflowStrat = s }
}
F
function

WithStrategy

WithStrategy sets the dispatch strategy for the Bus.

Parameters

Returns

pkg/events/bus.go:145-147
func WithStrategy(s DispatchStrategy) Option

{
	return func(b *Bus) { b.strategy = s }
}
F
function

WithOnAsyncError

WithOnAsyncError sets the error handler for async event emissions.

Parameters

fn
func(error)

Returns

pkg/events/bus.go:150-152
func WithOnAsyncError(fn func(error)) Option

{
	return func(b *Bus) { b.onAsyncError = fn }
}
F
function

Subscribe

Subscribe registers a typed handler for events of type T.

Parameters

b
fn
Handler[T]
priority
...Priority
pkg/events/bus.go:162-178
func Subscribe[T any](b *Bus, fn Handler[T], priority ...Priority)

{
	if b == nil {
		b = defaultBus
	}
	p := PriorityNormal
	if len(priority) > 0 {
		p = priority[0]
	}
	key := reflect.TypeFor[T]()
	b.subscribers.Compute(key, func(subs []subscriber, exists bool) []subscriber {
		newSubs := append(subs, subscriber{handler: fn, priority: p})
		sort.SliceStable(newSubs, func(i, j int) bool {
			return newSubs[i].priority > newSubs[j].priority
		})
		return newSubs
	})
}
F
function

SubscribeWildcard

SubscribeWildcard registers a handler for all event types.

Parameters

b
fn
func(ctx context.Context, event any) error
pkg/events/bus.go:181-188
func SubscribeWildcard(b *Bus, fn func(ctx context.Context, event any) error)

{
	if b == nil {
		b = defaultBus
	}
	b.mu.Lock()
	defer b.mu.Unlock()
	b.wildcard = append(b.wildcard, subscriber{handler: fn})
}
F
function

Emit

Emit dispatches an event to all matching handlers synchronously.

Parameters

b
event
T

Returns

error
pkg/events/bus.go:191-244
func Emit[T any](ctx context.Context, b *Bus, event T) error

{
	if b == nil {
		b = defaultBus
	}
	key := reflect.TypeFor[T]()
	subs, ok := b.subscribers.Get(key)

	b.mu.RLock()
	mws := b.middlewares
	b.mu.RUnlock()

	emit := func(ctx context.Context, evt any) error {
		if !ok {
			return nil
		}
		var errs []error
		for _, sub := range subs {
			if fn, ok := sub.handler.(Handler[T]); ok {
				if err := fn(ctx, evt.(T)); err != nil {
					if b.strategy == StopOnFirstError {
						return err
					}
					errs = append(errs, err)
				}
			}
		}
		if len(errs) > 0 {
			return errors.Join(errs...)
		}
		return nil
	}

	if len(mws) > 0 {
		chain := applyMiddleware(emit, mws)
		return chain(ctx, event)
	}

	if err := emit(ctx, event); err != nil {
		return err
	}

	b.mu.RLock()
	wildcards := b.wildcard
	b.mu.RUnlock()
	for _, w := range wildcards {
		if fn, ok := w.handler.(func(ctx context.Context, event any) error); ok {
			if err := fn(ctx, event); err != nil {
				return err
			}
		}
	}

	return nil
}
F
function

EmitAsync

EmitAsync dispatches an event asynchronously to the bus channel.

Parameters

b
event
T
pkg/events/bus.go:247-294
func EmitAsync[T any](ctx context.Context, b *Bus, event T)

{
	if b == nil {
		b = defaultBus
	}
	evt := asyncEvent{
		event: event,
		emit: func(ctx context.Context, evt any) error {
			key := reflect.TypeFor[T]()
			subs, ok := b.subscribers.Get(key)
			if !ok {
				return nil
			}
			for _, sub := range subs {
				if fn, ok := sub.handler.(Handler[T]); ok {
					if err := fn(ctx, evt.(T)); err != nil {
						if b.strategy == StopOnFirstError {
							return err
						}
					}
				}
			}
			return nil
		},
	}

	switch b.overflowStrat {
	case OverflowBlock:
		b.asyncCh <- evt
	case OverflowDropOldest:
		select {
		case b.asyncCh <- evt:
		default:
			select {
			case <-b.asyncCh:
			default:
			}
			b.asyncCh <- evt
		}
	case OverflowFail:
		select {
		case b.asyncCh <- evt:
		default:
			if b.onAsyncError != nil {
				b.onAsyncError(fmt.Errorf("events: async channel full, event dropped"))
			}
		}
	}
}
F
function

applyMiddleware

Parameters

handler
func(ctx context.Context, evt any) error
middlewares
pkg/events/bus.go:296-305
func applyMiddleware(handler func(ctx context.Context, evt any) error, middlewares []Middleware) func(ctx context.Context, evt any) error

{
	for i := len(middlewares) - 1; i >= 0; i-- {
		mw := middlewares[i]
		next := handler
		handler = func(ctx context.Context, evt any) error {
			return mw(ctx, evt, next)
		}
	}
	return handler
}