应用中间件

2.9 应用中间件

完成了接口的访问控制后,心中的一块大石终于落地了,你在开发服务器上将这个项目运行了起来,等着另外一位同事和你对接你所编写的后端接口后便愉快的先下班了。

但结果第二天你一来,该同事非常苦恼的和你说,你的接口,怎么调一下就出问题了,你大为震惊,详细的咨询了是几时调用的接口,调用的接口是哪个,入参又是什么?这时候更无奈的问题出现了,该同事只记得好像大概是晚上 9 点多,入参忘记记录了,它的调试工具上也是密密麻麻的访问记录,根本就分不清楚是哪一条入参记录,它只隐隐约约的记得是某一个接口。

这时候的你,想着去开发服务器上看看访问情况,结果发现,你默认使用的是 gin 的 Logging 和 Recovery,也就是在控制台上输出一些访问和异常记录,但很尴尬的是,它并没有成功记录到你所需要的一些数据,这样子你就无法及时的进行复现和响应,更别说现在还没进行多服务间内部调用和压力测试了。

以上问题,在一个项目的雏形初期很常见,实际上针对不同的环境,我们应该进行一些特殊的调整,而往往这些都是有规律可依的,一些常用的应用中间件就可以妥善的解决这些问题,接下来在这篇文章中我们去编写一些在项目中比较常见的应用中间件。

2.9.1 访问日志记录

在出问题时,我们常常会需要去查日志,那么除了查错误日志、业务日志以外,还有一个很重要的日志类别,就是访问日志,从功能上来讲,它最基本的会记录每一次请求的请求方法、方法调用开始时间、方法调用结束时间、方法响应结果、方法响应结果状态码,更进一步的话,会记录 RequestId、TraceId、SpanId 等等附加属性,以此来达到日志链路追踪的效果,如下图:

image

但是在正式开始前,你又会遇到一个问题,你没办法非常直接的获取到方法所返回的响应主体,这时候我们需要巧妙利用 Go interface 的特性,实际上在写入流时,调用的是 http.ResponseWriter,如下:

type ResponseWriter interface {
	Header() Header
	Write([]byte) (int, error)
	WriteHeader(statusCode int)
}

那么我们只需要写一个针对访问日志的 Writer 结构体,实现我们特定的 Write 方法就可以解决无法直接取到方法响应主体的问题了。我们打开项目目录 internal/middleware 并创建 access_log.go 文件,写入如下代码:

type AccessLogWriter struct {
	gin.ResponseWriter
	body *bytes.Buffer
}

func (w AccessLogWriter) Write(p []byte) (int, error) {
	if n, err := w.body.Write(p); err != nil {
		return n, err
	}
	return w.ResponseWriter.Write(p)
}

我们在 AccessLogWriter 的 Write 方法中,实现了双写,因此我们可以直接通过 AccessLogWriter 的 body 取到值,接下来我们继续编写访问日志的中间件,写入如下代码:

func AccessLog() gin.HandlerFunc {
	return func(c *gin.Context) {
		bodyWriter := &AccessLogWriter{body: bytes.NewBufferString(""), ResponseWriter: c.Writer}
		c.Writer = bodyWriter

		beginTime := time.Now().Unix()
		c.Next()
		endTime := time.Now().Unix()

		fields := logger.Fields{
			"request":  c.Request.PostForm.Encode(),
			"response": bodyWriter.body.String(),
		}
		global.Logger.WithFields(fields).Infof("access log: method: %s, status_code: %d, begin_time: %d, end_time: %d",
			c.Request.Method,
			bodyWriter.Status(),
			beginTime,
			endTime,
		)
	}
}

在 AccessLog 方法中,我们初始化了 AccessLogWriter,将其赋予给当前的 Writer 写入流(可理解为替换原有),并且通过指定方法得到我们所需的日志属性,最终写入到我们的日志中去,其中涉及到了如下信息:

  • method:当前的调用方法。
  • request:当前的请求参数。
  • response:当前的请求结果响应主体。
  • status_code:当前的响应结果状态码。
  • begin_time/end_time:调用方法的开始时间,调用方法结束的结束时间。

2.9.2 异常捕获处理

在异常造成的恐慌发生时,你一定不在现场,因为你不能随时随地的盯着控制台,在常规手段下你也不知道它几时有可能发生,因此对于异常的捕获和及时的告警通知是非常重要的,而发现这些可能性的手段有非常多,我们本次采取的是最简单的捕获和告警通知,如下图:

image

2.9.2.1 自定义 Recovery

