Reorganize source tree and build assets
This commit is contained in:
@@ -0,0 +1,180 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
var cronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
|
||||
|
||||
type Scheduler struct {
|
||||
store *Store
|
||||
jobs *[]Job
|
||||
onChange func(RunRecord)
|
||||
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
paused bool
|
||||
}
|
||||
|
||||
func NewScheduler(store *Store, jobs *[]Job, onChange func(RunRecord)) *Scheduler {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s := &Scheduler{
|
||||
store: store,
|
||||
jobs: jobs,
|
||||
onChange: onChange,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
s.resetNextRuns(time.Now())
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Scheduler) Start() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
s.tick(now)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Scheduler) Stop() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *Scheduler) SetPaused(paused bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.paused = paused
|
||||
now := time.Now()
|
||||
for index := range *s.jobs {
|
||||
job := &(*s.jobs)[index]
|
||||
if !job.Enabled {
|
||||
job.NextRun = "Paused"
|
||||
continue
|
||||
}
|
||||
if paused {
|
||||
job.NextRun = "Scheduler paused"
|
||||
continue
|
||||
}
|
||||
s.prepareNextRun(job, now)
|
||||
}
|
||||
_ = s.store.SaveJobs(*s.jobs)
|
||||
}
|
||||
|
||||
func (s *Scheduler) RunNow(index int) RunRecord {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if index < 0 || index >= len(*s.jobs) {
|
||||
return RunRecord{}
|
||||
}
|
||||
job := &(*s.jobs)[index]
|
||||
record := RunJob(s.ctx, job, "Manual", s.store.Paths.LogsDir)
|
||||
s.prepareNextRun(job, time.Now())
|
||||
_ = CleanupLogs(s.store.Paths.LogsDir, s.store.Config.MaxLogFiles, s.store.Config.MaxLogAgeDays)
|
||||
_ = s.store.SaveJobs(*s.jobs)
|
||||
return record
|
||||
}
|
||||
|
||||
func (s *Scheduler) RefreshSchedule(index int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if index < 0 || index >= len(*s.jobs) {
|
||||
return
|
||||
}
|
||||
job := &(*s.jobs)[index]
|
||||
if !job.Enabled {
|
||||
job.NextRun = "Paused"
|
||||
return
|
||||
}
|
||||
if s.paused {
|
||||
job.NextRun = "Scheduler paused"
|
||||
return
|
||||
}
|
||||
s.prepareNextRun(job, time.Now())
|
||||
}
|
||||
|
||||
func (s *Scheduler) tick(now time.Time) {
|
||||
var record RunRecord
|
||||
var changed bool
|
||||
|
||||
s.mu.Lock()
|
||||
if !s.paused {
|
||||
for index := range *s.jobs {
|
||||
job := &(*s.jobs)[index]
|
||||
if !job.Enabled || job.nextDue.IsZero() || now.Before(job.nextDue) {
|
||||
continue
|
||||
}
|
||||
record = RunJob(s.ctx, job, "Schedule", s.store.Paths.LogsDir)
|
||||
s.prepareNextRun(job, time.Now())
|
||||
_ = CleanupLogs(s.store.Paths.LogsDir, s.store.Config.MaxLogFiles, s.store.Config.MaxLogAgeDays)
|
||||
changed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if changed {
|
||||
_ = s.store.SaveJobs(*s.jobs)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if changed && s.onChange != nil {
|
||||
s.onChange(record)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) resetNextRuns(now time.Time) {
|
||||
for index := range *s.jobs {
|
||||
job := &(*s.jobs)[index]
|
||||
if !job.Enabled {
|
||||
job.NextRun = "Paused"
|
||||
continue
|
||||
}
|
||||
s.prepareNextRun(job, now)
|
||||
}
|
||||
_ = s.store.SaveJobs(*s.jobs)
|
||||
}
|
||||
|
||||
func (s *Scheduler) prepareNextRun(job *Job, from time.Time) {
|
||||
next, ok := nextRunTime(job.Schedule, from)
|
||||
if !ok {
|
||||
job.NextRun = "Invalid schedule"
|
||||
job.nextDue = time.Time{}
|
||||
return
|
||||
}
|
||||
job.nextDue = next
|
||||
job.NextRun = job.nextDue.Format("2006-01-02 15:04:05")
|
||||
}
|
||||
|
||||
func nextRunTime(schedule string, from time.Time) (time.Time, bool) {
|
||||
schedule = strings.TrimSpace(schedule)
|
||||
if schedule == "" {
|
||||
return time.Time{}, false
|
||||
}
|
||||
if strings.HasPrefix(schedule, "@every ") {
|
||||
interval, err := time.ParseDuration(strings.TrimSpace(strings.TrimPrefix(schedule, "@every ")))
|
||||
if err != nil || interval <= 0 {
|
||||
return time.Time{}, false
|
||||
}
|
||||
return from.Add(interval), true
|
||||
}
|
||||
parsed, err := cronParser.Parse(schedule)
|
||||
if err != nil {
|
||||
return time.Time{}, false
|
||||
}
|
||||
return parsed.Next(from), true
|
||||
}
|
||||
Reference in New Issue
Block a user