实现聊天室:广播器

4.6 实现聊天室:广播器

上一节介绍了聊天室的核心流程,其中多次提到了 Broadcaster,但没有过多涉及到其中的细节。本节我们详细介绍它的实现:广播器,这是聊天室的一个核心模块。

4.6.1 单例模式

Go 不是完全面向对象的语言,只支持部分面向对象的特性。面向对象中的单例模式是一个常见、简单的模式。前文提到,广播器中我们应用了单例模式,这里进行必要的讲解。

4.6.1.1 简介

英文名称:Singleton Pattern,该模式规定一个类只允许有一个实例,而且自行实例化并向整个系统提供这个实例。因此单例模式的要点有:1)只有一个实例;2)必须自行创建;3)必须自行向整个系统提供这个实例。

单例模式主要避免一个全局使用的类频繁地创建与销毁。当你想控制实例的数量,或有时候不允许存在多实例时,单例模式就派上用场了。

为了更好的讲解单例模式,我们先使用 Java 来描述它,之后回到 Go 中来。

image

通过该类图我们可以看出,实现一个单例模式有如下要求:

  • 私有、静态的类实例变量;
  • 构造函数私有化;
  • 静态工厂方法,返回此类的唯一实例;

根据实例化的时机,单例模式一般分成饿汉式和懒汉式。

  • 饿汉式:在定义 instance 时直接实例化,private static Singleton instance = new Singleton();
  • 懒汉式:在 getInstance 方法中进行实例化;

那两者有什么区别或优缺点?饿汉式单例类在自己被加载时就将自己实例化。即便加载器是静态的,饿汉式单例类被加载时仍会将自己实例化。单从资源利用率角度讲,这个比懒汉式单例类稍差些。从速度和反应时间角度讲,则比懒汉式单例类稍好些。然而,懒汉式单例类在实例化时,必须处理好在多个线程同时首次引用此类时的访问限制问题,特别是当单例类作为资源控制器在实例化时必须涉及资源初始化,而资源初始化很有可能耗费时间。这意味着出现多线程同时首次引用此类的几率变得较大。

4.6.1.2 单例模式的 Java 实现

结合上面的讲解,以一个计数器为例,我们看看 Java 中饿汉式的实现:

public class Singleton {
  private static final Singleton instance = new Singleton();
  private int count = 0;
  private Singleton() {}
  public static Singleton getInstance() {
    return instance;
  }
  public int Add() int {
    this.count++;
    return this.count;
  }
}

代码很简单,不过多解释。直接看懒汉式的实现:

public class Singleton {
  private static Singleton instance = null;
  private int count = 0;
  private Singleton() {}
  public static synchronized Singleton getInstance() {
    if (instance == null) {
      instance = new Singleton();
    }
    return instance;
  }
  public int Add() int {
    this.count++;
    return this.count;
  }
}

主要区别在于 getInstance 的实现,要注意 synchronized ,避免多线程时出现问题。

4.6.1.3 单例模式的 Go 实现

回到 Go 语言,看看 Go 语言如何实现单例。

// 饿汉式单例模式
package singleton

type singleton struct {
  count int
}

var Instance = new(singleton)

func (s *singleton) Add() int {
  s.count++
  return s.count
}

前面说了,Go 只支持部分面向对象的特性,因此看起来有点不太一样:

  • 类(结构体 singleton)本身非公开(小写字母开头,非导出);
  • 没有提供导出的 GetInstance 工厂方法(Go 没有静态方法),而是直接提供包级导出变量 Instance;

这样使用:

c := singleton.Instance.Add()

看看懒汉式单例模式在 Go 中如何实现:

// 懒汉式单例模式
package singleton

import (
	"sync"
)

type singleton struct {
  count int
}

var (
  instance *singleton
  mutex sync.Mutex
)

func New() *singleton {
  mutex.Lock()
  if instance == nil {
    instance = new(singleton)
  }
  mutex.Unlock()
  
  return instance
}

func (s *singleton) Add() int {
  s.count++
  return s.count
}

代码多了不少:

  • 包级变量变成非导出(instance),注意这里类型应该用指针,因为结构体的默认值不是 nil;
  • 提供了工厂方法,按照 Go 的惯例,我们命名为 New();
  • 多 goroutine 保护,对应 Java 的 synchronized,Go 使用 sync.Mutex;

关于懒汉式有一个“双重检查”,这是 C 语言的一种代码模式。

在上面 New() 函数中,同步化(锁保护)实际上只在 instance 变量第一次被赋值之前才有用。在 instance 变量有了值之后,同步化实际上变成了一个不必要的瓶颈。如果能够有一个方法去掉这个小小的额外开销,不是更加完美吗?因此出现了“双重检查”。看看 Go 如何实现“双重检查”,只看 New() 代码:

