package task import ( "fmt" "time" cron "github.com/robfig/cron/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "kumoly.io/kumoly/app/util" ) var l zerolog.Logger var clog = mylogger{} var c *cron.Cron func init() { // viper.Set("task.runner", 5) } func Init() { l = log.With().Str("mod", "task").Logger() c = cron.New( cron.WithLogger(&clog), cron.WithChain(func(j cron.Job) cron.Job { t, ok := j.(*_task) if ok { l.Debug().Str("taskid", t.Task.ID).Msg("") } return j }), cron.WithLocation(time.Local)) } type Task struct { ID string Description string Group string // Func is the stored function that will be called with the args, // and a interface to carry value will be provided as the first argument // args will be passed after carry, in order to normalize, // it is suggested to use strings or other simple types Func func(c *interface{}, args ...interface{}) error `json:"-"` } type _task struct { Task *Task name string id cron.EntryID args []interface{} spec string once bool // carry value to the next run. Should be modified from the Task.Func carry *interface{} } func (t *_task) Run() { if t.carry == nil { t.carry = new(interface{}) } defer func() { if err := recover(); err != nil { l.Error().Str("task_id", t.Task.ID). Str("task_name", t.name).Str("trace", util.Stack()). Str("error", fmt.Sprint(err)).Msg("task paniced") } else if t.once { c.Remove(t.id) } }() l.Info().Str("task_id", t.Task.ID). Str("task_name", t.name). Msg("task started.") err := t.Task.Func(t.carry, t.args...) if err != nil { l.Error().Err(err).Str("task_id", t.Task.ID). Str("task_name", t.name).Msg("task error") } Reporter <- &Report{ Entry: t.id, ID: t.Task.ID, Description: t.Task.Description, Group: t.Task.Group, Name: t.name, Spec: t.spec, Once: t.once, Args: t.args, Carry: t.carry, Error: err, } } func Start() { if len(started) == 1 { return } started <- struct{}{} c.Start() go func() { for { select { case r := <-Reporter: wg.Add(1) ReportReceiver(r) wg.Done() case <-quit: <-started return } } }() } func Stop() { if len(started) == 0 { return } quit <- struct{}{} <-c.Stop().Done() wg.Wait() }