缓存淘汰算法

5.2 缓存淘汰算法

本书讨论的缓存,是指存放在内存的。那么容量一般是有限的,因为内存是有限的。因此,当缓存容量超过一定限制时,应该移除一条或多条数据。那应该移除谁呢?答案是尽可能移除“无用”数据。那怎么判断数据是否“无用”?这就涉及到缓存淘汰算法。

常用的缓存淘汰算法有:

  • FIFO(先进先出)
  • LFU(最少使用)
  • LRU(最近最少使用)

本节讲解具体算法实现时,我们不考虑并发(即单 goroutine 读写)和 GC 问题,所以缓存数据通过 Go 中的 map 来存储。

5.2.1 初始化项目

开始之前,我们需要为进程内缓存项目做初始化,执行下述命令(若为 Windows 系统,可根据实际情况自行调整项目的路径):

$ mkdir -p $HOME/go-programming-tour-book/cache
$ cd $HOME/go-programming-tour-book/cache
$ go mod init github.com/go-programming-tour-book/cache

5.2.2 缓存接口

我们为缓存系统提供一个接口。在我们的 cache 项目根目录创建一个 cache.go 文件,定义一个 Cache 接口,代码如下:

package cache

// Cache 缓存接口
type Cache interface {
	Set(key string, value interface{})
	Get(key string) interface{}
	Del(key string)
	DelOldest()
	Len() int
}
  • 设置/添加一个缓存,如果 key 存在,用新值覆盖旧值;
  • 通过 key 获取一个缓存值;
  • 通过 key 删除一个缓存值;
  • 删除最“无用”的一个缓存值;
  • 获取缓存已存在的记录数;

接下来三种算法都会实现该接口。其中 FIFO 和 LRU 的数据结构设计参考了 Go 语言的 groupcache 库。

5.2.3 FIFO(First In First Out)

FIFO,先进先出,也就是淘汰缓存中最早添加的记录。在 FIFO Cache 设计中,核心原则就是:如果一个数据最先进入缓存,那么也应该最早淘汰掉。这么认为的根据是,最早添加的记录,其不再被使用的可能性比刚添加的可能性大。

这种算法的实现非常简单,创建一个队列(一般通过双向链表实现),新增记录添加到队尾,缓存满了,淘汰队首。

5.2.2.1 FIFO 算法实现

在 cache 项目根目录创建一个 fifo 子目录,并创建一个 fifo.go 文件,存放 FIFO 算法的实现。

mdkir -p fifo
cd filo
touch filo.go

1、核心数据结构

// fifo 是一个 FIFO cache。它不是并发安全的。
type fifo struct {
	// 缓存最大的容量,单位字节;
	// groupcache 使用的是最大存放 entry 个数
	maxBytes int
	// 当一个 entry 从缓存中移除是调用该回调函数,默认为 nil
	// groupcache 中的 key 是任意的可比较类型;value 是 interface{}
	onEvicted func(key string, value interface{})

	// 已使用的字节数,只包括值,key 不算
	usedBytes int

	ll    *list.List
	cache map[string]*list.Element
}

该结构体中,核心的两个数据结构是 *list.Listmap[string]*list.Element,list.List 是标准库 container/list 提供的,map 的键是字符串,值是双向链表中对应节点的指针。

该结构如图所示:

  • map 用来存储键值对。这是实现缓存最简单直接的数据结构。因为它的查找和增加时间复杂度都是 O(1)。
  • list.List 是 Go 标准库提供的双向链表。通过这个数据结构存放具体的值,可以做到移动记录到队尾的时间复杂度是 O(1),在在队尾增加记录时间复杂度也是 O(1),同时删除一条记录时间复杂度同样是 O(1)。

map 中存的值是 list.Element 的指针,而 Element 中有一个 Value 字段,是 interface{},也就是可以存放任意类型。当淘汰节点时,需要通过 key 从 map 中删除,因此,存入 Element 中的值是如下类型的指针:

