请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

关于队列实现调度器问题

老师好,调度器的处理这里用到了goroutine和channel的数据传输。所以一直有点蒙,在这里阐述下自己的理解。

先说下简单的调度器
1,要先开很多的worker,这时候就初次用到了goroutine
2,要往createWorker这个goroutine里传数据,就利用到了channel
由于实际工作者是worker,这时候定义了 chan request 和 chan parserResult
3,简单调度器的工作就是,
①把大量的request请求,发送到 chan request里
②之后goroutine里会接收chan request里的数据传达给worker工作,生成大量的parserResult 并把这些parserResult发送到 chan parserResult里。
③最后,外围有个地方会不停的接收来自chan parserResult的数据,来进行最后编译。

再说下队列实现调度器
1,要先开很多的worker,这时候就初次用到了goroutine
2,要往createWorker这个goroutine里传数据,就利用到了channel
由于实际工作者是worker,这时候定义了 chan request 和 chan parserResult(到这里为止是一样的)
3,队列调度器的工作是,
①把大量的request请求,发送到 chan request里
②chan request里的请求先不去分配工作,而是由队列 []Request进行接收
func (s *QueuedScheduler) WorkerChan() chan engine.Request { return make(chan engine.Request) }的意义不太理解
workerChan chan chan engine.Request存在意义不太理解
var workerQ []chan engine.Request  这个队列又是要做什么的
总体来说,有了chan request已经够了,为什么又多了workerChan 这里没太懂。是想把createrWorker也用队列来管理是吧?
⑥之后goroutine里会接收workerChan里的数据传达给worker工作,生成大量的parserResult 并把这些parserResult发送到 chan parserResult里。
⑦最后,外围有个地方会不停的接收来自chan parserResult的数据,来进行最后编译。

上述内容有点偏长,可能问题阐述的也不是很明确。希望老师能给讲解一下其中的原理。

正在回答

1回答

抱歉这个问题隔了很久。

首先这里的调度器总体上说做了一件事,它有一个chan request,又维护了很多worker,每次从chan requet里收到一个request,就发给一个worker。简单调度器和队列调度器都做这件事。

简单调度器直接用goroutine和channel的特性就做到了这一点,但是我们对整个系统各个环节控制较低,并发请求过多之后,系统到底应该是怎样的行为,是不确定的。所以又尝试了一个队列调度器。

为了阅读方便,具体代码在 https://git.imooc.com/coding-180/coding-180/src/master/crawler/scheduler/queued.go  

队列调度器同样维护一个

requestChan chan engine.Request

用于接收请求。为了维护很多worker,我们有

workerChan  chan chan engine.Request

这里有一个难点就是workerChan里的chan Request就是代表了worker。我们甚至可以写成

type worker chan engine.Request

workerChan chan worker

到这里,调度器做的就是从手里的两个channel,各取一个元素,那么就有了一个request和一个worker,把这个request发给这个worker,就解决了,然后不断重复。

但是各取一个元素这件事并不那么容易。channel的读取是会阻塞的。如果我先取request,取不到,那么在等待的时候我就没有办法取worker,这是不好的。所以要用select。用了select我们要保证很重要的一点:

select的每个case里面都要快速执行,不能有阻塞

这样我们所有的等待都发生在select上面,这是非常高效的,因为select可以等待多个信道,任何一个信道有了数据就会触发,不会发生数据送过来但来不及接收,或者有了数据来不及发送的情况。

所以我们在select里收到一个request,不能马上发给一个worker,而是把它存起来,存在哪里呢,队列里。所以我们有对应的

var requestQ []engine.Request

var workerQ []chan engine.Request

分别用来存储从reuqestChan和workerChan里接收到的东西(workerQ里的chan request就代表了worker)。

到这里,我们的select做同时做三件事,注意这三件事在“并”之后都非常快:

  1. 从requestChan收一个request,并存在requestQ

  2. 从workerChan收一个worker,并存在workerQ

  3. 把第一个requestQ里的request发给第一个workerQ里的worker,并把它们从队列里分别取出


最后还剩一个小问,

func (s *QueuedScheduler) WorkerChan() chan engine.Request {

    return make(chan engine.Request)

}

这是干什么的?我们看一下它的调用者,在https://git.imooc.com/coding-180/coding-180/src/master/crawler/engine/concurrent.go#L28 就是告诉worker,你会从这里接收发给你的request。这个返回的新的channel又回从WorkerReady https://git.imooc.com/coding-180/coding-180/src/master/crawler/scheduler/queued.go#L18  这个函数被送回来。这样不是很直观,但是是为了和SimpleScheduler兼容。使得SimpleScheduler和QueuedScheduler可以互换。这里我在16-5重构和总结中进行了详细的讲述。


3 回复 有任何疑惑可以回复我~
  • 提问者 nitros #1
    感谢老师百忙中的耐心回复。内容比较多我先消化一下
    回复 有任何疑惑可以回复我~ 2020-03-08 15:53:58
  • 提问者 nitros #2
    非常感谢!
    回复 有任何疑惑可以回复我~ 2020-03-09 08:13:28
  • hen_nam #3
    看完这个回答终于理解了。
    
    我觉得之所以难以理解的原因之一,可能是结构体字段的名称和类型相似,又不清楚它们之间的联系。例如:
    为什么 SimpleScheduler.workerChan 和 QueuedScheduler.workerChan 名称相同却类型不同?
    为什么 SimpleScheduler.workerChan 和 QueuedScheduler.requestChan 名称不同却类型相同?
    为什么 QueuedScheduler.requestChan 和 QueuedScheduler.workerChan 类型相似,它们存在什么联系?
    
    我觉得更容易理解的方式可能是,首先说明 Scheduler 的输入类型和输出类型都是 Request,然后将输入信道和输出信道分别命名为 in 和 out。
    
    type SimpleScheduler struct {
    	out chan engine.Request
    }
    type QueuedScheduler struct {
    	in chan engine.Request
    	outChan  chan chan engine.Request
    }
    回复 有任何疑惑可以回复我~ 2022-01-25 01:23:48
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信