events
packageAPI reference for the events
package.
Imports
(9)TestBus_SubscribeAndEmit
Parameters
func TestBus_SubscribeAndEmit(t *testing.T)
{
b := New()
var received MyEvent
Subscribe(b, func(ctx context.Context, e MyEvent) error {
received = e
return nil
})
err := Emit(context.Background(), b, MyEvent{ID: 1})
if err != nil {
t.Fatalf("Emit: %v", err)
}
if received.ID != 1 {
t.Errorf("received %d, want 1", received.ID)
}
}
MyEvent
type MyEvent struct
Fields
| Name | Type | Description |
|---|---|---|
| ID | int |
TestBus_DefaultBus
Parameters
func TestBus_DefaultBus(t *testing.T)
{
defaultBus = New()
var received MyEvent
Subscribe(Default(), func(ctx context.Context, e MyEvent) error {
received = e
return nil
})
Emit(context.Background(), Default(), MyEvent{ID: 2})
if received.ID != 2 {
t.Errorf("received %d, want 2", received.ID)
}
}
TestBus_NilBusDefaults
Parameters
func TestBus_NilBusDefaults(t *testing.T)
{
b := New()
defer b.Close()
var received MyEvent
Subscribe(nil, func(ctx context.Context, e MyEvent) error {
received = e
return nil
})
Emit(context.Background(), nil, MyEvent{ID: 8})
if received.ID != 8 {
t.Errorf("received %d, want 8", received.ID)
}
}
TestBus_EmitAsync
Parameters
func TestBus_EmitAsync(t *testing.T)
{
b := New()
defer b.Close()
var mu sync.Mutex
var received []MyEvent
done := make(chan struct{})
Subscribe(b, func(ctx context.Context, e MyEvent) error {
mu.Lock()
defer mu.Unlock()
received = append(received, e)
if len(received) == 5 {
close(done)
}
return nil
})
for i := 0; i < 5; i++ {
EmitAsync(context.Background(), b, MyEvent{ID: i})
}
select {
case <-done:
case <-b.asyncClose:
t.Fatal("bus closed before events processed")
}
mu.Lock()
defer mu.Unlock()
if len(received) != 5 {
t.Fatalf("got %d events, want 5", len(received))
}
}
TestBus_Middleware
Parameters
func TestBus_Middleware(t *testing.T)
{
b := New()
defer b.Close()
var chain []string
b.Use(func(ctx context.Context, event any, next func(ctx context.Context, event any) error) error {
chain = append(chain, "before")
err := next(ctx, event)
chain = append(chain, "after")
return err
})
Subscribe(b, func(ctx context.Context, e MyEvent) error {
chain = append(chain, "handler")
return nil
})
Emit(context.Background(), b, MyEvent{ID: 3})
if len(chain) != 3 || chain[0] != "before" || chain[1] != "handler" || chain[2] != "after" {
t.Errorf("middleware chain wrong: %v", chain)
}
}
TestBus_StrategyStopOnFirstError
Parameters
func TestBus_StrategyStopOnFirstError(t *testing.T)
{
b := New()
defer b.Close()
Subscribe(b, func(ctx context.Context, e MyEvent) error {
return context.Canceled
})
called := false
Subscribe(b, func(ctx context.Context, e MyEvent) error {
called = true
return nil
})
err := Emit(context.Background(), b, MyEvent{ID: 5})
if err != context.Canceled {
t.Errorf("expected context.Canceled, got %v", err)
}
if called {
t.Error("second handler should not be called under StopOnFirstError")
}
}
TestBus_OnAsyncError
Parameters
func TestBus_OnAsyncError(t *testing.T)
{
b := New(
WithStrategy(StopOnFirstError),
WithOnAsyncError(func(err error) {
if err == nil {
t.Error("expected error in async handler")
}
}),
)
defer b.Close()
Subscribe(b, func(ctx context.Context, e MyEvent) error {
return context.Canceled
})
EmitAsync(context.Background(), b, MyEvent{ID: 5})
}
TestBus_Close
Parameters
func TestBus_Close(t *testing.T)
{
b := New()
b.Close()
}
TestBus_EmitNoSubscribers
Parameters
func TestBus_EmitNoSubscribers(t *testing.T)
{
b := New()
defer b.Close()
err := Emit(context.Background(), b, MyEvent{ID: 7})
if err != nil {
t.Errorf("Emit without subscribers: %v", err)
}
}
TestBus_NilBusDefault
Parameters
func TestBus_NilBusDefault(t *testing.T)
{
var b *Bus = nil
Subscribe(b, func(ctx context.Context, e MyEvent) error {
return nil
})
err := Emit(context.Background(), b, MyEvent{ID: 8})
if err != nil {
t.Errorf("Emit with nil bus: %v", err)
}
}
Handler
Handler processes an event of type T.
type Handler func(ctx context.Context, event T) error
Priority
Priority defines the ordering of event handlers.
type Priority int
DispatchStrategy
DispatchStrategy controls error handling during event dispatch.
type DispatchStrategy int
Middleware
Middleware wraps event dispatch with cross-cutting behavior.
type Middleware func(ctx context.Context, event any, next func(ctx context.Context, event any) error) error
asyncEvent
type asyncEvent struct
Fields
| Name | Type | Description |
|---|---|---|
| event | any | |
| emit | func(ctx context.Context, event any) error |
OverflowStrategy
OverflowStrategy controls behavior when the async channel is full.
type OverflowStrategy int
Bus
Bus is the event bus that dispatches events to registered handlers.
type Bus struct
Methods
func (*Bus) asyncProcessor()
{
for {
select {
case <-b.asyncClose:
return
case evt, ok := <-b.asyncCh:
if !ok {
return
}
if err := evt.emit(context.Background(), evt.event); err != nil {
b.mu.RLock()
fn := b.onAsyncError
b.mu.RUnlock()
if fn != nil {
fn(err)
}
}
}
}
}
Close shuts down the async event processor.
func (*Bus) Close()
{
close(b.asyncClose)
}
Use appends middleware to the bus.
Parameters
func (*Bus) Use(mw Middleware)
{
b.mu.Lock()
defer b.mu.Unlock()
b.middlewares = append(b.middlewares, mw)
}
Fields
| Name | Type | Description |
|---|---|---|
| subscribers | *safemap.Map[reflect.Type, []subscriber] | |
| strategy | DispatchStrategy | |
| middlewares | []Middleware | |
| onAsyncError | func(error) | |
| wildcard | []subscriber | |
| mu | sync.RWMutex | |
| asyncCh | chan asyncEvent | |
| asyncClose | chan struct{} | |
| overflowStrat | OverflowStrategy | |
| bufferSize | int |
subscriber
type subscriber struct
Fields
| Name | Type | Description |
|---|---|---|
| handler | any | |
| priority | Priority |
Uses
Default
Default returns the package-level default Bus.
Returns
func Default() *Bus
{
return defaultBus
}
Option
Option configures a Bus.
type Option options.Option[Bus]
New
New creates a new Bus with the given options.
Parameters
Returns
func New(opts ...Option) *Bus
{
b := &Bus{
subscribers: safemap.New[reflect.Type, []subscriber](),
strategy: StopOnFirstError,
asyncCh: make(chan asyncEvent, 1024),
asyncClose: make(chan struct{}),
bufferSize: 1024,
overflowStrat: OverflowBlock,
}
go b.asyncProcessor()
options.Apply(b, opts...)
return b
}
WithBufferSize
WithBufferSize sets the async channel buffer size.
Parameters
Returns
func WithBufferSize(n int) Option
{
return func(b *Bus) {
b.bufferSize = n
b.asyncCh = make(chan asyncEvent, n)
}
}
Uses
WithOverflowStrategy
WithOverflowStrategy sets the overflow behavior for async events.
Parameters
Returns
func WithOverflowStrategy(s OverflowStrategy) Option
{
return func(b *Bus) { b.overflowStrat = s }
}
WithStrategy
WithStrategy sets the dispatch strategy for the Bus.
Parameters
Returns
func WithStrategy(s DispatchStrategy) Option
{
return func(b *Bus) { b.strategy = s }
}
WithOnAsyncError
WithOnAsyncError sets the error handler for async event emissions.
Parameters
Returns
func WithOnAsyncError(fn func(error)) Option
{
return func(b *Bus) { b.onAsyncError = fn }
}
Uses
Subscribe
Subscribe registers a typed handler for events of type T.
Parameters
func Subscribe[T any](b *Bus, fn Handler[T], priority ...Priority)
{
if b == nil {
b = defaultBus
}
p := PriorityNormal
if len(priority) > 0 {
p = priority[0]
}
key := reflect.TypeFor[T]()
b.subscribers.Compute(key, func(subs []subscriber, exists bool) []subscriber {
newSubs := append(subs, subscriber{handler: fn, priority: p})
sort.SliceStable(newSubs, func(i, j int) bool {
return newSubs[i].priority > newSubs[j].priority
})
return newSubs
})
}
SubscribeWildcard
SubscribeWildcard registers a handler for all event types.
Parameters
func SubscribeWildcard(b *Bus, fn func(ctx context.Context, event any) error)
{
if b == nil {
b = defaultBus
}
b.mu.Lock()
defer b.mu.Unlock()
b.wildcard = append(b.wildcard, subscriber{handler: fn})
}
Emit
Emit dispatches an event to all matching handlers synchronously.
Parameters
Returns
func Emit[T any](ctx context.Context, b *Bus, event T) error
{
if b == nil {
b = defaultBus
}
key := reflect.TypeFor[T]()
subs, ok := b.subscribers.Get(key)
b.mu.RLock()
mws := b.middlewares
b.mu.RUnlock()
emit := func(ctx context.Context, evt any) error {
if !ok {
return nil
}
var errs []error
for _, sub := range subs {
if fn, ok := sub.handler.(Handler[T]); ok {
if err := fn(ctx, evt.(T)); err != nil {
if b.strategy == StopOnFirstError {
return err
}
errs = append(errs, err)
}
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
if len(mws) > 0 {
chain := applyMiddleware(emit, mws)
return chain(ctx, event)
}
if err := emit(ctx, event); err != nil {
return err
}
b.mu.RLock()
wildcards := b.wildcard
b.mu.RUnlock()
for _, w := range wildcards {
if fn, ok := w.handler.(func(ctx context.Context, event any) error); ok {
if err := fn(ctx, event); err != nil {
return err
}
}
}
return nil
}
EmitAsync
EmitAsync dispatches an event asynchronously to the bus channel.
Parameters
func EmitAsync[T any](ctx context.Context, b *Bus, event T)
{
if b == nil {
b = defaultBus
}
evt := asyncEvent{
event: event,
emit: func(ctx context.Context, evt any) error {
key := reflect.TypeFor[T]()
subs, ok := b.subscribers.Get(key)
if !ok {
return nil
}
for _, sub := range subs {
if fn, ok := sub.handler.(Handler[T]); ok {
if err := fn(ctx, evt.(T)); err != nil {
if b.strategy == StopOnFirstError {
return err
}
}
}
}
return nil
},
}
switch b.overflowStrat {
case OverflowBlock:
b.asyncCh <- evt
case OverflowDropOldest:
select {
case b.asyncCh <- evt:
default:
select {
case <-b.asyncCh:
default:
}
b.asyncCh <- evt
}
case OverflowFail:
select {
case b.asyncCh <- evt:
default:
if b.onAsyncError != nil {
b.onAsyncError(fmt.Errorf("events: async channel full, event dropped"))
}
}
}
}
applyMiddleware
Parameters
Returns
func applyMiddleware(handler func(ctx context.Context, evt any) error, middlewares []Middleware) func(ctx context.Context, evt any) error
{
for i := len(middlewares) - 1; i >= 0; i-- {
mw := middlewares[i]
next := handler
handler = func(ctx context.Context, evt any) error {
return mw(ctx, evt, next)
}
}
return handler
}