type entry struct {
	key   string
	value interface{}
}

func (e *entry) Len() int {
	return cache.CalcLen(e.value)
}

groupcache 容量控制的是记录数,为了更精确或学习目的,我们采用了内存控制,但记录中只算 value 占用的内存,不考虑 key 的内存占用。CalcLen 函数就是计算内存用的:

func CalcLen(value interface{}) int {
	var n int
	switch v := value.(type) {
	case cache.Value:
		n = v.Len()
	case string:
		if runtime.GOARCH == "amd64" {
			n = 16 + len(v)
		} else {
			n = 8 + len(v)
		}
	case bool, uint8, int8:
		n = 1
	case int16, uint16:
		n = 2
	case int32, uint32, float32:
		n = 4
	case int64, uint64, float64:
		n = 8
	case int, uint:
		if runtime.GOARCH == "amd64" {
			n = 8
		} else {
			n = 4
		}
	case complex64:
		n = 8
	case complex128:
		n = 16
	default:
		panic(fmt.Sprintf("%T is not implement cache.Value", value))
	}

	return n
}

该函数计算各种类型的内存占用,有几点需要说明:

  • int/uint 类型,根据 GOARCH 不同,占用内存不同;
  • string 类型的底层由长度和字节数组构成,内存占用是 8 + len(s) 或 16 + len(s);
  • 对于其他类型,要求实现 cache.Value 接口,该接口有一个 Len 方法,返回占用的内存字节数;如果没有实现该接口,则 panic;

cache.Value 接口定义如下:

package cache

type Value interface {
	Len() int
}

实例化一个 FIFO 的 Cache,通过 fifo.New() 函数:

// New 创建一个新的 Cache,如果 maxBytes 是 0,表示没有容量限制
func New(maxBytes int, onEvicted func(key string, value interface{})) cache.Cache {
	return &fifo{
		maxBytes:  maxBytes,
		onEvicted: onEvicted,
		ll:        list.New(),
		cache:     make(map[string]*list.Element),
	}
}

一般地,对于接口和具体类型,Go 语言推荐函数参数使用接口,返回值使用具体类型,大量的标准库遵循了这一习俗。

然而,我们这里似乎违背了这一习俗。其实这里的设计更体现了封装,内部状态更可控,对比 groupcache 的代码你会发现这一点;而且它有一个重要的特点,那就是这里的类型(fifo)只实现了 cache.Cache 接口。这种设计风格,在标准库中也存在,比如 Hash 相关的库(hash.Hash 接口),Image 相关库(image.Image 接口)。

2、新增/修改

// Set 往 Cache 尾部增加一个元素(如果已经存在,则移到尾部,并修改值)
func (f *fifo) Set(key string, value interface{}) {
	if e, ok := f.cache[key]; ok {
		f.ll.MoveToBack(e)
		en := e.Value.(*entry)
		f.usedBytes = f.usedBytes - cache.CalcLen(en.value) + cache.CalcLen(value)
		en.value = value
		return
	}

	en := &entry{key, value}
	e := f.ll.PushBack(en)
	f.cache[key] = e

	f.usedBytes += en.Len()
	if f.maxBytes > 0 && f.usedBytes > f.maxBytes {
    f.DelOldest()
	}
}
  • 如果 key 存在,则更新对应节点的值,并将该节点移到队尾。
  • 不存在则是新增场景,首先队尾添加新节点 &entry{key, value}, 并在 map 中添加 key 和节点的映射关系。
  • 更新 f.usedBytes,如果超过了设定的最大值 f.maxBytes,则移除“无用”的节点,对于 FIFO 则是移除队首节点。
  • 如果 maxBytes = 0,表示不限内存,则不会进行移除操作。

3、查找

查找功能很简单,直接从 map 中找到对应的双向链表的节点。

