main.go
package main
import (
"ants-demo/mypool"
"fmt"
"log"
"runtime"
"sync/atomic"
"time"
)
func init() {
Ticke(func() {
log.Print("current go routine num: ", runtime.NumGoroutine())
}, 2*time.Second)
}
func main() {
var counter int64
p := mypool.NewPool(5, 5, 2)
for i := 0; i < 100; i++ {
x := i
p.Submit(func() {
log.Println("正在执行:", x)
time.Sleep(50 * time.Millisecond)
atomic.AddInt64(&counter, 1)
})
}
for p.Running() > 0 {
time.Sleep(500 * time.Millisecond)
}
fmt.Println(counter)
}
func Ticke(f func(), d time.Duration) {
go func() {
ticker := time.NewTicker(d)
for {
select {
case <-ticker.C:
go f()
}
}
}()
}
mypool/mypool.go
package mypool
import (
"fmt"
"log"
"sync/atomic"
"time"
)
type PoolInterface interface {
run(task func())
submit(task func(), timeout <-chan time.Time) error
Submit(task func())
SubmitWithTimeout(task func(), duration time.Duration)
addRunning(delta int)
Running() int
}
type Pool struct {
// 缓冲池
concurrency chan struct{}
// 任务队列
work chan func()
// 正在执行的任务数
running int32
}
func NewPool(concurrencyNum, queue, workers int) *Pool {
p := &Pool{
concurrency: make(chan struct{}, concurrencyNum),
work: make(chan func(), queue),
running: 0,
}
for i := 0; i < workers; i++ {
p.concurrency <- struct{}{}
go p.run(func() {})
}
return p
}
func (p *Pool) run(task func()) {
defer func() {
p.addRunning(-1)
if err := recover(); err != nil {
log.Println("task panic", err, string(debug.Stack()))
}
log.Println("run task defer 被执行")
log.Println("run task err")
<-p.concurrency
}()
task()
log.Println("run task middle 被执行")
for task := range p.work {
task()
p.addRunning(-1)
}
log.Println("run task end 被执行")
}
func (p *Pool) submit(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return fmt.Errorf("submit task err")
case p.work <- task:
p.addRunning(1)
return nil
case p.concurrency <- struct{}{}:
go p.run(task)
return nil
}
}
func (p *Pool) Submit(task func()) {
p.submit(task, nil)
}
func (p *Pool) SubmitWithTimeout(task func(), duration time.Duration) {
p.submit(task, time.After(duration))
}
func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}
func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}
在这个例子里,我们初始化线程池调用NewPool()方法和Submit方法都会调用run方法:
func (p *Pool) run(task func()) {
defer func() {
p.addRunning(-1)
if err := recover(); err != nil {
log.Println("task panic", err, string(debug.Stack()))
}
log.Println("run task defer 被执行")
log.Println("run task err")
<-p.concurrency
}()
task()
log.Println("run task middle 被执行")
for task := range p.work {
task()
p.addRunning(-1)
}
log.Println("run task end 被执行")
}
我用大量携程调用这个示例的时候控制台显示:
2024/04/15 11:15:29 正在执行: 6
2024/04/15 11:15:29 正在执行: 0
2024/04/15 11:15:29 正在执行: 1
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 2
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 3
2024/04/15 11:15:29 正在执行: 4
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 5
2024/04/15 11:15:29 正在执行: 7
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 8
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 9
2024/04/15 11:15:29 正在执行: 10
2024/04/15 11:15:29 正在执行: 11
2024/04/15 11:15:29 正在执行: 12
2024/04/15 11:15:29 正在执行: 13
2024/04/15 11:15:29 正在执行: 14
2024/04/15 11:15:29 正在执行: 15
2024/04/15 11:15:29 正在执行: 16
2024/04/15 11:15:29 正在执行: 18
2024/04/15 11:15:29 正在执行: 19
2024/04/15 11:15:29 正在执行: 17
2024/04/15 11:15:29 正在执行: 20
2024/04/15 11:15:29 正在执行: 21
在这里就想看一下run()方法的执行过程,run()方法里面的打印语句除了log.Println("run task middle 被执行")
被执行另外两个都没执行。
除了手动在run方法里面加panic 才会执行defer里面的代码
我们前面初始化:
p:=mypool.NewPool(5,5,2)
我可以理解成 最多只能调用5次run方法,且五个run方法里面的
for task := range p.work {
task()
p.addRunning(-1)
}
for range chan 会阻塞协程,会一直接收submit方法传过来的task,所以导致不会执行后面的代码也不会执行defer func(),不知是否可以这样理解
只要不出panic的情况下,甚至可以吧run方法的defer部分删掉,是不是可以这样理解
登录后可查看更多问答,登录/注册