saga API

saga

package

API reference for the saga package.

F
function

TestWorkflow_Run_AllStepsSucceed

Parameters

pkg/saga/workflow_test.go:11-33
func TestWorkflow_Run_AllStepsSucceed(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error { executed = append(executed, "step2"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
	)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if len(executed) != 2 {
		t.Fatalf("expected 2 steps executed, got %v", executed)
	}
	if executed[0] != "step1" || executed[1] != "step2" {
		t.Errorf("unexpected order: %v", executed)
	}
}
F
function

TestWorkflow_Run_StepFails_Compensates

Parameters

pkg/saga/workflow_test.go:35-64
func TestWorkflow_Run_StepFails_Compensates(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error { executed = append(executed, "step2"); return errors.New("fail") },
		func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
	)
	wf.Add("step3",
		func(ctx context.Context) error { executed = append(executed, "step3"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo3"); return nil },
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}
	expect := []string{"step1", "step2", "undo1"}
	if len(executed) != len(expect) {
		t.Fatalf("expected %v, got %v", expect, executed)
	}
	for i, v := range expect {
		if executed[i] != v {
			t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
		}
	}
}
F
function

TestWorkflow_Run_ContextCancelled

Parameters

pkg/saga/workflow_test.go:66-90
func TestWorkflow_Run_ContextCancelled(t *testing.T)

{
	var executed []string
	ctx, cancel := context.WithCancel(context.Background())

	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("step2",
		func(ctx context.Context) error {
			cancel()
			return ctx.Err()
		},
		func(ctx context.Context) error { return nil },
	)

	err := wf.Run(ctx)
	if err == nil {
		t.Fatal("expected error from cancelled context")
	}
	if len(executed) != 2 {
		t.Fatalf("expected step1 and step2 executed, got %v", executed)
	}
}
F
function

TestWorkflow_Run_PanicInStep

Parameters

pkg/saga/workflow_test.go:92-119
func TestWorkflow_Run_PanicInStep(t *testing.T)

{
	var executed []string
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
	)
	wf.Add("panic-step",
		func(ctx context.Context) error {
			panic("something went wrong")
		},
		func(ctx context.Context) error { return nil },
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error from panic")
	}
	expect := []string{"step1", "undo1"}
	if len(executed) != len(expect) {
		t.Fatalf("expected %v, got %v", expect, executed)
	}
	for i, v := range expect {
		if executed[i] != v {
			t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
		}
	}
}
F
function

TestWorkflow_Run_GroupParallel

Parameters

pkg/saga/workflow_test.go:121-141
func TestWorkflow_Run_GroupParallel(t *testing.T)

{
	var mu sync.Mutex
	var executed []string
	wf := New()

	group := Group{
		{Name: "g1", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g1"); mu.Unlock(); return nil },
			Compensate: func(ctx context.Context) error { return nil }},
		{Name: "g2", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g2"); mu.Unlock(); return nil },
			Compensate: func(ctx context.Context) error { return nil }},
	}
	wf.AddGroup(group)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if len(executed) != 2 {
		t.Errorf("expected 2 group steps, got %v", executed)
	}
}
F
function

TestWorkflow_Run_CompensatePanicSafe

Parameters

pkg/saga/workflow_test.go:143-158
func TestWorkflow_Run_CompensatePanicSafe(t *testing.T)

{
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { panic("compensate panic") },
	)
	wf.Add("step2",
		func(ctx context.Context) error { return errors.New("fail") },
		nil,
	)

	err := wf.Run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}
}
F
function

TestWorkflow_Run_NoCompensateOnSuccess

Parameters

pkg/saga/workflow_test.go:160-175
func TestWorkflow_Run_NoCompensateOnSuccess(t *testing.T)

{
	var compensated bool
	wf := New()
	wf.Add("step1",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { compensated = true; return nil },
	)

	err := wf.Run(context.Background())
	if err != nil {
		t.Fatalf("Run failed: %v", err)
	}
	if compensated {
		t.Error("expected no compensation on success")
	}
}
F
function

TestWorkflow_Compensate_WithRetry

Parameters

pkg/saga/workflow_test.go:177-193
func TestWorkflow_Compensate_WithRetry(t *testing.T)

{
	attempts := 0
	do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
		func(ctx context.Context) error {
			attempts++
			return nil
		},
	)

	err := do(context.Background())
	if err != nil {
		t.Fatalf("WithRetry failed: %v", err)
	}
	if attempts != 1 {
		t.Errorf("expected 1 attempt on success, got %d", attempts)
	}
}
F
function

