3.8 拦截器介绍和实际使用
我想在每一个 RPC 方法的前面或后面做某些操作,我想针对某个业务模块的 RPC 方法进行统一的特殊处理,我想对 RPC 方法进行鉴权校验,我想对 RPC 方法进行上下文的超时控制,我想对每个 RPC 方法的请求都做日志记录,怎么做呢?
这诸如类似的一切需求的答案,都在本章节将要介绍的拦截器(Interceptor)上,你能够借助它实现许许多多的定制功能且不直接侵入业务代码。
3.8.1 拦截器的类型
在 gRPC 中,根据拦截器拦截的 RPC 调用的类型,拦截器在分类上可以分为如下两种:
- 一元拦截器(Unary Interceptor):拦截和处理一元 RPC 调用。
- 流拦截器(Stream Interceptor):拦截和处理流式 RPC 调用。
虽然总的来说是只有两种拦截器分类,但是再细分下去,客户端和服务端每一个都有其自己的一元和流拦截器的具体类型。因此,gRPC 中也可以说总共有四种不同类型的拦截器。
3.8.2 客户端和服务端拦截器
3.8.2.1 客户端
3.8.2.1.1 一元拦截器
客户端的一元拦截器类型为 UnaryClientInterceptor
,方法原型如下:
type UnaryClientInterceptor func(
ctx context.Context,
method string,
req,
reply interface{},
cc *ClientConn,
invoker UnaryInvoker,
opts ...CallOption,
) error
一元拦截器的实现通常可以分为三个部分:预处理,调用 RPC 方法和后处理。其一共分为七个参数,分别是:RPC 上下文、所调用的方法、RPC 方法的请求参数和响应结果,客户端连接句柄、所调用的 RPC 方法以及调用的配置。
3.8.2.1.2 流拦截器
客户端的流拦截器类型为 StreamClientInterceptor
,方法原型如下:
type StreamClientInterceptor func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
streamer Streamer,
opts ...CallOption,
) (ClientStream, error)
流拦截器的实现包括预处理和流操作拦截,并不能在事后进行 RPC 方法调用和后处理,而是拦截用户对流的操作。
3.8.2.2 服务端
3.8.2.2.1 一元拦截器
服务端的一元拦截器类型为 UnaryServerInterceptor
,方法原型如下:
type UnaryServerInterceptor func(
ctx context.Context,
req interface{},
info *UnaryServerInfo,
handler UnaryHandler,
) (resp interface{}, err error)
其一共包含四个参数,分别是 RPC 上下文、RPC 方法的请求参数、RPC 方法的所有信息、RPC 方法本身。
3.8.2.2.2 流拦截器
服务端的流拦截器类型为 StreamServerInterceptor
,方法原型如下:
type StreamServerInterceptor func(
srv interface{},
ss ServerStream,
info *StreamServerInfo,
handler StreamHandler,
) error
3.8.3 实现一个拦截器
在了解了 gRPC 拦截器的基本概念后,我们打开前面所实现的 RPC 服务,做一个简单的尝试,打开 main.go 文件,新增拦截器相关的代码:
...
func runGrpcServer() *grpc.Server {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(HelloInterceptor),
}
s := grpc.NewServer(opts...)
pb.RegisterTagServiceServer(s, server.NewTagServer())
reflection.Register(s)
return s
}
func HelloInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("你好")
resp, err := handler(ctx, req)
log.Println("再见")
return resp, err
}
在上述代码中,我们除了实现一个简单的一元拦截器以外,还初次使用到了 grpc.ServerOption
,gRPC Server 的相关属性都可以在此设置,例如:credentials、keepalive 等等参数。服务端拦截器也在此注册,但是需要以指定的类型进行封装,例如一元拦截器是使用 grpc.UnaryInterceptor
。
在验证上,我们修改完毕后需要重新启动该服务,调用对应的 RPC 接口,查看控制台是否输出“你好”和“再见”两个字符串,若有则实现正确。
3.8.4 能使用多少个拦截器
既然我们实现了一个拦截器,那么在实际的应用程序中,肯定是不止一个了,按常规来讲,既然支持了一个,支持多个拦截器的注册和使用应该不过分吧,我们再来试试,代码如下:
func runGrpcServer() *grpc.Server {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(HelloInterceptor),
grpc.UnaryInterceptor(WorldInterceptor),
}
s := grpc.NewServer(opts...)
pb.RegisterTagServiceServer(s, server.NewTagServer())
reflection.Register(s)
return s
}
func HelloInterceptor(...) (interface{}, error) {
log.Println("你好,红烧煎鱼")
resp, err := handler(ctx, req)
log.Println("再见,红烧煎鱼")
return resp, err
}
func WorldInterceptor(...) (interface{}, error) {
log.Println("你好,清蒸煎鱼")
resp, err := handler(ctx, req)
log.Println("再见,清蒸煎鱼")
return resp, err
}
重新运行服务,查看输出结果,如下:
panic: The unary server interceptor was already set and may not be reset.
你会发现启动服务就报错了,会提示你“一元服务器拦截器已经设置,不能重置”,也就是一种类型的拦截器只允许设置一个。
3.8.5 真的需要多个拦截器
虽然 grpc-go 官方只允许设置一个拦截器,但不代表我们只能"用"一个拦截器。
在实际使用上,我们常常会希望将不同功能设计为不同的拦截器,这个时候,除了自己实现一套多拦截器的逻辑(拦截器中调拦截器即可)以外,我们还可以直接使用 gRPC 应用生态(grpc-ecosystem)中的 go-grpc-middleware 所提供的 grpc.UnaryInterceptor
和 grpc.StreamInterceptor
链式方法来达到这个目的。
3.8.5.1 安装
在项目根目录下执行如下安装命令:
$ go get -u github.com/grpc-ecosystem/go-grpc-middleware@v1.1.0
3.8.5.2 使用
修改 gRPC Sever 的相关代码,进行多拦截器的注册,如下:
func runGrpcServer() *grpc.Server {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
HelloInterceptor,
WorldInterceptor,
)),
}
s := grpc.NewServer(opts...)
pb.RegisterTagServiceServer(s, server.NewTagServer())
reflection.Register(s)
return s
}
在 grpc.UnaryInterceptor
中嵌套 grpc_middleware.ChainUnaryServer
后重新启动服务,查看输出结果:
你好,红烧煎鱼
你好,清蒸煎鱼
再见,清蒸煎鱼
再见,红烧煎鱼
两个拦截器的调用成功,完成常规多拦截器的需求。
3.8.5.3 是怎么实现的
单单会用还是不行的,go-grpc-middleware 它到底是怎么实现这个需求的呢,我们一起看看:
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(...) error {
var (
chainHandler grpc.UnaryInvoker
curI int
)
chainHandler = func(...) error {
if curI == lastI {
return invoker(...)
}
curI++
err := interceptors[curI](...)
curI--
return err
}
// 上述所省略的入参与该方法调用的入参一致
return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
}
}
...
}
当拦截器数量大于 1 时,从 interceptors[1]
开始递归,每一个递归的拦截器 interceptors[i]
会不断地执行,最后才会去去真正执行代表 RPC 方法的 handler
。
3.8.6 服务端-常用拦截器
在项目的运行中,常常会有那么一些应用拦截器,是必须要有的,因此我们可以总结出来一套简单的而行之有效的“公共”拦截器,在本节我们将模拟实际的使用场景进来实现。
我们在项目的 internal/middleware
目录下新建存储服务端拦截器的 server_interceptor.go 文件,另外后续的服务端拦截器的相关注册行为也均在 runGrpcServer 方法中进行处理,例如:
func runGrpcServer() *grpc.Server {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
middleware.XXXXX,
)),
}
...
}
3.8.6.1 日志
在应用程序的运行中,我们常常需要一些信息来协助我们做问题的排查和追踪,因此日志信息的及时记录和处理是非常必要的,接下来我们将针对常见的访问日志和错误日志进行日志输出。在实际使用的过程中,可以将案例中的默认日志实例替换为应用中实际在使用的文件日志的模式(例如参考第二章的日志器)。
3.8.6.1.1 访问日志
打开 server_interceptor.go 文件,新增针对访问记录的日志拦截器,代码如下:
func AccessLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
requestLog := "access request log: method: %s, begin_time: %d, request: %v"
beginTime := time.Now().Local().Unix()
log.Printf(requestLog, info.FullMethod, beginTime, req)
resp, err := handler(ctx, req)
responseLog := "access response log: method: %s, begin_time: %d, end_time: %d, response: %v"
endTime := time.Now().Local().Unix()
log.Printf(responseLog, info.FullMethod, beginTime, endTime, resp)
return resp, err
}
完成 AccessLog 拦截器的编写后,将其注册到 gRPC Server 中去,然后重新启动服务进行验证,在进行验证时,也就是调用 RPC 方法时,会输出两条日志记录。
这时候可能有的读者会疑惑,为什么要在该 RPC 方法执行的前后各输出一条类似但不完全一致的日志呢,这有什么用意,为什么不是直接在 RPC 方法执行完毕后输出一条就好了,会不会有些重复用工?
其实不然,如果仅仅只在 RPC 方法执行完毕后才输出、落地日志,那么我们可以来假设两个例子:
- 这个 RPC 方法在执行遇到了一些意外情况,执行了很久,也不知道什么时候返回(无其它措施的情况下)。
- 在执行过程中因极端情况出现了 OOM,RPC 方法未执行完毕,就被系统杀掉了。
这两个例子的情况可能会造成什么问题呢,一般来讲,会去看日志,基本是因为目前应用系统已经出现了问题,那么第一种情况,就非常常见,如果只打 RPC 方法执行完毕后的日志,单看日志,可能会压根就没有你所需要的访问日志,因为它还在执行中;而第二种情况,就根本上也没有达到完成。
那么从结果上来讲,日志的部分缺失有可能会导致你误判当前事故的原因,影响你的全链路˙追踪,需要花费更多的精力去排查。
3.8.6.1.2 错误日志
打开 server_interceptor.go 文件,新增普通错误记录的日志拦截器,代码如下:
func ErrorLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
errLog := "error log: method: %s, code: %v, message: %v, details: %v"
s := errcode.FromError(err)
log.Printf(errLog, info.FullMethod, s.Code(), s.Err().Error(), s.Details())
}
return resp, err
}
同之前的拦截器一样,编写后进行注册,该拦截器可以针对所有 RPC 方法的错误返回进行记录,便于对 error 级别的错误进行统一规管和观察。
3.8.6.2 异常捕获
接下来我们针对异常的捕获进行处理,在开始编写拦截器之前,我们尝试一下:
func (t *TagServer) GetTagList(ctx context.Context, r *pb.GetTagListRequest) (*pb.GetTagListReply, error) {
panic("测试抛出异常!")
...
}
在 gRPC Server 的 GetTagList 方法中,我们加入 panic
语句的调用,模拟抛出异常的情况,再重新运行服务,看看在没有加任何“防护”的情况下,会出现什么情况,如下:
$ go run main.go
access request log: method: /proto.TagService/GetTagList, begin_time: 159999999, request:
panic: 测试抛出异常!
goroutine 40 [running]:
github.com/go-programming-tour-book/tag-service/server.(*TagServer).GetTagList(0x1a4f728, 0x169af00, 0xc00016e720, 0xc00019a400, 0x1a4f728, 0xc0001c1790, 0x102eac1)
/Users/eddycjy/go-programming-tour-book/tag-service/server/tag.go:21 +0x39
github.com/go-programming-tour-book/tag-service/proto._TagService_GetTagList_Handler.func1(0x169af00, 0xc00016e720, 0x1594260, 0xc00019a400, 0xc0001c17a0, 0xc0001c1870, 0x10ec347, 0xc0000b2000)
/Users/eddycjy/go-programming-tour-book/tag-service/proto/tag.pb.go:265 +0x86
...
你没有看错,服务直接因为异常抛出中断了,这是一个非常糟糕的情况,意味着该服务无法提供响应了。为了解决这个问题,我们需要新增一个自定义的异常捕抓拦截器,打开 middleware.go 文件,新增如下代码:
func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
defer func() {
if e := recover(); e != nil {
recoveryLog := "recovery log: method: %s, message: %v, stack: %s"
log.Printf(recoveryLog, info.FullMethod, e, string(debug.Stack()[:]))
}
}()
return handler(ctx, req)
}
同之前的拦截器一样,编写后进行注册,该拦截器可以针对所有 RPC 方法所抛出的异常进行捕抓和记录,确保不会因为未知的 panic
语句的执行导致整个服务中断,在实际项目的应用中,你可以根据公司内的可观察性的技术栈情况,进行一些定制化的处理,那么它就会更加的完善。
3.8.7 客户端-常用拦截器
我们在项目的 internal/middleware
目录下新建存储客户端拦截器的 client_interceptor.go 文件,针对一些常用场景编写一些客户端拦截器。另外后续的客户端拦截器相关注册行为是在调用 grpc.Dial
或 grpc.DialContext
前通过 DialOption
配置选项进行注册的,例如:
var opts []grpc.DialOption
opts = append(opts, grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
middleware.XXXXX(),
),
))
opts = append(opts, grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
middleware.XXXXX(),
),
))
clientConn, err := grpc.DialContext(ctx, target, opts...)
...
3.8.7.1 超时控制(上下文)
超时时间的设置和适当控制,是在微服务架构中非常重要的一个保全项。
我们假设一个应用场景,你有多个服务,他们分别是 A、B、C、D,他们之间是最简单的关联依赖,也就是 A=》B=》C=》D。在某一天,你有一个需求上线了,修改的代码内容正正好就是与服务 D 相关的,恰好这个需求就对应着一轮业务高峰的使用,但你发现不知道为什么,你的服务 A、B、C、D 全部都出现了响应缓慢,整体来看,开始出现应用系统雪崩….这到底是怎么了?
从根本上来讲,是服务 D 出现了问题,所导致的这一系列上下游服务出现连锁反应,因为在服务调用中默认没有设置超时时间,或者所设置的超时时间过长,都会导致多服务下的整个调用链雪崩,导致非常严重的事故,因此任何调用的默认超时时间的设置是非常有必要的,在 gRPC 中更是强调 TL;DR(Too long, Don’t read)并建议始终设定截止日期。
因此在本节我们将针对 RPC 的内部调用设置默认的超时控制,在 client_interceptor.go 文件下,新增如下代码:
func defaultContextTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
if _, ok := ctx.Deadline(); !ok {
defaultTimeout := 60 * time.Second
ctx, cancel = context.WithTimeout(ctx, defaultTimeout)
}
return ctx, cancel
}
在上述代码中,我们通过对传入的 context 调用 ctx.Deadline
方法进行检查,若未设置截止时间的话,其将会返回 false
,那么我们就会对其调用 context.WithTimeout
方法设置默认超时时间为 60 秒(该超时时间设置是针对整条调用链路的,若需要另外调整,可在应用代码中再自行调整)。
接下来我们分别对 gRPC 的一元调用和流式调用编写对应的客户端拦截器,如下:
func UnaryContextTimeout() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, cancel := defaultContextTimeout(ctx)
if cancel != nil {
defer cancel()
}
return invoker(ctx, method, req, resp, cc, opts...)
}
}
func StreamContextTimeout() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx, cancel := defaultContextTimeout(ctx)
if cancel != nil {
defer cancel()
}
return streamer(ctx, desc, cc, method, opts...)
}
}
在编写完拦截器后,在进行 RPC 内调前进行注册就可以生效了。
3.8.7.2 重试操作
在整体的服务运行中,偶尔会出现一些“奇奇怪怪”的网络波动、流量限制、服务器资源突现异常(但很快下滑),需要稍后访问的情况,这时候我们常常需要采用一些退避策略,稍作等待后进行二次重试,确保应用程序的最终成功,因此对于我们 gRPC 客户端来讲,一个基本的重试是必要的。如果没有定制化需求的话,我们可以直接采用 gRPC 生态圈中的 grpc_retry 拦截器实现基本的重试功能,如下:
var opts []grpc.DialOption
opts = append(opts, grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(2),
grpc_retry.WithCodes(
codes.Unknown,
codes.Internal,
codes.DeadlineExceeded,
),
),
),
))
...
在上述 grpc_retry 拦截器中,我们设置了最大重试次数为 2 次,仅针对 gRPC 错误码为 Unknown、Internal、DeadlineExceeded 的情况。
这里需要注意的第一点是,它确定是否需要重试的维度是以错误码为标准,因此做服务重试的第一点,就是你需要在设计微服务的应用时,明确其状态码的规则,确保多服务的状态码的标准是一致的(可通过基础框架、公共代码库等方式落地),另外第二点是要尽可能的保证接口设计是幂等的,保证允许重试,也不会造成灾难性的问题,例如:重复扣库存。
3.8.8 实战演练
在刚刚的超时控制的拦截器中,我们完善了默认的超时控制,那我们的系统中,有没有类似的风险,那当然是有的,而且还是我们在 Go 语言编程中非常经典的问题。我们在实现 gRPC Server 的 GetTagList 方法时,数据源是来自第二章的博客后端应用(blog-service),如下:
func (t *TagServer) GetTagList(ctx context.Context, r *pb.GetTagListRequest) (*pb.GetTagListReply, error) {
api := bapi.NewAPI("http://127.0.0.1:8000")
body, err := api.GetTagList(ctx, r.GetName())
...
}
那么我们来模拟一下,假设这个博客后端的应用,出现了问题,假死,持续不返回,如下(我们通过休眠来模拟):
func (t Tag) List(c *gin.Context) {
time.Sleep(time.Hour)
...
}
我们打开第二章的博客后端应用,对“获取标签列表接口”新增长时间的休眠,接着调用 gRPC Server 的接口,例如:当你的 gRPC Server 起的是 8000 端口,则调用 http://127.0.0.1:8010/api/v1/tags
。
我们会看到 gRPC Server 中输出的访问日志如下:
access request log: method: /proto.TagService/GetTagList, begin_time: xxxxx, request:
// 没有下文,等了很久,response log 迟迟没有出现。
你会发现并没有响应结果,只输出了访问时间中的 request log,这时候你可以把这段请求一直挂着,你会发现,无论多久它都不会返回,直至休眠时间结束。
我们考虑一下实际场景,一般我们会用到 HTTP API,基本上都是因为依赖第三方接口。那假设这个第三方接口,出现了问题,也就是接口响应极度缓慢,甚至假死,没有任何响应。但是,你的应用是正常的,那么流量就会不断地打进你的应用中,这就会形成一个恶性循环,阻塞等待的协程会越来越多,开销越来越大,最终就会导致上游服务出现问题,那么你这个下游服务也会逐渐崩溃,最终形成连锁反应。
3.8.8.1 为什么
为什么一个休眠会带来那么大的问题呢,我们再看看 HTTP API SDK 的代码,如下:
func (a *API) httpGet(ctx context.Context, path string) ([]byte, error) {
resp, err := http.Get(fmt.Sprintf("%s/%s", a.URL, path))
...
}
我们默认使用的 http.Get
方法,其内部源码:
func Get(url string) (resp *Response, err error) {
return DefaultClient.Get(url)
}
实际上它使用的是标准库中预定的包全局变量 DefaultClient,而 DefaultClient 的 Timeout 的默认值是零值,相当于是 0,那么当 Timeout 值为 0 时,默认认为是没有任何超时时间限制的,也就是会无限等待,直至响应为止,这就是其出现问题的根本原因之一。
3.8.8.2 解决方案
那么针对现在这个问题,我们有至少两种解决方法,分别是自定义 HTTPClient,又或是通过我们的超时控制来解决这个问题,如下:
func (a *API) httpGet(ctx context.Context, path string) ([]byte, error) {
resp, err := ctxhttp.Get(ctx, http.DefaultClient, fmt.Sprintf("%s/%s", a.URL, path))
...
}
我们将 http.Get
方法修改为 ctxhttp.Get
方法,将上下文(ctx)传入到该方法中,那么它就会受到上下文的超时控制了。但是这种方法有一个前提,那就是客户端在调用时需要将超时控制的拦截器注册进去,如下:
func main() {
ctx := context.Background()
clientConn, err := GetClientConn(ctx, "tag-service", []grpc.DialOption{grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(middleware.UnaryContextTimeout()),
)})
...
}
再次进行验证,如下:
tagServiceClient.GetTagList err: rpc error: code = DeadlineExceeded desc = context deadline exceeded
exit status 1
在到达截止时间后,客户端将自动断开,提示 DeadlineExceeded
,那么从结果上来讲,当上游服务出现问题时,你当前服务再去调用它,也不会受到过多的影响,因为你通过超时时间进行了及时的止损,因此默认超时时间的设置和设置多少是非常有意义和考究的。
但是此时此刻,服务端本身可能还在无限的阻塞中,客户端断开的仅仅只是自己,因此服务端本身也建议设置默认的最大执行时间,以确保最大可用性和避免存在忘记设置超时控制的客户端所带来的最坏情况。
3.8.8.3 思考,如何发现
刚刚是由我告诉你结果反推出来的问题和解决方案,那假设是你自己,你又如何发现和解决呢?
最简单的方式有两种,分别是通过日志和链路追踪发现,假设是上述提到的问题,在我们所打的访问日志中,它只会返回 request log,而不会返回 response log。那如果是用分布式链路追踪系统,会非常明显的出现某个 Span 的调用链会耗时特别久,这就是一个危险的味道。更甚至可以通过对这些指标数据进行分析,当出现该类情况时,直接通过分析确定是否要报警、自愈,那将更妥当。
3.8.9 小结
在本章节中,我们先是介绍了 gRPC 中的拦截器类型和通过简单的示例对具体的使用方式进行说明和剖析。在介绍了拦截器的基础知识后,我们结合实际项目中的常见问题编写了一系列常用拦截器,让其为我们的应用保驾护航。
而关于链式拦截器上,也就是多拦截器的使用,我们推荐的是 go-grpc-middleware 的方案,不过从 grpc v1.28.0 起有热心的社区朋友贡献并合并了链式拦截器的相关方法(可参见 issues #935),大家可根据实际项目情况选用。