saga
packageAPI reference for the saga
package.
Imports
(10)TestWorkflow_Run_AllStepsSucceed
Parameters
func TestWorkflow_Run_AllStepsSucceed(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error { executed = append(executed, "step2"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if len(executed) != 2 {
t.Fatalf("expected 2 steps executed, got %v", executed)
}
if executed[0] != "step1" || executed[1] != "step2" {
t.Errorf("unexpected order: %v", executed)
}
}
TestWorkflow_Run_StepFails_Compensates
Parameters
func TestWorkflow_Run_StepFails_Compensates(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error { executed = append(executed, "step2"); return errors.New("fail") },
func(ctx context.Context) error { executed = append(executed, "undo2"); return nil },
)
wf.Add("step3",
func(ctx context.Context) error { executed = append(executed, "step3"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo3"); return nil },
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error")
}
expect := []string{"step1", "step2", "undo1"}
if len(executed) != len(expect) {
t.Fatalf("expected %v, got %v", expect, executed)
}
for i, v := range expect {
if executed[i] != v {
t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
}
}
}
TestWorkflow_Run_ContextCancelled
Parameters
func TestWorkflow_Run_ContextCancelled(t *testing.T)
{
var executed []string
ctx, cancel := context.WithCancel(context.Background())
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("step2",
func(ctx context.Context) error {
cancel()
return ctx.Err()
},
func(ctx context.Context) error { return nil },
)
err := wf.Run(ctx)
if err == nil {
t.Fatal("expected error from cancelled context")
}
if len(executed) != 2 {
t.Fatalf("expected step1 and step2 executed, got %v", executed)
}
}
TestWorkflow_Run_PanicInStep
Parameters
func TestWorkflow_Run_PanicInStep(t *testing.T)
{
var executed []string
wf := New()
wf.Add("step1",
func(ctx context.Context) error { executed = append(executed, "step1"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo1"); return nil },
)
wf.Add("panic-step",
func(ctx context.Context) error {
panic("something went wrong")
},
func(ctx context.Context) error { return nil },
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error from panic")
}
expect := []string{"step1", "undo1"}
if len(executed) != len(expect) {
t.Fatalf("expected %v, got %v", expect, executed)
}
for i, v := range expect {
if executed[i] != v {
t.Errorf("executed[%d] = %q, want %q", i, executed[i], v)
}
}
}
TestWorkflow_Run_GroupParallel
Parameters
func TestWorkflow_Run_GroupParallel(t *testing.T)
{
var mu sync.Mutex
var executed []string
wf := New()
group := Group{
{Name: "g1", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g1"); mu.Unlock(); return nil },
Compensate: func(ctx context.Context) error { return nil }},
{Name: "g2", Do: func(ctx context.Context) error { mu.Lock(); executed = append(executed, "g2"); mu.Unlock(); return nil },
Compensate: func(ctx context.Context) error { return nil }},
}
wf.AddGroup(group)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if len(executed) != 2 {
t.Errorf("expected 2 group steps, got %v", executed)
}
}
TestWorkflow_Run_CompensatePanicSafe
Parameters
func TestWorkflow_Run_CompensatePanicSafe(t *testing.T)
{
wf := New()
wf.Add("step1",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { panic("compensate panic") },
)
wf.Add("step2",
func(ctx context.Context) error { return errors.New("fail") },
nil,
)
err := wf.Run(context.Background())
if err == nil {
t.Fatal("expected error")
}
}
TestWorkflow_Run_NoCompensateOnSuccess
Parameters
func TestWorkflow_Run_NoCompensateOnSuccess(t *testing.T)
{
var compensated bool
wf := New()
wf.Add("step1",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { compensated = true; return nil },
)
err := wf.Run(context.Background())
if err != nil {
t.Fatalf("Run failed: %v", err)
}
if compensated {
t.Error("expected no compensation on success")
}
}
TestWorkflow_Compensate_WithRetry
Parameters
func TestWorkflow_Compensate_WithRetry(t *testing.T)
{
attempts := 0
do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
func(ctx context.Context) error {
attempts++
return nil
},
)
err := do(context.Background())
if err != nil {
t.Fatalf("WithRetry failed: %v", err)
}
if attempts != 1 {
t.Errorf("expected 1 attempt on success, got %d", attempts)
}
}
TestWorkflow_Compensate_WithRetryExhausted
Parameters
func TestWorkflow_Compensate_WithRetryExhausted(t *testing.T)
{
attempts := 0
do := WithRetry(RetryPolicy{MaxAttempts: 3, Delay: 10 * time.Millisecond, Multiplier: 1.0},
func(ctx context.Context) error {
attempts++
return errors.New("always fail")
},
)
err := do(context.Background())
if err == nil {
t.Fatal("expected error after retry exhausted")
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
}
RetryPolicy
RetryPolicy defines retry behavior for saga steps.
type RetryPolicy struct
Fields
| Name | Type | Description |
|---|---|---|
| MaxAttempts | int | |
| Delay | time.Duration | |
| Multiplier | float64 |
WithRetry
WithRetry wraps a function with retry logic according to the given policy.
Parameters
Returns
func WithRetry(policy RetryPolicy, do func(context.Context) error) func(context.Context) error
{
return func(ctx context.Context) error {
return resiliency.Retry(ctx, func() error {
return do(ctx)
},
resiliency.WithAttempts(policy.MaxAttempts),
resiliency.WithDelay(policy.Delay, 24*time.Hour),
resiliency.WithFactor(policy.Multiplier),
)
}
}
Uses
StepStatus
StepStatus represents the status of a saga step.
type StepStatus string
SagaState
SagaState holds the persisted state of a saga.
type SagaState struct
Fields
| Name | Type | Description |
|---|---|---|
| ID | string | json:"id" |
| Steps | []StepState | json:"steps" |
| Status | StepStatus | json:"status" |
| Error | string | json:"error,omitempty" |
| IdempotencyKey | string | json:"idempotency_key,omitempty" |
| RetryCount | int | json:"retry_count,omitempty" |
| MaxRetries | int | json:"max_retries,omitempty" |
| CreatedAt | time.Time | json:"created_at" |
| UpdatedAt | time.Time | json:"updated_at" |
Uses
StepState
StepState holds the persisted state of a single saga step.
type StepState struct
Fields
| Name | Type | Description |
|---|---|---|
| Name | string | json:"name" |
| Status | StepStatus | json:"status" |
| StepIndex | int | json:"step_index" |
Uses
SagaStore
SagaStore is the interface for saga state persistence.
type SagaStore interface
MemoryStore
MemoryStore is an in-memory saga store for testing and single-process use.
type MemoryStore struct
Methods
Parameters
Returns
func (*MemoryStore) Save(state *SagaState) error
{
m.mu.Lock()
defer m.mu.Unlock()
cp := *state
cp.UpdatedAt = time.Now()
if cp.CreatedAt.IsZero() {
cp.CreatedAt = time.Now()
}
m.sagas[state.ID] = &cp
return nil
}
Parameters
Returns
func (*MemoryStore) Load(id string) (*SagaState, error)
{
m.mu.RLock()
defer m.mu.RUnlock()
s, ok := m.sagas[id]
if !ok {
return nil, fmt.Errorf("saga %q not found", id)
}
cp := *s
return &cp, nil
}
Parameters
Returns
func (*MemoryStore) Delete(id string) error
{
m.mu.Lock()
defer m.mu.Unlock()
delete(m.sagas, id)
return nil
}
Returns
func (*MemoryStore) ListIncomplete() ([]*SagaState, error)
{
m.mu.RLock()
defer m.mu.RUnlock()
var result []*SagaState
for _, s := range m.sagas {
if s.Status == StatusPending || s.Status == StatusFailed {
cp := *s
result = append(result, &cp)
}
}
return result, nil
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| sagas | map[string]*SagaState |
NewMemoryStore
NewMemoryStore creates a new in-memory saga store.
Returns
func NewMemoryStore() *MemoryStore
{
return &MemoryStore{sagas: make(map[string]*SagaState)}
}
IdempotencyRecorder
IdempotencyRecorder tracks processed idempotency keys to prevent double execution.
type IdempotencyRecorder struct
Methods
Parameters
Returns
func (*IdempotencyRecorder) Record(key string, state *SagaState) error
{
r.mu.Lock()
defer r.mu.Unlock()
if existing, ok := r.seen[key]; ok {
return &IdempotencyError{Key: key, ExistingState: existing}
}
r.seen[key] = state
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| seen | map[string]*SagaState |
NewIdempotencyRecorder
NewIdempotencyRecorder creates a new IdempotencyRecorder.
Returns
func NewIdempotencyRecorder() *IdempotencyRecorder
{
return &IdempotencyRecorder{seen: make(map[string]*SagaState)}
}
IdempotencyError
IdempotencyError is returned when an idempotency key has already been processed.
type IdempotencyError struct
Methods
Fields
| Name | Type | Description |
|---|---|---|
| Key | string | |
| ExistingState | *SagaState |
DeadLetterQueue
DeadLetterQueue holds failed saga states that exceeded max retries.
type DeadLetterQueue struct
Methods
Parameters
func (*DeadLetterQueue) Enqueue(state *SagaState, reason string)
{
dlq.mu.Lock()
defer dlq.mu.Unlock()
dlq.items = append(dlq.items, &DLQEntry{
State: state,
Reason: reason,
DeadSince: time.Now(),
})
}
Returns
func (*DeadLetterQueue) List() []*DLQEntry
{
dlq.mu.RLock()
defer dlq.mu.RUnlock()
result := make([]*DLQEntry, len(dlq.items))
copy(result, dlq.items)
return result
}
Returns
func (*DeadLetterQueue) Len() int
{
dlq.mu.RLock()
defer dlq.mu.RUnlock()
return len(dlq.items)
}
Parameters
func (*DeadLetterQueue) Remove(id string)
{
dlq.mu.Lock()
defer dlq.mu.Unlock()
for i, entry := range dlq.items {
if entry.State.ID == id {
dlq.items = append(dlq.items[:i], dlq.items[i+1:]...)
return
}
}
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| items | []*DLQEntry |
NewDeadLetterQueue
NewDeadLetterQueue creates a new DeadLetterQueue.
Returns
func NewDeadLetterQueue() *DeadLetterQueue
{
return &DeadLetterQueue{items: make([]*DLQEntry, 0)}
}
ProcessWithDLQ
ProcessWithDLQ retries dead sagas up to maxRetries, then moves to dead state.
Parameters
Returns
func ProcessWithDLQ(store SagaStore, dlq *DeadLetterQueue, maxRetries int) func(id string, runFunc func(ctx context.Context) error) func(ctx context.Context) error
{
return func(id string, runFunc func(ctx context.Context) error) func(ctx context.Context) error {
return func(ctx context.Context) error {
state, err := store.Load(id)
if err != nil {
return err
}
if state.RetryCount >= maxRetries {
state.Status = StatusDead
state.Error = errors.New("exceeded max retries").Error()
_ = store.Save(state)
dlq.Enqueue(state, "max retries exceeded")
return fmt.Errorf("saga %q dead: max retries exceeded", id)
}
state.RetryCount++
if err := runFunc(ctx); err != nil {
state.Status = StatusFailed
state.Error = err.Error()
_ = store.Save(state)
return err
}
state.Status = StatusCompleted
_ = store.Save(state)
return nil
}
}
}
Uses
FileStore
FileStore persists saga state to disk.
type FileStore struct
Methods
Parameters
Returns
func (*FileStore) Save(state *SagaState) error
{
s.mu.Lock()
defer s.mu.Unlock()
state.UpdatedAt = time.Now()
if state.CreatedAt.IsZero() {
state.CreatedAt = time.Now()
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return fmt.Errorf("saga store: marshal: %w", err)
}
path := filepath.Join(s.dir, state.ID+".json")
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return fmt.Errorf("saga store: write: %w", err)
}
if err := os.Rename(tmp, path); err != nil {
return fmt.Errorf("saga store: rename: %w", err)
}
return nil
}
Parameters
Returns
func (*FileStore) Load(id string) (*SagaState, error)
{
s.mu.RLock()
defer s.mu.RUnlock()
path := filepath.Join(s.dir, id+".json")
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("saga store: read %s: %w", id, err)
}
var state SagaState
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("saga store: unmarshal %s: %w", id, err)
}
return &state, nil
}
Parameters
Returns
func (*FileStore) Delete(id string) error
{
s.mu.Lock()
defer s.mu.Unlock()
path := filepath.Join(s.dir, id+".json")
return os.Remove(path)
}
Returns
func (*FileStore) ListIncomplete() ([]*SagaState, error)
{
s.mu.RLock()
defer s.mu.RUnlock()
entries, err := os.ReadDir(s.dir)
if err != nil {
return nil, fmt.Errorf("saga store: readdir: %w", err)
}
var result []*SagaState
for _, entry := range entries {
if filepath.Ext(entry.Name()) != ".json" {
continue
}
data, err := os.ReadFile(filepath.Join(s.dir, entry.Name()))
if err != nil {
continue
}
var state SagaState
if err := json.Unmarshal(data, &state); err != nil {
continue
}
if state.Status == StatusPending || state.Status == StatusFailed {
result = append(result, &state)
}
}
return result, nil
}
Fields
| Name | Type | Description |
|---|---|---|
| dir | string | |
| mu | sync.RWMutex |
NewStore
NewStore creates a FileStore that writes state JSON files to dir.
Parameters
Returns
func NewStore(dir string) (*FileStore, error)
{
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("saga store: cannot create dir %s: %w", dir, err)
}
return &FileStore{dir: dir}, nil
}
RecoverableWorkflow
RecoverableWorkflow is a saga that persists state for crash recovery.
type RecoverableWorkflow struct
Methods
Parameters
func (*RecoverableWorkflow) Add(name string, do, compensate func(ctx context.Context) error)
{
idx := len(rw.state.Steps)
rw.state.Steps = append(rw.state.Steps, StepState{
Name: name,
Status: StatusPending,
StepIndex: idx,
})
rw.Workflow.Add(name, do, compensate)
}
Parameters
Returns
func (*RecoverableWorkflow) Run(ctx context.Context) error
{
rw.state.Status = StatusPending
_ = rw.store.Save(rw.state)
for i, item := range rw.steps {
if ctx.Err() != nil {
rw.state.Status = StatusFailed
rw.state.Error = ctx.Err().Error()
_ = rw.store.Save(rw.state)
return rw.rollback(ctx, ctx.Err())
}
var err error
switch v := item.(type) {
case Step:
err = rw.runStepTracking(ctx, v, i)
case Group:
err = rw.runGroupTracking(ctx, v, i)
}
if err != nil {
rw.state.Status = StatusFailed
rw.state.Error = err.Error()
_ = rw.store.Save(rw.state)
return rw.rollback(ctx, err)
}
}
rw.state.Status = StatusCompleted
_ = rw.store.Save(rw.state)
return nil
}
Parameters
Returns
func (*RecoverableWorkflow) runStepTracking(ctx context.Context, step Step, stepIndex int) error
{
if err := step.Do(ctx); err != nil {
return fmt.Errorf("step '%s' failed: %w", step.Name, err)
}
rw.mu.Lock()
if step.Compensate != nil {
rw.stack = append(rw.stack, step)
}
rw.mu.Unlock()
if stepIndex < len(rw.state.Steps) {
rw.state.Steps[stepIndex].Status = StatusCompleted
_ = rw.store.Save(rw.state)
}
return nil
}
Parameters
Returns
func (*RecoverableWorkflow) runGroupTracking(ctx context.Context, group Group, stepIndex int) error
{
var wg sync.WaitGroup
errChan := make(chan error, len(group))
for _, step := range group {
wg.Add(1)
go func(s Step) {
defer wg.Done()
if err := rw.runStepTracking(ctx, s, stepIndex); err != nil {
errChan <- err
}
}(step)
}
wg.Wait()
close(errChan)
if len(errChan) > 0 {
var errs []error
for e := range errChan {
errs = append(errs, e)
}
return fmt.Errorf("group failed: %w", joinErrors(errs))
}
return nil
}
Parameters
Returns
func (*RecoverableWorkflow) rollback(ctx context.Context, triggerErr error) error
{
rollbackCtx := context.WithoutCancel(ctx)
var errs []error
errs = append(errs, triggerErr)
rw.mu.Lock()
defer rw.mu.Unlock()
for i := len(rw.stack) - 1; i >= 0; i-- {
step := rw.stack[i]
if err := step.Compensate(rollbackCtx); err != nil {
errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
} else {
for j := range rw.state.Steps {
if rw.state.Steps[j].Name == step.Name {
rw.state.Steps[j].Status = StatusCompensated
break
}
}
}
_ = rw.store.Save(rw.state)
}
return joinErrors(errs)
}
Uses
NewRecoverable
NewRecoverable creates a saga that persists state using the given store.
Parameters
Returns
func NewRecoverable(id string, store SagaStore) *RecoverableWorkflow
{
return &RecoverableWorkflow{
Workflow: New(),
id: id,
store: store,
state: &SagaState{
ID: id,
Status: StatusPending,
},
}
}
Uses
joinErrors
Parameters
Returns
func joinErrors(errs []error) error
{
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
TestStore_SaveAndLoad
Parameters
func TestStore_SaveAndLoad(t *testing.T)
{
dir := t.TempDir()
store, err := NewStore(dir)
if err != nil {
t.Fatalf("NewStore: %v", err)
}
state := &SagaState{
ID: "test-1",
Status: StatusPending,
Steps: []StepState{
{Name: "reserve", Status: StatusCompleted, StepIndex: 0},
{Name: "charge", Status: StatusPending, StepIndex: 1},
},
}
if err := store.Save(state); err != nil {
t.Fatalf("Save: %v", err)
}
loaded, err := store.Load("test-1")
if err != nil {
t.Fatalf("Load: %v", err)
}
if loaded.ID != "test-1" {
t.Errorf("ID = %q, want %q", loaded.ID, "test-1")
}
if len(loaded.Steps) != 2 {
t.Errorf("len(Steps) = %d, want 2", len(loaded.Steps))
}
if loaded.Steps[0].Status != StatusCompleted {
t.Errorf("step[0] status = %q, want %q", loaded.Steps[0].Status, StatusCompleted)
}
}
TestStore_Delete
Parameters
func TestStore_Delete(t *testing.T)
{
dir := t.TempDir()
store, _ := NewStore(dir)
state := &SagaState{ID: "del-me", Status: StatusPending}
store.Save(state)
if err := store.Delete("del-me"); err != nil {
t.Fatalf("Delete: %v", err)
}
_, err := store.Load("del-me")
if err == nil {
t.Error("expected error after delete")
}
}
TestStore_ListIncomplete
Parameters
func TestStore_ListIncomplete(t *testing.T)
{
dir := t.TempDir()
store, _ := NewStore(dir)
store.Save(&SagaState{ID: "s1", Status: StatusPending})
store.Save(&SagaState{ID: "s2", Status: StatusCompleted})
store.Save(&SagaState{ID: "s3", Status: StatusFailed})
list, err := store.ListIncomplete()
if err != nil {
t.Fatalf("ListIncomplete: %v", err)
}
if len(list) != 2 {
t.Errorf("got %d incomplete, want 2", len(list))
}
}
TestRecoverableWorkflow_AllStepsSucceed
Parameters
func TestRecoverableWorkflow_AllStepsSucceed(t *testing.T)
{
dir := t.TempDir()
store, _ := NewStore(dir)
var executed []string
rw := NewRecoverable("order-1", store)
rw.Add("reserve",
func(ctx context.Context) error { executed = append(executed, "reserve"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo-reserve"); return nil },
)
rw.Add("charge",
func(ctx context.Context) error { executed = append(executed, "charge"); return nil },
func(ctx context.Context) error { executed = append(executed, "undo-charge"); return nil },
)
err := rw.Run(context.Background())
if err != nil {
t.Fatalf("Run: %v", err)
}
if len(executed) != 2 {
t.Errorf("executed %v, want 2 steps", executed)
}
state, _ := store.Load("order-1")
if state.Status != StatusCompleted {
t.Errorf("status = %q, want %q", state.Status, StatusCompleted)
}
}
TestRecoverableWorkflow_StepFails_Persists
Parameters
func TestRecoverableWorkflow_StepFails_Persists(t *testing.T)
{
dir := t.TempDir()
store, _ := NewStore(dir)
rw := NewRecoverable("order-2", store)
rw.Add("reserve",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
)
rw.Add("charge",
func(ctx context.Context) error { return errors.New("card declined") },
func(ctx context.Context) error { return nil },
)
err := rw.Run(context.Background())
if err == nil {
t.Fatal("expected error")
}
state, _ := store.Load("order-2")
if state.Status != StatusFailed {
t.Errorf("status = %q, want %q", state.Status, StatusFailed)
}
if state.Steps[0].Status != StatusCompensated {
t.Errorf("step[0] = %q, want %q", state.Steps[0].Status, StatusCompensated)
}
}
TestRecoverableWorkflow_CrashRecovery
Parameters
func TestRecoverableWorkflow_CrashRecovery(t *testing.T)
{
dir := t.TempDir()
store, _ := NewStore(dir)
rw := NewRecoverable("order-3", store)
rw.Add("reserve",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error { return nil },
)
rw.Add("charge",
func(ctx context.Context) error { return errors.New("crash") },
func(ctx context.Context) error { return nil },
)
rw.Run(context.Background())
incomplete, err := store.ListIncomplete()
if err != nil {
t.Fatalf("ListIncomplete: %v", err)
}
if len(incomplete) != 1 {
t.Fatalf("got %d incomplete sagas, want 1", len(incomplete))
}
if incomplete[0].ID != "order-3" {
t.Errorf("ID = %q, want %q", incomplete[0].ID, "order-3")
}
_ = filepath.Join(dir, "order-3.json")
data, err := os.ReadFile(filepath.Join(dir, "order-3.json"))
if err != nil {
t.Fatalf("read file: %v", err)
}
t.Logf("persisted state: %s", string(data))
}
TestMemoryStore_CRUD
Parameters
func TestMemoryStore_CRUD(t *testing.T)
{
store := NewMemoryStore()
state := &SagaState{ID: "m1", Status: StatusPending}
if err := store.Save(state); err != nil {
t.Fatalf("Save: %v", err)
}
loaded, err := store.Load("m1")
if err != nil {
t.Fatalf("Load: %v", err)
}
if loaded.ID != "m1" {
t.Errorf("ID = %q, want m1", loaded.ID)
}
store.Save(&SagaState{ID: "m2", Status: StatusCompleted})
incomplete, _ := store.ListIncomplete()
if len(incomplete) != 1 {
t.Errorf("incomplete = %d, want 1", len(incomplete))
}
store.Delete("m1")
_, err = store.Load("m1")
if err == nil {
t.Error("expected error after delete")
}
}
TestIdempotencyRecorder
Parameters
func TestIdempotencyRecorder(t *testing.T)
{
rec := NewIdempotencyRecorder()
state1 := &SagaState{ID: "s1", Status: StatusCompleted}
if err := rec.Record("key-1", state1); err != nil {
t.Fatalf("Record: %v", err)
}
err := rec.Record("key-1", &SagaState{ID: "s2"})
if err == nil {
t.Fatal("expected idempotency error")
}
var idErr *IdempotencyError
if !errors.As(err, &idErr) {
t.Errorf("error type = %T, want *IdempotencyError", err)
}
got, ok := rec.Get("key-1")
if !ok {
t.Fatal("expected key found")
}
if got.ID != "s1" {
t.Errorf("got ID = %q, want s1", got.ID)
}
_, ok = rec.Get("missing")
if ok {
t.Error("expected missing key not found")
}
}
TestDeadLetterQueue
Parameters
func TestDeadLetterQueue(t *testing.T)
{
dlq := NewDeadLetterQueue()
state := &SagaState{ID: "dead-1", Status: StatusFailed, RetryCount: 3}
dlq.Enqueue(state, "max retries exceeded")
if dlq.Len() != 1 {
t.Errorf("Len = %d, want 1", dlq.Len())
}
entries := dlq.List()
if len(entries) != 1 {
t.Fatalf("List len = %d, want 1", len(entries))
}
if entries[0].State.ID != "dead-1" {
t.Errorf("entry ID = %q, want dead-1", entries[0].State.ID)
}
if entries[0].Reason != "max retries exceeded" {
t.Errorf("reason = %q, want max retries exceeded", entries[0].Reason)
}
dlq.Remove("dead-1")
if dlq.Len() != 0 {
t.Errorf("Len after remove = %d, want 0", dlq.Len())
}
}
TestProcessWithDLQ_ExceedsRetries
Parameters
func TestProcessWithDLQ_ExceedsRetries(t *testing.T)
{
store := NewMemoryStore()
dlq := NewDeadLetterQueue()
state := &SagaState{ID: "dlq-1", Status: StatusFailed, RetryCount: 3, MaxRetries: 3}
store.Save(state)
processor := ProcessWithDLQ(store, dlq, 3)
run := processor("dlq-1", func(ctx context.Context) error {
return errors.New("still failing")
})
err := run(context.Background())
if err == nil {
t.Fatal("expected error")
}
updated, _ := store.Load("dlq-1")
if updated.Status != StatusDead {
t.Errorf("status = %q, want %q", updated.Status, StatusDead)
}
if dlq.Len() != 1 {
t.Errorf("dlq len = %d, want 1", dlq.Len())
}
}
TestProcessWithDLQ_RetrySucceeds
Parameters
func TestProcessWithDLQ_RetrySucceeds(t *testing.T)
{
store := NewMemoryStore()
dlq := NewDeadLetterQueue()
state := &SagaState{ID: "retry-1", Status: StatusFailed, RetryCount: 1, MaxRetries: 3}
store.Save(state)
processor := ProcessWithDLQ(store, dlq, 3)
run := processor("retry-1", func(ctx context.Context) error {
return nil
})
if err := run(context.Background()); err != nil {
t.Fatalf("unexpected error: %v", err)
}
updated, _ := store.Load("retry-1")
if updated.Status != StatusCompleted {
t.Errorf("status = %q, want %q", updated.Status, StatusCompleted)
}
if updated.RetryCount != 2 {
t.Errorf("retry count = %d, want 2", updated.RetryCount)
}
if dlq.Len() != 0 {
t.Errorf("dlq len = %d, want 0", dlq.Len())
}
}
TestSagaStore_Interface
Parameters
func TestSagaStore_Interface(t *testing.T)
{
var _ SagaStore = NewMemoryStore()
var _ SagaStore = &FileStore{}
dir := t.TempDir()
fs, _ := NewStore(dir)
var _ SagaStore = fs
}
Step
Step defines a single saga step with Do and Compensate actions.
Step defines a saga step with a do and compensate action.
type Step struct
Fields
| Name | Type | Description |
|---|---|---|
| Name | string | |
| Do | func(ctx context.Context) error | |
| Compensate | func(ctx context.Context) error |
Group
Group is a slice of Steps executed in parallel.
Group is a collection of steps executed in parallel.
type Group []Step
Workflow
Workflow orchestrates a saga with rollback support.
type Workflow struct
Methods
Add appends a step to the workflow.
Parameters
func (*Workflow) Add(name string, do, compensate func(ctx context.Context) error)
{
w.steps = append(w.steps, Step{
Name: name,
Do: do,
Compensate: compensate,
})
}
AddGroup appends a parallel step group to the workflow.
Parameters
func (*Workflow) AddGroup(g Group)
{
w.steps = append(w.steps, g)
}
Run executes all steps in order, rolling back on failure.
Parameters
Returns
func (*Workflow) Run(ctx context.Context) error
{
for _, item := range w.steps {
// Check Context before starting step
if ctx.Err() != nil {
return w.rollback(ctx, ctx.Err())
}
var err error
switch v := item.(type) {
case Step:
err = w.runStep(ctx, v)
case Group:
err = w.runGroup(ctx, v)
}
if err != nil {
return w.rollback(ctx, err)
}
}
return nil
}
Parameters
Returns
func (*Workflow) runStep(ctx context.Context, step Step) (err error)
{
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in step '%s': %v", step.Name, r)
}
}()
if err := step.Do(ctx); err != nil {
return fmt.Errorf("step '%s' failed: %w", step.Name, err)
}
w.mu.Lock()
if step.Compensate != nil {
w.stack = append(w.stack, step)
}
w.mu.Unlock()
return nil
}
Parameters
Returns
func (*Workflow) runGroup(ctx context.Context, group Group) error
{
var wg sync.WaitGroup
errChan := make(chan error, len(group))
// Temporarily store successful steps in this group to add to stack later
// If the group fails, we only compensate what succeeded inside the group?
// Actually, if we use w.runStep() inside goroutine, it appends to w.stack safely.
// But we must handle partial failure rollback within the group logic or rely on main rollback?
// For simplicity, we let them append to stack. If one fails, Run() returns error and triggers rollback of everything in stack.
for _, step := range group {
wg.Add(1)
go func(s Step) {
defer wg.Done()
if err := w.runStep(ctx, s); err != nil {
errChan <- err
}
}(step)
}
wg.Wait()
close(errChan)
if len(errChan) > 0 {
var errs []error
for e := range errChan {
errs = append(errs, e)
}
return errors.Join(errs...)
}
return nil
}
Parameters
Returns
func (*Workflow) rollback(ctx context.Context, triggerErr error) error
{
rollbackCtx := context.WithoutCancel(ctx)
var errs []error
errs = append(errs, triggerErr)
// LIFO
w.mu.Lock()
defer w.mu.Unlock()
for i := len(w.stack) - 1; i >= 0; i-- {
step := w.stack[i]
if err := w.safeCompensate(rollbackCtx, step); err != nil {
errs = append(errs, fmt.Errorf("rollback failed for '%s': %w", step.Name, err))
}
}
return errors.Join(errs...)
}
Parameters
Returns
func (*Workflow) safeCompensate(ctx context.Context, step Step) (err error)
{
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during compensation: %v", r)
}
}()
return step.Compensate(ctx)
}
Fields
| Name | Type | Description |
|---|---|---|
| steps | []any | |
| stack | []Step | |
| mu | sync.Mutex |
New
New creates an empty Workflow.
Returns
func New() *Workflow
{
return &Workflow{
stack: make([]Step, 0),
}
}