TestWorkflow_Compensate_WithRetryExhausted

Parameters

pkg/saga/workflow_test.go:195-211
func TestWorkflow_Compensate_WithRetryExhausted(t *testing.T)

{
	attempts := 0
	do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
		func(ctx context.Context) error {
			attempts++
			return errors.New("always fail")
		},
	)

	err := do(context.Background())
	if err == nil {
		t.Fatal("expected error after retry exhausted")
	}
	if attempts != 3 {
		t.Errorf("expected 3 attempts, got %d", attempts)
	}
}
S
struct

RetryPolicy

RetryPolicy defines retry behavior for saga steps.

pkg/saga/retry.go:11-15
type RetryPolicy struct

Fields

Name Type Description
MaxAttempts int
Delay time.Duration
Multiplier float64
F
function

WithRetry

WithRetry wraps a function with retry logic according to the given policy.

Parameters

policy
do
func(context.Context) error

Returns

func(context.Context)
error
pkg/saga/retry.go:18-28
func WithRetry(policy RetryPolicy, do func(context.Context) error) func(context.Context) error

{
	return func(ctx context.Context) error {
		return resiliency.Retry(ctx, func() error {
			return do(ctx)
		},
			resiliency.WithAttempts(policy.MaxAttempts),
			resiliency.WithDelay(policy.Delay, 24*time.Hour),
			resiliency.WithFactor(policy.Multiplier),
		)
	}
}
T
type

StepStatus

StepStatus represents the status of a saga step.

pkg/saga/store.go:15-15
type StepStatus string
S
struct

SagaState

SagaState holds the persisted state of a saga.

pkg/saga/store.go:31-41
type SagaState struct

Fields

Name Type Description
ID string json:"id"
Steps []StepState json:"steps"
Status StepStatus json:"status"
Error string json:"error,omitempty"
IdempotencyKey string json:"idempotency_key,omitempty"
RetryCount int json:"retry_count,omitempty"
MaxRetries int json:"max_retries,omitempty"
CreatedAt time.Time json:"created_at"
UpdatedAt time.Time json:"updated_at"
S
struct

StepState

StepState holds the persisted state of a single saga step.

pkg/saga/store.go:44-48
type StepState struct

Fields

Name Type Description
Name string json:"name"
Status StepStatus json:"status"
StepIndex int json:"step_index"
I
interface

SagaStore

SagaStore is the interface for saga state persistence.

pkg/saga/store.go:51-56
type SagaStore interface

Methods

Save
Method

Parameters

state *SagaState

Returns

error
func Save(...)
Load
Method

Parameters

id string

Returns

error
func Load(...)
Delete
Method

Parameters

id string

Returns

error
func Delete(...)

Returns

error
func ListIncomplete(...)
S
struct
Implements: SagaStore

MemoryStore

MemoryStore is an in-memory saga store for testing and single-process use.

pkg/saga/store.go:59-62
type MemoryStore struct

Methods

Save
Method

Parameters

state *SagaState

Returns

error
func (*MemoryStore) Save(state *SagaState) error
{
	m.mu.Lock()
	defer m.mu.Unlock()
	cp := *state
	cp.UpdatedAt = time.Now()
	if cp.CreatedAt.IsZero() {
		cp.CreatedAt = time.Now()
	}
	m.sagas[state.ID] = &cp
	return nil
}
Load
Method

Parameters

id string

Returns

error
func (*MemoryStore) Load(id string) (*SagaState, error)
{
	m.mu.RLock()
	defer m.mu.RUnlock()
	s, ok := m.sagas[id]
	if !ok {
		return nil, fmt.Errorf("saga %q not found", id)
	}
	cp := *s
	return &cp, nil
}
Delete
Method

Parameters

id string

Returns

error
func (*MemoryStore) Delete(id string) error
{
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.sagas, id)
	return nil
}

Returns

error
func (*MemoryStore) ListIncomplete() ([]*SagaState, error)
{
	m.mu.RLock()
	defer m.mu.RUnlock()
	var result []*SagaState
	for _, s := range m.sagas {
		if s.Status == StatusPending || s.Status == StatusFailed {
			cp := *s
			result = append(result, &cp)
		}
	}
	return result, nil
}

Fields

Name Type Description
mu sync.RWMutex
sagas map[string]*SagaState
F
function

NewMemoryStore

NewMemoryStore creates a new in-memory saga store.

Returns

pkg/saga/store.go:65-67
func NewMemoryStore() *MemoryStore

{
	return &MemoryStore{sagas: make(map[string]*SagaState)}
}
S
struct

IdempotencyRecorder

