scheduler API

scheduler

package

API reference for the scheduler package.

S
struct

Job

Job defines a scheduled task with a cron expression and handler.

pkg/scheduler/scheduler.go:16-20
type Job struct

Fields

Name Type Description
Name string
Cron string
Handler func(ctx context.Context) error
S
struct

Scheduler

Scheduler runs jobs according to cron expressions.

pkg/scheduler/scheduler.go:23-31
type Scheduler struct

Methods

log
Method

Parameters

msg string
func (*Scheduler) log(msg string)
{
	if s.logger != nil {
		s.logger(msg)
	}
}
metric
Method

Parameters

name string
err error
func (*Scheduler) metric(name string, dur time.Duration, err error)
{
	if s.metrics != nil {
		s.metrics(name, dur, err)
	}
}
Register
Method

Parameters

job Job

Returns

func (*Scheduler) Register(job Job) *Scheduler
{
	s.mu.Lock()
	defer s.mu.Unlock()

	var last time.Time
	if s.store != nil {
		if rec, err := s.store.Load(job.Name); err == nil && !rec.LastRun.IsZero() {
			last = rec.LastRun
		}
	}

	s.jobs = append(s.jobs, scheduledJob{job: job, last: last})
	return s
}
Start
Method

Parameters

Returns

error
func (*Scheduler) Start(ctx context.Context) error
{
	s.mu.Lock()
	if s.running {
		s.mu.Unlock()
		return fmt.Errorf("scheduler: already running")
	}
	s.running = true
	ctx, s.cancel = context.WithCancel(ctx)
	s.mu.Unlock()

	s.log("scheduler: started")

	go func() {
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				s.mu.Lock()
				s.running = false
				s.mu.Unlock()
				s.log("scheduler: stopped")
				return
			case now := <-ticker.C:
				s.runDue(ctx, now)
			}
		}
	}()

	return nil
}
runDue
Method

Parameters

func (*Scheduler) runDue(ctx context.Context, now time.Time)
{
	s.mu.RLock()
	jobs := make([]scheduledJob, len(s.jobs))
	copy(jobs, s.jobs)
	s.mu.RUnlock()

	for i, sj := range jobs {
		if isDue(sj.job.Cron, now, sj.last) {
			go func(job Job, idx int, t time.Time) {
				start := time.Now()
				err := job.Handler(ctx)
				dur := time.Since(start)
				s.metric(job.Name, dur, err)
				if err != nil {
					s.log(fmt.Sprintf("scheduler: job %s error: %v", job.Name, err))
				}

				if s.store != nil {
					_ = s.store.Save(&JobRecord{
						Name:        job.Name,
						Cron:        job.Cron,
						LastRun:     start,
						LastStatus:  "ok",
						LastLatency: dur.String(),
					})
				}
			}(sj.job, i, now)

			s.mu.Lock()
			s.jobs[i].last = now
			s.mu.Unlock()
		}
	}
}
Stop
Method

Parameters

Returns

error
func (*Scheduler) Stop(ctx context.Context) error
{
	s.mu.Lock()
	defer s.mu.Unlock()
	if !s.running {
		return fmt.Errorf("scheduler: not running")
	}
	s.cancel()
	s.running = false

	if s.store != nil {
		for _, sj := range s.jobs {
			_ = s.store.Save(&JobRecord{
				Name:    sj.job.Name,
				Cron:    sj.job.Cron,
				LastRun: sj.last,
			})
		}
	}
	return nil
}
Enqueue
Method

Parameters

fn func(ctx context.Context) error
func (*Scheduler) Enqueue(fn func(ctx context.Context) error)
{
	s.log("scheduler: enqueued fire-and-forget job")
	go func() {
		start := time.Now()
		err := fn(context.Background())
		dur := time.Since(start)
		s.metric("enqueue", dur, err)
		if err != nil {
			s.log(fmt.Sprintf("scheduler: enqueue error: %v", err))
		}
	}()
}
ScheduleAfter
Method

Parameters

fn func(ctx context.Context) error
func (*Scheduler) ScheduleAfter(d time.Duration, fn func(ctx context.Context) error)
{
	s.log(fmt.Sprintf("scheduler: scheduled job after %v", d))
	go func() {
		time.Sleep(d)
		start := time.Now()
		err := fn(context.Background())
		dur := time.Since(start)
		s.metric("schedule-after", dur, err)
		if err != nil {
			s.log(fmt.Sprintf("scheduler: schedule-after error: %v", err))
		}
	}()
}