在前文中我们看到 gin 本身已经自带了一个 Recovery 中间件,但是在项目中,我们需要针对我们的公司内部情况或生态圈定制 Recovery 中间件,确保异常在被正常捕抓之余,要及时的被识别和处理,因此自定义一个 Recovery 中间件是非常有必要的,如下:

func Recovery() gin.HandlerFunc {
	return func(c *gin.Context) {
		defer func() {
			if err := recover(); err != nil {
				global.Logger.WithCallersFrames().Errorf("panic recover err: %v", err)
				app.NewResponse(c).ToErrorResponse(errcode.ServerError)
				c.Abort()
			}
		}()
		c.Next()
	}
}

2.9.2.2 邮件报警处理

另外我们在实现 Recovery 的同时,需要实现一个简单的邮件报警功能,确保出现 Panic 后,在捕抓之余能够通过邮件报警来及时的通知到对应的负责人。

2.9.2.2.1 安装

首先在项目目录下执行安装命令,如下:

go get -u gopkg.in/gomail.v2

Gomail 是一个用于发送电子邮件的简单又高效的第三方开源库,目前只支持使用 SMTP 服务器发送电子邮件,但是其 API 较为灵活,如果有其它的定制需求也可以轻易地借助其实现,这恰恰好符合我们的需求,因为目前我们只需要一个小而美的发送电子邮件的库就可以了。

2.9.2.2.2 邮件工具库

在项目目录 pkg 下新建 email 目录并创建 email.go 文件,我们需要针对发送电子邮件的行为进行一些封装,写入如下代码:

type Email struct {
	*SMTPInfo
}

type SMTPInfo struct {
	Host     string
	Port     int
	IsSSL    bool
	UserName string
	Password string
	From     string
}

func NewEmail(info *SMTPInfo) *Email {
	return &Email{SMTPInfo: info}
}

func (e *Email) SendMail(to []string, subject, body string) error {
	m := gomail.NewMessage()
	m.SetHeader("From", e.From)
	m.SetHeader("To", to...)
	m.SetHeader("Subject", subject)
	m.SetBody("text/html", body)

	dialer := gomail.NewDialer(e.Host, e.Port, e.UserName, e.Password)
	dialer.TLSConfig = &tls.Config{InsecureSkipVerify: e.IsSSL}
	return dialer.DialAndSend(m)
}

在上述代码中,我们定义了 SMTPInfo 结构体用于传递发送邮箱所必需的信息,而在 SendMail 方法中,我们首先调用 NewMessage 方法创建一个消息实例,可以用于设置邮件的一些必要信息,分别是:

  • 发件人(From)

  • 收件人(To)

  • 邮件主题(Subject)

  • 邮件正文(Body)

在完成消息实例的基本信息设置后,调用 NewDialer 方法创建一个新的 SMTP 拨号实例,设置对应的拨号信息用于连接 SMTP 服务器,最后再调用 DialAndSend 方法打开与 SMTP 服务器的连接并发送电子邮件。

2.9.2.2.3 初始化配置信息

本次要做的发送电子邮件的行为,实际上你可以理解是与一个 SMTP 服务进行交互,那么除了自建 SMTP 服务器以外,我们可以使用目前市面上常见的邮件提供商,它们也是有提供 SMTP 服务的,首先我们打开项目的配置文件 config.yaml,新增如下 Email 的配置项:

Email:
  Host: smtp.qq.com
  Port: 465
  UserName: xxxx@qq.com
  Password: xxxxxxxx
  IsSSL: true
  From: xxxx@qq.com
  To:
    - xxxx@qq.com

通过 HOST 我们可以知道我用的是 QQ 邮件的 SMTP,这个只需要在”QQ 邮箱-设置-账户-POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV 服务“选项中将”POP3/SMTP 服务“和”IMAP/SMTP 服务“开启,然后根据所获取的 SMTP 账户密码进行设置即可,另外 SSL 是默认开启的。

另外需要特别提醒的一点是,我们所填写的 SMTP Server 的 HOST 端口号是 465,而常用的另外一类还有 25 端口号 ,但我强烈不建议使用 25,你应当切换为 465,因为 25 端口号在云服务厂商上是一个经常被默认封禁的端口号,并且不可解封,使用 25 端口,你很有可能会遇到部署进云服务环境后告警邮件无法正常发送出去的问题。

接下来我们在项目目录 pkg/setting 的 section.go 文件中,新增对应的 Email 配置项,如下:

type EmailSettingS struct {
	Host     string
	Port     int
	UserName string
	Password string
	IsSSL    bool
	From     string
	To       []string
}