// Get 从 cache 中获取 key 对应的值,nil 表示 key 不存在
func (f *fifo) Get(key string) interface{} {
	if e, ok := f.cache[key]; ok {
		return e.Value.(*entry).value
	}

	return nil
}

4、删除

根据缓存接口的定义,我们需要提供两个删除方法:根据指定的 key 删除记录和删除最“无用”的记录。

// Del 从 cache 中删除 key 对应的记录
func (f *fifo) Del(key string) {
	if e, ok := f.cache[key]; ok {
		f.removeElement(e)
	}
}

// DelOldest 从 cache 中删除最旧的记录
func (f *fifo) DelOldest() {
	f.removeElement(f.ll.Front())
}

func (f *fifo) removeElement(e *list.Element) {
	if e == nil {
		return
	}

	f.ll.Remove(e)
	en := e.Value.(*entry)
	f.usedBytes -= en.Len()
	delete(f.cache, en.key)

	if f.onEvicted != nil {
		f.onEvicted(en.key, en.value)
	}
}
  • Del 一般用于主动删除某个缓存记录。根据 key 从 map 中获取节点,从链表中删除,并从 map 中删除;
  • DelOldest 一般不主动调用,而是在内存满时自动触发(在新增方法中看到了),这就是缓存淘汰;
  • 两者都会在设置了 onEvicted 回调函数时,调用它;

5、获取缓存记录数

这个方法更多是为了方便测试或为数据统计提供。

// Len 返回当前 cache 中的记录数
func (f *fifo) Len() int {
	return f.ll.Len()
}

5.2.3.2 测试

在 fifo 目录下创建一个测试文件:fifo_test.go,增加单元测试代码,在一个测试函数中同时测试多个功能:

func TestSetGet(t *testing.T) {
	is := is.New(t)

	cache := fifo.New(24, nil)
	cache.DelOldest()
	cache.Set("k1", 1)
	v := cache.Get("k1")
	is.Equal(v, 1)

	cache.Del("k1")
	is.Equal(0, cache.Len()) // expect to be the same

	// cache.Set("k2", time.Now())
}
  • 我们使用了 github.com/matryer/is 这个库。注意这行注释:// expect to be the same,如果测试失败,这行注释会输出。如:fifo_test.go:20: 1 != 0 // expect to be the same
  • 最后注释的代码用来测试非基本类型,必须实现 Value 接口,方便知道内存占用,否则 panic;

接着测试自动淘汰和回调函数。

func TestOnEvicted(t *testing.T) {
	is := is.New(t)

	keys := make([]string, 0, 8)
	onEvicted := func(key string, value interface{}) {
		keys = append(keys, key)
	}
	cache := fifo.New(16, onEvicted)

	cache.Set("k1", 1)
	cache.Set("k2", 2)
	cache.Get("k1")
	cache.Set("k3", 3)
	cache.Get("k1")
	cache.Set("k4", 4)

	expected := []string{"k1", "k2"}

	is.Equal(expected, keys)
	is.Equal(2, cache.Len())
}

5.2.2.3 小结

FIFO 实现还是比较简单,主要使用了标准库的 container/list。该算法相关方法的时间复杂度都是 O(1)。

然而该算法的问题比较明显,很多场景下,部分记录虽然是最早添加但也常被访问,但该算法会导致它们被淘汰。这样这类数据会被频繁地添加进缓存,又被淘汰出去,导致缓存命中率降低。

5.2.4 LFU(Least Frequently Used)

LFU,即最少使用,也就是淘汰缓存中访问频率最低的记录。LFU 认为,如果数据过去被访问多次,那么将来被访问的频率也更高。LFU 的实现需要维护一个按照访问次数排序的队列,每次访问,访问次数加 1,队列重新排序,淘汰时选择访问次数最少的即可。

该算法的实现稍微复杂些。在 Go 中,我们结合标准库 container/heap 来实现。

5.2.3.1 算法实现

在 cache 项目根目录创建一个 lfu 子目录,并创建一个 lfu.go 文件,存放 LFU 算法的实现。

