软件技术学习笔记

个人博客,记录软件技术与程序员的点点滴滴。

EFK结构化日志

在分布式系统中,我们一般把日志集中收集到EFK或ELK中。默认情况下,Elasticsearch(ES)收集到的是一行一行日志文本,但是,这样的日志不方便业务检索,也没有充分发威ES的检索能力。

为了方便业务检索,我们做结构化日志,输出的内容均为JSON格式,经过Filebeat的简单处理,扔给ES存储与建立索引。查看日志时,可以对特定的业务字段做过滤,比如: user.id: 123。 日志输出格式如下,一条日志一行JSON:

{"@timestamp":"2020-11-29T14:37:17+08:00","fields.tag":"hello","log.level":"info","log.origin.file.name":"D:/Dev/Go/my-test/golang-logger/main.go:65","log.origin.function":"main.main.func3","message":"Hello abc","tracing.trace.id":"aabc:222:33a","tracing.transaction.id":"abc-2ww-abcd","user.id":123,"user.name":"abc"}

1. Filebeat 配置

为了方便配置,我们选用 Filebeat 7.10.0 (已尝试配置 Filebeat 7.1.1,但与K8s结合很不方便)。以下是K8s中的配置文件,开启json结构化日志自动探测(需Pod annotations配置):

# Allows you to add any config files in /usr/share/filebeat
# such as filebeat.yml
filebeatConfig:
  filebeat.yml: |
    filebeat.autodiscover:
      providers:
      - type: kubernetes
        node: ${NODE_NAME}
        hints.enabled: true
        hints.default_config:
          type: container
          paths:
          - /var/log/containers/*${data.kubernetes.container.id}.log
    processors:
      - add_host_metadata:

    output.elasticsearch:
      hosts:  ['${ELASTICSEARCH_HOSTS:elasticsearch-master:9200}']    

:使用开源的ES时,Filebeat DaemonSet镜像需选择 "docker.elastic.co/beats/filebeat-oss"。否则,出现Filebeat提交日志失败。

2. 微服务Deployment配置

对结构化打印日志的微服务,需在Pod的annotations中添加co.elastic.logs/json.*。否则,日志不会按照JSON格式自动解析。Deployment中的配置如下:

spec:
  template:
    metadata:
      annotations:
        co.elastic.logs/json.keys_under_root: "true"
        co.elastic.logs/json.overwrite_keys: "true"
        co.elastic.logs/json.add_error_key: "true"
        co.elastic.logs/json.message_key: "message"
      labels:
        logging: "json"

这样配置,Filebeat收集日志时遇到非JSON的文本行也能正常收集。

3. 微服务中日志打印

在 GoLang 中,我们可以使用Logrus来打印日志,示例代码如下:

package main

import (
	"fmt"
	"net/http"
	"os"

	"github.com/gin-gonic/gin"
	log "github.com/sirupsen/logrus"
)

func init() {
	// Log as JSON instead of the default ASCII formatter.
	log.SetFormatter(&log.JSONFormatter{
		FieldMap: log.FieldMap{
			log.FieldKeyTime:  "@timestamp",
			log.FieldKeyMsg:   "message",
			log.FieldKeyLevel: "log.level",
			log.FieldKeyFile:  "log.origin.file.name",
			log.FieldKeyFunc:  "log.origin.function",
		}})

	// Output to stdout instead of the default stderr
	// Can be any io.Writer, see below for File example
	log.SetOutput(os.Stdout)

	// Only log the warning severity or above.
	log.SetLevel(log.InfoLevel)

	log.SetReportCaller(true)
}

func main() {
	engine := gin.New()

	engine.GET("/ping", func(c *gin.Context) {
		log.Info("calling ping")
		c.JSON(http.StatusOK, gin.H{
			"message": "pong",
		})
	})

	engine.GET("/hello/:name", func(c *gin.Context) {
		name := c.Param("name")

		if name == "" {
			name = "world"
		}

		message := fmt.Sprintf("Hello %s", name)

		log.WithFields(log.Fields{
			"fields.tag": "hello",
			"user.id":    123,
			"user.name":  name,
			"tracing.trace.id": "aabc:222:33a",
			"tracing.transaction.id": "abc-2ww-abcd",
		}).Info(message)

		c.JSON(http.StatusOK, gin.H{
			"message": message,
		})
	})

	engine.Run(":8080")
}

其中的WithFields可以如下写,区别在于ES文档的原始JSON不一样,但是索引没有区别:

log.WithFields(log.Fields{
    "fields": log.Fields{
        "tag": "hello",
    },
    "user": log.Fields{
        "id":   123,
        "name": name,
    },
    "tracing": log.Fields{
        "trace": log.Fields{
            "id": "aabc:222:33a",
        },
        "transaction": log.Fields{
            "id": "abc-2ww-abcd",
        },
    },
}).Info(message)

:默认Filebeat会向ES建立一串很长的Map,如果提交的日志字段值与Map中的定义类型不符,Filebeat提交日志会失败。这时需要查看Filebeat本身的日志,确认哪个字段与预定义的不符。

懒人建议:使用EFK时,先看看Filebeat向ES Index Map哪些字段,直接使用预定义的字段。

4. Kibana中查询日志

在Kibana的Log页面,我们可以根据自定义字段条件查询相关业务的日志:

Kibana结构化日志