Golang并发模式之扇入FAN-IN和扇出FAN-OUT


在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。

假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3

package main

import (
	"fmt"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)
	secondResult := job2(firstResult)
	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果为

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21s

共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?

现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C

package main

import (
	"fmt"
	"sync"
	"time"
)

func job1(count int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for i := 0; i < count; i++ {
			time.Sleep(time.Second)
			fmt.Println("job1 finish:", 1)
			outCh <- 1
		}
	}()

	return outCh
}

func job2(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			// 耗时2秒
			time.Sleep(time.Second * 2)
			val++
			fmt.Println("job2 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func job3(inCh <-chan int) <-chan int {
	outCh := make(chan int, 2)

	go func() {
		defer close(outCh)
		for val := range inCh {
			val++
			fmt.Println("job3 finish:", val)
			outCh <- val
		}
	}()

	return outCh
}

func merge(inCh ...<-chan int) <-chan int {
	outCh := make(chan int, 2)

	var wg sync.WaitGroup
	for _, ch := range inCh {
		wg.Add(1)
		go func(wg *sync.WaitGroup, in <-chan int) {
			defer wg.Done()
			for val := range in {
				outCh <- val
			}
		}(&amp;wg, ch)
	}

	// 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock
	go func() {
		wg.Wait()
		close(outCh)
	}()

	return outCh
}

func main() {
	t := time.Now()

	firstResult := job1(10)

	// 拆分成三个job2,即3个goroutine (扇出)
	secondResult1 := job2(firstResult)
	secondResult2 := job2(firstResult)
	secondResult3 := job2(firstResult)

	// 合并结果(扇入)
	secondResult := merge(secondResult1, secondResult2, secondResult3)

	thirdResult := job3(secondResult)

	for v := range thirdResult {
		fmt.Println(v)
	}

	fmt.Println("all finish")
	fmt.Println("duration:", time.Since(t).String())
}

输出结果

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12s

可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。

FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。

FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。

是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。

总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。

参考

重新认识Golang中的空结构体

  1. 认识空结构体
  2. 低层实现原理
  3. 空结构体之内存对齐
  4. 应用场景

在golang中,如果我们想实现一个set集合的话,一般会使用map来实现,其中将set的值作为map的键,对于map的值一般使用一个空结构体来实现,当然对map值也可以使用一个bool类型或者数字类型等,只要符合一个键值对应关系即可。但我们一般推荐使用struct{}来实现,为什么呢?

package main

import "fmt"

func main() {
	m := make(map[int]struct{})
	m[1] = struct{}{}
	m[2] = struct{}{}
	
	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}
	
}

上面这段代码是一个很简单的使用map实现的set功能,这里是采用空结构体struct{}来实现。

在分析为什么使用struct{}以前,我看先认识一个struct。

认识空结构体 struct{}

我们先看一个这段代码

package main

import (
	"fmt"
	"unsafe"
)

type emptyStruct struct{}

func main() {
	a := struct{}{}
	b := struct{}{}

	c := emptyStruct{}

	fmt.Println(a)
	fmt.Printf("%pn", &amp;a)
	fmt.Printf("%pn", &amp;b)
	fmt.Printf("%pn", &amp;c)

	fmt.Println(a == b)

	fmt.Println(unsafe.Sizeof(a))
}


{} // 值
0x586a00 // a 内存地址
0x586a00 // b 内存地址, 同a一样
0x586a00 // c 内存地址,别名类型变量,同a一样
true // 丙个结构体是否相等,很显示,上面打印的是同一个内存地址
0 // 占用内存大小 

从打印结果里我们可以得出以下结论
1. 空结构体是一个无内容值的值,即空值
2. 空结构体占用0大小的内存,即不分配内存
3. 凡是空结构体他们都是一样的,即底层指向的是同一个内存地址,如何实现的呢?

对于高性能并发应用来说,内存占用大小一般都是我们关注重点对象,使用空struct{}根本不占用内存大小,相比使用其它类型的值,如bool(占用两个字节)int64(8个字节)性能要好的多,毕竟不用分配和回收内存了。

当然有人可能会说开发的应用map值不多的话,这点内存可以忽略不计。是的,确实是这样的,但这会带来一个另一个语义理解问题。如:

package main

import "fmt"

func main() {
	m := make(map[int]bool)
	m[1] = true
	m[2] = true

	if _, ok := m[1]; ok {
		fmt.Println("exists")
	}

}

我们用bool代替了空结构体,至于值是 true 还是 false 是没有任何影响的,都是bool数据类型占用的内存大小也一样。那么如果另一位开发的同事查看review源码的时候,如果这个map出现在一个大型应用的时候,会大多处出现,就很有可能带来疑惑,对于值所表达的意图就有所担心怀疑,提高了理解代码的门槛。心里如果值为true 的话,会执行一个逻辑,为false的话会执行另一个逻辑。而相比使用一个空结构体strcut{}来理解起来容易提高心智,别人一看空结构体struct{}就知道要表达的意思是不需要关心值是什么,只需要关心键值即可。

