app/task/task.go

126 lines
2.3 KiB
Go

package task
import (
"fmt"
"time"
cron "github.com/robfig/cron/v3"
"github.com/rs/zerolog"
"kumoly.io/kumoly/app/util"
)
var l zerolog.Logger
var clog = mylogger{}
var c *cron.Cron
func init() {
// viper.Set("task.runner", 5)
}
func Init() {
l = util.Klog.With().Str("mod", "task").Logger()
c = cron.New(
cron.WithLogger(&clog),
cron.WithChain(func(j cron.Job) cron.Job {
t, ok := j.(*_task)
if ok {
l.Debug().Str("taskid", t.Task.ID).Msg("")
}
return j
}),
cron.WithLocation(time.Local))
}
type Task struct {
ID string
Description string
Group string
// Func is the stored function that will be called with the args,
// and a interface to carry value will be provided as the first argument
// args will be passed after carry, in order to normalize,
// it is suggested to use strings or other simple types
Func func(c *interface{}, args ...interface{}) error `json:"-"`
}
type _task struct {
Task *Task
name string
id cron.EntryID
args []interface{}
spec string
once bool
// carry value to the next run. Should be modified from the Task.Func
carry *interface{}
}
func (t *_task) Run() {
if t.carry == nil {
t.carry = new(interface{})
}
defer func() {
if err := recover(); err != nil {
le := l.Error().Str("task_id", t.Task.ID).Str("task_name", t.name).
Str("error", fmt.Sprint(err))
if !util.PROD {
le.Str("trace", util.Stack())
}
le.Msg("task paniced")
} else if t.once {
c.Remove(t.id)
}
}()
l.Info().Str("task_id", t.Task.ID).
Str("task_name", t.name).
Msg("task started.")
err := t.Task.Func(t.carry, t.args...)
if err != nil {
l.Error().Err(err).Str("task_id", t.Task.ID).
Str("task_name", t.name).Msg("task error")
}
Reporter <- &Report{
Entry: t.id,
ID: t.Task.ID,
Description: t.Task.Description,
Group: t.Task.Group,
Name: t.name,
Spec: t.spec,
Once: t.once,
Args: t.args,
Carry: t.carry,
Error: err,
}
}
func Start() {
if len(started) == 1 {
return
}
started <- struct{}{}
c.Start()
go func() {
for {
select {
case r := <-Reporter:
wg.Add(1)
ReportReceiver(r)
wg.Done()
case <-quit:
<-started
return
}
}
}()
}
func Stop() {
if len(started) == 0 {
return
}
quit <- struct{}{}
<-c.Stop().Done()
wg.Wait()
}