Fields

Name Type Description
jobs []scheduledJob
mu sync.RWMutex
running bool
cancel context.CancelFunc
logger func(msg string)
metrics func(name string, dur time.Duration, err error)
store *JobStore
S
struct

scheduledJob

pkg/scheduler/scheduler.go:33-36
type scheduledJob struct

Fields

Name Type Description
job Job
last time.Time
T
type

Option

Option configures a Scheduler.

pkg/scheduler/scheduler.go:39-39
type Option options.Option[Scheduler]
F
function

New

New creates a new Scheduler with the given options.

Parameters

opts
...Option

Returns

pkg/scheduler/scheduler.go:42-48
func New(opts ...Option) *Scheduler

{
	s := &Scheduler{}
	for _, opt := range opts {
		opt(s)
	}
	return s
}
F
function

WithLogger

WithLogger sets a logging callback for the scheduler.

Parameters

log
func(msg string)

Returns

pkg/scheduler/scheduler.go:51-53
func WithLogger(log func(msg string)) Option

{
	return func(s *Scheduler) { s.logger = log }
}
F
function

WithMetrics

WithMetrics sets a metrics callback for job executions.

Parameters

m
func(name string, dur time.Duration, err error)

Returns

pkg/scheduler/scheduler.go:56-58
func WithMetrics(m func(name string, dur time.Duration, err error)) Option

{
	return func(s *Scheduler) { s.metrics = m }
}
F
function

WithStore

WithStore enables persistent job state storage to the given directory.

Parameters

dir
string

Returns

pkg/scheduler/scheduler.go:61-69
func WithStore(dir string) Option

{
	return func(s *Scheduler) {
		store, err := NewJobStore(dir)
		if err != nil {
			return
		}
		s.store = store
	}
}
S
struct

JobRecord

JobRecord holds persisted job execution state.

pkg/scheduler/scheduler.go:215-221
type JobRecord struct

Fields

Name Type Description
Name string json:"name"
Cron string json:"cron"
LastRun time.Time json:"last_run"
LastStatus string json:"last_status,omitempty"
LastLatency string json:"last_latency,omitempty"
S
struct

JobStore

JobStore persists job state to disk as JSON.

pkg/scheduler/scheduler.go:224-227
type JobStore struct

Methods

Save
Method

Parameters

rec *JobRecord

Returns

error
func (*JobStore) Save(rec *JobRecord) error
{
	js.mu.Lock()
	defer js.mu.Unlock()

	data, err := json.MarshalIndent(rec, "", "  ")
	if err != nil {
		return fmt.Errorf("scheduler store: marshal: %w", err)
	}

	path := filepath.Join(js.dir, rec.Name+".json")
	tmp := path + ".tmp"
	if err := os.WriteFile(tmp, data, 0644); err != nil {
		return fmt.Errorf("scheduler store: write: %w", err)
	}
	if err := os.Rename(tmp, path); err != nil {
		return fmt.Errorf("scheduler store: rename: %w", err)
	}
	return nil
}
Load
Method

Parameters

name string

Returns

error
func (*JobStore) Load(name string) (*JobRecord, error)
{
	js.mu.RLock()
	defer js.mu.RUnlock()

	path := filepath.Join(js.dir, name+".json")
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, fmt.Errorf("scheduler store: read %s: %w", name, err)
	}
	var rec JobRecord
	if err := json.Unmarshal(data, &rec); err != nil {
		return nil, fmt.Errorf("scheduler store: unmarshal %s: %w", name, err)
	}
	return &rec, nil
}
List
Method

Returns

error
func (*JobStore) List() ([]*JobRecord, error)
{
	js.mu.RLock()
	defer js.mu.RUnlock()

	entries, err := os.ReadDir(js.dir)
	if err != nil {
		return nil, fmt.Errorf("scheduler store: readdir: %w", err)
	}

	var result []*JobRecord
	for _, entry := range entries {
		if filepath.Ext(entry.Name()) != ".json" {
			continue
		}
		data, err := os.ReadFile(filepath.Join(js.dir, entry.Name()))
		if err != nil {
			continue
		}
		var rec JobRecord
		if err := json.Unmarshal(data, &rec); err != nil {
			continue
		}
		result = append(result, &rec)
	}
	return result, nil
}