func New() *singleton {
  if instance == nil {	// 第一次检查(①)
    // 这里可能有多于一个 goroutine 同时达到(②)
    mutex.Lock()
    // 这里每个时刻只会有一个 goroutine(③)
    if instance == nil {	// 第二次检查(④)
      instance = new(singleton)
    }
    mutex.Unlock()
  }
  
  return instance
}

有读者可能看不懂上面代码的意思,这里详细解释下。假设 goroutine X 和 Y 作为第一批调用者同时或几乎同时调用 New 函数。

  1. 因为 goroutine X 和 Y 是第一批调用者,因此,当它们进入此函数时,instance 变量是 nil。因此 goroutine X 和 Y 会同时或几乎同时到达位置 ①;
  2. 假设 goroutine X 会先达到位置 ②,并进入 mutex.Lock() 达到位置 ③。这时,由于 mutex.Lock 的同步限制,goroutine Y 无法到达位置 ③,而只能在位置 ② 等候;
  3. goroutine X 执行 instance = new(singleton) 语句,使得 instance 变量得到一个值,即对 singleton 实例的引用。此时,goroutine Y 只能继续在位置 ② 等候;
  4. goroutine X 释放锁,返回 instance,退出 New 函数;
  5. goroutine Y 进入 mutex.Lock(),到达位置 ③,进而到达位置 ④。由于 instance 变量已经不是 nil,因此 goroutine Y 释放锁,返回 instance 所引用的 singleton 实例(也就是 goroutine X 锁创建的 singleton 实例),退出 New 函数;

到这里,goroutine X 和 Y 得到了同一个 singleton 实例。可见上面的 New 函数中,锁仅用来避免多个 goroutine 同时实例化 singleton。

相比前面的版本,双重检查版本,只要 instance 实例化后,锁永远不会执行了,而前面版本每次调用 New 获取实例都需要执行锁。性能很显然,我们可以基准测试来验证:(双重检查版本 New 重命名为 New2)

package singleton_test

import (
	"testing"

	"github.com/go-programming-tour-book/go-demo/singleton"
)

func BenchmarkNew(b *testing.B) {
	for i := 0; i < b.N; i++ {
		singleton.New()
	}
}

func BenchmarkNew2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		singleton.New2()
	}
}

因为是单例,所以两个基准测试需要分别执行。

New1 的结果:

$ go test -benchmem -bench ^BenchmarkNew$ github.com/go-programming-tour-book/go-demo/singleton
goos: darwin
goarch: amd64
pkg: github.com/go-programming-tour-book/go-demo/singleton
BenchmarkNew-8   	80470467	        14.0 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/go-programming-tour-book/go-demo/singleton	1.151s

New2 的结果:

$ go test -benchmem -bench ^BenchmarkNew2$ github.com/go-programming-tour-book/go-demo/singleton
goos: darwin
goarch: amd64
pkg: github.com/go-programming-tour-book/go-demo/singleton
BenchmarkNew2-8   	658810392	         1.80 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/go-programming-tour-book/go-demo/singleton	1.380s

New2 快十几倍。

Go 语言单例模式,推荐一般优先考虑使用饿汉式。

4.6.2 广播器的实现

本章第 6 节我们看过广播器结构的定义:

// broadcaster 广播器
type broadcaster struct {
	// 所有聊天室用户
	users map[string]*User

	// 所有 channel 统一管理,可以避免外部乱用

	enteringChannel chan *User
	leavingChannel  chan *User
	messageChannel  chan *Message

	// 判断该昵称用户是否可进入聊天室(重复与否):true 能,false 不能
	checkUserChannel      chan string
	checkUserCanInChannel chan bool
}

很显然,广播器全局应该只有一个,所以是典型的单例。我们使用饿汉式实现。

var Broadcaster = &broadcaster{
	users: make(map[string]*User),

	enteringChannel: make(chan *User),
	leavingChannel:  make(chan *User),
	messageChannel:  make(chan *Message, MessageQueueLen),

	checkUserChannel:      make(chan string),
	checkUserCanInChannel: make(chan bool),
}

导出的 Broadcaster 代表广播器的唯一实例,通过 logic.Broadcaster 来使用这个单例。

在本章第 4 节时提到了通过如下语句启动广播器:

go logic.Broadcaster.Start()

现在看看 Start 的具体实现:

// logic/broadcast.go

