manager API

manager

package

API reference for the manager package.

S
struct

Job

Job represents a relay job with topic and payload.
Job represents a unit of work with metadata.

pkg/relay/manager/manager.go:16-23
type Job struct

Fields

Name Type Description
ID string
Queue string
Topic string
Payload []byte
CreatedAt time.Time
TryCount int
T
type

Handler

Handler processes a typed relay payload.
Handler processes a typed payload from the relay.

pkg/relay/manager/manager.go:27-27
type Handler func(ctx context.Context, payload T) error
I
interface

Broker

Broker publishes and subscribes to topics.

pkg/relay/manager/manager.go:30-33
type Broker interface

Methods

Publish
Method

Parameters

topic string
payload []byte

Returns

error
func Publish(...)
Subscribe
Method

Parameters

topic string
handler func(ctx context.Context, payload []byte) error

Returns

error
func Subscribe(...)
S
struct

Relay

Relay manages broker subscriptions and handlers.
Relay coordinates typed message handlers with a pluggable broker.

pkg/relay/manager/manager.go:37-41
type Relay struct

Methods

Start
Method

Start subscribes all registered handlers and returns a ready channel.

Parameters

Returns

<-chan struct{}
error
func (*Relay) Start(ctx context.Context) (<-chan struct{}, error)
{
	r.handlerMu.RLock()
	defer r.handlerMu.RUnlock()

	ready := make(chan struct{})

	for topic, wrapperFn := range r.handlers {
		userHandler := wrapperFn.(func(ctx context.Context, data []byte) error)

		err := r.broker.Subscribe(topic, func(ctx context.Context, data []byte) error {
			defer func() {
				if rec := recover(); rec != nil {
					fmt.Printf("panic in job %s: %v\n", topic, rec)
				}
			}()

			return userHandler(ctx, data)
		})
		if err != nil {
			close(ready)
			return ready, err
		}
	}

	close(ready)
	return ready, nil
}

Fields

Name Type Description
broker Broker
handlers map[string]any
handlerMu sync.RWMutex
T
type

Option

Option configures a Relay.

pkg/relay/manager/manager.go:44-44
type Option options.Option[Relay]
F
function

New

New creates a Relay with the given options.

Parameters

opts
...Option

Returns

pkg/relay/manager/manager.go:47-54
func New(opts ...Option) *Relay

{
	r := &Relay{
		broker:   broker.NewMemoryBroker(),
		handlers: make(map[string]any),
	}
	options.Apply(r, opts...)
	return r
}
F
function

WithBroker

WithBroker sets the broker implementation.

Parameters

b

Returns

pkg/relay/manager/manager.go:57-61
func WithBroker(b Broker) Option

{
	return func(r *Relay) {
		r.broker = b
	}
}
F
function

Register

Register adds a typed handler for a topic.

Parameters

r
topic
string
fn
Handler[T]
pkg/relay/manager/manager.go:64-76
func Register[T any](r *Relay, topic string, fn Handler[T])

{
	wrapper := func(ctx context.Context, raw []byte) error {
		var payload T
		if err := json.Unmarshal(raw, &payload); err != nil {
			return fmt.Errorf("payload unmarshal failed: %w", err)
		}
		return fn(ctx, payload)
	}

	r.handlerMu.Lock()
	r.handlers[topic] = wrapper
	r.handlerMu.Unlock()
}
F
function

Enqueue

Enqueue publishes a typed payload to a topic.

Parameters

r
topic
string
payload
T

Returns

error
pkg/relay/manager/manager.go:79-86
func Enqueue[T any](ctx context.Context, r *Relay, topic string, payload T) error

{
	data, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("payload marshal failed: %w", err)
	}

	return r.broker.Publish(ctx, topic, data)
}
S
struct

testPayload

pkg/relay/manager/manager_test.go:12-15
type testPayload struct

Fields

Name Type Description
Message string json:"message"
Value int json:"value"
F
function

TestNew_DefaultBroker

Parameters

pkg/relay/manager/manager_test.go:17-22
func TestNew_DefaultBroker(t *testing.T)

{
	r := New()
	if r == nil {
		t.Fatal("New returned nil")
	}
}
F
function

TestRegisterAndEnqueue

Parameters

pkg/relay/manager/manager_test.go:24-58
func TestRegisterAndEnqueue(t *testing.T)