1、核心数据结构

// lfu 是一个 LFU cache。它不是并发安全的。
type lfu struct {
	// 缓存最大的容量,单位字节;
	// groupcache 使用的是最大存放 entry 个数
	maxBytes int
	// 当一个 entry 从缓存中移除是调用该回调函数,默认为 nil
	// groupcache 中的 key 是任意的可比较类型;value 是 interface{}
	onEvicted func(key string, value interface{})

	// 已使用的字节数,只包括值,key 不算
	usedBytes int

	queue *queue
	cache map[string]*entry
}

和 FIFO 算法中有两个不同点:

  • 使用了 queue 而不是 container/list;
  • cache 这个 map 的 value 是 *entry 类型,而不是 *list.Element

这里的 queue 是什么?它定义在 lfu 目录下的 queue.go 文件中。代码如下:

type entry struct {
	key    string
	value  interface{}
	weight int
	index  int
}

func (e *entry) Len() int {
	return cache.CalcLen(e.value) + 4 + 4
}

type queue []*entry
  • queue 是一个 entry 指针切片;
  • entry 和 FIFO 中的区别是多了两个字段:weight 和 index;
  • weight 表示该 entry 在 queue 中权重(优先级),访问次数越多,权重越高;
  • index 代表该 entry 在堆(heap)中的索引;

LFU 算法用最小堆实现。在 Go 中,通过标准库 container/heap 实现最小堆,要求 queue 实现 heap.Interface 接口:

type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

看看 queue 的实现。

func (q queue) Len() int {
	return len(q)
}

func (q queue) Less(i, j int) bool {
	return q[i].weight < q[j].weight
}

func (q queue) Swap(i, j int) {
	q[i], q[j] = q[j], q[i]
	q[i].index = i
	q[j].index = j
}

func (q *queue) Push(x interface{}) {
	n := len(*q)
	en := x.(*entry)
	en.index = n
	*q = append(*q, en)
}

func (q *queue) Pop() interface{} {
	old := *q
	n := len(old)
	en := old[n-1]
	old[n-1] = nil // avoid memory leak
	en.index = -1  // for safety
	*q = old[0 : n-1]
	return en
}

前三个方法:Len、Less、Swap 是标准库 sort.Interface 接口的方法;后两个方法:Push、Pop 是 heap.Interface 要求的新方法。

至于是最大堆还是最小堆,取决于 Swap 方法的实现:< 则是最小堆,> 则是最大堆。我们这里的需求自然使用最小堆。

该数据结构如下图所示:

实例化一个 LFU 的 Cache,通过 lfu.New() 函数:

// New 创建一个新的 Cache,如果 maxBytes 是 0,表示没有容量限制
func New(maxBytes int, onEvicted func(key string, value interface{})) cache.Cache {
	q := make(queue, 0, 1024)
	return &lfu{
		maxBytes:  maxBytes,
		onEvicted: onEvicted,
		queue:     &q,
		cache:     make(map[string]*entry),
	}
}

因为 queue 实际上是一个 slice,避免 append 导致内存拷贝,可以提前分配一个稍大的容量。实际中,如果使用 LFU 算法,为了性能考虑,可以将最大内存限制改为最大记录数限制,这样可以更好地提前分配 queue 的容量。

2、新增/修改

// Set 往 Cache 增加一个元素(如果已经存在,更新值,并增加权重,重新构建堆)
func (l *lfu) Set(key string, value interface{}) {
	if e, ok := l.cache[key]; ok {
		l.usedBytes = l.usedBytes - cache.CalcLen(e.value) + cache.CalcLen(value)
		l.queue.update(e, value, e.weight+1)
		return
	}

	en := &entry{key: key, value: value}
	heap.Push(l.queue, en)
	l.cache[key] = en

	l.usedBytes += en.Len()
	if l.maxBytes > 0 && l.usedBytes > l.maxBytes {
		l.removeElement(heap.Pop(l.queue))
	}
}

