请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

关于示例代码的理解

main.go

package main

import (
	"ants-demo/mypool"
	"fmt"
	"log"
	"runtime"
	"sync/atomic"
	"time"
)

func init() {
	Ticke(func() {
		log.Print("current go routine num: ", runtime.NumGoroutine())
	}, 2*time.Second)

}
func main() {
	var counter int64
	p := mypool.NewPool(5, 5, 2)

	for i := 0; i < 100; i++ {
		x := i
		p.Submit(func() {
			log.Println("正在执行:", x)
			time.Sleep(50 * time.Millisecond)
			atomic.AddInt64(&counter, 1)
		})
	}

	for p.Running() > 0 {
		time.Sleep(500 * time.Millisecond)
	}
	fmt.Println(counter)
}

func Ticke(f func(), d time.Duration) {
	go func() {
		ticker := time.NewTicker(d)
		for {
			select {
			case <-ticker.C:
				go f()
			}
		}
	}()
}

mypool/mypool.go

package mypool

import (
	"fmt"
	"log"
	"sync/atomic"
	"time"
)

type PoolInterface interface {
	run(task func())
	submit(task func(), timeout <-chan time.Time) error
	Submit(task func())
	SubmitWithTimeout(task func(), duration time.Duration)
	addRunning(delta int)
	Running() int
}
type Pool struct {
	//	缓冲池
	concurrency chan struct{}
	//	任务队列
	work chan func()
	//	正在执行的任务数
	running int32
}

func NewPool(concurrencyNum, queue, workers int) *Pool {
	p := &Pool{
		concurrency: make(chan struct{}, concurrencyNum),
		work:        make(chan func(), queue),
		running:     0,
	}
	for i := 0; i < workers; i++ {
		p.concurrency <- struct{}{}
		go p.run(func() {})
	}

	return p
}
func (p *Pool) run(task func()) {

	defer func() {
		p.addRunning(-1)
		if err := recover(); err != nil {
			log.Println("task panic", err, string(debug.Stack()))
		}
		log.Println("run task defer 被执行")
		log.Println("run task err")
		<-p.concurrency

	}()
	task()
	log.Println("run task middle 被执行")

	for task := range p.work {
		task()
		p.addRunning(-1)
	}
	log.Println("run task end 被执行")

}
func (p *Pool) submit(task func(), timeout <-chan time.Time) error {
	select {
	case <-timeout:
		return fmt.Errorf("submit task err")
	case p.work <- task:
		p.addRunning(1)
		return nil
	case p.concurrency <- struct{}{}:
		go p.run(task)
		return nil
	}
}
func (p *Pool) Submit(task func()) {
	p.submit(task, nil)

}
func (p *Pool) SubmitWithTimeout(task func(), duration time.Duration) {
	p.submit(task, time.After(duration))
}
func (p *Pool) addRunning(delta int) {
	atomic.AddInt32(&p.running, int32(delta))
}
func (p *Pool) Running() int {
	return int(atomic.LoadInt32(&p.running))
}

在这个例子里,我们初始化线程池调用NewPool()方法和Submit方法都会调用run方法:

func (p *Pool) run(task func()) {

	defer func() {
		p.addRunning(-1)
		if err := recover(); err != nil {
			log.Println("task panic", err, string(debug.Stack()))
		}
		log.Println("run task defer 被执行")
		log.Println("run task err")
		<-p.concurrency

	}()
	task()
	log.Println("run task middle 被执行")

	for task := range p.work {
		task()
		p.addRunning(-1)
	}
	log.Println("run task end 被执行")

}

我用大量携程调用这个示例的时候控制台显示:

2024/04/15 11:15:29 正在执行: 6
2024/04/15 11:15:29 正在执行: 0
2024/04/15 11:15:29 正在执行: 1
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 2
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 3
2024/04/15 11:15:29 正在执行: 4
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 5
2024/04/15 11:15:29 正在执行: 7
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 8
2024/04/15 11:15:29 run task middle 被执行
2024/04/15 11:15:29 正在执行: 9
2024/04/15 11:15:29 正在执行: 10
2024/04/15 11:15:29 正在执行: 11
2024/04/15 11:15:29 正在执行: 12
2024/04/15 11:15:29 正在执行: 13
2024/04/15 11:15:29 正在执行: 14
2024/04/15 11:15:29 正在执行: 15
2024/04/15 11:15:29 正在执行: 16
2024/04/15 11:15:29 正在执行: 18
2024/04/15 11:15:29 正在执行: 19
2024/04/15 11:15:29 正在执行: 17
2024/04/15 11:15:29 正在执行: 20
2024/04/15 11:15:29 正在执行: 21

在这里就想看一下run()方法的执行过程,run()方法里面的打印语句除了log.Println("run task middle 被执行") 被执行另外两个都没执行。
除了手动在run方法里面加panic 才会执行defer里面的代码

我们前面初始化:
p:=mypool.NewPool(5,5,2)

我可以理解成 最多只能调用5次run方法,且五个run方法里面的

for task := range p.work {
	task()
	p.addRunning(-1)
}

for range chan 会阻塞协程,会一直接收submit方法传过来的task,所以导致不会执行后面的代码也不会执行defer func(),不知是否可以这样理解

只要不出panic的情况下,甚至可以吧run方法的defer部分删掉,是不是可以这样理解

正在回答 回答被采纳积分+3

1回答

少林码僧 2024-04-15 16:29:49

可以这样理解,run方法本质上是一个常驻协程,一直读取写入到p.work这个channel的task,直到p.work关闭,或者执行task过程中发生panic才会退出

0 回复 有任何疑惑可以回复我~
  • 提问者 南姿悍_ #1
    结构体内嵌的通道 需要手动关闭吗
    回复 有任何疑惑可以回复我~ 2024-04-15 20:54:19
  • 少林码僧 回复 提问者 南姿悍_ #2
    channel是否需要关闭以及在什么时机关闭完全是基于业务需要的,关闭的channel会有一些特性,比如不能再发送消息,会等待for range读取完再退出for range,以及从一个已经关闭的channel接收数据,如果缓冲区中为空,则返回一个零值。通常是基于这些特性来实现一些特定的逻辑。如果是为了垃圾回收去考虑是否关闭channel就没必要了,因为channel变量属于间接值部,当没有引用指向时会自动回收
    回复 有任何疑惑可以回复我~ 2024-04-19 00:18:42
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信