要记住一点,我们写的代码虽然是让机器运行的,但却是让人看的,能让人一眼看明白就不要看两眼,这点不正是符合了golang 这门开发语言的特性吗,只需要记住25个关键字,简单易理解,性能还高效。

底层原理

那么在底层一个空结构体又是怎么一回事呢? 为什么多个空结构体会指向同一个内存地址呢? 这和一个很重要的 zerobase 变量有关(在runtime里多次使用到了这个变量),而zerobase 变量是一个 uintptr 的全局变量,占用8个字节 (https://github.com/golang/go/blob/master/src/runtime/malloc.go#L840-L841),只要你将struct{} 赋值给一个或者多个变量,它都返回这个 zerobase 的地址,这点我们上面已经证实过这一点了。

在golang中大量的地方使用到了这个 zerobase 变量,只要分配的内存为0,就返回这个变量地址。

内存分配函数 https://github.com/golang/go/blob/master/src/runtime/malloc.go#L902-L1171

// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
	if gcphase == _GCmarktermination {
		throw("mallocgc called with gcphase == _GCmarktermination")
	}

	if size == 0 {
		return unsafe.Pointer(&amp;zerobase)
	}
       
        .......
}

结论:只要分配的内存大小为 0 字节,就返回 &zerobase 的指针,不仅仅是空结构体。

内存对齐

如果您对内存对齐不太了解的话,可以参考这篇Memory Layouts,注意下32位和64位系统之间的差异。有一点需要保证在 32 位系统上想要原子操作 64 位字(如 uint64)的话,需要由调用方保证其数据地址是 64 位对齐的,否则原子访问会有异常。

还拿uint64来说,大小为 8bytes,32 位系统上按 4bytes 对齐,64 位系统上按 8bytes 对齐。

在 64 位系统上,8bytes 刚好和其字长相同,所以可以一次完成原子的访问,不被其他操作影响或打断。

而 32 位系统,4byte 对齐,字长也为 4bytes,可能出现uint64的数据分布在两个数据块中,需要两次操作才能完成访问。

如果两次操作中间有可能别其他操作修改,不能保证原子性。

这样的访问方式也是不安全的。

这一点issue-6404[5]中也有提到:

This is because the int64 is not aligned following the bool. It is 32-bit aligned but not 64-bit aligned, because we’re on a 32-bit system so it’s really just two 32-bit values side by side.

For the numeric types, the following sizes are guaranteed:

type                                 size in bytes

byte, uint8, int8                     1
uint16, int16                         2
uint32, int32, float32                4
uint64, int64, float64, complex64     8
complex128                           16

空结构体作为一个占用0字节的数据类型与其它基本类型对比的话,确实有些特殊。

代码 https://play.golang.org/p/HDOXko5iLNO

package main

import (
	"fmt"
	"unsafe"
)

// 放在第1个字段,共需要24字节
type T0 struct {
	s  struct{} // 0
	f2 int32    // 4
	f3 int32    // 4
	f1 bool     // 1
	f4 int64    // 8
}

// 放在第1个字段,共需要16字节
// f2为 int16 类型
type T1 struct {
	s  struct{} // 0
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 中间字段,共需要16字节
type T2 struct {
	f1 bool     // 1
	s  struct{} // 0
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
}

// 最后一个字段, 共需要24字节
type T3 struct {
	f1 bool     // 1
	f2 int16    // 2
	f3 int32    // 4
	f4 int64    // 8
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
type T4 struct {
	f1 bool     // 1
	f2 int16    // 2
	f4 int64    // 8
	f3 int32    // 4
	s  struct{} // 0
}

// 最后一个字段, 共需要24字节
// 这里f1数据类型由bool变成了int16
type T5 struct {
	f4 int64    // 8
	f3 int32    // 4
	f1 int16    // 2
	f2 int16    // 2
	s  struct{} // 0
}

func main() {
	bit := 32 << (^uint(0) >> 63)
	fmt.Printf("当前系统为 %d 位n", bit)

	var t0 T0
	var t1 T1
	var t2 T2
	var t3 T3
	var t4 T4
	var t5 T5

	fmt.Println("t0:", unsafe.Sizeof(t0)) // (0+4)+(4)+ (8) 共占用24字节对齐
	fmt.Println("t1:", unsafe.Sizeof(t1)) // (0+1+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t2:", unsafe.Sizeof(t2)) // (1+0+2+4)+ (8) 共占用8字节对齐

	fmt.Println("t3:", unsafe.Sizeof(t3)) // (1+2+4)+(8)+(0)共占用24字节
	fmt.Println("t4:", unsafe.Sizeof(t4)) // (1+2)+(8)+(4+0)共占用24字节
	fmt.Println("t5:", unsafe.Sizeof(t5)) // (8)+(4+2+2)+ (0) 共占用24字节
}


结果

当前系统为 64 位
t0: 24
t1: 16
t2: 16
t3: 24
t4: 24
t5: 24

这里运行环境为64位系统,所以按8字节对齐。

T0T1是将struct{}放在首字段的情况;
T2是放在中间位置的情况;
T3T4T5是作为最后一个字段的情况;

当作为最后一个字段的时候,如果它前面的字段放在一起的话等于8的话,则空结构体单独占用8字节(T5),否则可以与前面的共占8字节(T4)。

总结:上面结果我们可以看出,虽然空结构体占用0字节,但在进行内存对齐的时候是需要考虑这个字段,可以理解为将其视为需占用1个字节,不然这个结构体就没有任何意义了 。

对于Golang为什么要内存对齐, 有没有必要使用,可以看看这篇文章 Golang 是否有必要内存对齐?

空结构体struct{}使用场景

一般我们用在用户不关注值内容的情况下,只是作为一个信号或一个占位符来使用。

  1. 基于map实现集合功能
    就是我们上面提到的情况
  2. 与channel组合使用,实现一个信号
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	conLimit := make(chan struct{}, 2)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		conLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			// doing...

			fmt.Println(i)
			time.Sleep(time.Second)
			<-conLimit
		}(i)
	}

	wg.Wait()
	close(conLimit)

	fmt.Println("ok")

}

