resiliency API

resiliency

package

API reference for the resiliency package.

T
type

State

State represents the current state of a CircuitBreaker.

pkg/resiliency/breaker.go:14-14
type State int
S
struct

CircuitBreaker

CircuitBreaker protects a caller from repeated failures.

It implements a state machine with Closed, Open, and Half-Open states.

pkg/resiliency/breaker.go:33-42
type CircuitBreaker struct

Example

cb := resiliency.NewCircuitBreaker(3, time.Minute)
err := cb.Execute(func() error {
	return doRiskyOperation()
})

Methods

OnStateChange
Method

OnStateChange registers a callback for state transitions.

Parameters

fn func(from, to State)
func (*CircuitBreaker) OnStateChange(fn func(from, to State))
{
	cb.mu.Lock()
	defer cb.mu.Unlock()
	cb.onStateChange = fn
}
Execute
Method

Execute calls fn if the circuit allows it, recording the outcome to drive state transitions. It returns ErrCircuitOpen when the circuit is in the Open state, otherwise it returns the error (if any) produced by fn.

Parameters

fn func() error

Returns

error
func (*CircuitBreaker) Execute(fn func() error) error
{
	if !cb.allow() {
		return ErrCircuitOpen
	}

	err := fn()
	if err != nil {
		cb.onFailure()
		return err
	}

	cb.onSuccess()
	return nil
}
allow
Method

allow returns true if the current state permits a call through.

Returns

bool
func (*CircuitBreaker) allow() bool
{
	cb.mu.Lock()
	defer cb.mu.Unlock()

	switch cb.state {
	case StateClosed:
		return true
	case StateOpen:
		if time.Since(cb.lastFailure) > cb.timeout {
			atomic.StoreInt32(&cb.probing, 0)
			cb.changeState(StateHalfOpen)
			return atomic.CompareAndSwapInt32(&cb.probing, 0, 1)
		}
		return false
	case StateHalfOpen:
		// Only allow one concurrent probe at a time.
		return atomic.CompareAndSwapInt32(&cb.probing, 0, 1)
	}
	return false
}
onSuccess
Method

onSuccess records a successful call and transitions out of HalfOpen if needed.

func (*CircuitBreaker) onSuccess()
{
	cb.mu.Lock()
	defer cb.mu.Unlock()

	if cb.state == StateHalfOpen {
		atomic.StoreInt32(&cb.probing, 0)
		cb.changeState(StateClosed)
	}
	cb.failures = 0
}
onFailure
Method

onFailure records a failed call and trips the circuit when the threshold is reached.

func (*CircuitBreaker) onFailure()
{
	cb.mu.Lock()
	defer cb.mu.Unlock()

	cb.failures++
	cb.lastFailure = time.Now()

	if cb.state == StateClosed && cb.failures >= cb.threshold {
		cb.changeState(StateOpen)
	} else if cb.state == StateHalfOpen {
		atomic.StoreInt32(&cb.probing, 0)
		cb.failures = 0
		cb.changeState(StateOpen)
	}
}
changeState
Method

changeState transitions to the target state and fires the optional callback.

Parameters

to State
func (*CircuitBreaker) changeState(to State)
{
	from := cb.state
	cb.state = to
	if cb.onStateChange != nil {
		cb.onStateChange(from, to)
	}
}
State
Method

State returns the current state of the circuit breaker.

Returns

func (*CircuitBreaker) State() State
{
	cb.mu.RLock()
	defer cb.mu.RUnlock()
	return cb.state
}

Fields

Name Type Description
mu sync.RWMutex
state State
failures int
threshold int
timeout time.Duration
lastFailure time.Time
onStateChange func(from, to State)
probing int32
F
function

NewCircuitBreaker

NewCircuitBreaker creates a new circuit breaker.

Parameters

threshold
int
timeout

Returns

pkg/resiliency/breaker.go:45-51
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker

{
	return &CircuitBreaker{
		threshold: threshold,
		timeout:   timeout,
		state:     StateClosed,
	}
}
F
function

TestCircuitBreaker

Parameters

pkg/resiliency/breaker_test.go:11-46
func TestCircuitBreaker(t *testing.T)