IdempotencyRecorder tracks processed idempotency keys to prevent double execution.

pkg/saga/store.go:113-116
type IdempotencyRecorder struct

Methods

Record
Method

Parameters

key string
state *SagaState

Returns

error
func (*IdempotencyRecorder) Record(key string, state *SagaState) error
{
	r.mu.Lock()
	defer r.mu.Unlock()
	if existing, ok := r.seen[key]; ok {
		return &IdempotencyError{Key: key, ExistingState: existing}
	}
	r.seen[key] = state
	return nil
}
Get
Method

Parameters

key string

Returns

bool
func (*IdempotencyRecorder) Get(key string) (*SagaState, bool)
{
	r.mu.RLock()
	defer r.mu.RUnlock()
	s, ok := r.seen[key]
	if !ok {
		return nil, false
	}
	cp := *s
	return &cp, true
}

Fields

Name Type Description
mu sync.RWMutex
seen map[string]*SagaState
F
function

NewIdempotencyRecorder

NewIdempotencyRecorder creates a new IdempotencyRecorder.

pkg/saga/store.go:119-121
func NewIdempotencyRecorder() *IdempotencyRecorder

{
	return &IdempotencyRecorder{seen: make(map[string]*SagaState)}
}
S
struct

IdempotencyError

IdempotencyError is returned when an idempotency key has already been processed.

pkg/saga/store.go:145-148
type IdempotencyError struct

Methods

Error
Method

Returns

string
func (*IdempotencyError) Error() string
{
	return fmt.Sprintf("idempotency key %q already processed", e.Key)
}
Is
Method

Parameters

target error

Returns

bool
func (*IdempotencyError) Is(target error) bool
{
	_, ok := target.(*IdempotencyError)
	return ok
}

Fields

Name Type Description
Key string
ExistingState *SagaState
S
struct

DeadLetterQueue

DeadLetterQueue holds failed saga states that exceeded max retries.

pkg/saga/store.go:160-163
type DeadLetterQueue struct

Methods

Enqueue
Method

Parameters

state *SagaState
reason string
func (*DeadLetterQueue) Enqueue(state *SagaState, reason string)
{
	dlq.mu.Lock()
	defer dlq.mu.Unlock()
	dlq.items = append(dlq.items, &DLQEntry{
		State:     state,
		Reason:    reason,
		DeadSince: time.Now(),
	})
}
List
Method

Returns

func (*DeadLetterQueue) List() []*DLQEntry
{
	dlq.mu.RLock()
	defer dlq.mu.RUnlock()
	result := make([]*DLQEntry, len(dlq.items))
	copy(result, dlq.items)
	return result
}
Len
Method

Returns

int
func (*DeadLetterQueue) Len() int
{
	dlq.mu.RLock()
	defer dlq.mu.RUnlock()
	return len(dlq.items)
}
Remove
Method

Parameters

id string
func (*DeadLetterQueue) Remove(id string)
{
	dlq.mu.Lock()
	defer dlq.mu.Unlock()
	for i, entry := range dlq.items {
		if entry.State.ID == id {
			dlq.items = append(dlq.items[:i], dlq.items[i+1:]...)
			return
		}
	}
}

Fields

Name Type Description
mu sync.RWMutex
items []*DLQEntry
S
struct

DLQEntry

DLQEntry holds a dead-lettered saga state and the reason it was moved.

pkg/saga/store.go:166-170
type DLQEntry struct

Fields

Name Type Description
State *SagaState
Reason string
DeadSince time.Time
F
function

NewDeadLetterQueue

NewDeadLetterQueue creates a new DeadLetterQueue.

Returns

pkg/saga/store.go:173-175
func NewDeadLetterQueue() *DeadLetterQueue

{
	return &DeadLetterQueue{items: make([]*DLQEntry, 0)}
}
F
function

ProcessWithDLQ

ProcessWithDLQ retries dead sagas up to maxRetries, then moves to dead state.

Parameters

store
maxRetries
int

Returns

func(id
string, runFunc func(ctx context.Context) error) func(ctx context.Context) error
pkg/saga/store.go:213-239
func ProcessWithDLQ(store SagaStore, dlq *DeadLetterQueue, maxRetries int) func(id string, runFunc func(ctx context.Context) error) func(ctx context.Context) error