基于有缓冲的channel是实现并发限速。另外一种限速用法

下面的例子来自《Go 语言高级编程》, 为里使用空结构体进行了调整

var limit = make(chan struct{}, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- struct{}{}
            w()
            <-limit
        }()
    }
    // …………
}

那么这两种写法是否有差异呢?

这里,limit <- 1 放在 func 内部而不是外部,原因是:

如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。

limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。

还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。

参考

https://golang.org/ref/spec#Size_and_alignment_guarantees

Memory Layouts

Dig101:Go之聊聊struct的内存对齐

Golang 是否有必要内存对齐?

认识Golang内存对齐

https://github.com/qcrao/Go-Questions/blob/master/channel/channel%20%E6%9C%89%E5%93%AA%E4%BA%9B%E5%BA%94%E7%94%A8.md

Golang中的内存重排(Memory Reordering)

什么是内存重排

内存重排指的是内存的读/写指令重排。— Xargin

为什么要内存重排

为了提升程序执行效率,减少一些IO操作,一些硬件或者编译器会对程序进行一些指令优化,优化后的结果可能会导致程序编码时的顺序与代码编译后的先后顺序不一致。

就拿做饭场景来说吧,是先蒸米还是先炒菜,这两者是没有冲突的,编译器在编译时有可能与你要求的顺序不一样。

编译器重排

如下面这段代码

X = 0
for i in range(100):
    X = 1
    print X

要实现打印100次1,很显示在for里面每次都执行X=1语句有些浪费资源,如果将初始变量值修改为1,是不是要快的多。编译器也分析到了这一点,于是在编译时对代码做了以下优化

X = 1
for i in range(100):
    print X

最终输出结果是一样的,两段代码功能也一样。

但是如果此时有另一个线程里执行了一个 X=0 的赋值语句的话(两个线程同时运行),那么输出结果就可能与我们想要的不一样了。

优化前情况:第一个线程执行到了第3次print X 后,第二个线程执行了X=0,把X 的值进行了修改,结果就有可能是1110 1111(在线程执行第5次时,重新执行了X=1,而且之后一直都是 1)。这就有问题了。

优化后情况:按上面的逻辑来执行的话,结果就是11100000…, 后面全是0,再也没有机会将X贬值为1了

由此我们可以得出一个结果,在多线程下,是无法保证编译前后的代码功能是“待价”的。

所以要开发时,这一点我们一定要小心。搞不好在单线程运行没有问题的程序在多线程下会出现各种各样的问题。

每当我们提到内存重排的时候,其实真实在讨论的主题是 内存屏障Memory barrier。指令重排无法逾越内存屏障。

CPU 重排

参考:https://www.w3xue.com/exp/article/20196/40758.html

CPU 架构

Figure 1. CPU Architecture
Figure 2. Store Buffer
Figure 3. MESI Protocol

MESI 可参考 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA

参考

从 Memory Reordering 说起
CPU指令重排/内存乱序
谈谈Go语言中的内存重排

相关话题

锁、内存屏障与缓存一致性 https://gocode.cc/project/9/article/128

memory barrier

[译] 什么是缓存 false sharing 以及如何解决(Golang 示例) https://juejin.cn/post/6844903866270482445

CPU缓存体系对Go程序的影响 https://mp.weixin.qq.com/s/vnm9yztpfYA4w-IM6XqyIA