在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。
假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3
package main import ( "fmt" "time" ) func job1(count int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for i := 0; i < count; i++ { time.Sleep(time.Second) fmt.Println("job1 finish:", 1) outCh <- 1 } }() return outCh } func job2(inCh <-chan int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for val := range inCh { // 耗时2秒 time.Sleep(time.Second * 2) val++ fmt.Println("job2 finish:", val) outCh <- val } }() return outCh } func job3(inCh <-chan int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for val := range inCh { val++ fmt.Println("job3 finish:", val) outCh <- val } }() return outCh } func main() { t := time.Now() firstResult := job1(10) secondResult := job2(firstResult) thirdResult := job3(secondResult) for v := range thirdResult { fmt.Println(v) } fmt.Println("all finish") fmt.Println("duration:", time.Since(t).String()) }
输出结果为
job1 finish: 1 job1 finish: 1 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job2 finish: 2 job3 finish: 3 3 job2 finish: 2 job3 finish: 3 3 job2 finish: 2 job3 finish: 3 3 all finish duration: 21s
共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?
现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C
package main import ( "fmt" "sync" "time" ) func job1(count int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for i := 0; i < count; i++ { time.Sleep(time.Second) fmt.Println("job1 finish:", 1) outCh <- 1 } }() return outCh } func job2(inCh <-chan int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for val := range inCh { // 耗时2秒 time.Sleep(time.Second * 2) val++ fmt.Println("job2 finish:", val) outCh <- val } }() return outCh } func job3(inCh <-chan int) <-chan int { outCh := make(chan int, 2) go func() { defer close(outCh) for val := range inCh { val++ fmt.Println("job3 finish:", val) outCh <- val } }() return outCh } func merge(inCh ...<-chan int) <-chan int { outCh := make(chan int, 2) var wg sync.WaitGroup for _, ch := range inCh { wg.Add(1) go func(wg *sync.WaitGroup, in <-chan int) { defer wg.Done() for val := range in { outCh <- val } }(&wg, ch) } // 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock go func() { wg.Wait() close(outCh) }() return outCh } func main() { t := time.Now() firstResult := job1(10) // 拆分成三个job2,即3个goroutine (扇出) secondResult1 := job2(firstResult) secondResult2 := job2(firstResult) secondResult3 := job2(firstResult) // 合并结果(扇入) secondResult := merge(secondResult1, secondResult2, secondResult3) thirdResult := job3(secondResult) for v := range thirdResult { fmt.Println(v) } fmt.Println("all finish") fmt.Println("duration:", time.Since(t).String()) }
输出结果
job1 finish: 1 job1 finish: 1 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job1 finish: 1 job2 finish: 2 job3 finish: 3 3 job2 finish: 2 job3 finish: 3 3 job2 finish: 2 job3 finish: 3 3 all finish duration: 12s
可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。
FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。
FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。
是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。
总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。