{
	cb := NewCircuitBreaker(2, 50*time.Millisecond)

	// State: Closed
	err := cb.Execute(func() error { return nil })
	if err != nil || cb.State() != StateClosed {
		t.Error("Circuit should be closed and return nil")
	}

	// First failure
	_ = cb.Execute(func() error { return errors.New("fail1") })
	if cb.State() != StateClosed {
		t.Error("Circuit should still be closed after 1 failure")
	}

	// Second failure -> State: Open
	_ = cb.Execute(func() error { return errors.New("fail2") })
	if cb.State() != StateOpen {
		t.Error("Circuit should be open after reaching threshold")
	}

	// While Open
	err = cb.Execute(func() error { return nil })
	if err != ErrCircuitOpen {
		t.Errorf("Execute should return ErrCircuitOpen when open, got %v", err)
	}

	// Wait for timeout -> State: Half-Open (on next Execute)
	time.Sleep(60 * time.Millisecond)

	// First success in Half-Open -> State: Closed
	err = cb.Execute(func() error { return nil })
	if err != nil || cb.State() != StateClosed {
		t.Errorf("Circuit should be closed after success in half-open, got state %v", cb.State())
	}
}
F
function

TestHalfOpenAllowsOnlyOneProbe

TestHalfOpenAllowsOnlyOneProbe verifies that in HalfOpen state only one
concurrent request is allowed through; all others receive ErrCircuitOpen.

Parameters

pkg/resiliency/breaker_test.go:50-102
func TestHalfOpenAllowsOnlyOneProbe(t *testing.T)

{
	const timeout = 30 * time.Millisecond
	cb := NewCircuitBreaker(1, timeout)

	// Trip the breaker.
	_ = cb.Execute(func() error { return errors.New("fail") })
	if cb.State() != StateOpen {
		t.Fatal("expected StateOpen after threshold reached")
	}

	// Wait for the open timeout to elapse so the next allow() transitions to HalfOpen.
	time.Sleep(timeout + 10*time.Millisecond)

	// Fire 5 concurrent requests; use a gate so they all call allow() at the same time.
	const n = 5
	var (
		wg      sync.WaitGroup
		gate    = make(chan struct{})
		allowed int32
		blocked int32
	)

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-gate
			err := cb.Execute(func() error {
				// Slow probe so concurrent goroutines can observe HalfOpen.
				time.Sleep(20 * time.Millisecond)
				return nil
			})
			if err == ErrCircuitOpen {
				atomic.AddInt32(&blocked, 1)
			} else if err == nil {
				atomic.AddInt32(&allowed, 1)
			}
		}()
	}

	close(gate)
	wg.Wait()

	if allowed != 1 {
		t.Errorf("expected exactly 1 probe allowed, got %d", allowed)
	}
	if blocked != n-1 {
		t.Errorf("expected %d blocked, got %d", n-1, blocked)
	}
	if cb.State() != StateClosed {
		t.Errorf("expected StateClosed after successful probe, got %v", cb.State())
	}
}
F
function

TestHalfOpenFailureResetsCounter

TestHalfOpenFailureResetsCounter verifies that after a HalfOpen→Open cycle
the failure counter is reset, so a fresh probe failure in the new HalfOpen
correctly re-opens without a stale count.

Parameters

pkg/resiliency/breaker_test.go:107-133
func TestHalfOpenFailureResetsCounter(t *testing.T)

{
	const timeout = 30 * time.Millisecond
	cb := NewCircuitBreaker(2, timeout)

	// Reach threshold to open breaker (2 failures).
	_ = cb.Execute(func() error { return errors.New("f1") })
	_ = cb.Execute(func() error { return errors.New("f2") })
	if cb.State() != StateOpen {
		t.Fatal("expected StateOpen")
	}

	// First HalfOpen → probe fails → back to Open.
	time.Sleep(timeout + 10*time.Millisecond)
	_ = cb.Execute(func() error { return errors.New("probe fail") })
	if cb.State() != StateOpen {
		t.Fatalf("expected StateOpen after half-open probe failure, got %v", cb.State())
	}

	// Second HalfOpen: failures counter must have been reset to 0 on the previous
	// HalfOpen→Open transition.  A single failure in this new HalfOpen should
	// send the breaker back to Open immediately (not leave it closed/half-open).
	time.Sleep(timeout + 10*time.Millisecond)
	_ = cb.Execute(func() error { return errors.New("probe fail 2") })
	if cb.State() != StateOpen {
		t.Errorf("expected StateOpen after second half-open failure, got %v", cb.State())
	}
}
S
struct

