diff --git a/history/history.go b/history/history.go index d509f05..1a42ef3 100644 --- a/history/history.go +++ b/history/history.go @@ -84,6 +84,9 @@ func Start(r Receiver) { } func Stop() { + if len(started) == 0 { + return + } quit <- struct{}{} log.Debug().Str("mod", "history").Msg("stop received") wg.Wait() diff --git a/task/profile.go b/task/profile.go index cf2d5a2..44f93ec 100644 --- a/task/profile.go +++ b/task/profile.go @@ -1,12 +1,26 @@ package task import ( + "sync" "time" cron "github.com/robfig/cron/v3" ) var Reporter chan *Report +var started chan struct{} +var wg sync.WaitGroup +var quit chan struct{} + +func init() { + Reporter = make(chan *Report) + started = make(chan struct{}, 1) + quit = make(chan struct{}, 1) +} + +type Receiver func(*Report) + +var ReportReceiver Receiver = func(r *Report) {} type Report struct { // Entry diff --git a/task/service.go b/task/service.go index 4e9aafa..4f43cb8 100644 --- a/task/service.go +++ b/task/service.go @@ -18,6 +18,6 @@ func (srv Service) Del() { if c == nil { return } - <-Stop().Done() + Stop() } func (srv Service) Health() error { return nil } diff --git a/task/task.go b/task/task.go index 72fbb2e..5c9e6de 100644 --- a/task/task.go +++ b/task/task.go @@ -1,7 +1,6 @@ package task import ( - "context" "fmt" "time" @@ -16,7 +15,6 @@ var c *cron.Cron func init() { // viper.Set("task.runner", 5) - Reporter = make(chan *Report) } func Init() { @@ -62,13 +60,13 @@ func (t *_task) Run() { } defer func() { if err := recover(); err != nil { - log.Error().Str("error", fmt.Sprint(err)).Msg("") + l.Error().Str("error", fmt.Sprint(err)).Msg("") } else if t.once { c.Remove(t.id) } }() if err := t.Task.Func(t.carry, t.args...); err != nil { - log.Error().Err(err).Str("taskname", t.Task.Name).Msg("") + l.Error().Err(err).Str("taskname", t.Task.Name).Msg("") Reporter <- &Report{ Entry: t.id, Name: t.Task.Name, @@ -85,9 +83,31 @@ func (t *_task) Run() { } func Start() { + if len(started) == 1 { + return + } + started <- struct{}{} c.Start() + go func() { + for { + select { + case r := <-Reporter: + wg.Add(1) + ReportReceiver(r) + wg.Done() + case <-quit: + <-started + return + } + } + }() } -func Stop() context.Context { - return c.Stop() +func Stop() { + if len(started) == 0 { + return + } + quit <- struct{}{} + <-c.Stop().Done() + wg.Wait() } diff --git a/task/task_test.go b/task/task_test.go index 2ff4fd2..394cd27 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -82,7 +82,7 @@ func TestOverall(t *testing.T) { lg.Info().Msgf("%v", GetTasks()) Start() <-time.After(time.Second * 20) - <-Stop().Done() + Stop() } @@ -120,5 +120,5 @@ func TestReporter(t *testing.T) { }() <-time.After(time.Second * 20) - <-Stop().Done() + Stop() }