前言

接上文go源码阅读-sync.Once继续来看sync包下的Pool。简而言之,sync.Pool是一组可以单独保存和检索的临时对象。

  • 存储在池中的任何项目都可能在任何时候被自动移除,且不会通知。如果发生这种情况时池持有唯一的引用,则该项目可能会被释放。
  • Pool是并发安全的,可以被多个协程同时使用
  • Pool的目的是缓存已分配但未使用的项目以供后续重用,减轻垃圾回收器的压力
  • 池的一个良好使用示例是在fmt包中,它维护了一个动态大小的临时输出缓冲区存储。该存储在负载下(当许多goroutine积极打印时)会扩展,并在空闲时缩小
  • sync.Once类似,sync.Pool不允许拷贝

使用

1. 声明对象池

如下代码调用sync.Pool时传入func()作为New, 在对象池中没有对象时将会调用New创建对象。

1
2
3
4
5
6
7
8
9
10
type Student struct {
Name string
Age int
}

var studentPool = sync.Pool{
New: func() interface{} {
return new(Student)
},
}
2. 获取对象

sync.Pool提供了Get()方法从池中获取对象,并将该对象从池中删除。
注意从池中获取到的数据类型是interface{},转换为对象时需要进行类型强制转换。
如果池中没有对象,则返回会调用p.New()且它不为nil,创建新对象。

1
2
3
4
func getStudent() *Student {
stu := studentPool.Get().(*Student)
return stu
}
3. 放置对象

sync.Pool提供了Put()方法传入对象,将其存入池中。

1
2
3
func putStudent(stu *Student) {
studentPool.Put(stu)
}

使用案例

最常见的sync.Pool的使用场景是fmt.Println, 其中就维护了这样的sync.Pool

1
2
3
func Println(a ...any) (n int, err error) {
return Fprintln(os.Stdout, a...)
}

Fprintln调用newPrinter()获取pp对象。

1
2
3
4
5
6
7
func Fprintln(w io.Writer, a ...any) (n int, err error) {
p := newPrinter()
p.doPrintln(a)
n, err = w.Write(p.buf)
p.free()
return
}
1
2
3
4
5
6
7
8
func newPrinter() *pp {
p := ppFree.Get().(*pp) // 从ppFree中获取pp对象
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}

print.go函数中可以看到它定义了这样的对象池,以保障在并发或者频繁调用fmt.print时的高效,无需多次创建pp实例,节省了内存。

1
2
3
var ppFree = sync.Pool{ 
New: func() any { return new(pp) }, // 创建这样的对象池
}

sync.Pool的实现原理

Pool结构

1
2
3
4
5
6
7
8
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
New func() any
}

Get方法详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (p *Pool) Get() any {
if race.Enabled {
race.Disable()
}
l, pid := p.pin() // 通过pin方法将当前线程固定到P上,禁止抢占,并返回P对应的poolLocal池以及P的ID。
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}

2-3行,如果开启了竞态检测,则先关闭竞态检测
上述第5行代码通过调用p.pin()将当前协程固定在P上,同时禁止抢占。并返回P对应的poolLocal池,以及P对应的ID。此处不展开分析pin()
6-7行代码中, 先尝试从私有列表中获取对象。如果该私有列表为空则继续向下执行。
8-16行,如果私有列表为空,则尝试从poolLocal的共享队列中弹出队首对象。如果共享列表也为空,则调用getSlow()方法尝试获取。
17行代码,使用完poolLocal后,调用runtime_procUnpin()取消固定和禁用抢占,恢复goroutine的正常调度
18-23行代码,重新启用竞态检测。如果获取到的对象x不为空,则调用race.Acquire来标记这个对象已经被获取。
如果最终没有从池中获取到对象,并且Pool的New函数不为空,则调用New函数生成一个新的对象。

1
2
3
4
type poolLocalInternal struct {
private any // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}

getSlow()方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (p *Pool) getSlow(pid int) any {
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

第2行中,通过调用runtime包中的LoadAcquintptr方法原子获取 p.localSize, 确保后续的读取不会发生在此加载之前。
第3行获取本地缓存数组。
第4-9行尝试从其他处理器的本地缓存中偷取对象:遍历本地缓存数组,尝试从每个缓存中弹出尾部对象。如果成功获取到对象,则返回该对象
第10-25行, 首先检查p.victimSize,如果当前处理器的ID大于victim cache的大小,则直接返回nil。
然后尝试从victim cache的私有列表中获取对象,如果成功,则返回该对象。 如果私有列表中没有对象,再尝试从victim cache的共享列表中偷取对象
最后,如果从victim cache中获取对象失败,则将p.victimSize设置为0,表示victim cache为空,这样未来的Get操作就不会再尝试从victim cache中获取对象

Put方法衔接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}

首先判断当前put的对象为空,则直接return
4-11行,判断当前是否处于竞态

参考

1. 深度分析 Golang sync.Pool 底层原理
2. Go sync.Pool