Golang并发模式之扇入FAN-IN和扇出FAN-OUT


在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。

假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3

package main

import (
	"fmt"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)
	secondResult := job2(firstResult)
	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果为

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21s

共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?

现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C

package main

import (
	"fmt"
	"sync"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func merge(inCh ...<-chan int) <-chan int {
	outCh := make(chan int, 2)

	var wg sync.WaitGroup
	for _, ch := range inCh {
		wg.Add(1)
		go func(wg *sync.WaitGroup, in <-chan int) {
			defer wg.Done()
			for val := range in {
				outCh <- val
			}
		}(&amp;wg, ch)
	}

	// 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock
	go func() {
		wg.Wait()
		close(outCh)
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)

	// 拆分成三个job2,即3个goroutine (扇出)
	secondResult1 := job2(firstResult)
	secondResult2 := job2(firstResult)
	secondResult3 := job2(firstResult)

	// 合并结果(扇入)
	secondResult := merge(secondResult1, secondResult2, secondResult3)

	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12s

可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。

FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。

FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。

是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。

总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。

参考