manager
API
manager
packageAPI reference for the manager
package.
Imports
(8)
S
struct
Job
Job represents a relay job with topic and payload.
Job represents a unit of work with metadata.
pkg/relay/manager/manager.go:16-23
type Job struct
Fields
| Name | Type | Description |
|---|---|---|
| ID | string | |
| Queue | string | |
| Topic | string | |
| Payload | []byte | |
| CreatedAt | time.Time | |
| TryCount | int |
T
type
Handler
Handler processes a typed relay payload.
Handler processes a typed payload from the relay.
pkg/relay/manager/manager.go:27-27
type Handler func(ctx context.Context, payload T) error
I
interface
Broker
Broker publishes and subscribes to topics.
pkg/relay/manager/manager.go:30-33
type Broker interface
S
struct
Relay
Relay manages broker subscriptions and handlers.
Relay coordinates typed message handlers with a pluggable broker.
pkg/relay/manager/manager.go:37-41
type Relay struct
Methods
Start
Method
Start subscribes all registered handlers and returns a ready channel.
Parameters
ctx
context.Context
Returns
<-chan
struct{}
error
func (*Relay) Start(ctx context.Context) (<-chan struct{}, error)
{
r.handlerMu.RLock()
defer r.handlerMu.RUnlock()
ready := make(chan struct{})
for topic, wrapperFn := range r.handlers {
userHandler := wrapperFn.(func(ctx context.Context, data []byte) error)
err := r.broker.Subscribe(topic, func(ctx context.Context, data []byte) error {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("panic in job %s: %v\n", topic, rec)
}
}()
return userHandler(ctx, data)
})
if err != nil {
close(ready)
return ready, err
}
}
close(ready)
return ready, nil
}
Fields
| Name | Type | Description |
|---|---|---|
| broker | Broker | |
| handlers | map[string]any | |
| handlerMu | sync.RWMutex |
Uses
T
type
Option
Option configures a Relay.
pkg/relay/manager/manager.go:44-44
type Option options.Option[Relay]
F
function
New
New creates a Relay with the given options.
Parameters
opts
...Option
Returns
pkg/relay/manager/manager.go:47-54
func New(opts ...Option) *Relay
{
r := &Relay{
broker: broker.NewMemoryBroker(),
handlers: make(map[string]any),
}
options.Apply(r, opts...)
return r
}
F
function
WithBroker
WithBroker sets the broker implementation.
pkg/relay/manager/manager.go:57-61
func WithBroker(b Broker) Option
{
return func(r *Relay) {
r.broker = b
}
}
F
function
Register
Register adds a typed handler for a topic.
Parameters
pkg/relay/manager/manager.go:64-76
func Register[T any](r *Relay, topic string, fn Handler[T])
{
wrapper := func(ctx context.Context, raw []byte) error {
var payload T
if err := json.Unmarshal(raw, &payload); err != nil {
return fmt.Errorf("payload unmarshal failed: %w", err)
}
return fn(ctx, payload)
}
r.handlerMu.Lock()
r.handlers[topic] = wrapper
r.handlerMu.Unlock()
}
F
function
Enqueue
Enqueue publishes a typed payload to a topic.
Parameters
Returns
error
pkg/relay/manager/manager.go:79-86
func Enqueue[T any](ctx context.Context, r *Relay, topic string, payload T) error
{
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("payload marshal failed: %w", err)
}
return r.broker.Publish(ctx, topic, data)
}
S
struct
testPayload
pkg/relay/manager/manager_test.go:12-15
type testPayload struct
Fields
| Name | Type | Description |
|---|---|---|
| Message | string | json:"message" |
| Value | int | json:"value" |
F
function
TestNew_DefaultBroker
Parameters
t
pkg/relay/manager/manager_test.go:17-22
func TestNew_DefaultBroker(t *testing.T)
{
r := New()
if r == nil {
t.Fatal("New returned nil")
}
}
F
function
TestRegisterAndEnqueue
Parameters
t
pkg/relay/manager/manager_test.go:24-58
func TestRegisterAndEnqueue(t *testing.T)
{
r := New()
var mu sync.Mutex
var received testPayload
handlerCalled := make(chan struct{})
Register[testPayload](r, "test.topic", func(ctx context.Context, p testPayload) error {
mu.Lock()
received = p
mu.Unlock()
close(handlerCalled)
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready, err := r.Start(ctx)
if err != nil {
t.Fatalf("Start failed: %v", err)
}
<-ready
Enqueue[testPayload](context.Background(), r, "test.topic", testPayload{Message: "hello", Value: 42})
select {
case <-handlerCalled:
mu.Lock()
if received.Message != "hello" || received.Value != 42 {
t.Errorf("got %+v, want {Message:hello Value:42}", received)
}
mu.Unlock()
case <-time.After(time.Second):
t.Fatal("timeout waiting for handler")
}
}
F
function
TestRegister_TypeSafety
Parameters
t
pkg/relay/manager/manager_test.go:60-73
func TestRegister_TypeSafety(t *testing.T)
{
r := New()
Register[testPayload](r, "topic1", func(ctx context.Context, p testPayload) error {
return nil
})
Register[int](r, "topic2", func(ctx context.Context, p int) error {
return nil
})
if len(r.handlers) != 2 {
t.Errorf("expected 2 handlers, got %d", len(r.handlers))
}
}
F
function
TestEnqueue_MarshalError
Parameters
t
pkg/relay/manager/manager_test.go:75-82
func TestEnqueue_MarshalError(t *testing.T)
{
r := New()
err := Enqueue[testPayload](context.Background(), r, "test", testPayload{})
if err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
}
F
function
TestBroker_Interface
Parameters
t
pkg/relay/manager/manager_test.go:84-90
func TestBroker_Interface(t *testing.T)
{
b := broker.NewMemoryBroker()
r := New(WithBroker(b))
if r.broker != b {
t.Error("broker not set correctly")
}
}
F
function
TestStart_ContextCancel
Parameters
t
pkg/relay/manager/manager_test.go:92-102
func TestStart_ContextCancel(t *testing.T)
{
r := New()
ctx, cancel := context.WithCancel(context.Background())
cancel()
ready, err := r.Start(ctx)
if err != nil {
t.Fatalf("Start failed: %v", err)
}
<-ready
}
F
function
TestMultipleTopics
Parameters
t
pkg/relay/manager/manager_test.go:104-144
func TestMultipleTopics(t *testing.T)
{
r := New()
ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
Register[int](r, "topic.a", func(ctx context.Context, p int) error {
ch1 <- p
return nil
})
Register[int](r, "topic.b", func(ctx context.Context, p int) error {
ch2 <- p
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready, err := r.Start(ctx)
if err != nil {
t.Fatalf("Start failed: %v", err)
}
<-ready
Enqueue[int](context.Background(), r, "topic.a", 1)
Enqueue[int](context.Background(), r, "topic.b", 2)
var results [2]int
select {
case results[0] = <-ch1:
case <-time.After(time.Second):
t.Fatal("timeout for topic.a")
}
select {
case results[1] = <-ch2:
case <-time.After(time.Second):
t.Fatal("timeout for topic.b")
}
if results[0] != 1 || results[1] != 2 {
t.Errorf("got %v, want [1 2]", results[:])
}
}
F
function
TestEnqueueAndRegisterAreThreadSafe
Parameters
t
pkg/relay/manager/manager_test.go:146-167
func TestEnqueueAndRegisterAreThreadSafe(t *testing.T)
{
r := New()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
handler := func(ctx context.Context, p int) error { return nil }
Register[int](r, "", handler)
}(i)
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Enqueue[int](context.Background(), r, "", 0)
}()
}
wg.Wait()
}
F
function
BenchmarkEnqueue
Parameters
b
pkg/relay/manager/manager_test.go:169-180
func BenchmarkEnqueue(b *testing.B)
{
r := New()
Register[int](r, "bench", func(ctx context.Context, p int) error { return nil })
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Start(ctx)
b.ResetTimer()
for i := 0; i < b.N; i++ {
Enqueue[int](context.Background(), r, "bench", i)
}
}