Fields

Name Type Description
dir string
mu sync.RWMutex
F
function

NewJobStore

NewJobStore creates a JobStore that writes to the given directory.

Parameters

dir
string

Returns

error
pkg/scheduler/scheduler.go:230-235
func NewJobStore(dir string) (*JobStore, error)

{
	if err := os.MkdirAll(dir, 0755); err != nil {
		return nil, fmt.Errorf("scheduler store: cannot create dir %s: %w", dir, err)
	}
	return &JobStore{dir: dir}, nil
}
F
function

isDue

Parameters

cronExpr
string
now
last

Returns

bool
pkg/scheduler/scheduler.go:300-310
func isDue(cronExpr string, now, last time.Time) bool

{
	fields, err := parseCron(cronExpr)
	if err != nil {
		return false
	}
	if last.IsZero() {
		return true
	}
	next := nextRun(fields, last)
	return !next.After(now)
}
S
struct

cronFields

pkg/scheduler/scheduler.go:312-315
type cronFields struct

Fields

Name Type Description
minute int
hour int
dom int
month int
dow int
wildMin bool
wildHour bool
wildDom bool
wildMonth bool
wildDow bool
F
function

parseCron

Parameters

expr
string

Returns

error
pkg/scheduler/scheduler.go:317-329
func parseCron(expr string) (cronFields, error)

{
	parts := splitFields(expr)
	if len(parts) != 5 {
		return cronFields{}, fmt.Errorf("scheduler: invalid cron expression: %s", expr)
	}
	f := cronFields{}
	f.minute, f.wildMin = parseField(parts[0], 0, 59)
	f.hour, f.wildHour = parseField(parts[1], 0, 23)
	f.dom, f.wildDom = parseField(parts[2], 1, 31)
	f.month, f.wildMonth = parseField(parts[3], 1, 12)
	f.dow, f.wildDow = parseField(parts[4], 0, 6)
	return f, nil
}
F
function

splitFields

Parameters

expr
string

Returns

[]string
pkg/scheduler/scheduler.go:331-348
func splitFields(expr string) []string

{
	var fields []string
	current := ""
	for _, ch := range expr {
		if ch == ' ' || ch == '\t' {
			if current != "" {
				fields = append(fields, current)
				current = ""
			}
		} else {
			current += string(ch)
		}
	}
	if current != "" {
		fields = append(fields, current)
	}
	return fields
}
F
function

parseField

Parameters

s
string
min
int
max
int

Returns

int
bool
pkg/scheduler/scheduler.go:350-357
func parseField(s string, min, max int) (int, bool)

{
	if s == "*" {
		return 0, true
	}
	var v int
	fmt.Sscanf(s, "%d", &v)
	return v, false
}
F
function

nextRun

Parameters

Returns

pkg/scheduler/scheduler.go:359-372
func nextRun(f cronFields, last time.Time) time.Time

{
	t := last.Add(time.Minute)
	for i := 0; i < 525600; i++ {
		if (f.wildMin || t.Minute() == f.minute) &&
			(f.wildHour || t.Hour() == f.hour) &&
			(f.wildDom || t.Day() == f.dom) &&
			(f.wildMonth || int(t.Month()) == f.month) &&
			(f.wildDow || int(t.Weekday()) == f.dow) {
			return t
		}
		t = t.Add(time.Minute)
	}
	return t
}
F
function

TestScheduler_RegisterAndFire

Parameters

pkg/scheduler/scheduler_test.go:10-43
func TestScheduler_RegisterAndFire(t *testing.T)

{
	s := New()
	var mu sync.Mutex
	fired := false

	s.Register(Job{
		Name:    "test",
		Cron:    "* * * * *",
		Handler: func(ctx context.Context) error { mu.Lock(); fired = true; mu.Unlock(); return nil },
	})

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	if err := s.Start(ctx); err != nil {
		t.Fatalf("Start failed: %v", err)
	}

	time.Sleep(1500 * time.Millisecond)

	s.mu.RLock()
	running := s.running
	s.mu.RUnlock()
	if !running {
		t.Error("scheduler should be running")
	}

	mu.Lock()
	wasFired := fired
	mu.Unlock()
	if !wasFired {
		t.Error("job should have fired within timeout")
	}
}
F
function

TestScheduler_DoubleStart

