broker
API
broker
packageAPI reference for the broker
package.
Imports
(4)
S
struct
MemoryBroker
MemoryBroker is an in-memory message broker.
MemoryBroker is an in-memory pub/sub implementation for testing and single-process use.
pkg/relay/broker/memory.go:10-13
type MemoryBroker struct
Methods
Publish
Method
Publish sends a payload to all topic subscribers.
Parameters
Returns
error
func (*MemoryBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
m.subMu.RLock()
handlers := m.subs[topic]
m.subMu.RUnlock()
for _, h := range handlers {
go func(fn func(ctx context.Context, payload []byte) error) {
_ = fn(ctx, payload)
}(h)
}
return nil
}
Subscribe
Method
Subscribe registers a handler for a topic.
Parameters
topic
string
handler
func(ctx context.Context, payload []byte) error
Returns
error
func (*MemoryBroker) Subscribe(topic string, handler func(ctx context.Context, payload []byte) error) error
{
m.subMu.Lock()
defer m.subMu.Unlock()
m.subs[topic] = append(m.subs[topic], handler)
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| subs | map[string][]func(ctx context.Context, payload []byte) error | |
| subMu | sync.RWMutex |
F
function
NewMemoryBroker
NewMemoryBroker creates a MemoryBroker.
Returns
pkg/relay/broker/memory.go:16-20
func NewMemoryBroker() *MemoryBroker
{
return &MemoryBroker{
subs: make(map[string][]func(ctx context.Context, payload []byte) error),
}
}
F
function
TestMemoryBroker_PublishSubscribe
Parameters
t
pkg/relay/broker/memory_test.go:10-32
func TestMemoryBroker_PublishSubscribe(t *testing.T)
{
b := NewMemoryBroker()
ch := make(chan []byte, 1)
b.Subscribe("test", func(_ context.Context, data []byte) error {
ch <- data
return nil
})
err := b.Publish(context.Background(), "test", []byte("hello"))
if err != nil {
t.Fatalf("Publish failed: %v", err)
}
select {
case result := <-ch:
if string(result) != "hello" {
t.Errorf("got %q, want %q", string(result), "hello")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for handler")
}
}
F
function
TestMemoryBroker_MultipleSubscribers
Parameters
t
pkg/relay/broker/memory_test.go:34-59
func TestMemoryBroker_MultipleSubscribers(t *testing.T)
{
b := NewMemoryBroker()
ch := make(chan int, 3)
count := 0
for i := 0; i < 3; i++ {
b.Subscribe("topic", func(_ context.Context, data []byte) error {
ch <- 1
return nil
})
}
b.Publish(context.Background(), "topic", []byte("data"))
for i := 0; i < 3; i++ {
select {
case <-ch:
count++
case <-time.After(time.Second):
t.Fatalf("timeout waiting for handler %d", i)
}
}
if count != 3 {
t.Errorf("expected 3 handler calls, got %d", count)
}
}
F
function
TestMemoryBroker_DifferentTopics
Parameters
t
pkg/relay/broker/memory_test.go:61-84
func TestMemoryBroker_DifferentTopics(t *testing.T)
{
b := NewMemoryBroker()
ch := make(chan string, 1)
b.Subscribe("topic1", func(_ context.Context, data []byte) error {
ch <- "topic1:" + string(data)
return nil
})
b.Subscribe("topic2", func(_ context.Context, data []byte) error {
ch <- "topic2:" + string(data)
return nil
})
b.Publish(context.Background(), "topic1", []byte("msg"))
select {
case result := <-ch:
if result != "topic1:msg" {
t.Errorf("got %q, want %q", result, "topic1:msg")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for handler")
}
}
F
function
TestMemoryBroker_PublishNoSubscribers
Parameters
t
pkg/relay/broker/memory_test.go:86-92
func TestMemoryBroker_PublishNoSubscribers(t *testing.T)
{
b := NewMemoryBroker()
err := b.Publish(context.Background(), "nonexistent", []byte("msg"))
if err != nil {
t.Fatalf("Publish to nonexistent topic: %v", err)
}
}
F
function
TestMemoryBroker_ConcurrentPublish
Parameters
t
pkg/relay/broker/memory_test.go:94-139
func TestMemoryBroker_ConcurrentPublish(t *testing.T)
{
b := NewMemoryBroker()
ch := make(chan int, 10)
received := 0
b.Subscribe("test", func(_ context.Context, data []byte) error {
ch <- 1
return nil
})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
b.Publish(context.Background(), "test", []byte("data"))
}()
}
wg.Wait()
done := make(chan struct{})
go func() {
for {
select {
case <-ch:
received++
if received >= 10 {
close(done)
return
}
case <-time.After(100 * time.Millisecond):
close(done)
return
}
}
}()
select {
case <-done:
if received != 10 {
t.Errorf("expected 10 handler calls, got %d", received)
}
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
}