app/task/task.go

124 lines
2.3 KiB
Go
Raw Normal View History

2021-12-16 04:11:33 +00:00
package task
import (
"fmt"
"time"
cron "github.com/robfig/cron/v3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
2021-12-19 06:40:01 +00:00
"kumoly.io/kumoly/app/util"
2021-12-16 04:11:33 +00:00
)
var l zerolog.Logger
var clog = mylogger{}
var c *cron.Cron
func init() {
// viper.Set("task.runner", 5)
}
func Init() {
l = log.With().Str("mod", "task").Logger()
c = cron.New(
cron.WithLogger(&clog),
cron.WithChain(func(j cron.Job) cron.Job {
t, ok := j.(*_task)
if ok {
2021-12-19 06:02:39 +00:00
l.Debug().Str("taskid", t.Task.ID).Msg("")
2021-12-16 04:11:33 +00:00
}
return j
}),
cron.WithLocation(time.Local))
}
type Task struct {
ID string
Description string
Group string
// Func is the stored function that will be called with the args,
// and a interface to carry value will be provided as the first argument
// args will be passed after carry, in order to normalize,
// it is suggested to use strings or other simple types
2021-12-17 09:13:00 +00:00
Func func(c *interface{}, args ...interface{}) error `json:"-"`
2021-12-16 04:11:33 +00:00
}
type _task struct {
Task *Task
2021-12-19 06:02:39 +00:00
name string
2021-12-16 04:11:33 +00:00
id cron.EntryID
args []interface{}
spec string
once bool
// carry value to the next run. Should be modified from the Task.Func
carry *interface{}
}
func (t *_task) Run() {
if t.carry == nil {
t.carry = new(interface{})
}
defer func() {
if err := recover(); err != nil {
2021-12-19 06:40:01 +00:00
l.Error().Str("task_id", t.Task.ID).
Str("task_name", t.name).Str("trace", util.Stack()).
Str("error", fmt.Sprint(err)).Msg("task paniced")
2021-12-16 04:11:33 +00:00
} else if t.once {
c.Remove(t.id)
}
}()
2021-12-19 06:02:39 +00:00
l.Info().Str("task_id", t.Task.ID).
2021-12-19 06:26:39 +00:00
Str("task_name", t.name).
2021-12-19 06:02:39 +00:00
Msg("task started.")
err := t.Task.Func(t.carry, t.args...)
if err != nil {
l.Error().Err(err).Str("task_id", t.Task.ID).
2021-12-19 06:26:39 +00:00
Str("task_name", t.name).Msg("task error")
2021-12-19 06:02:39 +00:00
}
Reporter <- &Report{
Entry: t.id,
ID: t.Task.ID,
Description: t.Task.Description,
Group: t.Task.Group,
Name: t.name,
Spec: t.spec,
Once: t.once,
Args: t.args,
Carry: t.carry,
Error: err,
2021-12-16 04:11:33 +00:00
}
2021-12-19 06:02:39 +00:00
2021-12-16 04:11:33 +00:00
}
func Start() {
2021-12-18 13:32:46 +00:00
if len(started) == 1 {
return
}
started <- struct{}{}
2021-12-16 04:11:33 +00:00
c.Start()
2021-12-18 13:32:46 +00:00
go func() {
for {
select {
case r := <-Reporter:
wg.Add(1)
ReportReceiver(r)
wg.Done()
case <-quit:
<-started
return
}
}
}()
2021-12-16 04:11:33 +00:00
}
2021-12-18 13:32:46 +00:00
func Stop() {
if len(started) == 0 {
return
}
quit <- struct{}{}
<-c.Stop().Done()
wg.Wait()
2021-12-16 04:11:33 +00:00
}