并在在项目目录 global 的 setting.go 文件中,新增 Email 对应的配置全局对象,如下:

var (
	...
	EmailSetting    *setting.EmailSettingS
	...
)

最后就是在项目根目录的 main.go 文件的 setupSetting 方法中,新增 Email 配置项的读取和映射,如下:

func setupSetting() error {
	...
	err = s.ReadSection("Email", &global.EmailSetting)
	if err != nil {
		return err
	}
	...
}

2.9.2.3 编写中间件

我们打开项目目录 internal/middleware 并创建 recovery.go 文件,写入如下代码:

func Recovery() gin.HandlerFunc {
	defailtMailer := email.NewEmail(&email.SMTPInfo{
		Host:     global.EmailSetting.Host,
		Port:     global.EmailSetting.Port,
		IsSSL:    global.EmailSetting.IsSSL,
		UserName: global.EmailSetting.UserName,
		Password: global.EmailSetting.Password,
		From:     global.EmailSetting.From,
	})
	return func(c *gin.Context) {
		defer func() {
			if err := recover(); err != nil {
				global.Logger.WithCallersFrames().Errorf("panic recover err: %v", err)

				err := defailtMailer.SendMail(
					global.EmailSetting.To,
					fmt.Sprintf("异常抛出,发生时间: %d", time.Now().Unix()),
					fmt.Sprintf("错误信息: %v", err),
				)
				if err != nil {
					global.Logger.Panicf("mail.SendMail err: %v", err)
				}

				app.NewResponse(c).ToErrorResponse(errcode.ServerError)
				c.Abort()
			}
		}()
		c.Next()
	}
}

在本项目中,我们的 Mailer 是固定的,因此我们直接将其定义为了 defailtMailer,接着在捕获到异常后调用 SendMail 方法进行预警邮件发送,效果如下:

image

这里具体的邮件模板你可以根据实际情况进行定制。

2.9.3 服务信息存储

平时我们经常会需要在进程内上下文设置一些内部信息,例如是应用名称和应用版本号这类基本信息,也可以是业务属性的信息存储,例如是根据不同的租户号获取不同的数据库实例对象,这时候就需要有一个统一的地方处理,如下图:

image

我们打开项目下的 internal/middleware 目录并新建 app_info.go 文件,写入如下代码:

func AppInfo() gin.HandlerFunc {
	return func(c *gin.Context) {
		c.Set("app_name", "blog-service")
		c.Set("app_version", "1.0.0")
		c.Next()
	}
}

在上述代码中我们就需要用到 gin.Context 所提供的 setter 和 getter,在 gin 中称为元数据管理(Metadata Management),大致如下:

func (c *Context) Set(key string, value interface{}) {
	if c.Keys == nil {
		c.Keys = make(map[string]interface{})
	}
	c.Keys[key] = value
}

func (c *Context) Get(key string) (value interface{}, exists bool) {
	value, exists = c.Keys[key]
	return
}

func (c *Context) MustGet(key string) interface{} {...}
func (c *Context) GetString(key string) (s string) {...}
func (c *Context) GetBool(key string) (b bool) {...}
func (c *Context) GetInt(key string) (i int) {...}
func (c *Context) GetInt64(key string) (i64 int64) {...}
func (c *Context) GetFloat64(key string) (f64 float64) {...}
func (c *Context) GetTime(key string) (t time.Time) {...}
func (c *Context) GetDuration(key string) (d time.Duration) {...}
func (c *Context) GetStringSlice(key string) (ss []string) {...}
func (c *Context) GetStringMap(key string) (sm map[string]interface{}) {...}
func (c *Context) GetStringMapString(key string) (sms map[string]string) {...}
func (c *Context) GetStringMapStringSlice(key string) (smss map[string][]string) {...}

实际上我们可以看到在 gin 中的 metadata,其实就是利用内部实现的 gin.Context 中的 Keys 进行存储的,并配套了多种类型的获取和设置方法,相当的方便。另外我们可以注意到在默认的 Get 和 Set 方法中,传入和返回的都是 interface 类型,实际在业务属性的初始化逻辑处理中,我们可以通过对返回的 interface 进行类型断言,就可以获取到我们所需要的类型了。

2.9.4 接口限流控制

在应用程序的运行过程中,会不断地有新的客户端进行访问,而有时候会突然出现流量高峰(例如:营销活动),如果不及时进行削峰,资源整体又跟不上,那就很有可能会造成事故,因此我们常常会才有多种手段进行限流削峰,而针对应用接口进行限流控制就是其中一种方法,如下图:

image

2.9.4.1 安装