Parameters

pkg/scheduler/scheduler_test.go:45-55
func TestScheduler_DoubleStart(t *testing.T)

{
	s := New()
	ctx := context.Background()

	if err := s.Start(ctx); err != nil {
		t.Fatalf("First Start failed: %v", err)
	}
	if err := s.Start(ctx); err == nil {
		t.Error("double start should return an error")
	}
}
F
function

TestScheduler_StopNotRunning

Parameters

pkg/scheduler/scheduler_test.go:57-63
func TestScheduler_StopNotRunning(t *testing.T)

{
	s := New()
	ctx := context.Background()
	if err := s.Stop(ctx); err == nil {
		t.Error("stopping non-running scheduler should return an error")
	}
}
F
function

TestScheduler_Enqueue

Parameters

pkg/scheduler/scheduler_test.go:65-85
func TestScheduler_Enqueue(t *testing.T)

{
	s := New()
	var mu sync.Mutex
	called := false

	s.Enqueue(func(ctx context.Context) error {
		mu.Lock()
		called = true
		mu.Unlock()
		return nil
	})

	time.Sleep(100 * time.Millisecond)

	mu.Lock()
	wasCalled := called
	mu.Unlock()
	if !wasCalled {
		t.Error("enqueued function should have been called")
	}
}
F
function

TestScheduler_ScheduleAfter

Parameters

pkg/scheduler/scheduler_test.go:87-107
func TestScheduler_ScheduleAfter(t *testing.T)

{
	s := New()
	var mu sync.Mutex
	called := false

	s.ScheduleAfter(50*time.Millisecond, func(ctx context.Context) error {
		mu.Lock()
		called = true
		mu.Unlock()
		return nil
	})

	time.Sleep(200 * time.Millisecond)

	mu.Lock()
	wasCalled := called
	mu.Unlock()
	if !wasCalled {
		t.Error("scheduled function should have been called after delay")
	}
}
F
function

TestJobStore_SaveAndLoad

Parameters

pkg/scheduler/store_test.go:10-41
func TestJobStore_SaveAndLoad(t *testing.T)

{
	dir := t.TempDir()
	store, err := NewJobStore(dir)
	if err != nil {
		t.Fatalf("NewJobStore: %v", err)
	}

	rec := &JobRecord{
		Name:        "cleanup",
		Cron:        "0 3 * * *",
		LastRun:     time.Date(2026, 1, 15, 3, 0, 0, 0, time.UTC),
		LastStatus:  "ok",
		LastLatency: "12ms",
	}
	if err := store.Save(rec); err != nil {
		t.Fatalf("Save: %v", err)
	}

	loaded, err := store.Load("cleanup")
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if loaded.Name != "cleanup" {
		t.Errorf("Name = %q, want %q", loaded.Name, "cleanup")
	}
	if loaded.Cron != "0 3 * * *" {
		t.Errorf("Cron = %q, want %q", loaded.Cron, "0 3 * * *")
	}
	if !loaded.LastRun.Equal(rec.LastRun) {
		t.Errorf("LastRun = %v, want %v", loaded.LastRun, rec.LastRun)
	}
}
F
function

TestJobStore_List

Parameters

pkg/scheduler/store_test.go:43-57
func TestJobStore_List(t *testing.T)

{
	dir := t.TempDir()
	store, _ := NewJobStore(dir)

	store.Save(&JobRecord{Name: "job1", Cron: "* * * * *"})
	store.Save(&JobRecord{Name: "job2", Cron: "0 * * * *"})

	list, err := store.List()
	if err != nil {
		t.Fatalf("List: %v", err)
	}
	if len(list) != 2 {
		t.Errorf("got %d jobs, want 2", len(list))
	}
}
F
function

TestScheduler_WithStore

Parameters

pkg/scheduler/store_test.go:59-77
func TestScheduler_WithStore(t *testing.T)

{
	dir := t.TempDir()
	s := New(WithStore(dir))
	var count int64
	s.Register(Job{
		Name: "tick",
		Cron: "* * * * *",
		Handler: func(ctx context.Context) error {
			atomic.AddInt64(&count, 1)
			return nil
		},
	})
	s.runDue(context.Background(), time.Now())
	time.Sleep(100 * time.Millisecond)

	if atomic.LoadInt64(&count) == 0 {
		t.Error("expected job to run at least once")
	}
}