repeated_task.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. // Copyright 2021 Tencent Inc. All rights reserved.
  2. package task
  3. import (
  4. "runtime"
  5. "sync"
  6. "time"
  7. )
  8. // State RepeatedTask 状态类型
  9. type State int
  10. // State 可能枚举
  11. const (
  12. Init State = iota
  13. Running
  14. Stopped
  15. )
  16. // implRepeatedTask 定时重复任务
  17. // 将会在后台(另一个goroutine中)定时重复执行给定的任务
  18. //
  19. // 注意:
  20. // 1. 请不要在多个协程中操作同一个 implRepeatedTask 实例,它并不支持多协程并发
  21. // 2. 基于上一条,请不要在 handler 中操作本 implRepeatedTask 实例
  22. type implRepeatedTask struct {
  23. interval time.Duration
  24. handler func(time.Time)
  25. state State
  26. ticker *time.Ticker
  27. closed chan struct{}
  28. wg sync.WaitGroup
  29. }
  30. // State 查看当前任务状态
  31. func (t *implRepeatedTask) State() State {
  32. return t.state
  33. }
  34. // Interval 查看当前任务间隔
  35. func (t *implRepeatedTask) Interval() time.Duration {
  36. return t.interval
  37. }
  38. // Start 启动重复任务
  39. // 当且仅当任务并未启动时有效,其他状态下不会发生任何事
  40. func (t *implRepeatedTask) Start() {
  41. if t.state != Init {
  42. return
  43. }
  44. t.ticker = time.NewTicker(t.interval)
  45. t.wg.Add(1)
  46. go func() {
  47. defer t.wg.Done()
  48. t.run()
  49. }()
  50. t.state = Running
  51. }
  52. // Stop 停止重复任务
  53. // 当且仅当任务处于 Running 状态时有效,其他状态下不会发生任何事
  54. func (t *implRepeatedTask) Stop() {
  55. if t.state != Running {
  56. return
  57. }
  58. close(t.closed)
  59. t.wg.Wait()
  60. t.ticker.Stop()
  61. t.state = Stopped
  62. }
  63. func (t *implRepeatedTask) run() {
  64. for {
  65. select {
  66. case <-t.closed:
  67. return
  68. case tickTime := <-t.ticker.C:
  69. t.handler(tickTime)
  70. }
  71. }
  72. }
  73. type wrapper struct {
  74. *implRepeatedTask
  75. }
  76. // RepeatedTask 自动重复任务
  77. // 设定间隔时间与任务Handler即可自动按间隔执行,在你不再持有该实例的引用后,任务自动停止。
  78. // 也可以调用 Stop 方法来停止任务
  79. type RepeatedTask wrapper
  80. // NewRepeatedTask 初始化一个自动重复任务
  81. // 创建成功后请调用 Start 方法启动任务,启动后需等待 interval 时间发生第一次调用
  82. func NewRepeatedTask(interval time.Duration, handler func(time.Time)) *RepeatedTask {
  83. task := implRepeatedTask{
  84. closed: make(chan struct{}),
  85. interval: interval,
  86. handler: handler,
  87. }
  88. result := &RepeatedTask{&task}
  89. runtime.SetFinalizer(result, func(t *RepeatedTask) { t.Stop() })
  90. return result
  91. }