{
	r := New()
	var mu sync.Mutex
	var received testPayload
	handlerCalled := make(chan struct{})

	Register[testPayload](r, "test.topic", func(ctx context.Context, p testPayload) error {
		mu.Lock()
		received = p
		mu.Unlock()
		close(handlerCalled)
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ready, err := r.Start(ctx)
	if err != nil {
		t.Fatalf("Start failed: %v", err)
	}
	<-ready

	Enqueue[testPayload](context.Background(), r, "test.topic", testPayload{Message: "hello", Value: 42})

	select {
	case <-handlerCalled:
		mu.Lock()
		if received.Message != "hello" || received.Value != 42 {
			t.Errorf("got %+v, want {Message:hello Value:42}", received)
		}
		mu.Unlock()
	case <-time.After(time.Second):
		t.Fatal("timeout waiting for handler")
	}
}
F
function

TestRegister_TypeSafety

Parameters

pkg/relay/manager/manager_test.go:60-73
func TestRegister_TypeSafety(t *testing.T)

{
	r := New()

	Register[testPayload](r, "topic1", func(ctx context.Context, p testPayload) error {
		return nil
	})
	Register[int](r, "topic2", func(ctx context.Context, p int) error {
		return nil
	})

	if len(r.handlers) != 2 {
		t.Errorf("expected 2 handlers, got %d", len(r.handlers))
	}
}
F
function

TestEnqueue_MarshalError

Parameters

pkg/relay/manager/manager_test.go:75-82
func TestEnqueue_MarshalError(t *testing.T)

{
	r := New()

	err := Enqueue[testPayload](context.Background(), r, "test", testPayload{})
	if err != nil {
		t.Fatalf("Enqueue failed: %v", err)
	}
}
F
function

TestBroker_Interface

Parameters

pkg/relay/manager/manager_test.go:84-90
func TestBroker_Interface(t *testing.T)

{
	b := broker.NewMemoryBroker()
	r := New(WithBroker(b))
	if r.broker != b {
		t.Error("broker not set correctly")
	}
}
F
function

TestStart_ContextCancel

Parameters

pkg/relay/manager/manager_test.go:92-102
func TestStart_ContextCancel(t *testing.T)

{
	r := New()
	ctx, cancel := context.WithCancel(context.Background())
	cancel()

	ready, err := r.Start(ctx)
	if err != nil {
		t.Fatalf("Start failed: %v", err)
	}
	<-ready
}
F
function

TestMultipleTopics

Parameters

pkg/relay/manager/manager_test.go:104-144
func TestMultipleTopics(t *testing.T)

{
	r := New()
	ch1 := make(chan int, 1)
	ch2 := make(chan int, 1)

	Register[int](r, "topic.a", func(ctx context.Context, p int) error {
		ch1 <- p
		return nil
	})
	Register[int](r, "topic.b", func(ctx context.Context, p int) error {
		ch2 <- p
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	ready, err := r.Start(ctx)
	if err != nil {
		t.Fatalf("Start failed: %v", err)
	}
	<-ready

	Enqueue[int](context.Background(), r, "topic.a", 1)
	Enqueue[int](context.Background(), r, "topic.b", 2)

	var results [2]int
	select {
	case results[0] = <-ch1:
	case <-time.After(time.Second):
		t.Fatal("timeout for topic.a")
	}
	select {
	case results[1] = <-ch2:
	case <-time.After(time.Second):
		t.Fatal("timeout for topic.b")
	}

	if results[0] != 1 || results[1] != 2 {
		t.Errorf("got %v, want [1 2]", results[:])
	}
}
F
function

TestEnqueueAndRegisterAreThreadSafe

Parameters

pkg/relay/manager/manager_test.go:146-167
func TestEnqueueAndRegisterAreThreadSafe(t *testing.T)

{
	r := New()

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			handler := func(ctx context.Context, p int) error { return nil }
			Register[int](r, "", handler)
		}(i)
	}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			Enqueue[int](context.Background(), r, "", 0)
		}()
	}
	wg.Wait()
}
F
function

BenchmarkEnqueue

Parameters

pkg/relay/manager/manager_test.go:169-180
func BenchmarkEnqueue(b *testing.B)

{
	r := New()
	Register[int](r, "bench", func(ctx context.Context, p int) error { return nil })
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go r.Start(ctx)

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		Enqueue[int](context.Background(), r, "bench", i)
	}
}