{
	return func(id string, runFunc func(ctx context.Context) error) func(ctx context.Context) error {
		return func(ctx context.Context) error {
			state, err := store.Load(id)
			if err != nil {
				return err
			}
			if state.RetryCount >= maxRetries {
				state.Status = StatusDead
				state.Error = errors.New("exceeded max retries").Error()
				_ = store.Save(state)
				dlq.Enqueue(state, "max retries exceeded")
				return fmt.Errorf("saga %q dead: max retries exceeded", id)
			}
			state.RetryCount++
			if err := runFunc(ctx); err != nil {
				state.Status = StatusFailed
				state.Error = err.Error()
				_ = store.Save(state)
				return err
			}
			state.Status = StatusCompleted
			_ = store.Save(state)
			return nil
		}
	}
}
S
struct
Implements: SagaStore

FileStore

FileStore persists saga state to disk.

pkg/saga/store.go:242-245
type FileStore struct

Methods

Save
Method

Parameters

state *SagaState

Returns

error
func (*FileStore) Save(state *SagaState) error
{
	s.mu.Lock()
	defer s.mu.Unlock()

	state.UpdatedAt = time.Now()
	if state.CreatedAt.IsZero() {
		state.CreatedAt = time.Now()
	}

	data, err := json.MarshalIndent(state, "", "  ")
	if err != nil {
		return fmt.Errorf("saga store: marshal: %w", err)
	}

	path := filepath.Join(s.dir, state.ID+".json")
	tmp := path + ".tmp"
	if err := os.WriteFile(tmp, data, 0644); err != nil {
		return fmt.Errorf("saga store: write: %w", err)
	}
	if err := os.Rename(tmp, path); err != nil {
		return fmt.Errorf("saga store: rename: %w", err)
	}
	return nil
}
Load
Method

Parameters

id string

Returns

error
func (*FileStore) Load(id string) (*SagaState, error)
{
	s.mu.RLock()
	defer s.mu.RUnlock()

	path := filepath.Join(s.dir, id+".json")
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, fmt.Errorf("saga store: read %s: %w", id, err)
	}

	var state SagaState
	if err := json.Unmarshal(data, &state); err != nil {
		return nil, fmt.Errorf("saga store: unmarshal %s: %w", id, err)
	}
	return &state, nil
}
Delete
Method

Parameters

id string

Returns

error
func (*FileStore) Delete(id string) error
{
	s.mu.Lock()
	defer s.mu.Unlock()

	path := filepath.Join(s.dir, id+".json")
	return os.Remove(path)
}

Returns

error
func (*FileStore) ListIncomplete() ([]*SagaState, error)
{
	s.mu.RLock()
	defer s.mu.RUnlock()

	entries, err := os.ReadDir(s.dir)
	if err != nil {
		return nil, fmt.Errorf("saga store: readdir: %w", err)
	}

	var result []*SagaState
	for _, entry := range entries {
		if filepath.Ext(entry.Name()) != ".json" {
			continue
		}
		data, err := os.ReadFile(filepath.Join(s.dir, entry.Name()))
		if err != nil {
			continue
		}
		var state SagaState
		if err := json.Unmarshal(data, &state); err != nil {
			continue
		}
		if state.Status == StatusPending || state.Status == StatusFailed {
			result = append(result, &state)
		}
	}
	return result, nil
}

Fields

Name Type Description
dir string
mu sync.RWMutex
F
function

NewStore

NewStore creates a FileStore that writes state JSON files to dir.

Parameters

dir
string

Returns

error
pkg/saga/store.go:248-253
func NewStore(dir string) (*FileStore, error)

{
	if err := os.MkdirAll(dir, 0755); err != nil {
		return nil, fmt.Errorf("saga store: cannot create dir %s: %w", dir, err)
	}
	return &FileStore{dir: dir}, nil
}
S
struct

RecoverableWorkflow

RecoverableWorkflow is a saga that persists state for crash recovery.

pkg/saga/store.go:335-340
type RecoverableWorkflow struct

Methods

Add
Method

Parameters

name string
do func(ctx context.Context) error
compensate func(ctx context.Context) error
func (*RecoverableWorkflow) Add(name string, do, compensate func(ctx context.Context) error)
{
	idx := len(rw.state.Steps)
	rw.state.Steps = append(rw.state.Steps, StepState{
		Name:      name,
		Status:    StatusPending,
		StepIndex: idx,
	})
	rw.Workflow.Add(name, do, compensate)
}
Run
Method

Parameters

Returns