// Start 启动广播器
// 需要在一个新 goroutine 中运行,因为它不会返回
func (b *broadcaster) Start() {
	for {
		select {
		case user := <-b.enteringChannel:
			// 新用户进入
			b.users[user.NickName] = user

			b.sendUserList()
		case user := <-b.leavingChannel:
			// 用户离开
			delete(b.users, user.NickName)
			// 避免 goroutine 泄露
			user.CloseMessageChannel()

			b.sendUserList()
		case msg := <-b.messageChannel:
			// 给所有在线用户发送消息
			for _, user := range b.users {
				if user.UID == msg.User.UID {
					continue
				}
				user.MessageChannel <- msg
			}
		case nickname := <-b.checkUserChannel:
			if _, ok := b.users[nickname]; ok {
				b.checkUserCanInChannel <- false
			} else {
				b.checkUserCanInChannel <- true
			}
		}
	}
}

核心关注的知识点:

  • 需要在一个新 goroutine 中进行,因为它不会返回。注意这里并非说,只要不会返回的函数/方法就应该在新的 goroutine 中运行,虽然大部分情况是这样;
  • Go 有一个最佳实践:应该让调用者决定并发(启动新 goroutine),这样它清楚自己在干什么。Start 的设计遵循了这一实践,没有自己内部开启新的 goroutine;
  • for + select 形式,是 Go 中一种较常用的编程模式,可以不断监听各种 channel 的状态,有点类似 Unix 系统的 select 系统调用;
  • 每新开一个 goroutine,你必须知道它什么时候会停止。这一句 user.CloseMessageChannel() 就涉及到 goroutine 的停止,避免泄露;

4.6.2.1 select-case 结构

Go 中有一个专门为 channel 设计的 select-case 分支流程控制语法。 此语法和 switch-case 分支流程控制语法很相似。 比如,select-case 流程控制代码块中也可以有若干 case 分支和最多一个 default 分支。 但是,这两种流程控制也有很多不同点。在一个 select-case 流程控制中:

  • select 关键字和 { 之间不允许存在任何表达式和语句;
  • fallthrough 语句不能使用;
  • 每个 case 关键字后必须跟随一个 channel 接收数据操作或者一个 channel 发送数据操作,所以叫做专门为 channel 设计的;
  • 所有的非阻塞 case 操作中将有一个被随机选择执行(而不是按照从上到下的顺序),然后执行此操作对应的 case 分支代码块;
  • 在所有的 case 操作均阻塞的情况下,如果 default 分支存在,则 default 分支代码块将得到执行; 否则,当前 goroutine 进入阻塞状态;

所以,广播器的 Start 方法中,当所有 case 操作都阻塞时,Start 方法所在的 goroutine 进入阻塞状态。

另外,根据以上规则,一个不含任何分支的 select-case 代码块 select{} 将使当前 goroutine 处于永久阻塞状态,这可以用于一些服务开发中,如果你见到了 select{} 这样的写法不要惊讶了。比如:

func main() {
  go func() {
    // 该函数不会退出
    for {
      // 省略代码
    }
  } ()
  
  select {}
}

这样保证 main goroutine 永远阻塞,让其他 goroutine 运行。但如果除了当前因为 select{} 阻塞的 goroutine 外,没有其他可运行的 goroutine,会导致死锁。因此下面的代码会死锁:

func main() {
  select {}
}

运行报错:

fatal error: all goroutines are asleep - deadlock!

4.6.2.2 goroutine 泄露

在 Go 中,goroutine 的创建成本低廉且调度效率高。Go 运行时能很好的支持具有成千上万个 goroutine 的程序运行,数十万个也并不意外。但是,goroutine 在内存占用方面却需要谨慎,内存资源是有限的,因此你不能创建无限的 goroutine。

每当你在程序中使用 go 关键字启动 goroutine 时,你必须知道该 goroutine 将在何时何地退出。如果你不知道答案,那可能会内存泄漏。

我们回过头梳理下聊天室项目有哪些新启动的 goroutine。

1)启动广播器

// 广播消息处理
go logic.Broadcaster.Start()

我们很清楚,该广播器的生命周期是和程序生命周期一致的,因此它不应该结束。

2)负责给用户发送消息的 goroutine

在 WebSocketHandleFunc 函数中:

// 2. 开启给用户发送消息的 goroutine
go user.SendMessage(req.Context())

user.SendMessage 的具体实现是:

func (u *User) SendMessage(ctx context.Context) {
	for msg := range u.MessageChannel {
		wsjson.Write(ctx, u.conn, msg)
	}
}