$ go get -u github.com/juju/ratelimit@v1.0.1

Ratelimit 提供了一个简单又高效的令牌桶实现,能够提供大量的方法帮助我们实现限流器的逻辑。

2.9.4.2 限流控制

2.9.4.2.1 LimiterIface

我们打开项目的 pkg/limiter 目录并新建 limiter.go 文件,写入如下代码:

type LimiterIface interface {
	Key(c *gin.Context) string
	GetBucket(key string) (*ratelimit.Bucket, bool)
	AddBuckets(rules ...LimiterBucketRule) LimiterIface
}

type Limiter struct {
	limiterBuckets map[string]*ratelimit.Bucket
}

type LimiterBucketRule struct {
	Key          string
	FillInterval time.Duration
	Capacity     int64
	Quantum      int64
}

在上述代码中,我们声明了 LimiterIface 接口,用于定义当前限流器所必须要的方法。

为什么要这么做呢,实际上需要知道一点,限流器是存在多种实现的,可能某一类接口需要限流器 A,另外一类接口需要限流器 B,所采用的策略不是完全一致的,因此我们需要声明 LimiterIfac 这类通用接口,保证其接口的设计,我们初步的在 Iface 接口中,一共声明了三个方法,如下:

  • Key:获取对应的限流器的键值对名称。
  • GetBucket:获取令牌桶。
  • AddBuckets:新增多个令牌桶。

同时我们定义 Limiter 结构体用于存储令牌桶与键值对名称的映射关系,并定义 LimiterBucketRule 结构体用于存储令牌桶的一些相应规则属性,如下:

  • Key:自定义键值对名称。
  • FillInterval:间隔多久时间放 N 个令牌。
  • Capacity:令牌桶的容量。
  • Quantum:每次到达间隔时间后所放的具体令牌数量。

至此我们就完成了一个 Limter 最基本的属性定义了,接下来我们将针对不同的情况实现我们这个项目中的限流器。

2.9.4.2.2 MethodLimiter

我们第一个编写的简单限流器的主要功能是针对路由进行限流,因为在项目中,我们可能只需要对某一部分的接口进行流量调控,我们打开项目下的 pkg/limiter 目录并新建 method_limiter.go 文件,写入如下代码:

type MethodLimiter struct {
	*Limiter
}

func NewMethodLimiter() LimiterIface {
	return MethodLimiter{
		Limiter: &Limiter{limiterBuckets: make(map[string]*ratelimit.Bucket)},
	}
}

func (l MethodLimiter) Key(c *gin.Context) string {
	uri := c.Request.RequestURI
	index := strings.Index(uri, "?")
	if index == -1 {
		return uri
	}

	return uri[:index]
}

func (l MethodLimiter) GetBucket(key string) (*ratelimit.Bucket, bool) {
	bucket, ok := l.limiterBuckets[key]
	return bucket, ok
}

func (l MethodLimiter) AddBuckets(rules ...LimiterBucketRule) LimiterIface {
	for _, rule := range rules {
		if _, ok := l.limiterBuckets[rule.Key]; !ok {
			l.limiterBuckets[rule.Key] = ratelimit.NewBucketWithQuantum(rule.FillInterval, rule.Capacity, rule.Quantum)
		}
	}

	return l
}

在上述代码中,我们针对 LimiterIface 接口实现了我们的 MethodLimiter 限流器,主要逻辑是在 Key 方法中根据 RequestURI 切割出核心路由作为键值对名称,并在 GetBucket 和 AddBuckets 进行获取和设置 Bucket 的对应逻辑。

2.9.4.3 编写中间件

在完成了限流器的逻辑编写后,打开项目下的 internal/middleware 目录并新建 limiter.go 文件,将整体的限流器与对应的中间件逻辑串联起来,写入如下代码:

func RateLimiter(l limiter.LimiterIface) gin.HandlerFunc {
	return func(c *gin.Context) {
		key := l.Key(c)
		if bucket, ok := l.GetBucket(key); ok {
			count := bucket.TakeAvailable(1)
			if count == 0 {
				response := app.NewResponse(c)
				response.ToErrorResponse(errcode.TooManyRequests)
				c.Abort()
				return
			}
		}

		c.Next()
	}
}

在 RateLimiter 中间件中,需要注意的是入参应该为 LimiterIface 接口类型,这样子的话只要符合该接口类型的具体限流器实现都可以传入并使用,另外比较重要的就是 TakeAvailable 方法,它会占用存储桶中立即可用的令牌的数量,返回值为删除的令牌数,如果没有可用的令牌,将会返回 0,也就是已经超出配额了,因此这时候我们将返回 errcode.TooManyRequest 状态告诉客户端需要减缓并控制请求速度。

