Golang服务常用组件—Kafka, Redis
在分布式微服务系统中,Kafka一般用于事件溯源(Event Sourcing)的存储与异步消息队列。Redis一般用于数据缓存,提升系统的吞吐量,减少持久层的压力。后端落地CQRS模式(Command-Query Responsibility Segregation )时,带上它们俩可以事半功倍。
1. Kafka
要安装Kafka,需要先安装Zookeeper,用于协调Kafka集群。在使用Kafka时,一般使用发布-订阅模式。在微服务中,采用最多的是Consumer Group做消费者,避免同一条消息被多个微服务实例处理。
Kafka的数据存储概念:Topic > Partition -> Segment。节点概念:Broker。消费概念:一个Consumer或Consumer Group在一个Partition中均包含一个Offset。一个Broker可以存储0~N个Partition,但在生产环境中不要把一个Topic的多个Partition存储到同一节点(负载不均衡)。副本集是针对Partition的,Leader负责所有的读写操作,Follower只确保集群高可用。
在GoLang中,可以使用kafka-go来读写消息。在生产消息时,不需要指定Partition,客户端会自动计算负载均衡的Partition。在消费消息时,为了避免消费失败,使用FetchMessage -> Process -> CommitMessages的工作流程。要求消费消息时幂等性,支持异常时重复消费。
// producer.go
package main
import (
"context"
"log"
"os"
"strconv"
kafka "github.com/segmentio/kafka-go"
)
var kafkaBrokerURL = "kafka:9092"
var w *kafka.Writer
func initMQ() {
url := os.Getenv("KAFKA_BROKER_URL")
if url != "" {
kafkaBrokerURL = url
}
// make a writer that produces to 'topic-user-creating', using the least-bytes distribution
w = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaBrokerURL},
Topic: "topic-user-creating",
Balancer: &kafka.LeastBytes{},
})
if w == nil {
log.Println("ERROR connecting to MQS 'topic-user-creating': FAILED")
} else {
log.Println("INFO connecting to MQS 'topic-user-creating': OK")
}
}
func closeMQ() {
if w != nil {
w.Close()
}
}
func publishUserCreatingMsg(userInfo *UserInfo) {
value, err := json.Marshal(userInfo)
if err != nil {
return
}
err = w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(strconv.Itoa(userInfo.ID)),
Value: value,
},
)
if err != nil {
log.Println("ERROR writting message to 'topic-user-creating': ", err)
} else {
log.Println("INFO writting message to 'topic-user-creating': OK")
}
}
// consumer_group.go
package main
import (
"context"
"fmt"
"log"
"os"
"time"
kafka "github.com/segmentio/kafka-go"
)
// UserCreatingHandler calling on user creating
type UserCreatingHandler interface {
OnCreating(userInfo *UserInfo) error
}
var kafkaBrokerURL = "kafka:9092"
var userCreatingHandler UserCreatingHandler
func setUserCreatingHandler(handler UserCreatingHandler) {
userCreatingHandler = handler
}
func initMQ() {
url := os.Getenv("KAFKA_BROKER_URL")
if url != "" {
kafkaBrokerURL = url
}
go readMQLoop()
}
func readMQLoop() {
// make a new reader that consumes from 'user-creating-topic'
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBrokerURL},
GroupID: "consumer-group-product",
Topic: "topic-user-creating",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
MaxWait: time.Second,
CommitInterval: time.Second, // flushes commits to Kafka every second
})
if r == nil {
log.Println("ERROR connecting to MQS 'topic-user-creating': FAILED")
} else {
log.Println("INFO connecting to MQS 'topic-user-creating': OK")
}
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
fmt.Println("INFO MQ reading loop exit")
break
}
fmt.Printf("INFO message at topic/partition/offset %v/%v/%v: %s = %s\n",
m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if userCreatingHandler != nil {
var userInfo UserInfo
json.Unmarshal(m.Value, &userInfo)
userCreatingHandler.OnCreating(&userInfo)
}
r.CommitMessages(ctx, m)
}
r.Close()
}
2. Redis
Redis是轻巧的内存数据库,以Key-Value的形式存储。可以命令式使用,或使用其他语言包装的Redis客户端。在高并发、大流量的系统中,可以使用Redis做分布式缓存。
Redis支持虚拟槽分区方法,可以使用CRC16 % 16383
计算槽位号,再求出分区节点。Redis支持三种集群方案:客户端分区、Proxy分区(如Twemproxy)、查询路由(每个节点都混合了路由与客户端分区)。
在GoLang中,我们使用官方提供的客户端go-redis。根据不同的场景,分别使用redis.NewClient()、redis.NewFailoverClient()、redis.NewClusterClient()。如下代码中,我们使用客户端连接单个Redis服务器:
// user_cache.go
package main
import (
"fmt"
"log"
"os"
redis "github.com/go-redis/redis"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var redisServerURL = "redis:6379"
var client *redis.Client
func initCache() {
url := os.Getenv("REDIS_SERVER_URL")
if url != "" {
redisServerURL = url
}
client = redis.NewClient(&redis.Options{
Addr: redisServerURL,
Password: "", // no password set
DB: 0, // use default DB
})
log.Println("INFO Redis client init")
}
func closeCache() {
client.Close()
}
func getUserCacheKey(userID int) string {
return fmt.Sprintf("user_%d", userID)
}
func getUserInfoFromCache(userID int) *UserInfo {
key := getUserCacheKey(userID)
val, err := client.Get(key).Result()
if err != nil {
log.Println("WARN redis can't get: ", key, ", ", err)
return nil
}
log.Printf("INFO redis got: %s, %s\n", key, val)
var userInfo UserInfo
err = json.Unmarshal([]byte(val), &userInfo)
if err != nil {
return nil
}
return &userInfo
}
func setUserInfoToCache(userInfo *UserInfo) {
key := getUserCacheKey(userInfo.ID)
value, err := json.Marshal(userInfo)
if err != nil {
log.Println("WARN redis set failed: ", key, ", ", err)
return
}
err = client.Set(key, string(value), 0).Err()
if err != nil {
log.Printf("ERROR Fail to store user %d info to redis\n", userInfo.ID)
return
}
log.Printf("INFO Saving user %d info to redis OK\n", userInfo.ID)
}