根据 for-range 用于 channel 的语法,默认情况下,for-range 不会退出。很显然,如果我们不做特殊处理,这里的 goroutine 会一直存在。而实际上,当用户离开聊天室时,它对应连接的写 goroutine 应该终止。这也就是上面 Start 方法中,在用户离开聊天室的 channel 收到消息时,要将用户的 MessageChannel 关闭的原因。MessageChannel 关闭了,for msg := range u.MessageChannel 就会退出循环,goroutine 结束,避免了内存泄露。

3)库开启的 goroutine

在本章第 1 节,我们用 TCP 实现简单聊天室时,每一个用户到来,都会新开启一个 goroutine 服务该用户。在我们的 WebSocket 聊天室中,这个新开启 goroutine 的动作,由库给我们做了(具体是 net/http 库)。也许你不明白为什么是 http 库开启的,这里教大家一个思考思路。

一个程序能够长时间运行而不停止,肯定是程序里有死循环。在本章第 1 节中,我们自己写了一个死循环:

func main() {
	listener, err := net.Listen("tcp", ":2020")
	if err != nil {
		panic(err)
	}

	go broadcaster()

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Println(err)
			continue
		}

		go handleConn(conn)
	}
}

那 WebSocket 版本的聊天室的死循环在哪里呢?回到 cmd/chatroom/main.go :

func main() {
	fmt.Printf(banner, addr)

	server.RegisterHandle()

	log.Fatal(http.ListenAndServe(addr, nil))
}

很显然在不出错时,http.ListenAndServe(addr, nil) 函数调用不会返回。因为 HTTP 协议基于 TCP 协议,因此 http 库中肯定存在类似我们上面实现 tcp 聊天室时的死循环代码。

通过跟踪 http.ListenAndServe -> Server.ListenAndServe,我们找到了如下代码:

func (srv *Server) ListenAndServe() error {
	if srv.shuttingDown() {
		return ErrServerClosed
	}
	addr := srv.Addr
	if addr == "" {
		addr = ":http"
	}
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	return srv.Serve(ln)
}

这一句 ln, err := net.Listen(“tcp”, addr) 和我们自己实现时一样。接着看 Server.Serve 方法,看到 for 死循环了(只保留关键代码):

func (srv *Server) Serve(l net.Listener) error {
	...
	origListener := l
	l = &onceCloseListener{Listener: l}
	defer l.Close()
  ...
	for {
		rw, e := l.Accept()
		if e != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
    ...
		tempDelay = 0
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew) // before Serve can return
		go c.serve(ctx)
	}
}

在这个死循环的最后一句:go c.serve(ctx) ,即当有新客户端连接时,开启一个新 goroutine 为其服务。最终,根据我们定义的路由,进入相应的函数进行处理。

那由 net/http 开启的 goroutine 什么时候结束呢?根据上面的分析,该 goroutine 最终会执行到我们定义的路由处理器中。所以,当我们的处理函数返回后,该 goroutine 也就结束了。因此,我们要确保 WebSocketHandleFunc 函数是有可能返回的。通过上一节的分析知道,当用户退出聊天室或其他原因导致连接断开时,User.ReceiveMessage 中的循环都会结束,函数退出。

总结一下容易导致 goroutine 或内存泄露的场景

1)time.After

这是很多人实际遇到过的内存泄露场景。如下代码:

func ProcessMessage(ctx context.Context, in <-chan string) {
	for {
		select {
		case s, ok := <-in:
			if !ok {
				return
			}
			// handle `s`
		case <-time.After(5 * time.Minute):
			// do something
		case <-ctx.Done():
			return
		}
	}
}

在标准库 time.After 的文档中有一段说明:

等待持续时间过去,然后在返回的 channel 上发送当前时间。它等效于 NewTimer().C。在计时器触发之前,计时器不会被垃圾收集器回收。

所以,如果还没有到 5 分钟,该函数返回了,计时器就不会被 GC 回收,因此出现了内存泄露。因此大家使用 time.After 时一定要仔细,一般建议不用它,而是使用 time.NewTimer:

func ProcessMessage(ctx context.Context, in <-chan string) {
	idleDuration := 5 * time.Minute
	idleDelay := time.NewTimer(idleDuration)
  // 这句必须的
	defer idleDelay.Stop()
	for {
		idleDelay.Reset(idleDuration)
		select {
		case s, ok := <-in:
			if !ok {
				return
			}
			// handle `s`
		case <-idleDelay.C:
			// do something
		case <-ctx.Done():
			return
		}
	}
}

2)发送到 channel 阻塞导致 goroutine 泄露

假如存在如下的程序:

