软件技术学习笔记

个人博客,记录软件技术与程序员的点点滴滴。

Golang服务常用组件—Kafka, Redis

在分布式微服务系统中,Kafka一般用于事件溯源(Event Sourcing)的存储与异步消息队列。Redis一般用于数据缓存,提升系统的吞吐量,减少持久层的压力。后端落地CQRS模式(Command-Query Responsibility Segregation )时,带上它们俩可以事半功倍。

1. Kafka

要安装Kafka,需要先安装Zookeeper,用于协调Kafka集群。在使用Kafka时,一般使用发布-订阅模式。在微服务中,采用最多的是Consumer Group做消费者,避免同一条消息被多个微服务实例处理。

Kafka的数据存储概念:Topic > Partition -> Segment。节点概念:Broker。消费概念:一个Consumer或Consumer Group在一个Partition中均包含一个Offset。一个Broker可以存储0~N个Partition,但在生产环境中不要把一个Topic的多个Partition存储到同一节点(负载不均衡)。副本集是针对Partition的,Leader负责所有的读写操作,Follower只确保集群高可用。

Kafka

在GoLang中,可以使用kafka-go来读写消息。在生产消息时,不需要指定Partition,客户端会自动计算负载均衡的Partition。在消费消息时,为了避免消费失败,使用FetchMessage -> Process -> CommitMessages的工作流程。要求消费消息时幂等性,支持异常时重复消费。

// producer.go

package main

import (
    "context"
    "log"
    "os"
    "strconv"

    kafka "github.com/segmentio/kafka-go"
)

var kafkaBrokerURL = "kafka:9092"
var w *kafka.Writer

func initMQ() {
    url := os.Getenv("KAFKA_BROKER_URL")

    if url != "" {
        kafkaBrokerURL = url
    }

    // make a writer that produces to 'topic-user-creating', using the least-bytes distribution
    w = kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{kafkaBrokerURL},
        Topic:    "topic-user-creating",
        Balancer: &kafka.LeastBytes{},
    })

    if w == nil {
        log.Println("ERROR    connecting to MQS 'topic-user-creating': FAILED")
    } else {
        log.Println("INFO    connecting to MQS 'topic-user-creating': OK")
    }
}

func closeMQ() {
    if w != nil {
        w.Close()
    }
}

func publishUserCreatingMsg(userInfo *UserInfo) {
    value, err := json.Marshal(userInfo)

    if err != nil {
        return
    }

    err = w.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte(strconv.Itoa(userInfo.ID)),
            Value: value,
        },
    )

    if err != nil {
        log.Println("ERROR    writting message to 'topic-user-creating': ", err)
    } else {
        log.Println("INFO    writting message to 'topic-user-creating': OK")
    }
}
// consumer_group.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    kafka "github.com/segmentio/kafka-go"
)

// UserCreatingHandler  calling on user creating
type UserCreatingHandler interface {
    OnCreating(userInfo *UserInfo) error
}

var kafkaBrokerURL = "kafka:9092"
var userCreatingHandler UserCreatingHandler

func setUserCreatingHandler(handler UserCreatingHandler) {
    userCreatingHandler = handler
}

func initMQ() {
    url := os.Getenv("KAFKA_BROKER_URL")

    if url != "" {
        kafkaBrokerURL = url
    }

    go readMQLoop()
}

func readMQLoop() {
    // make a new reader that consumes from 'user-creating-topic'
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{kafkaBrokerURL},
        GroupID:        "consumer-group-product",
        Topic:          "topic-user-creating",
        MinBytes:       10e3, // 10KB
        MaxBytes:       10e6, // 10MB
        MaxWait:        time.Second,
        CommitInterval: time.Second, // flushes commits to Kafka every second
    })

    if r == nil {
        log.Println("ERROR    connecting to MQS 'topic-user-creating': FAILED")
    } else {
        log.Println("INFO    connecting to MQS 'topic-user-creating': OK")
    }

    ctx := context.Background()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            fmt.Println("INFO   MQ reading loop exit")
            break
        }

        fmt.Printf("INFO   message at topic/partition/offset %v/%v/%v: %s = %s\n",
            m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))

        if userCreatingHandler != nil {
            var userInfo UserInfo
            json.Unmarshal(m.Value, &userInfo)
            userCreatingHandler.OnCreating(&userInfo)
        }

        r.CommitMessages(ctx, m)
    }

    r.Close()
}

2. Redis

Redis是轻巧的内存数据库,以Key-Value的形式存储。可以命令式使用,或使用其他语言包装的Redis客户端。在高并发、大流量的系统中,可以使用Redis做分布式缓存。

Redis支持虚拟槽分区方法,可以使用CRC16 % 16383计算槽位号,再求出分区节点。Redis支持三种集群方案:客户端分区、Proxy分区(如Twemproxy)、查询路由(每个节点都混合了路由与客户端分区)。

在GoLang中,我们使用官方提供的客户端go-redis。根据不同的场景,分别使用redis.NewClient()、redis.NewFailoverClient()、redis.NewClusterClient()。如下代码中,我们使用客户端连接单个Redis服务器:

// user_cache.go
package main

import (
    "fmt"
    "log"
    "os"

    redis "github.com/go-redis/redis"
    jsoniter "github.com/json-iterator/go"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

var redisServerURL = "redis:6379"
var client *redis.Client

func initCache() {
    url := os.Getenv("REDIS_SERVER_URL")

    if url != "" {
        redisServerURL = url
    }

    client = redis.NewClient(&redis.Options{
        Addr:     redisServerURL,
        Password: "", // no password set
        DB:       0,  // use default DB
    })

    log.Println("INFO    Redis client init")
}

func closeCache() {
    client.Close()
}

func getUserCacheKey(userID int) string {
    return fmt.Sprintf("user_%d", userID)
}

func getUserInfoFromCache(userID int) *UserInfo {
    key := getUserCacheKey(userID)
    val, err := client.Get(key).Result()

    if err != nil {
        log.Println("WARN    redis can't get: ", key, ", ", err)
        return nil
    }

    log.Printf("INFO    redis got: %s, %s\n", key, val)

    var userInfo UserInfo
    err = json.Unmarshal([]byte(val), &userInfo)

    if err != nil {
        return nil
    }

    return &userInfo
}

func setUserInfoToCache(userInfo *UserInfo) {
    key := getUserCacheKey(userInfo.ID)
    value, err := json.Marshal(userInfo)

    if err != nil {
        log.Println("WARN    redis set failed: ", key, ", ", err)
        return
    }

    err = client.Set(key, string(value), 0).Err()

    if err != nil {
        log.Printf("ERROR    Fail to store user %d info to redis\n", userInfo.ID)
        return
    }

    log.Printf("INFO    Saving user %d info to redis OK\n", userInfo.ID)
}