func (q *queue) update(en *entry, value interface{}, weight int) {
	en.value = value
	en.weight = weight
	heap.Fix(q, en.index)
}
  • 如果 key 存在,则更新对应节点的值。这里调用了 queue 的 update 方法:增加权重,然后调用 heap.Fix 重建堆,重建的过程,时间复杂度是 O(log n),其中 n = quque.Len();
  • key 不存在,则是新增场景,首先在堆中添加新元素 &entry{key: key, value: value}, 并在 map 中添加 key 和 entry 的映射关系。heap.Push 操作的时间复杂度是 O(log n),其中 n = quque.Len();
  • 更新 l.usedBytes,如果超过了设定的最大值 l.maxBytes,则移除“无用”的节点,对于 LFU 则是删除堆的 root 节点。
  • 如果 maxBytes = 0,表示不限内存,那么不会进行移除操作。

3、查找

// Get 从 cache 中获取 key 对应的值,nil 表示 key 不存在
func (l *lfu) Get(key string) interface{} {
	if e, ok := l.cache[key]; ok {
		l.queue.update(e, e.value, e.weight+1)
		return e.value
	}

	return nil
}

查找过程:先从 map 中查找是否存在指定的 key,存在则将权重加 1。这个过程一样会进行堆的重建,因此时间复杂度也是 O(log n)。

4、删除

// Del 从 cache 中删除 key 对应的元素
func (l *lfu) Del(key string) {
	if e, ok := l.cache[key]; ok {
		heap.Remove(l.queue, e.index)
		l.removeElement(e)
	}
}

// DelOldest 从 cache 中删除最旧的记录
func (l *lfu) DelOldest() {
	if l.queue.Len() == 0 {
		return
	}
	l.removeElement(heap.Pop(l.queue))
}

func (l *lfu) removeElement(x interface{}) {
	if x == nil {
		return
	}

	en := x.(*entry)

	delete(l.cache, en.key)

	l.usedBytes -= en.Len()

	if l.onEvicted != nil {
		l.onEvicted(en.key, en.value)
	}
}
  • Del 实际通过 heap.Remove 进行删除。这个过程一样需要重建堆,因此时间复杂度是 O(log n);
  • DelOldest 通过 heap.Pop 得到堆顶(root)元素,这里也是权重最小的(之一),然后将其删除;

删除操作的其他过程和 FIFO 类似。

5、获取缓存记录数

可以通过 map 或 queue 获取,因为 queue 实际上是一个切片。

// Len 返回当前 cache 中的记录数
func (l *lfu) Len() int {
	return l.queue.Len()
}

5.2.4.2 测试

在 lfu 目录下创建一个测试文件:lfu_test.go,增加单元测试代码。Set/Get 等的测试和 FIFO 类似:

func TestSet(t *testing.T) {
	is := is.New(t)

	cache := lfu.New(24, nil)
	cache.DelOldest()
	cache.Set("k1", 1)
	v := cache.Get("k1")
	is.Equal(v, 1)

	cache.Del("k1")
	is.Equal(0, cache.Len())

	// cache.Set("k2", time.Now())
}

但在淘汰测试时,需要注意下。

func TestOnEvicted(t *testing.T) {
	is := is.New(t)

	keys := make([]string, 0, 8)
	onEvicted := func(key string, value interface{}) {
		keys = append(keys, key)
	}
	cache := lfu.New(32, onEvicted)

	cache.Set("k1", 1)
	cache.Set("k2", 2)
	// cache.Get("k1")
	// cache.Get("k1")
	// cache.Get("k2")
	cache.Set("k3", 3)
	cache.Set("k4", 4)

	expected := []string{"k1", "k3"}

	is.Equal(expected, keys)
	is.Equal(2, cache.Len())
}

我们限制内存最多只能容纳两条记录。注意注释的代码去掉和不去掉的区别。

