broker API

broker

package

API reference for the broker package.

S
struct

MemoryBroker

MemoryBroker is an in-memory message broker.
MemoryBroker is an in-memory pub/sub implementation for testing and single-process use.

pkg/relay/broker/memory.go:10-13
type MemoryBroker struct

Methods

Publish
Method

Publish sends a payload to all topic subscribers.

Parameters

topic string
payload []byte

Returns

error
func (*MemoryBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
	m.subMu.RLock()
	handlers := m.subs[topic]
	m.subMu.RUnlock()

	for _, h := range handlers {
		go func(fn func(ctx context.Context, payload []byte) error) {
			_ = fn(ctx, payload)
		}(h)
	}
	return nil
}
Subscribe
Method

Subscribe registers a handler for a topic.

Parameters

topic string
handler func(ctx context.Context, payload []byte) error

Returns

error
func (*MemoryBroker) Subscribe(topic string, handler func(ctx context.Context, payload []byte) error) error
{
	m.subMu.Lock()
	defer m.subMu.Unlock()
	m.subs[topic] = append(m.subs[topic], handler)
	return nil
}

Fields

Name Type Description
subs map[string][]func(ctx context.Context, payload []byte) error
subMu sync.RWMutex
F
function

NewMemoryBroker

NewMemoryBroker creates a MemoryBroker.

Returns

pkg/relay/broker/memory.go:16-20
func NewMemoryBroker() *MemoryBroker

{
	return &MemoryBroker{
		subs: make(map[string][]func(ctx context.Context, payload []byte) error),
	}
}
F
function

TestMemoryBroker_PublishSubscribe

Parameters

pkg/relay/broker/memory_test.go:10-32
func TestMemoryBroker_PublishSubscribe(t *testing.T)

{
	b := NewMemoryBroker()
	ch := make(chan []byte, 1)

	b.Subscribe("test", func(_ context.Context, data []byte) error {
		ch <- data
		return nil
	})

	err := b.Publish(context.Background(), "test", []byte("hello"))
	if err != nil {
		t.Fatalf("Publish failed: %v", err)
	}

	select {
	case result := <-ch:
		if string(result) != "hello" {
			t.Errorf("got %q, want %q", string(result), "hello")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for handler")
	}
}
F
function

TestMemoryBroker_MultipleSubscribers

Parameters

pkg/relay/broker/memory_test.go:34-59
func TestMemoryBroker_MultipleSubscribers(t *testing.T)

{
	b := NewMemoryBroker()
	ch := make(chan int, 3)
	count := 0

	for i := 0; i < 3; i++ {
		b.Subscribe("topic", func(_ context.Context, data []byte) error {
			ch <- 1
			return nil
		})
	}

	b.Publish(context.Background(), "topic", []byte("data"))

	for i := 0; i < 3; i++ {
		select {
		case <-ch:
			count++
		case <-time.After(time.Second):
			t.Fatalf("timeout waiting for handler %d", i)
		}
	}
	if count != 3 {
		t.Errorf("expected 3 handler calls, got %d", count)
	}
}
F
function

TestMemoryBroker_DifferentTopics

Parameters

pkg/relay/broker/memory_test.go:61-84
func TestMemoryBroker_DifferentTopics(t *testing.T)

{
	b := NewMemoryBroker()
	ch := make(chan string, 1)

	b.Subscribe("topic1", func(_ context.Context, data []byte) error {
		ch <- "topic1:" + string(data)
		return nil
	})
	b.Subscribe("topic2", func(_ context.Context, data []byte) error {
		ch <- "topic2:" + string(data)
		return nil
	})

	b.Publish(context.Background(), "topic1", []byte("msg"))

	select {
	case result := <-ch:
		if result != "topic1:msg" {
			t.Errorf("got %q, want %q", result, "topic1:msg")
		}
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for handler")
	}
}
F
function

TestMemoryBroker_PublishNoSubscribers

Parameters

pkg/relay/broker/memory_test.go:86-92
func TestMemoryBroker_PublishNoSubscribers(t *testing.T)

{
	b := NewMemoryBroker()
	err := b.Publish(context.Background(), "nonexistent", []byte("msg"))
	if err != nil {
		t.Fatalf("Publish to nonexistent topic: %v", err)
	}
}
F
function

TestMemoryBroker_ConcurrentPublish

Parameters

pkg/relay/broker/memory_test.go:94-139
func TestMemoryBroker_ConcurrentPublish(t *testing.T)

{
	b := NewMemoryBroker()
	ch := make(chan int, 10)
	received := 0

	b.Subscribe("test", func(_ context.Context, data []byte) error {
		ch <- 1
		return nil
	})

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			b.Publish(context.Background(), "test", []byte("data"))
		}()
	}
	wg.Wait()

	done := make(chan struct{})
	go func() {
		for {
			select {
			case <-ch:
				received++
				if received >= 10 {
					close(done)
					return
				}
			case <-time.After(100 * time.Millisecond):
				close(done)
				return
			}
		}
	}()

	select {
	case <-done:
		if received != 10 {
			t.Errorf("expected 10 handler calls, got %d", received)
		}
	case <-time.After(2 * time.Second):
		t.Fatal("timeout")
	}
}