4.6 实现聊天室:广播器
上一节介绍了聊天室的核心流程,其中多次提到了 Broadcaster,但没有过多涉及到其中的细节。本节我们详细介绍它的实现:广播器,这是聊天室的一个核心模块。
4.6.1 单例模式
Go 不是完全面向对象的语言,只支持部分面向对象的特性。面向对象中的单例模式是一个常见、简单的模式。前文提到,广播器中我们应用了单例模式,这里进行必要的讲解。
4.6.1.1 简介
英文名称:Singleton Pattern,该模式规定一个类只允许有一个实例,而且自行实例化并向整个系统提供这个实例。因此单例模式的要点有:1)只有一个实例;2)必须自行创建;3)必须自行向整个系统提供这个实例。
单例模式主要避免一个全局使用的类频繁地创建与销毁。当你想控制实例的数量,或有时候不允许存在多实例时,单例模式就派上用场了。
为了更好的讲解单例模式,我们先使用 Java 来描述它,之后回到 Go 中来。
通过该类图我们可以看出,实现一个单例模式有如下要求:
- 私有、静态的类实例变量;
- 构造函数私有化;
- 静态工厂方法,返回此类的唯一实例;
根据实例化的时机,单例模式一般分成饿汉式和懒汉式。
- 饿汉式:在定义 instance 时直接实例化,private static Singleton instance = new Singleton();
- 懒汉式:在 getInstance 方法中进行实例化;
那两者有什么区别或优缺点?饿汉式单例类在自己被加载时就将自己实例化。即便加载器是静态的,饿汉式单例类被加载时仍会将自己实例化。单从资源利用率角度讲,这个比懒汉式单例类稍差些。从速度和反应时间角度讲,则比懒汉式单例类稍好些。然而,懒汉式单例类在实例化时,必须处理好在多个线程同时首次引用此类时的访问限制问题,特别是当单例类作为资源控制器在实例化时必须涉及资源初始化,而资源初始化很有可能耗费时间。这意味着出现多线程同时首次引用此类的几率变得较大。
4.6.1.2 单例模式的 Java 实现
结合上面的讲解,以一个计数器为例,我们看看 Java 中饿汉式的实现:
public class Singleton {
private static final Singleton instance = new Singleton();
private int count = 0;
private Singleton() {}
public static Singleton getInstance() {
return instance;
}
public int Add() int {
this.count++;
return this.count;
}
}
代码很简单,不过多解释。直接看懒汉式的实现:
public class Singleton {
private static Singleton instance = null;
private int count = 0;
private Singleton() {}
public static synchronized Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
public int Add() int {
this.count++;
return this.count;
}
}
主要区别在于 getInstance 的实现,要注意 synchronized ,避免多线程时出现问题。
4.6.1.3 单例模式的 Go 实现
回到 Go 语言,看看 Go 语言如何实现单例。
// 饿汉式单例模式
package singleton
type singleton struct {
count int
}
var Instance = new(singleton)
func (s *singleton) Add() int {
s.count++
return s.count
}
前面说了,Go 只支持部分面向对象的特性,因此看起来有点不太一样:
- 类(结构体 singleton)本身非公开(小写字母开头,非导出);
- 没有提供导出的 GetInstance 工厂方法(Go 没有静态方法),而是直接提供包级导出变量 Instance;
这样使用:
c := singleton.Instance.Add()
看看懒汉式单例模式在 Go 中如何实现:
// 懒汉式单例模式
package singleton
import (
"sync"
)
type singleton struct {
count int
}
var (
instance *singleton
mutex sync.Mutex
)
func New() *singleton {
mutex.Lock()
if instance == nil {
instance = new(singleton)
}
mutex.Unlock()
return instance
}
func (s *singleton) Add() int {
s.count++
return s.count
}
代码多了不少:
- 包级变量变成非导出(instance),注意这里类型应该用指针,因为结构体的默认值不是 nil;
- 提供了工厂方法,按照 Go 的惯例,我们命名为 New();
- 多 goroutine 保护,对应 Java 的 synchronized,Go 使用 sync.Mutex;
关于懒汉式有一个“双重检查”,这是 C 语言的一种代码模式。
在上面 New() 函数中,同步化(锁保护)实际上只在 instance 变量第一次被赋值之前才有用。在 instance 变量有了值之后,同步化实际上变成了一个不必要的瓶颈。如果能够有一个方法去掉这个小小的额外开销,不是更加完美吗?因此出现了“双重检查”。看看 Go 如何实现“双重检查”,只看 New() 代码:
func New() *singleton {
if instance == nil { // 第一次检查(①)
// 这里可能有多于一个 goroutine 同时达到(②)
mutex.Lock()
// 这里每个时刻只会有一个 goroutine(③)
if instance == nil { // 第二次检查(④)
instance = new(singleton)
}
mutex.Unlock()
}
return instance
}
有读者可能看不懂上面代码的意思,这里详细解释下。假设 goroutine X 和 Y 作为第一批调用者同时或几乎同时调用 New 函数。
- 因为 goroutine X 和 Y 是第一批调用者,因此,当它们进入此函数时,instance 变量是 nil。因此 goroutine X 和 Y 会同时或几乎同时到达位置 ①;
- 假设 goroutine X 会先达到位置 ②,并进入 mutex.Lock() 达到位置 ③。这时,由于 mutex.Lock 的同步限制,goroutine Y 无法到达位置 ③,而只能在位置 ② 等候;
- goroutine X 执行 instance = new(singleton) 语句,使得 instance 变量得到一个值,即对 singleton 实例的引用。此时,goroutine Y 只能继续在位置 ② 等候;
- goroutine X 释放锁,返回 instance,退出 New 函数;
- goroutine Y 进入 mutex.Lock(),到达位置 ③,进而到达位置 ④。由于 instance 变量已经不是 nil,因此 goroutine Y 释放锁,返回 instance 所引用的 singleton 实例(也就是 goroutine X 锁创建的 singleton 实例),退出 New 函数;
到这里,goroutine X 和 Y 得到了同一个 singleton 实例。可见上面的 New 函数中,锁仅用来避免多个 goroutine 同时实例化 singleton。
相比前面的版本,双重检查版本,只要 instance 实例化后,锁永远不会执行了,而前面版本每次调用 New 获取实例都需要执行锁。性能很显然,我们可以基准测试来验证:(双重检查版本 New 重命名为 New2)
package singleton_test
import (
"testing"
"github.com/go-programming-tour-book/go-demo/singleton"
)
func BenchmarkNew(b *testing.B) {
for i := 0; i < b.N; i++ {
singleton.New()
}
}
func BenchmarkNew2(b *testing.B) {
for i := 0; i < b.N; i++ {
singleton.New2()
}
}
因为是单例,所以两个基准测试需要分别执行。
New1 的结果:
$ go test -benchmem -bench ^BenchmarkNew$ github.com/go-programming-tour-book/go-demo/singleton
goos: darwin
goarch: amd64
pkg: github.com/go-programming-tour-book/go-demo/singleton
BenchmarkNew-8 80470467 14.0 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/go-programming-tour-book/go-demo/singleton 1.151s
New2 的结果:
$ go test -benchmem -bench ^BenchmarkNew2$ github.com/go-programming-tour-book/go-demo/singleton
goos: darwin
goarch: amd64
pkg: github.com/go-programming-tour-book/go-demo/singleton
BenchmarkNew2-8 658810392 1.80 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/go-programming-tour-book/go-demo/singleton 1.380s
New2 快十几倍。
Go 语言单例模式,推荐一般优先考虑使用饿汉式。
4.6.2 广播器的实现
本章第 6 节我们看过广播器结构的定义:
// broadcaster 广播器
type broadcaster struct {
// 所有聊天室用户
users map[string]*User
// 所有 channel 统一管理,可以避免外部乱用
enteringChannel chan *User
leavingChannel chan *User
messageChannel chan *Message
// 判断该昵称用户是否可进入聊天室(重复与否):true 能,false 不能
checkUserChannel chan string
checkUserCanInChannel chan bool
}
很显然,广播器全局应该只有一个,所以是典型的单例。我们使用饿汉式实现。
var Broadcaster = &broadcaster{
users: make(map[string]*User),
enteringChannel: make(chan *User),
leavingChannel: make(chan *User),
messageChannel: make(chan *Message, MessageQueueLen),
checkUserChannel: make(chan string),
checkUserCanInChannel: make(chan bool),
}
导出的 Broadcaster 代表广播器的唯一实例,通过 logic.Broadcaster 来使用这个单例。
在本章第 4 节时提到了通过如下语句启动广播器:
go logic.Broadcaster.Start()
现在看看 Start 的具体实现:
// logic/broadcast.go
// Start 启动广播器
// 需要在一个新 goroutine 中运行,因为它不会返回
func (b *broadcaster) Start() {
for {
select {
case user := <-b.enteringChannel:
// 新用户进入
b.users[user.NickName] = user
b.sendUserList()
case user := <-b.leavingChannel:
// 用户离开
delete(b.users, user.NickName)
// 避免 goroutine 泄露
user.CloseMessageChannel()
b.sendUserList()
case msg := <-b.messageChannel:
// 给所有在线用户发送消息
for _, user := range b.users {
if user.UID == msg.User.UID {
continue
}
user.MessageChannel <- msg
}
case nickname := <-b.checkUserChannel:
if _, ok := b.users[nickname]; ok {
b.checkUserCanInChannel <- false
} else {
b.checkUserCanInChannel <- true
}
}
}
}
核心关注的知识点:
- 需要在一个新 goroutine 中进行,因为它不会返回。注意这里并非说,只要不会返回的函数/方法就应该在新的 goroutine 中运行,虽然大部分情况是这样;
- Go 有一个最佳实践:应该让调用者决定并发(启动新 goroutine),这样它清楚自己在干什么。Start 的设计遵循了这一实践,没有自己内部开启新的 goroutine;
- for + select 形式,是 Go 中一种较常用的编程模式,可以不断监听各种 channel 的状态,有点类似 Unix 系统的 select 系统调用;
- 每新开一个 goroutine,你必须知道它什么时候会停止。这一句
user.CloseMessageChannel()
就涉及到 goroutine 的停止,避免泄露;
4.6.2.1 select-case 结构
Go 中有一个专门为 channel 设计的 select-case 分支流程控制语法。 此语法和 switch-case 分支流程控制语法很相似。 比如,select-case 流程控制代码块中也可以有若干 case 分支和最多一个 default 分支。 但是,这两种流程控制也有很多不同点。在一个 select-case 流程控制中:
- select 关键字和 { 之间不允许存在任何表达式和语句;
- fallthrough 语句不能使用;
- 每个 case 关键字后必须跟随一个 channel 接收数据操作或者一个 channel 发送数据操作,所以叫做专门为 channel 设计的;
- 所有的非阻塞 case 操作中将有一个被随机选择执行(而不是按照从上到下的顺序),然后执行此操作对应的 case 分支代码块;
- 在所有的 case 操作均阻塞的情况下,如果 default 分支存在,则 default 分支代码块将得到执行; 否则,当前 goroutine 进入阻塞状态;
所以,广播器的 Start 方法中,当所有 case 操作都阻塞时,Start 方法所在的 goroutine 进入阻塞状态。
另外,根据以上规则,一个不含任何分支的 select-case 代码块 select{} 将使当前 goroutine 处于永久阻塞状态,这可以用于一些服务开发中,如果你见到了 select{} 这样的写法不要惊讶了。比如:
func main() {
go func() {
// 该函数不会退出
for {
// 省略代码
}
} ()
select {}
}
这样保证 main goroutine 永远阻塞,让其他 goroutine 运行。但如果除了当前因为 select{} 阻塞的 goroutine 外,没有其他可运行的 goroutine,会导致死锁。因此下面的代码会死锁:
func main() {
select {}
}
运行报错:
fatal error: all goroutines are asleep - deadlock!
4.6.2.2 goroutine 泄露
在 Go 中,goroutine 的创建成本低廉且调度效率高。Go 运行时能很好的支持具有成千上万个 goroutine 的程序运行,数十万个也并不意外。但是,goroutine 在内存占用方面却需要谨慎,内存资源是有限的,因此你不能创建无限的 goroutine。
每当你在程序中使用 go 关键字启动 goroutine 时,你必须知道该 goroutine 将在何时何地退出。如果你不知道答案,那可能会内存泄漏。
我们回过头梳理下聊天室项目有哪些新启动的 goroutine。
1)启动广播器
// 广播消息处理
go logic.Broadcaster.Start()
我们很清楚,该广播器的生命周期是和程序生命周期一致的,因此它不应该结束。
2)负责给用户发送消息的 goroutine
在 WebSocketHandleFunc 函数中:
// 2. 开启给用户发送消息的 goroutine
go user.SendMessage(req.Context())
user.SendMessage 的具体实现是:
func (u *User) SendMessage(ctx context.Context) {
for msg := range u.MessageChannel {
wsjson.Write(ctx, u.conn, msg)
}
}
根据 for-range 用于 channel 的语法,默认情况下,for-range 不会退出。很显然,如果我们不做特殊处理,这里的 goroutine 会一直存在。而实际上,当用户离开聊天室时,它对应连接的写 goroutine 应该终止。这也就是上面 Start 方法中,在用户离开聊天室的 channel 收到消息时,要将用户的 MessageChannel 关闭的原因。MessageChannel 关闭了,for msg := range u.MessageChannel
就会退出循环,goroutine 结束,避免了内存泄露。
3)库开启的 goroutine
在本章第 1 节,我们用 TCP 实现简单聊天室时,每一个用户到来,都会新开启一个 goroutine 服务该用户。在我们的 WebSocket 聊天室中,这个新开启 goroutine 的动作,由库给我们做了(具体是 net/http 库)。也许你不明白为什么是 http 库开启的,这里教大家一个思考思路。
一个程序能够长时间运行而不停止,肯定是程序里有死循环。在本章第 1 节中,我们自己写了一个死循环:
func main() {
listener, err := net.Listen("tcp", ":2020")
if err != nil {
panic(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConn(conn)
}
}
那 WebSocket 版本的聊天室的死循环在哪里呢?回到 cmd/chatroom/main.go :
func main() {
fmt.Printf(banner, addr)
server.RegisterHandle()
log.Fatal(http.ListenAndServe(addr, nil))
}
很显然在不出错时,http.ListenAndServe(addr, nil) 函数调用不会返回。因为 HTTP 协议基于 TCP 协议,因此 http 库中肯定存在类似我们上面实现 tcp 聊天室时的死循环代码。
通过跟踪 http.ListenAndServe -> Server.ListenAndServe,我们找到了如下代码:
func (srv *Server) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
这一句 ln, err := net.Listen(“tcp”, addr) 和我们自己实现时一样。接着看 Server.Serve 方法,看到 for 死循环了(只保留关键代码):
func (srv *Server) Serve(l net.Listener) error {
...
origListener := l
l = &onceCloseListener{Listener: l}
defer l.Close()
...
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
...
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx)
}
}
在这个死循环的最后一句:go c.serve(ctx) ,即当有新客户端连接时,开启一个新 goroutine 为其服务。最终,根据我们定义的路由,进入相应的函数进行处理。
那由 net/http 开启的 goroutine 什么时候结束呢?根据上面的分析,该 goroutine 最终会执行到我们定义的路由处理器中。所以,当我们的处理函数返回后,该 goroutine 也就结束了。因此,我们要确保 WebSocketHandleFunc 函数是有可能返回的。通过上一节的分析知道,当用户退出聊天室或其他原因导致连接断开时,User.ReceiveMessage 中的循环都会结束,函数退出。
总结一下容易导致 goroutine 或内存泄露的场景
1)time.After
这是很多人实际遇到过的内存泄露场景。如下代码:
func ProcessMessage(ctx context.Context, in <-chan string) {
for {
select {
case s, ok := <-in:
if !ok {
return
}
// handle `s`
case <-time.After(5 * time.Minute):
// do something
case <-ctx.Done():
return
}
}
}
在标准库 time.After 的文档中有一段说明:
等待持续时间过去,然后在返回的 channel 上发送当前时间。它等效于 NewTimer().C。在计时器触发之前,计时器不会被垃圾收集器回收。
所以,如果还没有到 5 分钟,该函数返回了,计时器就不会被 GC 回收,因此出现了内存泄露。因此大家使用 time.After 时一定要仔细,一般建议不用它,而是使用 time.NewTimer:
func ProcessMessage(ctx context.Context, in <-chan string) {
idleDuration := 5 * time.Minute
idleDelay := time.NewTimer(idleDuration)
// 这句必须的
defer idleDelay.Stop()
for {
idleDelay.Reset(idleDuration)
select {
case s, ok := <-in:
if !ok {
return
}
// handle `s`
case <-idleDelay.C:
// do something
case <-ctx.Done():
return
}
}
}
2)发送到 channel 阻塞导致 goroutine 泄露
假如存在如下的程序:
func process(term string) error {
// 创建一个在 100 ms 内取消的 context
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
// 为 goroutine 创建一个传递结果的 channel
ch := make(chan string)
// 启动一个 goroutine 来寻找记录,然后得到结果
// 并将返回值从 channel 中传回
go func() {
ch <- search(term)
}()
select {
case <-ctx.Done():
return errors.New("search canceled")
case result := <-ch:
fmt.Println("Received:", result)
return nil
}
}
// search 模拟成一个查找记录的函数
// 在查找记录时。执行此工作需要 200 ms。
func search(term string) string {
time.Sleep(200 * time.Millisecond)
return "some value"
}
这是一个挺常见的场景:要进行一些耗时操作,因此开启一个 goroutine 进行处理,它的处理结果,通过 channel 回传给原来的 goroutine;同时,这个耗时操作不能太长,因此有了 WithTimeout Context。最后通过 select-case 来监控 ctx.Done 和传递数据的 channel 是否就绪。
如果超时没处理完,ctx.Done 会执行,函数返回,新开启的 goroutine 会因为 channel 中的另一端没有就绪的接收 goroutine 而一直阻塞,导致 goroutine 泄露。
解决这种因为发送到 channel 阻塞导致 goroutine 泄露的简单办法是将 channel 改为有缓冲的 channel,并保证容量充足。比如上面例子,将 ch 改为:ch := make(chan string, 1) 即可。
3)从 channel 接收阻塞导致 goroutine 泄露
我们聊天室可能导致 goroutine 泄露就属于这种情况。
func (u *User) SendMessage(ctx context.Context) {
for msg := range u.MessageChannel {
wsjson.Write(ctx, u.conn, msg)
}
}
for-range 循环直到 MessageChannel 这个 channel 关闭才会结束,因此需要有地方调用 close(u.MessageChannel)。
这种情况的另一种情形是:虽然没有 for-range,但给 channel 发送数据的一方已经不再发送数据了,接收的一方还在等待,这个等待会无限持续下去。唯一能取消它等待的就是 close 这个 channel。
4.6.2.3 广播器和外界的通信
从广播器的结构定义知道,它和其他 goroutine 的通信通过 channel 进行。判断用户是否存在的方式前面讲解了,这里看用户进入、离开和消息的通信。
func(b *broadcaster) UserEntering(u *User) {
b.enteringChannel <- u
}
func(b *broadcaster) UserLeaving(u *User) {
b.leavingChannel <- u
}
func(b *broadcaster) Broadcast(msg *Message) {
b.messageChannel <- msg
}
通过 channel 和其他 goroutine 通信,可以有几种方式,以用户进入聊天室为例。
方式一:
在 broadcast.go 中定义导出的 channel:var EnteringChannel = make(chan *User) 或者还是作为 broadcaster 的字段,但是导出的,各个 goroutine 都可以直接对 EnteringChannel 进行读写。这种方式显然不好,用面向对象说法,封装性不好,容易被乱用。
方式二:
broadcaster 结构和现在不变,通过方法将 enteringChannel 暴露出去:
func (b *broadcaster) EnteringChannel() chan<- *User {
return b.enteringChannel
}
前面讲过单向 channel,该方法的返回值类型:chan<- *User
就是一个单向 channel,它是只写的(only send channel),这限制了外部 goroutine 使用它的方式:只能往 channel 写数据,读取由我自己负责。
使用方式:logic.Broadcaster.EnteringChannel() <- user 。
整体上这种方式没有大问题,只是使用方式有点别扭。
方式三:
这种方式就是我们目前采用的方式,对外完全隐藏 channel,调用方不需要知道有 channel 的存在,只是感觉在做普通的方法调用。channel 的处理由内部自己处理,保证了安全性。这种方式比较优雅。
User 中的 MessageChannel 我们没有采用这种方式,而是使用了方式一,让大家感受一下两种方式的不同。读者可以试着改为方式三的形式。
回到 Start 的循环中,这是在 broadcaster goroutine 中运行的,负责循环接收各个 channel 发送的数据,根据不同的 channel 处理不同的业务逻辑。
4.6.3 小结
本节讲解了单例模式,以及在 Go 中如何实现单例模式。
在讲解广播器的具体实现时引出了一个很重要的知识点:goroutine 泄漏,详细讲解了各种可能泄露的场景,读者在实际项目中一定要注意。
至此,一个 WebSocket 聊天室就实现了,但功能相对比较简单。下节我们会实现聊天室的一些非核心功能。