resiliency
packageAPI reference for the resiliency
package.
Imports
(7)State
State represents the current state of a CircuitBreaker.
type State int
CircuitBreaker
CircuitBreaker protects a caller from repeated failures.
It implements a state machine with Closed, Open, and Half-Open states.
type CircuitBreaker struct
Example
cb := resiliency.NewCircuitBreaker(3, time.Minute)
err := cb.Execute(func() error {
return doRiskyOperation()
})
Methods
OnStateChange registers a callback for state transitions.
Parameters
func (*CircuitBreaker) OnStateChange(fn func(from, to State))
{
cb.mu.Lock()
defer cb.mu.Unlock()
cb.onStateChange = fn
}
Execute calls fn if the circuit allows it, recording the outcome to drive state transitions. It returns ErrCircuitOpen when the circuit is in the Open state, otherwise it returns the error (if any) produced by fn.
Parameters
Returns
func (*CircuitBreaker) Execute(fn func() error) error
{
if !cb.allow() {
return ErrCircuitOpen
}
err := fn()
if err != nil {
cb.onFailure()
return err
}
cb.onSuccess()
return nil
}
allow returns true if the current state permits a call through.
Returns
func (*CircuitBreaker) allow() bool
{
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
if time.Since(cb.lastFailure) > cb.timeout {
atomic.StoreInt32(&cb.probing, 0)
cb.changeState(StateHalfOpen)
return atomic.CompareAndSwapInt32(&cb.probing, 0, 1)
}
return false
case StateHalfOpen:
// Only allow one concurrent probe at a time.
return atomic.CompareAndSwapInt32(&cb.probing, 0, 1)
}
return false
}
onSuccess records a successful call and transitions out of HalfOpen if needed.
func (*CircuitBreaker) onSuccess()
{
cb.mu.Lock()
defer cb.mu.Unlock()
if cb.state == StateHalfOpen {
atomic.StoreInt32(&cb.probing, 0)
cb.changeState(StateClosed)
}
cb.failures = 0
}
onFailure records a failed call and trips the circuit when the threshold is reached.
func (*CircuitBreaker) onFailure()
{
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures++
cb.lastFailure = time.Now()
if cb.state == StateClosed && cb.failures >= cb.threshold {
cb.changeState(StateOpen)
} else if cb.state == StateHalfOpen {
atomic.StoreInt32(&cb.probing, 0)
cb.failures = 0
cb.changeState(StateOpen)
}
}
changeState transitions to the target state and fires the optional callback.
Parameters
func (*CircuitBreaker) changeState(to State)
{
from := cb.state
cb.state = to
if cb.onStateChange != nil {
cb.onStateChange(from, to)
}
}
Fields
| Name | Type | Description |
|---|---|---|
| mu | sync.RWMutex | |
| state | State | |
| failures | int | |
| threshold | int | |
| timeout | time.Duration | |
| lastFailure | time.Time | |
| onStateChange | func(from, to State) | |
| probing | int32 |
Uses
NewCircuitBreaker
NewCircuitBreaker creates a new circuit breaker.
Parameters
Returns
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker
{
return &CircuitBreaker{
threshold: threshold,
timeout: timeout,
state: StateClosed,
}
}
TestCircuitBreaker
Parameters
func TestCircuitBreaker(t *testing.T)
{
cb := NewCircuitBreaker(2, 50*time.Millisecond)
// State: Closed
err := cb.Execute(func() error { return nil })
if err != nil || cb.State() != StateClosed {
t.Error("Circuit should be closed and return nil")
}
// First failure
_ = cb.Execute(func() error { return errors.New("fail1") })
if cb.State() != StateClosed {
t.Error("Circuit should still be closed after 1 failure")
}
// Second failure -> State: Open
_ = cb.Execute(func() error { return errors.New("fail2") })
if cb.State() != StateOpen {
t.Error("Circuit should be open after reaching threshold")
}
// While Open
err = cb.Execute(func() error { return nil })
if err != ErrCircuitOpen {
t.Errorf("Execute should return ErrCircuitOpen when open, got %v", err)
}
// Wait for timeout -> State: Half-Open (on next Execute)
time.Sleep(60 * time.Millisecond)
// First success in Half-Open -> State: Closed
err = cb.Execute(func() error { return nil })
if err != nil || cb.State() != StateClosed {
t.Errorf("Circuit should be closed after success in half-open, got state %v", cb.State())
}
}
TestHalfOpenAllowsOnlyOneProbe
TestHalfOpenAllowsOnlyOneProbe verifies that in HalfOpen state only one
concurrent request is allowed through; all others receive ErrCircuitOpen.
Parameters
func TestHalfOpenAllowsOnlyOneProbe(t *testing.T)
{
const timeout = 30 * time.Millisecond
cb := NewCircuitBreaker(1, timeout)
// Trip the breaker.
_ = cb.Execute(func() error { return errors.New("fail") })
if cb.State() != StateOpen {
t.Fatal("expected StateOpen after threshold reached")
}
// Wait for the open timeout to elapse so the next allow() transitions to HalfOpen.
time.Sleep(timeout + 10*time.Millisecond)
// Fire 5 concurrent requests; use a gate so they all call allow() at the same time.
const n = 5
var (
wg sync.WaitGroup
gate = make(chan struct{})
allowed int32
blocked int32
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-gate
err := cb.Execute(func() error {
// Slow probe so concurrent goroutines can observe HalfOpen.
time.Sleep(20 * time.Millisecond)
return nil
})
if err == ErrCircuitOpen {
atomic.AddInt32(&blocked, 1)
} else if err == nil {
atomic.AddInt32(&allowed, 1)
}
}()
}
close(gate)
wg.Wait()
if allowed != 1 {
t.Errorf("expected exactly 1 probe allowed, got %d", allowed)
}
if blocked != n-1 {
t.Errorf("expected %d blocked, got %d", n-1, blocked)
}
if cb.State() != StateClosed {
t.Errorf("expected StateClosed after successful probe, got %v", cb.State())
}
}
TestHalfOpenFailureResetsCounter
TestHalfOpenFailureResetsCounter verifies that after a HalfOpen→Open cycle
the failure counter is reset, so a fresh probe failure in the new HalfOpen
correctly re-opens without a stale count.
Parameters
func TestHalfOpenFailureResetsCounter(t *testing.T)
{
const timeout = 30 * time.Millisecond
cb := NewCircuitBreaker(2, timeout)
// Reach threshold to open breaker (2 failures).
_ = cb.Execute(func() error { return errors.New("f1") })
_ = cb.Execute(func() error { return errors.New("f2") })
if cb.State() != StateOpen {
t.Fatal("expected StateOpen")
}
// First HalfOpen → probe fails → back to Open.
time.Sleep(timeout + 10*time.Millisecond)
_ = cb.Execute(func() error { return errors.New("probe fail") })
if cb.State() != StateOpen {
t.Fatalf("expected StateOpen after half-open probe failure, got %v", cb.State())
}
// Second HalfOpen: failures counter must have been reset to 0 on the previous
// HalfOpen→Open transition. A single failure in this new HalfOpen should
// send the breaker back to Open immediately (not leave it closed/half-open).
time.Sleep(timeout + 10*time.Millisecond)
_ = cb.Execute(func() error { return errors.New("probe fail 2") })
if cb.State() != StateOpen {
t.Errorf("expected StateOpen after second half-open failure, got %v", cb.State())
}
}
RetryOptions
RetryOptions configures retry behavior.
type RetryOptions struct
Fields
| Name | Type | Description |
|---|---|---|
| Attempts | int | |
| InitialDelay | time.Duration | |
| MaxDelay | time.Duration | |
| Factor | float64 |
Retry
Retry executes fn up to Attempts times with exponential backoff.
Parameters
Returns
func Retry(ctx context.Context, fn func() error, opts ...func(*RetryOptions)) error
{
o := DefaultRetryOptions
for _, opt := range opts {
opt(&o)
}
var lastErr error
for i := 0; i < o.Attempts; i++ {
if err := ctx.Err(); err != nil {
return err
}
if err := fn(); err == nil {
return nil
} else {
lastErr = err
}
if i < o.Attempts-1 {
delay := time.Duration(float64(o.InitialDelay) * math.Pow(o.Factor, float64(i)))
if delay > o.MaxDelay {
delay = o.MaxDelay
}
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
}
return lastErr
}
Example
err := resiliency.Retry(ctx, func() error {
return doNetworkCall()
}, resiliency.WithAttempts(5))
WithAttempts
WithAttempts sets the maximum number of retry attempts.
Parameters
Returns
func WithAttempts(n int) func(*RetryOptions)
{
return func(o *RetryOptions) { o.Attempts = n }
}
WithDelay
WithDelay sets the initial and max delay for backoff.
Parameters
Returns
func WithDelay(initial, max time.Duration) func(*RetryOptions)
{
return func(o *RetryOptions) {
o.InitialDelay = initial
o.MaxDelay = max
}
}
WithFactor
WithFactor sets the backoff factor.
Parameters
Returns
func WithFactor(f float64) func(*RetryOptions)
{
return func(o *RetryOptions) { o.Factor = f }
}
TestRetry
Parameters
func TestRetry(t *testing.T)
{
t.Run("SuccessFirstTry", func(t *testing.T) {
calls := 0
err := Retry(context.Background(), func() error {
calls++
return nil
})
if err != nil || calls != 1 {
t.Errorf("Retry failed: %v, calls=%d", err, calls)
}
})
t.Run("SuccessAfterRetries", func(t *testing.T) {
calls := 0
err := Retry(context.Background(), func() error {
calls++
if calls < 3 {
return errors.New("fail")
}
return nil
}, WithAttempts(5), WithDelay(1*time.Millisecond, 10*time.Millisecond))
if err != nil || calls != 3 {
t.Errorf("Retry failed: %v, calls=%d", err, calls)
}
})
t.Run("FailureAllAttempts", func(t *testing.T) {
calls := 0
targetErr := errors.New("permanent fail")
err := Retry(context.Background(), func() error {
calls++
return targetErr
}, WithAttempts(3), WithDelay(1*time.Millisecond, 1*time.Millisecond))
if err != targetErr || calls != 3 {
t.Errorf("Retry should return last error: %v, calls=%d", err, calls)
}
})
t.Run("ContextCancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
calls := 0
err := Retry(ctx, func() error {
calls++
return errors.New("fail")
})
if err != context.Canceled || calls != 0 {
t.Errorf("Retry should stop on context cancel: %v", err)
}
})
}