2.9.5 统一超时控制

在应用程序的运行中,常常会遇到一个头疼的问题,调用链如果是应用 A =》应用 B =》应用 C,那如果应用 C 出现了问题,在没有任何约束的情况下持续调用,就会导致应用 A、B、C 均出现问题,也就是很常见的上下游应用的互相影响,导致连环反应,最终使得整个集群应用出现一定规模的不可用,如下图:

image

为了规避这种情况,最简单也是最基本的一个约束点,那就是统一的在应用程序中针对所有请求都进行一个最基本的超时时间控制,如下图:

image

为此我们就编写一个上下文超时时间控制的中间件来实现这个需求,打开项目下的 internal/middleware 目录并新建 context_timeout.go 文件,如下:

func ContextTimeout(t time.Duration) func(c *gin.Context) {
	return func(c *gin.Context) {
		ctx, cancel := context.WithTimeout(c.Request.Context(), t)
		defer cancel()

		c.Request = c.Request.WithContext(ctx)
		c.Next()
	}
}

在上述代码中,我们调用了 context.WithTimeout 方法设置当前 context 的超时时间,并重新赋予给了 gin.Context,这样子在当前请求运行到指定的时间后,在使用了该 context 的运行流程就会针对 context 所提供的超时时间进行处理,并在指定的时间进行取消行为。效果如下:

_, err := ctxhttp.Get(c.Request.Context(), http.DefaultClient, "https://www.google.com/")
if err != nil {
    log.Fatalf("ctxhttp.Get err: %v", err)
}

我们需要将我们设置了超时的 c.Request.Context() 给传递进去,在验证时你可以将默认超时时间调短来进行调试,其最终输出结果:

ctxhttp.Get err: context deadline exceeded
exit status 1

最后由于已经到达了截止时间,因此返回 context deadline exceeded 错误提示信息。另外这里还需要注意,如果你在进行多应用/服务的调用时,把父级的上下文信息(ctx)不断地传递下去,那么在统计超时控制的中间件中所设置的超时时间,其实是针对整条链路的,而不是针对单单每一条,如果你需要针对额外的链路进行超时时间的调整,那么只需要调用像 context.WithTimeout 等方法对父级 ctx 进行设置,然后取得子级 ctx,再进行新的上下文传递就可以了。

2.9.6 注册中间件

在完成一连串的通用中间件编写后,打开项目目录 internal/routers 下的 router.go 文件,修改注册应用中间件的逻辑,如下:

var methodLimiters = limiter.NewMethodLimiter().AddBuckets(limiter.LimiterBucketRule{
	Key:          "/auth",
	FillInterval: time.Second,
	Capacity:     10,
	Quantum:      10,
})

func NewRouter() *gin.Engine {
	r := gin.New()
	if global.ServerSetting.RunMode == "debug" {
		r.Use(gin.Logger())
		r.Use(gin.Recovery())
	} else {
		r.Use(middleware.AccessLog())
		r.Use(middleware.Recovery())
	}

	r.Use(middleware.RateLimiter(methodLimiters))
	r.Use(middleware.ContextTimeout(60 * time.Second))
	r.Use(middleware.Translations())
	...
	apiv1.Use(middleware.JWT()){...}

在上述代码中,我们根据不同的部署环境(RunMode)进行了应用中间件的设置,因为实际上在使用了自定义的 Logger 和 Recovery 后,就没有必要使用 gin 原有所提供的了,而在本地开发环境中,可能没有齐全应用生态圈,因此需要进行特殊处理。另外在常规项目中,自定义的中间件不仅包含了基本的功能,还包含了很多定制化的功能,同时在注册顺序上也注意,Recovery 这类应用中间件应当尽可能的早注册,这根据实际所要应用的中间件情况进行顺序定制就可以了。

这里我们可以看到 middleware.ContextTimeout 是写死的 60 秒,在此交给你一个小任务,你可以对其进行配置化(映射配置和秒数初始化),将超时的时间配置调整到配置文件中,而不是在代码中硬编码,最终结果应当如下:

r.Use(middleware.ContextTimeout(global.AppSetting.DefaultContextTimeout))

这样子的话,以后修改超时的时间就只需要通过修改配置文件就可以解决了,不需要人为的修改代码,甚至可以不需要开发人员的直接参与,让运维同事确认后直接修改即可。



本图书由 煎鱼 ©2020 版权所有,所有文章采用知识署名-非商业性使用-禁止演绎 4.0 国际进行许可。