error
func (*RecoverableWorkflow) Run(ctx context.Context) error
{
	rw.state.Status = StatusPending
	_ = rw.store.Save(rw.state)

	for i, item := range rw.steps {
		if ctx.Err() != nil {
			rw.state.Status = StatusFailed
			rw.state.Error = ctx.Err().Error()
			_ = rw.store.Save(rw.state)
			return rw.rollback(ctx, ctx.Err())
		}

		var err error
		switch v := item.(type) {
		case Step:
			err = rw.runStepTracking(ctx, v, i)
		case Group:
			err = rw.runGroupTracking(ctx, v, i)
		}

		if err != nil {
			rw.state.Status = StatusFailed
			rw.state.Error = err.Error()
			_ = rw.store.Save(rw.state)
			return rw.rollback(ctx, err)
		}
	}

	rw.state.Status = StatusCompleted
	_ = rw.store.Save(rw.state)
	return nil
}

Parameters

step Step
stepIndex int

Returns

error
func (*RecoverableWorkflow) runStepTracking(ctx context.Context, step Step, stepIndex int) error
{
	if err := step.Do(ctx); err != nil {
		return fmt.Errorf("step '%s' failed: %w", step.Name, err)
	}

	rw.mu.Lock()
	if step.Compensate != nil {
		rw.stack = append(rw.stack, step)
	}
	rw.mu.Unlock()

	if stepIndex < len(rw.state.Steps) {
		rw.state.Steps[stepIndex].Status = StatusCompleted
		_ = rw.store.Save(rw.state)
	}
	return nil
}

Parameters

group Group
stepIndex int

Returns

error
func (*RecoverableWorkflow) runGroupTracking(ctx context.Context, group Group, stepIndex int) error
{
	var wg sync.WaitGroup
	errChan := make(chan error, len(group))

	for _, step := range group {
		wg.Add(1)
		go func(s Step) {
			defer wg.Done()
			if err := rw.runStepTracking(ctx, s, stepIndex); err != nil {
				errChan <- err
			}
		}(step)
	}

	wg.Wait()
	close(errChan)

	if len(errChan) > 0 {
		var errs []error
		for e := range errChan {
			errs = append(errs, e)
		}
		return fmt.Errorf("group failed: %w", joinErrors(errs))
	}
	return nil
}
rollback
Method

Parameters

triggerErr error

Returns

error
func (*RecoverableWorkflow) rollback(ctx context.Context, triggerErr error) error
{
	rollbackCtx := context.WithoutCancel(ctx)
	var errs []error
	errs = append(errs, triggerErr)

	rw.mu.Lock()
	defer rw.mu.Unlock()

	for i := len(rw.stack) - 1; i >= 0; i-- {
		step := rw.stack[i]
		if err := step.Compensate(rollbackCtx); err != nil {
			errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
		} else {
			for j := range rw.state.Steps {
				if rw.state.Steps[j].Name == step.Name {
					rw.state.Steps[j].Status = StatusCompensated
					break
				}
			}
		}
		_ = rw.store.Save(rw.state)
	}

	return joinErrors(errs)
}

Fields

Name Type Description
id string
store SagaStore
state *SagaState
F
function

NewRecoverable

NewRecoverable creates a saga that persists state using the given store.

Parameters

id
string
store
pkg/saga/store.go:343-353
func NewRecoverable(id string, store SagaStore) *RecoverableWorkflow

{
	return &RecoverableWorkflow{
		Workflow: New(),
		id:       id,
		store:    store,
		state: &SagaState{
			ID:     id,
			Status: StatusPending,
		},
	}
}
F
function

joinErrors

Parameters

errs
[]error

Returns

error
pkg/saga/store.go:469-474
func joinErrors(errs []error) error

{
	if len(errs) == 0 {
		return nil
	}
	return fmt.Errorf("%v", errs)
}
F
function

TestStore_SaveAndLoad

Parameters

pkg/saga/store_test.go:11-43
func TestStore_SaveAndLoad(t *testing.T)

{
	dir := t.TempDir()
	store, err := NewStore(dir)
	if err != nil {
		t.Fatalf("NewStore: %v", err)
	}

	state := &SagaState{
		ID:     "test-1",
		Status: StatusPending,
		Steps: []StepState{
			{Name: "reserve", Status: StatusCompleted, StepIndex: 0},
			{Name: "charge", Status: StatusPending, StepIndex: 1},
		},
	}
	if err := store.Save(state); err != nil {
		t.Fatalf("Save: %v", err)
	}

	loaded, err := store.Load("test-1")
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if loaded.ID != "test-1" {
		t.Errorf("ID = %q, want %q", loaded.ID, "test-1")
	}
	if len(loaded.Steps) != 2 {
		t.Errorf("len(Steps) = %d, want 2", len(loaded.Steps))
	}
	if loaded.Steps[0].Status != StatusCompleted {
		t.Errorf("step[0] status = %q, want %q", loaded.Steps[0].Status, StatusCompleted)
	}
}
F
function

