master
Evan Chen 2021-12-18 21:32:46 +08:00
parent 796a2f4d8c
commit e3b864e29d
5 changed files with 46 additions and 9 deletions

View File

@ -84,6 +84,9 @@ func Start(r Receiver) {
} }
func Stop() { func Stop() {
if len(started) == 0 {
return
}
quit <- struct{}{} quit <- struct{}{}
log.Debug().Str("mod", "history").Msg("stop received") log.Debug().Str("mod", "history").Msg("stop received")
wg.Wait() wg.Wait()

View File

@ -1,12 +1,26 @@
package task package task
import ( import (
"sync"
"time" "time"
cron "github.com/robfig/cron/v3" cron "github.com/robfig/cron/v3"
) )
var Reporter chan *Report var Reporter chan *Report
var started chan struct{}
var wg sync.WaitGroup
var quit chan struct{}
func init() {
Reporter = make(chan *Report)
started = make(chan struct{}, 1)
quit = make(chan struct{}, 1)
}
type Receiver func(*Report)
var ReportReceiver Receiver = func(r *Report) {}
type Report struct { type Report struct {
// Entry // Entry

View File

@ -18,6 +18,6 @@ func (srv Service) Del() {
if c == nil { if c == nil {
return return
} }
<-Stop().Done() Stop()
} }
func (srv Service) Health() error { return nil } func (srv Service) Health() error { return nil }

View File

@ -1,7 +1,6 @@
package task package task
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -16,7 +15,6 @@ var c *cron.Cron
func init() { func init() {
// viper.Set("task.runner", 5) // viper.Set("task.runner", 5)
Reporter = make(chan *Report)
} }
func Init() { func Init() {
@ -62,13 +60,13 @@ func (t *_task) Run() {
} }
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error().Str("error", fmt.Sprint(err)).Msg("") l.Error().Str("error", fmt.Sprint(err)).Msg("")
} else if t.once { } else if t.once {
c.Remove(t.id) c.Remove(t.id)
} }
}() }()
if err := t.Task.Func(t.carry, t.args...); err != nil { if err := t.Task.Func(t.carry, t.args...); err != nil {
log.Error().Err(err).Str("taskname", t.Task.Name).Msg("") l.Error().Err(err).Str("taskname", t.Task.Name).Msg("")
Reporter <- &Report{ Reporter <- &Report{
Entry: t.id, Entry: t.id,
Name: t.Task.Name, Name: t.Task.Name,
@ -85,9 +83,31 @@ func (t *_task) Run() {
} }
func Start() { func Start() {
if len(started) == 1 {
return
}
started <- struct{}{}
c.Start() c.Start()
go func() {
for {
select {
case r := <-Reporter:
wg.Add(1)
ReportReceiver(r)
wg.Done()
case <-quit:
<-started
return
}
}
}()
} }
func Stop() context.Context { func Stop() {
return c.Stop() if len(started) == 0 {
return
}
quit <- struct{}{}
<-c.Stop().Done()
wg.Wait()
} }

View File

@ -82,7 +82,7 @@ func TestOverall(t *testing.T) {
lg.Info().Msgf("%v", GetTasks()) lg.Info().Msgf("%v", GetTasks())
Start() Start()
<-time.After(time.Second * 20) <-time.After(time.Second * 20)
<-Stop().Done() Stop()
} }
@ -120,5 +120,5 @@ func TestReporter(t *testing.T) {
}() }()
<-time.After(time.Second * 20) <-time.After(time.Second * 20)
<-Stop().Done() Stop()
} }