5.2.3.3 小结

LFU 算法的命中率是比较高的,但缺点也非常明显,维护每个记录的访问次数,对内存是一种浪费;另外,如果数据的访问模式发生变化,LFU 需要较长的时间去适应,也就是说 LFU 算法受历史数据的影响比较大。例如某个数据历史上访问次数奇高,但在某个时间点之后几乎不再被访问,但因为历史访问次数过高,而迟迟不能被淘汰。

另外,因为要维护堆,算法的时间复杂度相对比较高。

5.2.5 LRU(Least Recently Used)

LRU,即最近最少使用,相对于仅考虑时间因素的 FIFO 和仅考虑访问频率的 LFU,LRU 算法可以认为是相对平衡的一种淘汰算法。LRU 认为,如果数据最近被访问过,那么将来被访问的概率也会更高。

LRU 算法的实现非常简单,维护一个队列,如果某条记录被访问了,则移动到队尾,那么队首则是最近最少访问的数据,淘汰该条记录即可。因此该算法的核心数据结构和 FIFO 是一样的,只是记录的移动方式不同而已。

5.2.4.1 算法实现

在 cache 项目根目录创建一个 lru 子目录,并创建一个 lru.go 文件,存放 LRU 算法的实现。

因为数据结构和 FIFO 算法一样,我们直接看核心的操作,主要关注和 FIFO 算法不同的地方。

1、新增/修改

// Set 往 Cache 尾部增加一个元素(如果已经存在,则放入尾部,并更新值)
func (l *lru) Set(key string, value interface{}) {
	if e, ok := l.cache[key]; ok {
		l.ll.MoveToBack(e)
		en := e.Value.(*entry)
		l.usedBytes = l.usedBytes - cache.CalcLen(en.value) + cache.CalcLen(value)
		en.value = value
		return
	}

	en := &entry{key, value}
	e := l.ll.PushBack(en)
	l.cache[key] = e

	l.usedBytes += en.Len()
	if l.maxBytes > 0 && l.usedBytes > l.maxBytes {
		l.DelOldest()
	}
}

这里和 FIFO 的实现没有任何区别。

2、查找

// Get 从 cache 中获取 key 对应的值,nil 表示 key 不存在
func (l *lru) Get(key string) interface{} {
	if e, ok := l.cache[key]; ok {
		l.ll.MoveToBack(e)
		return e.Value.(*entry).value
	}

	return nil
}

和 FIFO 唯一不同的是,if 语句中多了一行代码:

l.ll.MoveToBack(e)

表示有访问就将该记录放在队列尾部。

3、删除

删除和 FIFO 完全一样,具体代码不贴了。

另外获取缓存记录数和 FIFO 也是一样的。

5.2.5.2 测试

在 lru 目录下创建一个测试文件:lru_test.go,增加单元测试代码。LRU 算法的测试和 FIFO 一样,主要看看淘汰时,是不是符合预期:

func TestOnEvicted(t *testing.T) {
	is := is.New(t)

	keys := make([]string, 0, 8)
	onEvicted := func(key string, value interface{}) {
		keys = append(keys, key)
	}
	cache := lru.New(16, onEvicted)

	cache.Set("k1", 1)
	cache.Set("k2", 2)
	cache.Get("k1")
	cache.Set("k3", 3)
	cache.Get("k1")
	cache.Set("k4", 4)

	expected := []string{"k2", "k3"}

	is.Equal(expected, keys)
	is.Equal(2, cache.Len())
}

因为 k1 最近一直被访问,因此它不会被淘汰。

5.2.5.3 小结

LRU 是缓存淘汰算法中最常使用的算法,有一些大厂面试可能会让现场实现一个 LRU 算法,因此大家务必掌握该算法。groupcache 库使用的就是 LRU 算法。



本图书由 煎鱼 ©2020 版权所有,所有文章采用知识署名-非商业性使用-禁止演绎 4.0 国际进行许可。