TestStore_Delete

Parameters

pkg/saga/store_test.go:45-60
func TestStore_Delete(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewStore(dir)

	state := &SagaState{ID: "del-me", Status: StatusPending}
	store.Save(state)

	if err := store.Delete("del-me"); err != nil {
		t.Fatalf("Delete: %v", err)
	}

	_, err := store.Load("del-me")
	if err == nil {
		t.Error("expected error after delete")
	}
}
F
function

TestStore_ListIncomplete

Parameters

pkg/saga/store_test.go:62-77
func TestStore_ListIncomplete(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewStore(dir)

	store.Save(&SagaState{ID: "s1", Status: StatusPending})
	store.Save(&SagaState{ID: "s2", Status: StatusCompleted})
	store.Save(&SagaState{ID: "s3", Status: StatusFailed})

	list, err := store.ListIncomplete()
	if err != nil {
		t.Fatalf("ListIncomplete: %v", err)
	}
	if len(list) != 2 {
		t.Errorf("got %d incomplete, want 2", len(list))
	}
}
F
function

TestRecoverableWorkflow_AllStepsSucceed

Parameters

pkg/saga/store_test.go:79-106
func TestRecoverableWorkflow_AllStepsSucceed(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewStore(dir)

	var executed []string
	rw := NewRecoverable("order-1", store)
	rw.Add("reserve",
		func(ctx context.Context) error { executed = append(executed, "reserve"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo-reserve"); return nil },
	)
	rw.Add("charge",
		func(ctx context.Context) error { executed = append(executed, "charge"); return nil },
		func(ctx context.Context) error { executed = append(executed, "undo-charge"); return nil },
	)

	err := rw.Run(context.Background())
	if err != nil {
		t.Fatalf("Run: %v", err)
	}
	if len(executed) != 2 {
		t.Errorf("executed %v, want 2 steps", executed)
	}

	state, _ := store.Load("order-1")
	if state.Status != StatusCompleted {
		t.Errorf("status = %q, want %q", state.Status, StatusCompleted)
	}
}
F
function

TestRecoverableWorkflow_StepFails_Persists

Parameters

pkg/saga/store_test.go:108-134
func TestRecoverableWorkflow_StepFails_Persists(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewStore(dir)

	rw := NewRecoverable("order-2", store)
	rw.Add("reserve",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { return nil },
	)
	rw.Add("charge",
		func(ctx context.Context) error { return errors.New("card declined") },
		func(ctx context.Context) error { return nil },
	)

	err := rw.Run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}

	state, _ := store.Load("order-2")
	if state.Status != StatusFailed {
		t.Errorf("status = %q, want %q", state.Status, StatusFailed)
	}
	if state.Steps[0].Status != StatusCompensated {
		t.Errorf("step[0] = %q, want %q", state.Steps[0].Status, StatusCompensated)
	}
}
F
function

TestRecoverableWorkflow_CrashRecovery

Parameters

pkg/saga/store_test.go:136-169
func TestRecoverableWorkflow_CrashRecovery(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewStore(dir)

	rw := NewRecoverable("order-3", store)
	rw.Add("reserve",
		func(ctx context.Context) error { return nil },
		func(ctx context.Context) error { return nil },
	)
	rw.Add("charge",
		func(ctx context.Context) error { return errors.New("crash") },
		func(ctx context.Context) error { return nil },
	)

	rw.Run(context.Background())

	incomplete, err := store.ListIncomplete()
	if err != nil {
		t.Fatalf("ListIncomplete: %v", err)
	}
	if len(incomplete) != 1 {
		t.Fatalf("got %d incomplete sagas, want 1", len(incomplete))
	}
	if incomplete[0].ID != "order-3" {
		t.Errorf("ID = %q, want %q", incomplete[0].ID, "order-3")
	}

	_ = filepath.Join(dir, "order-3.json")
	data, err := os.ReadFile(filepath.Join(dir, "order-3.json"))
	if err != nil {
		t.Fatalf("read file: %v", err)
	}
	t.Logf("persisted state: %s", string(data))
}
F
function

TestMemoryStore_CRUD

Parameters

pkg/saga/store_test.go:173-200
func TestMemoryStore_CRUD(t *testing.T)

{
	store := NewMemoryStore()

	state := &SagaState{ID: "m1", Status: StatusPending}
	if err := store.Save(state); err != nil {
		t.Fatalf("Save: %v", err)
	}

	loaded, err := store.Load("m1")
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if loaded.ID != "m1" {
		t.Errorf("ID = %q, want m1", loaded.ID)
	}

	store.Save(&SagaState{ID: "m2", Status: StatusCompleted})
	incomplete, _ := store.ListIncomplete()
	if len(incomplete) != 1 {
		t.Errorf("incomplete = %d, want 1", len(incomplete))
	}

	store.Delete("m1")
	_, err = store.Load("m1")
	if err == nil {
		t.Error("expected error after delete")
	}
}
F
function

TestIdempotencyRecorder

Parameters

pkg/saga/store_test.go:202-231
func TestIdempotencyRecorder(t *testing.T)

{
	rec := NewIdempotencyRecorder()

	state1 := &SagaState{ID: "s1", Status: StatusCompleted}
	if err := rec.Record("key-1", state1); err != nil {
		t.Fatalf("Record: %v", err)
	}

	err := rec.Record("key-1", &SagaState{ID: "s2"})
	if err == nil {
		t.Fatal("expected idempotency error")
	}
	var idErr *IdempotencyError
	if !errors.As(err, &idErr) {
		t.Errorf("error type = %T, want *IdempotencyError", err)
	}

	got, ok := rec.Get("key-1")
	if !ok {
		t.Fatal("expected key found")
	}
	if got.ID != "s1" {
		t.Errorf("got ID = %q, want s1", got.ID)
	}

	_, ok = rec.Get("missing")
	if ok {
		t.Error("expected missing key not found")
	}
}
F
function

TestDeadLetterQueue

Parameters

pkg/saga/store_test.go:233-258
func TestDeadLetterQueue(t *testing.T)

{
	dlq := NewDeadLetterQueue()

	state := &SagaState{ID: "dead-1", Status: StatusFailed, RetryCount: 3}
	dlq.Enqueue(state, "max retries exceeded")

	if dlq.Len() != 1 {
		t.Errorf("Len = %d, want 1", dlq.Len())
	}

	entries := dlq.List()
	if len(entries) != 1 {
		t.Fatalf("List len = %d, want 1", len(entries))
	}
	if entries[0].State.ID != "dead-1" {
		t.Errorf("entry ID = %q, want dead-1", entries[0].State.ID)
	}
	if entries[0].Reason != "max retries exceeded" {
		t.Errorf("reason = %q, want max retries exceeded", entries[0].Reason)
	}

	dlq.Remove("dead-1")
	if dlq.Len() != 0 {
		t.Errorf("Len after remove = %d, want 0", dlq.Len())
	}
}
F
function

TestProcessWithDLQ_ExceedsRetries

Parameters

pkg/saga/store_test.go:260-284
func TestProcessWithDLQ_ExceedsRetries(t *testing.T)

{
	store := NewMemoryStore()
	dlq := NewDeadLetterQueue()

	state := &SagaState{ID: "dlq-1", Status: StatusFailed, RetryCount: 3, MaxRetries: 3}
	store.Save(state)

	processor := ProcessWithDLQ(store, dlq, 3)
	run := processor("dlq-1", func(ctx context.Context) error {
		return errors.New("still failing")
	})

	err := run(context.Background())
	if err == nil {
		t.Fatal("expected error")
	}

	updated, _ := store.Load("dlq-1")
	if updated.Status != StatusDead {
		t.Errorf("status = %q, want %q", updated.Status, StatusDead)
	}
	if dlq.Len() != 1 {
		t.Errorf("dlq len = %d, want 1", dlq.Len())
	}
}
F
function

TestProcessWithDLQ_RetrySucceeds

Parameters

pkg/saga/store_test.go:286-312
func TestProcessWithDLQ_RetrySucceeds(t *testing.T)

{
	store := NewMemoryStore()
	dlq := NewDeadLetterQueue()

	state := &SagaState{ID: "retry-1", Status: StatusFailed, RetryCount: 1, MaxRetries: 3}
	store.Save(state)

	processor := ProcessWithDLQ(store, dlq, 3)
	run := processor("retry-1", func(ctx context.Context) error {
		return nil
	})

	if err := run(context.Background()); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	updated, _ := store.Load("retry-1")
	if updated.Status != StatusCompleted {
		t.Errorf("status = %q, want %q", updated.Status, StatusCompleted)
	}
	if updated.RetryCount != 2 {
		t.Errorf("retry count = %d, want 2", updated.RetryCount)
	}
	if dlq.Len() != 0 {
		t.Errorf("dlq len = %d, want 0", dlq.Len())
	}
}
F
function

TestSagaStore_Interface

Parameters

pkg/saga/store_test.go:314-320
func TestSagaStore_Interface(t *testing.T)

{
	var _ SagaStore = NewMemoryStore()
	var _ SagaStore = &FileStore{}
	dir := t.TempDir()
	fs, _ := NewStore(dir)
	var _ SagaStore = fs
}
S
struct

Step

Step defines a single saga step with Do and Compensate actions.
Step defines a saga step with a do and compensate action.

pkg/saga/workflow.go:12-16
type Step struct

Fields

Name Type Description
Name string
Do func(ctx context.Context) error
Compensate func(ctx context.Context) error
T
type

Group

Group is a slice of Steps executed in parallel.
Group is a collection of steps executed in parallel.

pkg/saga/workflow.go:20-20
type Group []Step
S
struct

Workflow

Workflow orchestrates a saga with rollback support.

pkg/saga/workflow.go:23-27
type Workflow struct

Methods

Add
Method

Add appends a step to the workflow.

Parameters

name string
do func(ctx context.Context) error
compensate func(ctx context.Context) error
func (*Workflow) Add(name string, do, compensate func(ctx context.Context) error)
{
	w.steps = append(w.steps, Step{
		Name:       name,
		Do:         do,
		Compensate: compensate,
	})
}
AddGroup
Method

AddGroup appends a parallel step group to the workflow.

Parameters

g Group
func (*Workflow) AddGroup(g Group)
{
	w.steps = append(w.steps, g)
}
Run
Method

Run executes all steps in order, rolling back on failure.

Parameters

Returns

error
func (*Workflow) Run(ctx context.Context) error
{
	for _, item := range w.steps {
		// Check Context before starting step
		if ctx.Err() != nil {
			return w.rollback(ctx, ctx.Err())
		}

		var err error
		switch v := item.(type) {
		case Step:
			err = w.runStep(ctx, v)
		case Group:
			err = w.runGroup(ctx, v)
		}

		if err != nil {
			return w.rollback(ctx, err)
		}
	}
	return nil
}
runStep
Method

Parameters

Returns

err error
func (*Workflow) runStep(ctx context.Context, step Step) (err error)
{
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("panic in step '%s': %v", step.Name, r)
		}
	}()

	if err := step.Do(ctx); err != nil {
		return fmt.Errorf("step '%s' failed: %w", step.Name, err)
	}

	w.mu.Lock()
	if step.Compensate != nil {
		w.stack = append(w.stack, step)
	}
	w.mu.Unlock()

	return nil
}
runGroup
Method