func process(term string) error {
     // 创建一个在 100 ms 内取消的 context
     ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
     defer cancel()

     // 为 goroutine 创建一个传递结果的 channel
     ch := make(chan string)

     // 启动一个 goroutine 来寻找记录,然后得到结果
     // 并将返回值从 channel 中传回
     go func() {
         ch <- search(term)
     }()

     select {
     case <-ctx.Done():
         return errors.New("search canceled")
     case result := <-ch:
         fmt.Println("Received:", result)
         return nil
    }
 }

// search 模拟成一个查找记录的函数
// 在查找记录时。执行此工作需要 200 ms。
func search(term string) string {
     time.Sleep(200 * time.Millisecond)
     return "some value"
}

这是一个挺常见的场景:要进行一些耗时操作,因此开启一个 goroutine 进行处理,它的处理结果,通过 channel 回传给原来的 goroutine;同时,这个耗时操作不能太长,因此有了 WithTimeout Context。最后通过 select-case 来监控 ctx.Done 和传递数据的 channel 是否就绪。

如果超时没处理完,ctx.Done 会执行,函数返回,新开启的 goroutine 会因为 channel 中的另一端没有就绪的接收 goroutine 而一直阻塞,导致 goroutine 泄露。

解决这种因为发送到 channel 阻塞导致 goroutine 泄露的简单办法是将 channel 改为有缓冲的 channel,并保证容量充足。比如上面例子,将 ch 改为:ch := make(chan string, 1) 即可。

3)从 channel 接收阻塞导致 goroutine 泄露

我们聊天室可能导致 goroutine 泄露就属于这种情况。

func (u *User) SendMessage(ctx context.Context) {
	for msg := range u.MessageChannel {
		wsjson.Write(ctx, u.conn, msg)
	}
}

for-range 循环直到 MessageChannel 这个 channel 关闭才会结束,因此需要有地方调用 close(u.MessageChannel)。

这种情况的另一种情形是:虽然没有 for-range,但给 channel 发送数据的一方已经不再发送数据了,接收的一方还在等待,这个等待会无限持续下去。唯一能取消它等待的就是 close 这个 channel。

4.6.2.3 广播器和外界的通信

从广播器的结构定义知道,它和其他 goroutine 的通信通过 channel 进行。判断用户是否存在的方式前面讲解了,这里看用户进入、离开和消息的通信。

func(b *broadcaster) UserEntering(u *User) {
	b.enteringChannel <- u
}

func(b *broadcaster) UserLeaving(u *User) {
	b.leavingChannel <- u
}

func(b *broadcaster) Broadcast(msg *Message) {
	b.messageChannel <- msg
}

通过 channel 和其他 goroutine 通信,可以有几种方式,以用户进入聊天室为例。

方式一:

在 broadcast.go 中定义导出的 channel:var EnteringChannel = make(chan *User) 或者还是作为 broadcaster 的字段,但是导出的,各个 goroutine 都可以直接对 EnteringChannel 进行读写。这种方式显然不好,用面向对象说法,封装性不好,容易被乱用。

方式二:

broadcaster 结构和现在不变,通过方法将 enteringChannel 暴露出去:

func (b *broadcaster) EnteringChannel() chan<- *User {
	return b.enteringChannel
}

前面讲过单向 channel,该方法的返回值类型:chan<- *User 就是一个单向 channel,它是只写的(only send channel),这限制了外部 goroutine 使用它的方式:只能往 channel 写数据,读取由我自己负责。

使用方式:logic.Broadcaster.EnteringChannel() <- user 。

整体上这种方式没有大问题,只是使用方式有点别扭。

方式三:

这种方式就是我们目前采用的方式,对外完全隐藏 channel,调用方不需要知道有 channel 的存在,只是感觉在做普通的方法调用。channel 的处理由内部自己处理,保证了安全性。这种方式比较优雅。

User 中的 MessageChannel 我们没有采用这种方式,而是使用了方式一,让大家感受一下两种方式的不同。读者可以试着改为方式三的形式。

回到 Start 的循环中,这是在 broadcaster goroutine 中运行的,负责循环接收各个 channel 发送的数据,根据不同的 channel 处理不同的业务逻辑。

4.6.3 小结

本节讲解了单例模式,以及在 Go 中如何实现单例模式。

在讲解广播器的具体实现时引出了一个很重要的知识点:goroutine 泄漏,详细讲解了各种可能泄露的场景,读者在实际项目中一定要注意。

至此,一个 WebSocket 聊天室就实现了,但功能相对比较简单。下节我们会实现聊天室的一些非核心功能。



本图书由 煎鱼 ©2020 版权所有,所有文章采用知识署名-非商业性使用-禁止演绎 4.0 国际进行许可。