RetryOptions

RetryOptions configures retry behavior.

pkg/resiliency/retry.go:10-15
type RetryOptions struct

Fields

Name Type Description
Attempts int
InitialDelay time.Duration
MaxDelay time.Duration
Factor float64
F
function

Retry

Retry executes fn up to Attempts times with exponential backoff.

Parameters

fn
func() error
opts
...func(*RetryOptions)

Returns

error
pkg/resiliency/retry.go:32-67
func Retry(ctx context.Context, fn func() error, opts ...func(*RetryOptions)) error

{
	o := DefaultRetryOptions
	for _, opt := range opts {
		opt(&o)
	}

	var lastErr error
	for i := 0; i < o.Attempts; i++ {
		if err := ctx.Err(); err != nil {
			return err
		}

		if err := fn(); err == nil {
			return nil
		} else {
			lastErr = err
		}

		if i < o.Attempts-1 {
			delay := time.Duration(float64(o.InitialDelay) * math.Pow(o.Factor, float64(i)))
			if delay > o.MaxDelay {
				delay = o.MaxDelay
			}

			timer := time.NewTimer(delay)
			select {
			case <-ctx.Done():
				timer.Stop()
				return ctx.Err()
			case <-timer.C:
			}
		}
	}

	return lastErr
}

Example

err := resiliency.Retry(ctx, func() error {
	return doNetworkCall()
}, resiliency.WithAttempts(5))
F
function

WithAttempts

WithAttempts sets the maximum number of retry attempts.

Parameters

n
int

Returns

func(*RetryOptions)
pkg/resiliency/retry.go:70-72
func WithAttempts(n int) func(*RetryOptions)

{
	return func(o *RetryOptions) { o.Attempts = n }
}
F
function

WithDelay

WithDelay sets the initial and max delay for backoff.

Parameters

Returns

func(*RetryOptions)
pkg/resiliency/retry.go:75-80
func WithDelay(initial, max time.Duration) func(*RetryOptions)

{
	return func(o *RetryOptions) {
		o.InitialDelay = initial
		o.MaxDelay = max
	}
}
F
function

WithFactor

WithFactor sets the backoff factor.

Parameters

f
float64

Returns

func(*RetryOptions)
pkg/resiliency/retry.go:83-85
func WithFactor(f float64) func(*RetryOptions)

{
	return func(o *RetryOptions) { o.Factor = f }
}
F
function

TestRetry

Parameters

pkg/resiliency/retry_test.go:10-64
func TestRetry(t *testing.T)

{
	t.Run("SuccessFirstTry", func(t *testing.T) {
		calls := 0
		err := Retry(context.Background(), func() error {
			calls++
			return nil
		})
		if err != nil || calls != 1 {
			t.Errorf("Retry failed: %v, calls=%d", err, calls)
		}
	})

	t.Run("SuccessAfterRetries", func(t *testing.T) {
		calls := 0
		err := Retry(context.Background(), func() error {
			calls++
			if calls < 3 {
				return errors.New("fail")
			}
			return nil
		}, WithAttempts(5), WithDelay(1*time.Millisecond, 10*time.Millisecond))

		if err != nil || calls != 3 {
			t.Errorf("Retry failed: %v, calls=%d", err, calls)
		}
	})

	t.Run("FailureAllAttempts", func(t *testing.T) {
		calls := 0
		targetErr := errors.New("permanent fail")
		err := Retry(context.Background(), func() error {
			calls++
			return targetErr
		}, WithAttempts(3), WithDelay(1*time.Millisecond, 1*time.Millisecond))

		if err != targetErr || calls != 3 {
			t.Errorf("Retry should return last error: %v, calls=%d", err, calls)
		}
	})

	t.Run("ContextCancellation", func(t *testing.T) {
		ctx, cancel := context.WithCancel(context.Background())
		cancel()

		calls := 0
		err := Retry(ctx, func() error {
			calls++
			return errors.New("fail")
		})

		if err != context.Canceled || calls != 0 {
			t.Errorf("Retry should stop on context cancel: %v", err)
		}
	})
}