Parameters

Returns

error
func (*Workflow) runGroup(ctx context.Context, group Group) error
{
	var wg sync.WaitGroup
	errChan := make(chan error, len(group))

	// Temporarily store successful steps in this group to add to stack later
	// If the group fails, we only compensate what succeeded inside the group?
	// Actually, if we use w.runStep() inside goroutine, it appends to w.stack safely.
	// But we must handle partial failure rollback within the group logic or rely on main rollback?
	// For simplicity, we let them append to stack. If one fails, Run() returns error and triggers rollback of everything in stack.

	for _, step := range group {
		wg.Add(1)
		go func(s Step) {
			defer wg.Done()
			if err := w.runStep(ctx, s); err != nil {
				errChan <- err
			}
		}(step)
	}

	wg.Wait()
	close(errChan)

	if len(errChan) > 0 {
		var errs []error
		for e := range errChan {
			errs = append(errs, e)
		}
		return errors.Join(errs...)
	}
	return nil
}
rollback
Method

Parameters

triggerErr error

Returns

error
func (*Workflow) rollback(ctx context.Context, triggerErr error) error
{
	rollbackCtx := context.WithoutCancel(ctx)
	var errs []error
	errs = append(errs, triggerErr)

	// LIFO
	w.mu.Lock()
	defer w.mu.Unlock()

	for i := len(w.stack) - 1; i >= 0; i-- {
		step := w.stack[i]
		if err := w.safeCompensate(rollbackCtx, step); err != nil {
			errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
		}
	}

	return errors.Join(errs...)
}

Parameters

Returns

err error
func (*Workflow) safeCompensate(ctx context.Context, step Step) (err error)
{
	defer func() {
		if r := recover(); r != nil {
			err = fmt.Errorf("panic during compensation: %v", r)
		}
	}()
	return step.Compensate(ctx)
}

Fields

Name Type Description
steps []any
stack []Step
mu sync.Mutex
F
function

New

New creates an empty Workflow.

Returns

pkg/saga/workflow.go:30-34
func New() *Workflow

{
	return &Workflow{
		stack: make([]Step, 0),
	}
}