scheduler
API
scheduler
packageAPI reference for the scheduler
package.
Imports
(10)
S
struct
Job
Job defines a scheduled task with a cron expression and handler.
pkg/scheduler/scheduler.go:16-20
type Job struct
Fields
| Name | Type | Description |
|---|---|---|
| Name | string | |
| Cron | string | |
| Handler | func(ctx context.Context) error |
S
struct
Scheduler
Scheduler runs jobs according to cron expressions.
pkg/scheduler/scheduler.go:23-31
type Scheduler struct
Methods
log
Method
Parameters
msg
string
func (*Scheduler) log(msg string)
{
if s.logger != nil {
s.logger(msg)
}
}
metric
Method
Parameters
func (*Scheduler) metric(name string, dur time.Duration, err error)
{
if s.metrics != nil {
s.metrics(name, dur, err)
}
}
Register
Method
func (*Scheduler) Register(job Job) *Scheduler
{
s.mu.Lock()
defer s.mu.Unlock()
var last time.Time
if s.store != nil {
if rec, err := s.store.Load(job.Name); err == nil && !rec.LastRun.IsZero() {
last = rec.LastRun
}
}
s.jobs = append(s.jobs, scheduledJob{job: job, last: last})
return s
}
Start
Method
Parameters
ctx
context.Context
Returns
error
func (*Scheduler) Start(ctx context.Context) error
{
s.mu.Lock()
if s.running {
s.mu.Unlock()
return fmt.Errorf("scheduler: already running")
}
s.running = true
ctx, s.cancel = context.WithCancel(ctx)
s.mu.Unlock()
s.log("scheduler: started")
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.mu.Lock()
s.running = false
s.mu.Unlock()
s.log("scheduler: stopped")
return
case now := <-ticker.C:
s.runDue(ctx, now)
}
}
}()
return nil
}
runDue
Method
Parameters
ctx
context.Context
now
time.Time
func (*Scheduler) runDue(ctx context.Context, now time.Time)
{
s.mu.RLock()
jobs := make([]scheduledJob, len(s.jobs))
copy(jobs, s.jobs)
s.mu.RUnlock()
for i, sj := range jobs {
if isDue(sj.job.Cron, now, sj.last) {
go func(job Job, idx int, t time.Time) {
start := time.Now()
err := job.Handler(ctx)
dur := time.Since(start)
s.metric(job.Name, dur, err)
if err != nil {
s.log(fmt.Sprintf("scheduler: job %s error: %v", job.Name, err))
}
if s.store != nil {
_ = s.store.Save(&JobRecord{
Name: job.Name,
Cron: job.Cron,
LastRun: start,
LastStatus: "ok",
LastLatency: dur.String(),
})
}
}(sj.job, i, now)
s.mu.Lock()
s.jobs[i].last = now
s.mu.Unlock()
}
}
}
Stop
Method
Parameters
ctx
context.Context
Returns
error
func (*Scheduler) Stop(ctx context.Context) error
{
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return fmt.Errorf("scheduler: not running")
}
s.cancel()
s.running = false
if s.store != nil {
for _, sj := range s.jobs {
_ = s.store.Save(&JobRecord{
Name: sj.job.Name,
Cron: sj.job.Cron,
LastRun: sj.last,
})
}
}
return nil
}
Enqueue
Method
Parameters
fn
func(ctx context.Context) error
func (*Scheduler) Enqueue(fn func(ctx context.Context) error)
{
s.log("scheduler: enqueued fire-and-forget job")
go func() {
start := time.Now()
err := fn(context.Background())
dur := time.Since(start)
s.metric("enqueue", dur, err)
if err != nil {
s.log(fmt.Sprintf("scheduler: enqueue error: %v", err))
}
}()
}
ScheduleAfter
Method
Parameters
fn
func(ctx context.Context) error
func (*Scheduler) ScheduleAfter(d time.Duration, fn func(ctx context.Context) error)
{
s.log(fmt.Sprintf("scheduler: scheduled job after %v", d))
go func() {
time.Sleep(d)
start := time.Now()
err := fn(context.Background())
dur := time.Since(start)
s.metric("schedule-after", dur, err)
if err != nil {
s.log(fmt.Sprintf("scheduler: schedule-after error: %v", err))
}
}()
}
Fields
| Name | Type | Description |
|---|---|---|
| jobs | []scheduledJob | |
| mu | sync.RWMutex | |
| running | bool | |
| cancel | context.CancelFunc | |
| logger | func(msg string) | |
| metrics | func(name string, dur time.Duration, err error) | |
| store | *JobStore |
S
struct
scheduledJob
pkg/scheduler/scheduler.go:33-36
type scheduledJob struct
Uses
T
type
Option
Option configures a Scheduler.
pkg/scheduler/scheduler.go:39-39
type Option options.Option[Scheduler]
F
function
New
New creates a new Scheduler with the given options.
Parameters
opts
...Option
Returns
pkg/scheduler/scheduler.go:42-48
func New(opts ...Option) *Scheduler
{
s := &Scheduler{}
for _, opt := range opts {
opt(s)
}
return s
}
F
function
WithLogger
WithLogger sets a logging callback for the scheduler.
Parameters
log
func(msg string)
Returns
pkg/scheduler/scheduler.go:51-53
func WithLogger(log func(msg string)) Option
{
return func(s *Scheduler) { s.logger = log }
}
Uses
F
function
WithMetrics
WithMetrics sets a metrics callback for job executions.
Parameters
m
func(name string, dur time.Duration, err error)
Returns
pkg/scheduler/scheduler.go:56-58
func WithMetrics(m func(name string, dur time.Duration, err error)) Option
{
return func(s *Scheduler) { s.metrics = m }
}
Uses
F
function
WithStore
WithStore enables persistent job state storage to the given directory.
Parameters
dir
string
Returns
pkg/scheduler/scheduler.go:61-69
func WithStore(dir string) Option
{
return func(s *Scheduler) {
store, err := NewJobStore(dir)
if err != nil {
return
}
s.store = store
}
}
Uses
S
struct
JobRecord
JobRecord holds persisted job execution state.
pkg/scheduler/scheduler.go:215-221
type JobRecord struct
Fields
| Name | Type | Description |
|---|---|---|
| Name | string | json:"name" |
| Cron | string | json:"cron" |
| LastRun | time.Time | json:"last_run" |
| LastStatus | string | json:"last_status,omitempty" |
| LastLatency | string | json:"last_latency,omitempty" |
S
struct
JobStore
JobStore persists job state to disk as JSON.
pkg/scheduler/scheduler.go:224-227
type JobStore struct
Methods
Save
Method
Parameters
rec
*JobRecord
Returns
error
func (*JobStore) Save(rec *JobRecord) error
{
js.mu.Lock()
defer js.mu.Unlock()
data, err := json.MarshalIndent(rec, "", " ")
if err != nil {
return fmt.Errorf("scheduler store: marshal: %w", err)
}
path := filepath.Join(js.dir, rec.Name+".json")
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return fmt.Errorf("scheduler store: write: %w", err)
}
if err := os.Rename(tmp, path); err != nil {
return fmt.Errorf("scheduler store: rename: %w", err)
}
return nil
}
Load
Method
Parameters
name
string
Returns
error
func (*JobStore) Load(name string) (*JobRecord, error)
{
js.mu.RLock()
defer js.mu.RUnlock()
path := filepath.Join(js.dir, name+".json")
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("scheduler store: read %s: %w", name, err)
}
var rec JobRecord
if err := json.Unmarshal(data, &rec); err != nil {
return nil, fmt.Errorf("scheduler store: unmarshal %s: %w", name, err)
}
return &rec, nil
}
List
Method
Returns
[]*JobRecord
error
func (*JobStore) List() ([]*JobRecord, error)
{
js.mu.RLock()
defer js.mu.RUnlock()
entries, err := os.ReadDir(js.dir)
if err != nil {
return nil, fmt.Errorf("scheduler store: readdir: %w", err)
}
var result []*JobRecord
for _, entry := range entries {
if filepath.Ext(entry.Name()) != ".json" {
continue
}
data, err := os.ReadFile(filepath.Join(js.dir, entry.Name()))
if err != nil {
continue
}
var rec JobRecord
if err := json.Unmarshal(data, &rec); err != nil {
continue
}
result = append(result, &rec)
}
return result, nil
}
Fields
| Name | Type | Description |
|---|---|---|
| dir | string | |
| mu | sync.RWMutex |
F
function
NewJobStore
NewJobStore creates a JobStore that writes to the given directory.
Parameters
dir
string
Returns
error
pkg/scheduler/scheduler.go:230-235
func NewJobStore(dir string) (*JobStore, error)
{
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("scheduler store: cannot create dir %s: %w", dir, err)
}
return &JobStore{dir: dir}, nil
}
F
function
isDue
pkg/scheduler/scheduler.go:300-310
func isDue(cronExpr string, now, last time.Time) bool
{
fields, err := parseCron(cronExpr)
if err != nil {
return false
}
if last.IsZero() {
return true
}
next := nextRun(fields, last)
return !next.After(now)
}
S
struct
cronFields
pkg/scheduler/scheduler.go:312-315
type cronFields struct
Fields
| Name | Type | Description |
|---|---|---|
| minute | int | |
| hour | int | |
| dom | int | |
| month | int | |
| dow | int | |
| wildMin | bool | |
| wildHour | bool | |
| wildDom | bool | |
| wildMonth | bool | |
| wildDow | bool |
F
function
parseCron
Parameters
expr
string
Returns
error
pkg/scheduler/scheduler.go:317-329
func parseCron(expr string) (cronFields, error)
{
parts := splitFields(expr)
if len(parts) != 5 {
return cronFields{}, fmt.Errorf("scheduler: invalid cron expression: %s", expr)
}
f := cronFields{}
f.minute, f.wildMin = parseField(parts[0], 0, 59)
f.hour, f.wildHour = parseField(parts[1], 0, 23)
f.dom, f.wildDom = parseField(parts[2], 1, 31)
f.month, f.wildMonth = parseField(parts[3], 1, 12)
f.dow, f.wildDow = parseField(parts[4], 0, 6)
return f, nil
}
F
function
splitFields
Parameters
expr
string
Returns
[]string
pkg/scheduler/scheduler.go:331-348
func splitFields(expr string) []string
{
var fields []string
current := ""
for _, ch := range expr {
if ch == ' ' || ch == '\t' {
if current != "" {
fields = append(fields, current)
current = ""
}
} else {
current += string(ch)
}
}
if current != "" {
fields = append(fields, current)
}
return fields
}
F
function
parseField
Parameters
s
string
min
int
max
int
Returns
int
bool
pkg/scheduler/scheduler.go:350-357
func parseField(s string, min, max int) (int, bool)
{
if s == "*" {
return 0, true
}
var v int
fmt.Sscanf(s, "%d", &v)
return v, false
}
F
function
nextRun
Parameters
last
Returns
pkg/scheduler/scheduler.go:359-372
func nextRun(f cronFields, last time.Time) time.Time
{
t := last.Add(time.Minute)
for i := 0; i < 525600; i++ {
if (f.wildMin || t.Minute() == f.minute) &&
(f.wildHour || t.Hour() == f.hour) &&
(f.wildDom || t.Day() == f.dom) &&
(f.wildMonth || int(t.Month()) == f.month) &&
(f.wildDow || int(t.Weekday()) == f.dow) {
return t
}
t = t.Add(time.Minute)
}
return t
}
F
function
TestScheduler_RegisterAndFire
Parameters
t
pkg/scheduler/scheduler_test.go:10-43
func TestScheduler_RegisterAndFire(t *testing.T)
{
s := New()
var mu sync.Mutex
fired := false
s.Register(Job{
Name: "test",
Cron: "* * * * *",
Handler: func(ctx context.Context) error { mu.Lock(); fired = true; mu.Unlock(); return nil },
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.Start(ctx); err != nil {
t.Fatalf("Start failed: %v", err)
}
time.Sleep(1500 * time.Millisecond)
s.mu.RLock()
running := s.running
s.mu.RUnlock()
if !running {
t.Error("scheduler should be running")
}
mu.Lock()
wasFired := fired
mu.Unlock()
if !wasFired {
t.Error("job should have fired within timeout")
}
}
F
function
TestScheduler_DoubleStart
Parameters
t
pkg/scheduler/scheduler_test.go:45-55
func TestScheduler_DoubleStart(t *testing.T)
{
s := New()
ctx := context.Background()
if err := s.Start(ctx); err != nil {
t.Fatalf("First Start failed: %v", err)
}
if err := s.Start(ctx); err == nil {
t.Error("double start should return an error")
}
}
F
function
TestScheduler_StopNotRunning
Parameters
t
pkg/scheduler/scheduler_test.go:57-63
func TestScheduler_StopNotRunning(t *testing.T)
{
s := New()
ctx := context.Background()
if err := s.Stop(ctx); err == nil {
t.Error("stopping non-running scheduler should return an error")
}
}
F
function
TestScheduler_Enqueue
Parameters
t
pkg/scheduler/scheduler_test.go:65-85
func TestScheduler_Enqueue(t *testing.T)
{
s := New()
var mu sync.Mutex
called := false
s.Enqueue(func(ctx context.Context) error {
mu.Lock()
called = true
mu.Unlock()
return nil
})
time.Sleep(100 * time.Millisecond)
mu.Lock()
wasCalled := called
mu.Unlock()
if !wasCalled {
t.Error("enqueued function should have been called")
}
}
F
function
TestScheduler_ScheduleAfter
Parameters
t
pkg/scheduler/scheduler_test.go:87-107
func TestScheduler_ScheduleAfter(t *testing.T)
{
s := New()
var mu sync.Mutex
called := false
s.ScheduleAfter(50*time.Millisecond, func(ctx context.Context) error {
mu.Lock()
called = true
mu.Unlock()
return nil
})
time.Sleep(200 * time.Millisecond)
mu.Lock()
wasCalled := called
mu.Unlock()
if !wasCalled {
t.Error("scheduled function should have been called after delay")
}
}
F
function
TestJobStore_SaveAndLoad
Parameters
t
pkg/scheduler/store_test.go:10-41
func TestJobStore_SaveAndLoad(t *testing.T)
{
dir := t.TempDir()
store, err := NewJobStore(dir)
if err != nil {
t.Fatalf("NewJobStore: %v", err)
}
rec := &JobRecord{
Name: "cleanup",
Cron: "0 3 * * *",
LastRun: time.Date(2026, 1, 15, 3, 0, 0, 0, time.UTC),
LastStatus: "ok",
LastLatency: "12ms",
}
if err := store.Save(rec); err != nil {
t.Fatalf("Save: %v", err)
}
loaded, err := store.Load("cleanup")
if err != nil {
t.Fatalf("Load: %v", err)
}
if loaded.Name != "cleanup" {
t.Errorf("Name = %q, want %q", loaded.Name, "cleanup")
}
if loaded.Cron != "0 3 * * *" {
t.Errorf("Cron = %q, want %q", loaded.Cron, "0 3 * * *")
}
if !loaded.LastRun.Equal(rec.LastRun) {
t.Errorf("LastRun = %v, want %v", loaded.LastRun, rec.LastRun)
}
}
F
function
TestJobStore_List
Parameters
t
pkg/scheduler/store_test.go:43-57
func TestJobStore_List(t *testing.T)
{
dir := t.TempDir()
store, _ := NewJobStore(dir)
store.Save(&JobRecord{Name: "job1", Cron: "* * * * *"})
store.Save(&JobRecord{Name: "job2", Cron: "0 * * * *"})
list, err := store.List()
if err != nil {
t.Fatalf("List: %v", err)
}
if len(list) != 2 {
t.Errorf("got %d jobs, want 2", len(list))
}
}
F
function
TestScheduler_WithStore
Parameters
t
pkg/scheduler/store_test.go:59-77
func TestScheduler_WithStore(t *testing.T)
{
dir := t.TempDir()
s := New(WithStore(dir))
var count int64
s.Register(Job{
Name: "tick",
Cron: "* * * * *",
Handler: func(ctx context.Context) error {
atomic.AddInt64(&count, 1)
return nil
},
})
s.runDue(context.Background(), time.Now())
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt64(&count) == 0 {
t.Error("expected job to run at least once")
}
}