1 介绍
1.1 定义
01.技术概述
a.基本定义
go-redis是Go语言中最流行的Redis客户端库,提供类型安全的API来操作Redis数据库,支持Redis 6.0及以上版本的所有特性。
b.官方信息
a.项目地址
GitHub仓库位于github.com/go-redis/redis,目前已有超过15k星标,是Go生态中最成熟的Redis客户端实现。
b.版本支持
当前主要版本为v8和v9,v8支持Go 1.13+,v9支持Go 1.18+并引入泛型支持,提供更好的类型安全。
02.设计理念
a.类型安全
a.强类型API
所有Redis命令都有对应的Go方法,参数和返回值都是强类型,编译期即可发现类型错误,避免运行时错误。
b.代码示例
---
// 类型安全的API示例
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 无密码
DB: 0, // 使用默认数据库
})
// 类型安全的Set操作
err := rdb.Set(ctx, "key", "value", 0).Err()
if err != nil {
panic(err)
}
// 类型安全的Get操作,返回string
val, err := rdb.Get(ctx, "key").Result()
if err != nil {
panic(err)
}
fmt.Println("key:", val)
// 类型安全的数值操作
err = rdb.Set(ctx, "counter", 0, 0).Err()
if err != nil {
panic(err)
}
// Incr返回int64类型
newVal, err := rdb.Incr(ctx, "counter").Result()
if err != nil {
panic(err)
}
fmt.Println("counter:", newVal) // 输出: counter: 1
}
---
b.Context支持
a.超时控制
所有操作都接受context.Context参数,可以方便地实现超时控制、取消操作和传递请求级别的元数据。
b.代码示例
---
// Context超时控制示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// 操作会在2秒后超时
err := rdb.Set(ctx, "key", "value", 0).Err()
if err != nil {
if err == context.DeadlineExceeded {
fmt.Println("操作超时")
} else {
fmt.Println("错误:", err)
}
return
}
// 可以随时取消操作
ctx2, cancel2 := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel2() // 取消操作
}()
err = rdb.Get(ctx2, "key").Err()
if err == context.Canceled {
fmt.Println("操作被取消")
}
}
---
c.连接池管理
内置高效的连接池实现,自动管理连接的创建、复用和释放,支持连接健康检查和自动重连机制。
03.核心特点
a.完整的Redis命令支持
支持Redis所有数据类型的操作,包括String、Hash、List、Set、Sorted Set、HyperLogLog、Geo、Stream等。
b.高级特性支持
a.Pipeline管道
支持批量命令执行,减少网络往返次数,显著提升性能。
b.事务支持
支持MULTI/EXEC事务和WATCH乐观锁机制。
c.发布订阅
完整支持Redis的Pub/Sub功能,可用于实现消息队列和实时通信。
d.Lua脚本
支持执行Lua脚本,实现原子性的复杂操作。
c.集群支持
a.单机模式
支持连接单个Redis实例。
b.哨兵模式
支持Redis Sentinel高可用方案,自动故障转移。
c.集群模式
支持Redis Cluster分片集群,自动路由和重定向。
1.2 核心概念
01.Client客户端
a.定义说明
Client是go-redis的核心对象,代表与Redis服务器的连接,所有Redis操作都通过Client实例执行。
b.客户端类型
a.普通客户端
redis.Client用于连接单个Redis实例,适用于单机部署场景。
b.集群客户端
redis.ClusterClient用于连接Redis Cluster集群,自动处理分片和重定向。
c.哨兵客户端
redis.FailoverClient用于连接Redis Sentinel,实现高可用自动故障转移。
d.环形客户端
redis.Ring用于实现一致性哈希的分片方案。
c.创建示例
---
// 不同类型客户端创建示例
package main
import (
"github.com/go-redis/redis/v8"
)
func main() {
// 1. 普通客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// 2. 集群客户端
clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"localhost:7000",
"localhost:7001",
"localhost:7002",
},
})
// 3. 哨兵客户端
failoverClient := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:26379", "localhost:26380"},
})
// 4. 环形客户端
ringClient := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"shard1": "localhost:6379",
"shard2": "localhost:6380",
},
})
defer client.Close()
defer clusterClient.Close()
defer failoverClient.Close()
defer ringClient.Close()
}
---
02.Options配置
a.基本配置
a.连接配置
包括地址Addr、密码Password、数据库DB等基��连接参数。
b.网络配置
包括网络类型Network、拨号超时DialTimeout、读写超时等网络相关参数。
b.连接池配置
a.池大小配置
PoolSize设置连接池最大连接数,MinIdleConns设置最小空闲连接数。
b.超时配置
PoolTimeout设置从连接池获取连接的超时时间,IdleTimeout设置空闲连接的超时时间。
c.配置示例
---
// 完整的Options配置示例
package main
import (
"time"
"github.com/go-redis/redis/v8"
)
func main() {
client := redis.NewClient(&redis.Options{
// 基本配置
Addr: "localhost:6379",
Password: "your_password",
DB: 0,
// 网络配置
Network: "tcp",
DialTimeout: 5 * time.Second, // 连接超时
ReadTimeout: 3 * time.Second, // 读超时
WriteTimeout: 3 * time.Second, // 写超时
// 连接池配置
PoolSize: 10, // 最大连接数
MinIdleConns: 5, // 最小空闲连接数
PoolTimeout: 4 * time.Second, // 获取连接超时
IdleTimeout: 5 * time.Minute, // 空闲连接超时
// 重试配置
MaxRetries: 3, // 最大重试次数
MinRetryBackoff: 8 * time.Millisecond,
MaxRetryBackoff: 512 * time.Millisecond,
// 其他配置
MaxConnAge: 0, // 连接最大存活时间,0表示不限制
})
defer client.Close()
}
---
03.Context上下文
a.作用说明
Context用于控制操作的生命周期,包括超时控制、取消操作、传递请求元数据等。
b.常用Context
a.Background
context.Background()用于主函数、初始化和测试,是最顶层的Context。
b.WithTimeout
context.WithTimeout()创建带超时的Context,操作超时后自动取消。
c.WithCancel
context.WithCancel()创建可取消的Context,可以主动取消操作。
d.WithDeadline
context.WithDeadline()创建带截止时间的Context,到达截止时间后自动取消。
c.使用示例
---
// Context使用示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 1. 使用Background
ctx := context.Background()
err := rdb.Set(ctx, "key1", "value1", 0).Err()
if err != nil {
fmt.Println("Set错误:", err)
}
// 2. 使用WithTimeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel2()
val, err := rdb.Get(ctx2, "key1").Result()
if err != nil {
fmt.Println("Get错误:", err)
} else {
fmt.Println("值:", val)
}
// 3. 使用WithCancel
ctx3, cancel3 := context.WithCancel(context.Background())
go func() {
time.Sleep(500 * time.Millisecond)
cancel3() // 主动取消
}()
err = rdb.Set(ctx3, "key2", "value2", 0).Err()
if err == context.Canceled {
fmt.Println("操作被取消")
}
// 4. 使用WithDeadline
deadline := time.Now().Add(2 * time.Second)
ctx4, cancel4 := context.WithDeadline(context.Background(), deadline)
defer cancel4()
err = rdb.Set(ctx4, "key3", "value3", 0).Err()
if err != nil {
fmt.Println("Set错误:", err)
}
}
---
04.Result结果处理
a.Result方法
所有命令都返回Cmd对象,调用Result()方法获取结果和错误,这是最常用的结果处理方式。
b.Err方法
只获取错误,不关心返回值时使用Err()方法,常用于Set、Del等不需要返回值的操作。
c.Val方法
只获取值,忽略错误,适用于确定不会出错的场景,但不推荐在生产环境使用。
d.处理示例
---
// 结果处理示例
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 1. 使用Result()获取值和错误
val, err := rdb.Get(ctx, "key").Result()
if err == redis.Nil {
fmt.Println("key不存在")
} else if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("值:", val)
}
// 2. 使用Err()只获取错误
err = rdb.Set(ctx, "key", "value", 0).Err()
if err != nil {
fmt.Println("Set失败:", err)
}
// 3. 使用Val()只获取值(不推荐)
val = rdb.Get(ctx, "key").Val()
fmt.Println("值:", val) // 如果出错,返回零值
// 4. 链式调用
exists, err := rdb.Exists(ctx, "key").Result()
if err != nil {
fmt.Println("Exists错误:", err)
} else {
fmt.Printf("key存在: %v\n", exists == 1)
}
// 5. 处理特殊错误
val, err = rdb.Get(ctx, "nonexistent").Result()
switch err {
case redis.Nil:
fmt.Println("键不存在")
case nil:
fmt.Println("值:", val)
default:
fmt.Println("其他错误:", err)
}
}
---
1.3 优缺点
01.优势
a.类型安全
a.编译期检查
强类型API在编译期就能发现类型错误,避免运行时的类型转换错误,提高代码可靠性。
b.代码示例
---
// 类型安全示例
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 编译期类型检查
// 正确: Incr返回int64
count, err := rdb.Incr(ctx, "counter").Result()
if err != nil {
panic(err)
}
fmt.Printf("计数器: %d (类型: %T)\n", count, count)
// 正确: Get返回string
str, err := rdb.Get(ctx, "name").Result()
if err != nil && err != redis.Nil {
panic(err)
}
fmt.Printf("名称: %s (类型: %T)\n", str, str)
// 正确: HGetAll返回map[string]string
hash, err := rdb.HGetAll(ctx, "user:1").Result()
if err != nil {
panic(err)
}
fmt.Printf("哈希: %v (类型: %T)\n", hash, hash)
}
---
b.性能优越
a.连接池优化
内置高效连接池,支持连接复用和自动扩缩容,减少连接建立开销,提升并发性能。
b.Pipeline支持
支持批量命令执行,一次网络往返可执行多个命令,显著降低网络延迟。
c.性能测试
---
// 性能对比示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 100,
})
defer rdb.Close()
// 1. 普通方式执行1000次Set
start := time.Now()
for i := 0; i < 1000; i++ {
err := rdb.Set(ctx, fmt.Sprintf("key:%d", i), i, 0).Err()
if err != nil {
panic(err)
}
}
normalDuration := time.Since(start)
fmt.Printf("普通方式: %v\n", normalDuration)
// 2. Pipeline方式执行1000次Set
start = time.Now()
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(ctx, fmt.Sprintf("pkey:%d", i), i, 0)
}
_, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
pipelineDuration := time.Since(start)
fmt.Printf("Pipeline方式: %v\n", pipelineDuration)
fmt.Printf("性能提升: %.2fx\n", float64(normalDuration)/float64(pipelineDuration))
}
---
c.功能完整
a.命令覆盖
支持Redis所有命令,包括String、Hash、List、Set、Sorted Set、HyperLogLog、Geo、Stream等所有数据结构。
b.高级特性
支持Pipeline、Transaction、Pub/Sub、Lua脚本、Cluster、Sentinel等所有Redis高级特性。
d.生态成熟
a.社区活跃
GitHub上超过15k星标,issue响应及时,版本更新频繁,文档完善。
b.广泛应用
被众多知名项目采用,经过大规模生产环境验证,稳定可靠。
02.劣势
a.学习成本
a.API复杂
命令众多,需要熟悉Redis协议和go-redis的API设计,初学者需要一定时间适应。
b.Context理解
需要理解Go的Context机制,正确处理超时和取消,否则可能导致资源泄漏。
c.学习建议
---
// 新手常见错误示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 错误1: 忘记传Context
// rdb.Get("key") // 编译错误: 缺少context参数
// 错误2: 忘记处理错误
ctx := context.Background()
val := rdb.Get(ctx, "key").Val() // 不推荐: 忽略错误
fmt.Println(val)
// 正确: 处理错误
val, err := rdb.Get(ctx, "key").Result()
if err == redis.Nil {
fmt.Println("key不存在")
} else if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("值:", val)
}
// 错误3: Context泄漏
for i := 0; i < 100; i++ {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
// 忘记调用cancel,导致资源泄漏
rdb.Get(ctx, "key")
}
// 正确: 使用defer调用cancel
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
rdb.Get(ctx, "key")
cancel() // 或使用defer cancel()
}
}
---
b.版本兼容
a.v8与v9差异
v8和v9版本API有差异,升级时需要注意兼容性,v9引入泛型后部分API签名发生变化。
b.Redis版本依赖
某些特性依赖特定Redis版本,如Stream需要Redis 5.0+,ACL需要Redis 6.0+。
c.错误处理
a.错误类型多样
需要区分redis.Nil、网络错误、超时错误等不同类型的错误,处理逻辑较复杂。
b.错误处理示例
---
// 完整的错误处理示例
package main
import (
"context"
"errors"
"fmt"
"net"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
val, err := rdb.Get(ctx, "key").Result()
// 详细的错误分类处理
if err != nil {
switch {
case err == redis.Nil:
fmt.Println("键不存在")
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("操作超时")
case errors.Is(err, context.Canceled):
fmt.Println("操作被取消")
default:
// 判断是否为网络错误
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
fmt.Println("网络超时")
} else {
fmt.Println("网络错误:", netErr)
}
} else {
fmt.Println("其他错误:", err)
}
}
return
}
fmt.Println("值:", val)
}
// 封装错误处理函数
func handleRedisError(err error) {
if err == nil {
return
}
switch {
case err == redis.Nil:
fmt.Println("[INFO] 键不存在")
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("[ERROR] 操作超时,请检查网络或增加超时时间")
case errors.Is(err, context.Canceled):
fmt.Println("[WARN] 操作被取消")
default:
fmt.Printf("[ERROR] Redis错误: %v\n", err)
}
}
---
d.内存占用
连接池会占用一定内存,在连接数较多时需要注意内存使用,合理配置PoolSize和MinIdleConns参数。
1.4 使用场景
01.缓存系统
a.应用场景
使用Redis作为应用缓存层,减轻数据库压力,提升系统响应速度,是最常见的使用场景。
b.缓存策略
a.Cache-Aside模式
应用程序先查询缓存,缓存未命中时查询数据库,然后将结果写入缓存。
b.Read-Through模式
缓存层自动从数据库加载数据,应用程序只与缓存交互。
c.Write-Through模式
写操作同时更新缓存和数据库,保证数据一致性。
c.实现示例
---
// 缓存实现示例
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
type UserCache struct {
rdb *redis.Client
}
func NewUserCache(rdb *redis.Client) *UserCache {
return &UserCache{rdb: rdb}
}
// Cache-Aside模式: 获取用户信息
func (c *UserCache) GetUser(ctx context.Context, userID int) (*User, error) {
key := fmt.Sprintf("user:%d", userID)
// 1. 先查缓存
val, err := c.rdb.Get(ctx, key).Result()
if err == nil {
// 缓存命中,反序列化返回
var user User
if err := json.Unmarshal([]byte(val), &user); err != nil {
return nil, err
}
fmt.Println("缓存命中")
return &user, nil
}
if err != redis.Nil {
return nil, err
}
// 2. 缓存未命中,查询数据库
fmt.Println("缓存未命中,查询数据库")
user := c.getUserFromDB(userID)
if user == nil {
return nil, fmt.Errorf("用户不存在")
}
// 3. 将结果写入缓存
data, _ := json.Marshal(user)
c.rdb.Set(ctx, key, data, 10*time.Minute)
return user, nil
}
// 模拟数据库查询
func (c *UserCache) getUserFromDB(userID int) *User {
time.Sleep(100 * time.Millisecond) // 模拟数据库延迟
return &User{ID: userID, Name: "张三", Age: 25}
}
// Write-Through模式: 更新用户信息
func (c *UserCache) UpdateUser(ctx context.Context, user *User) error {
// 1. 更新数据库
if err := c.updateUserInDB(user); err != nil {
return err
}
// 2. 更新缓存
key := fmt.Sprintf("user:%d", user.ID)
data, _ := json.Marshal(user)
return c.rdb.Set(ctx, key, data, 10*time.Minute).Err()
}
func (c *UserCache) updateUserInDB(user *User) error {
// 模拟数据库更新
return nil
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
cache := NewUserCache(rdb)
// 第一次查询,缓存未命中
user, _ := cache.GetUser(ctx, 1)
fmt.Printf("用户: %+v\n", user)
// 第二次查询,缓存命中
user, _ = cache.GetUser(ctx, 1)
fmt.Printf("用户: %+v\n", user)
}
---
02.会话管理
a.应用场景
在分布式系统中,使用Redis存储用户会话信息,实现跨服务器的会话共享。
b.会话特点
a.分布式共享
多个应用服务器共享同一个Redis实例,实现会话数据的统一管理。
b.自动过期
利用Redis的TTL机制,自动清理过期会话,无需手动维护。
c.实现示例
---
// 会话管理示例
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
type Session struct {
UserID int `json:"user_id"`
Username string `json:"username"`
LoginAt time.Time `json:"login_at"`
}
type SessionManager struct {
rdb *redis.Client
ttl time.Duration
}
func NewSessionManager(rdb *redis.Client, ttl time.Duration) *SessionManager {
return &SessionManager{
rdb: rdb,
ttl: ttl,
}
}
// 创建会话
func (sm *SessionManager) CreateSession(ctx context.Context, userID int, username string) (string, error) {
// 生成会话ID
sessionID := uuid.New().String()
// 创建会话数据
session := Session{
UserID: userID,
Username: username,
LoginAt: time.Now(),
}
// 序列化并存储
data, err := json.Marshal(session)
if err != nil {
return "", err
}
key := fmt.Sprintf("session:%s", sessionID)
err = sm.rdb.Set(ctx, key, data, sm.ttl).Err()
if err != nil {
return "", err
}
return sessionID, nil
}
// 获取会话
func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*Session, error) {
key := fmt.Sprintf("session:%s", sessionID)
val, err := sm.rdb.Get(ctx, key).Result()
if err == redis.Nil {
return nil, fmt.Errorf("会话不存在或已过期")
}
if err != nil {
return nil, err
}
var session Session
if err := json.Unmarshal([]byte(val), &session); err != nil {
return nil, err
}
return &session, nil
}
// 刷新会话过期时间
func (sm *SessionManager) RefreshSession(ctx context.Context, sessionID string) error {
key := fmt.Sprintf("session:%s", sessionID)
return sm.rdb.Expire(ctx, key, sm.ttl).Err()
}
// 删除会话(登出)
func (sm *SessionManager) DeleteSession(ctx context.Context, sessionID string) error {
key := fmt.Sprintf("session:%s", sessionID)
return sm.rdb.Del(ctx, key).Err()
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
sm := NewSessionManager(rdb, 30*time.Minute)
// 用户登录,创建会话
sessionID, _ := sm.CreateSession(ctx, 1001, "zhangsan")
fmt.Println("会话ID:", sessionID)
// 获取会话信息
session, _ := sm.GetSession(ctx, sessionID)
fmt.Printf("会话信息: %+v\n", session)
// 刷新会话
sm.RefreshSession(ctx, sessionID)
// 用户登出,删除会话
sm.DeleteSession(ctx, sessionID)
}
---
03.分布式锁
a.应用场景
在分布式系统中,使用Redis实现分布式锁,保证多个进程或服务器之间的互斥访问。
b.锁的特性
a.互斥性
同一时刻只有一个客户端能持有锁。
b.防死锁
设置锁的过期时间,即使持锁进程崩溃也能自动释放。
c.可重入
同一个客户端可以多次获取同一把锁。
c.实现示例
---
// 分布式锁示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
type DistributedLock struct {
rdb *redis.Client
key string
value string
expiration time.Duration
}
func NewDistributedLock(rdb *redis.Client, key string, expiration time.Duration) *DistributedLock {
return &DistributedLock{
rdb: rdb,
key: key,
value: uuid.New().String(), // 唯一标识,防止误删
expiration: expiration,
}
}
// 获取锁
func (dl *DistributedLock) Lock(ctx context.Context) (bool, error) {
// 使用SET NX EX命令,原子性地设置键值和过期时间
result, err := dl.rdb.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
return result, err
}
// 释放锁(使用Lua脚本保证原子性)
func (dl *DistributedLock) Unlock(ctx context.Context) error {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
_, err := dl.rdb.Eval(ctx, script, []string{dl.key}, dl.value).Result()
return err
}
// 尝试获取锁,带重试
func (dl *DistributedLock) TryLock(ctx context.Context, retryTimes int, retryDelay time.Duration) (bool, error) {
for i := 0; i < retryTimes; i++ {
locked, err := dl.Lock(ctx)
if err != nil {
return false, err
}
if locked {
return true, nil
}
time.Sleep(retryDelay)
}
return false, nil
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 创建分布式锁
lock := NewDistributedLock(rdb, "order:lock:12345", 10*time.Second)
// 尝试获取锁
locked, err := lock.TryLock(ctx, 3, 100*time.Millisecond)
if err != nil {
panic(err)
}
if !locked {
fmt.Println("获取锁失败")
return
}
fmt.Println("获取锁成功,执行业务逻辑")
// 执行业务逻辑
time.Sleep(2 * time.Second)
// 释放锁
if err := lock.Unlock(ctx); err != nil {
fmt.Println("释放锁失败:", err)
} else {
fmt.Println("释放锁成功")
}
}
---
04.消息队列
a.应用场景
使用Redis的List或Stream数据结构实现轻量级消息队列,适用于异步任务处理、事件通知等场景。
b.实现方式
a.List实现
使用LPUSH和BRPOP命令实现简单的生产者消费者模式。
b.Stream实现
使用Redis 5.0+的Stream数据结构,支持消费组、消息确认等高级特性。
c.实现示例
---
// 消息队列示例(基于List)
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type Message struct {
ID string `json:"id"`
Content string `json:"content"`
Time time.Time `json:"time"`
}
type MessageQueue struct {
rdb *redis.Client
queue string
}
func NewMessageQueue(rdb *redis.Client, queue string) *MessageQueue {
return &MessageQueue{
rdb: rdb,
queue: queue,
}
}
// 生产者: 发送消息
func (mq *MessageQueue) Produce(ctx context.Context, content string) error {
msg := Message{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
Content: content,
Time: time.Now(),
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
return mq.rdb.LPush(ctx, mq.queue, data).Err()
}
// 消费者: 接收消息(阻塞)
func (mq *MessageQueue) Consume(ctx context.Context, timeout time.Duration) (*Message, error) {
result, err := mq.rdb.BRPop(ctx, timeout, mq.queue).Result()
if err == redis.Nil {
return nil, nil // 超时,无消息
}
if err != nil {
return nil, err
}
var msg Message
if err := json.Unmarshal([]byte(result[1]), &msg); err != nil {
return nil, err
}
return &msg, nil
}
// 获取队列长度
func (mq *MessageQueue) Length(ctx context.Context) (int64, error) {
return mq.rdb.LLen(ctx, mq.queue).Result()
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
mq := NewMessageQueue(rdb, "task:queue")
// 生产者: 发送3条消息
for i := 1; i <= 3; i++ {
content := fmt.Sprintf("任务 %d", i)
if err := mq.Produce(ctx, content); err != nil {
panic(err)
}
fmt.Println("发送消息:", content)
}
// 查看队列长度
length, _ := mq.Length(ctx)
fmt.Printf("队列长度: %d\n", length)
// 消费者: 接收消息
for i := 0; i < 3; i++ {
msg, err := mq.Consume(ctx, 5*time.Second)
if err != nil {
panic(err)
}
if msg != nil {
fmt.Printf("接收消息: %+v\n", msg)
}
}
}
---
05.限流器
a.应用场景
使用Redis实现分布式限流器,控制API访问频率,防止系统过载和恶意攻击。
b.限流算法
a.固定窗口
在固定时间窗口内限制请求数量。
b.滑动窗口
使用有序集合实现更精确的滑动窗口限流。
c.令牌桶
使用Lua脚本实现令牌桶算法。
c.实现示例
---
// 限流器示例(滑动窗口)
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RateLimiter struct {
rdb *redis.Client
key string
limit int // 限制次数
window time.Duration // 时间窗口
}
func NewRateLimiter(rdb *redis.Client, key string, limit int, window time.Duration) *RateLimiter {
return &RateLimiter{
rdb: rdb,
key: key,
limit: limit,
window: window,
}
}
// 检查是否允许访问(滑动窗口算法)
func (rl *RateLimiter) Allow(ctx context.Context) (bool, error) {
now := time.Now().UnixNano()
windowStart := now - rl.window.Nanoseconds()
pipe := rl.rdb.Pipeline()
// 1. 删除窗口外的记录
pipe.ZRemRangeByScore(ctx, rl.key, "0", fmt.Sprintf("%d", windowStart))
// 2. 统计当前窗口内的请求数
pipe.ZCard(ctx, rl.key)
// 3. 添加当前请求
pipe.ZAdd(ctx, rl.key, &redis.Z{
Score: float64(now),
Member: now,
})
// 4. 设置过期时间
pipe.Expire(ctx, rl.key, rl.window)
cmds, err := pipe.Exec(ctx)
if err != nil {
return false, err
}
// 获取当前窗口内的请求数
count := cmds[1].(*redis.IntCmd).Val()
return count < int64(rl.limit), nil
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 创建限流器: 10秒内最多5次请求
limiter := NewRateLimiter(rdb, "api:limit:user:1001", 5, 10*time.Second)
// 模拟10次请求
for i := 1; i <= 10; i++ {
allowed, err := limiter.Allow(ctx)
if err != nil {
panic(err)
}
if allowed {
fmt.Printf("请求 %d: 允许\n", i)
} else {
fmt.Printf("请求 %d: 拒绝(超过限流)\n", i)
}
time.Sleep(500 * time.Millisecond)
}
}
---
1.5 架构原理
01.客户端架构
a.分层设计
go-redis采用分层架构,从底层到上层依次为网络层、协议层、命令层和API层,每层职责清晰。
b.核心组件
a.��接池
管理TCP连接的创建、复用和销毁,提供高效的连接管理机制。
b.命令队列
缓冲待发送的命令,支持批量发送和Pipeline模式。
c.结果解析器
解析Redis服务器返回的RESP协议数据,转换为Go类型。
c.架构图示
---
// 架构层次示意
+------------------+
| API Layer | <- 用户调用的方法(Get/Set/HGet等)
+------------------+
| Command Layer | <- 命令封装和参数处理
+------------------+
| Protocol Layer | <- RESP协议编解码
+------------------+
| Network Layer | <- TCP连接和I/O操作
+------------------+
| Connection Pool | <- 连接池管理
+------------------+
---
02.连接池原理
a.池化机制
a.连接创建
当池中无可用连接且未达到最大连接数时,创建新连接;达到最大连接数时,等待或返回错误。
b.连接复用
命令执行完毕后,连接返回池中供其他请求复用,避免频繁创建销毁连接的开销。
c.连接淘汰
空闲连接超过IdleTimeout后被关闭,连接存活时间超过MaxConnAge后被淘汰。
b.连接状态
a.空闲状态
连接在池中等待被使用。
b.使用状态
连接被某个请求占用,正在执行命令。
c.关闭状态
连接出现错误或超时,被标记为关闭,等待清理。
c.实现原理
---
// 连接池工作原理示意代码
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 配置连接池参数
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 10, // 最大连接数
MinIdleConns: 5, // 最小空闲连接数
PoolTimeout: 4 * time.Second, // 获取连接超时
IdleTimeout: 5 * time.Minute, // 空闲连接超时
MaxConnAge: 0, // 连接最大存活时间,0表示不限制
})
defer rdb.Close()
// 模拟并发请求,观察连接池行为
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 执行命令时自动从池中获取连接
err := rdb.Set(ctx, fmt.Sprintf("key:%d", id), id, 0).Err()
if err != nil {
fmt.Printf("协程 %d 错误: %v\n", id, err)
return
}
// 命令执行完毕,连接自动返回池中
fmt.Printf("协程 %d 完成\n", id)
}(i)
}
wg.Wait()
// 查看连接池统计信息
stats := rdb.PoolStats()
fmt.Printf("\n连接池统计:\n")
fmt.Printf(" 总连接数: %d\n", stats.TotalConns)
fmt.Printf(" 空闲连接数: %d\n", stats.IdleConns)
fmt.Printf(" 过期连接数: %d\n", stats.StaleConns)
fmt.Printf(" 命中次数: %d\n", stats.Hits)
fmt.Printf(" 未命中次数: %d\n", stats.Misses)
fmt.Printf(" 超时次数: %d\n", stats.Timeouts)
}
---
03.RESP协议
a.协议说明
Redis使用RESP(REdis Serialization Protocol)协议进行客户端和服务器之间的通信,这是一个简单高效的文本协议。
b.数据类型
a.简单字符串
以加号开头,如+OK表示成功响应。
b.错误信息
以减号开头,如-ERR表示错误。
c.整数
以冒号开头,如:1000表示整数1000。
d.批量字符串
以美元符号开头,后跟字符串长度,如$5\r\nhello表示字符串hello。
e.数组
以星号开头,后跟元素个数,如*3表示包含3个元素的数组。
c.协议示例
---
// RESP协议示例
// 客户端发送: SET key value
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n
// 服务器响应: +OK
+OK\r\n
// 客户端发送: GET key
*2\r\n
$3\r\n
GET\r\n
$3\r\n
key\r\n
// 服务器响应: $5\r\nvalue\r\n
$5\r\n
value\r\n
// 客户端发送: LRANGE list 0 2
*4\r\n
$6\r\n
LRANGE\r\n
$4\r\n
list\r\n
$1\r\n
0\r\n
$1\r\n
2\r\n
// 服务器响应: 数组
*3\r\n
$5\r\n
item1\r\n
$5\r\n
item2\r\n
$5\r\n
item3\r\n
---
d.协议处理
---
// go-redis如何处理RESP协议
package main
import (
"bufio"
"fmt"
"net"
"strings"
)
// 简化的RESP协议编码示例
func encodeCommand(args ...string) string {
var sb strings.Builder
// 数组标记
sb.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
// 每个参数
for _, arg := range args {
sb.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
}
return sb.String()
}
// 简化的RESP协议解码示例
func decodeSimpleString(reader *bufio.Reader) (string, error) {
line, err := reader.ReadString('\n')
if err != nil {
return "", err
}
// 去掉\r\n
line = strings.TrimSuffix(line, "\r\n")
switch line[0] {
case '+': // 简单字符串
return line[1:], nil
case '-': // 错误
return "", fmt.Errorf("redis error: %s", line[1:])
case ':': // 整数
return line[1:], nil
case '$': // 批量字符串
// 读取长度后的实际内容
content, err := reader.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimSuffix(content, "\r\n"), nil
default:
return "", fmt.Errorf("unknown type: %c", line[0])
}
}
func main() {
// 编码示例
cmd := encodeCommand("SET", "key", "value")
fmt.Println("编码后的命令:")
fmt.Printf("%q\n", cmd)
// 模拟发送和接收
conn, err := net.Dial("tcp", "localhost:6379")
if err != nil {
fmt.Println("连接失败:", err)
return
}
defer conn.Close()
// 发送命令
_, err = conn.Write([]byte(cmd))
if err != nil {
fmt.Println("发送失败:", err)
return
}
// 接收响应
reader := bufio.NewReader(conn)
response, err := decodeSimpleString(reader)
if err != nil {
fmt.Println("解码失败:", err)
return
}
fmt.Println("服务器响应:", response)
}
---
04.命令执行流程
a.执行步骤
a.命令构建
用户调用API方法,go-redis将其转换为Redis命令和参数。
b.获取连接
从连接池中获取一个可用连接,如果池中无连接则等待或创建新连接。
c.发送命令
将命令按RESP协议编码后通过TCP连接发送到Redis服务器。
d.接收响应
从TCP连接读取服务器响应,按RESP协议解码。
e.返回结果
将解码后的结果转换为Go类型,返回给用户。
f.归还连接
命令执行完毕,将连接归还到连接池供其他请求使用。
b.流程图示
---
// 命令执行流程
用户调用 rdb.Get(ctx, "key")
|
v
[1] 构建命令: GET key
|
v
[2] 从连接池获取连接
|
v
[3] 编码为RESP: *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
|
v
[4] 通过TCP发送到Redis服务器
|
v
[5] 等待服务器响应
|
v
[6] 接收RESP响应: $5\r\nvalue\r\n
|
v
[7] 解码为Go字符串: "value"
|
v
[8] 归还连接到连接池
|
v
返回结果给用户: ("value", nil)
---
c.执行示例
---
// 命令执行流程跟踪示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// 自定义Hook,跟踪命令执行
type TracingHook struct{}
func (h TracingHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
fmt.Printf("[开始] 执行命令: %s\n", cmd.String())
return ctx, nil
}
func (h TracingHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
fmt.Printf("[完成] 命令结果: %v\n", cmd.Err())
return nil
}
func (h TracingHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
fmt.Printf("[开始] Pipeline包含 %d 个命令\n", len(cmds))
return ctx, nil
}
func (h TracingHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
fmt.Printf("[完成] Pipeline执行完毕\n")
return nil
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
// 添加Hook跟踪命令执行
rdb.AddHook(TracingHook{})
// 执行命令,观察执行流程
fmt.Println("=== 执行SET命令 ===")
err := rdb.Set(ctx, "trace:key", "trace:value", 10*time.Second).Err()
if err != nil {
panic(err)
}
fmt.Println("\n=== 执行GET命令 ===")
val, err := rdb.Get(ctx, "trace:key").Result()
if err != nil {
panic(err)
}
fmt.Printf("获取到的值: %s\n", val)
fmt.Println("\n=== 执行Pipeline ===")
pipe := rdb.Pipeline()
pipe.Set(ctx, "pipe:key1", "value1", 0)
pipe.Set(ctx, "pipe:key2", "value2", 0)
pipe.Get(ctx, "pipe:key1")
_, err = pipe.Exec(ctx)
if err != nil {
panic(err)
}
}
---
05.错误处理机制
a.错误类型
a.连接错误
网络断开、连接超时、连接被拒绝等网络层面的错误。
b.协议错误
RESP协议解析失败、数据格式错误等协议层面的错误。
c.命令错误
Redis服务器返回的错误,如键不存在、类型错误、权限错误等。
d.超时错误
Context超时、读写超时、连接池获取超时等各种超时错误。
b.错误处理策略
a.自动重试
对于网络临时故障,go-redis会自动重试,重试次数和间隔可配置。
b.连接重建
连接出现错误时,自动关闭并从池中移除,下次请求时创建新连接。
c.错误传播
将错误信息完整地传递给用户,便于问题定位和处理。
c.错误处理示例
---
// 错误处理机制示例
package main
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 配置重试策略
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
MaxRetries: 3, // 最大重试次数
MinRetryBackoff: 8 * time.Millisecond, // 最小重试间隔
MaxRetryBackoff: 512 * time.Millisecond, // 最大重试间隔
DialTimeout: 5 * time.Second, // 连接超时
ReadTimeout: 3 * time.Second, // 读超时
WriteTimeout: 3 * time.Second, // 写超时
})
defer rdb.Close()
// 测试各种错误场景
testErrors(ctx, rdb)
}
func testErrors(ctx context.Context, rdb *redis.Client) {
// 1. 测试键不存在错误
fmt.Println("=== 测试键不存在 ===")
val, err := rdb.Get(ctx, "nonexistent:key").Result()
if err == redis.Nil {
fmt.Println("键不存在(这是正常情况)")
} else if err != nil {
fmt.Println("其他错误:", err)
} else {
fmt.Println("值:", val)
}
// 2. 测试类型错误
fmt.Println("\n=== 测试类型错误 ===")
rdb.Set(ctx, "string:key", "value", 0)
_, err = rdb.LPush(ctx, "string:key", "item").Result()
if err != nil {
fmt.Println("类型错误:", err)
}
// 3. 测试超时错误
fmt.Println("\n=== 测试超时错误 ===")
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Nanosecond)
defer cancel()
time.Sleep(10 * time.Millisecond) // 确保超时
err = rdb.Set(timeoutCtx, "key", "value", 0).Err()
if errors.Is(err, context.DeadlineExceeded) {
fmt.Println("Context超时")
}
// 4. 测试网络错误
fmt.Println("\n=== 测试网络错误 ===")
badClient := redis.NewClient(&redis.Options{
Addr: "localhost:9999", // 错误的端口
})
defer badClient.Close()
err = badClient.Ping(ctx).Err()
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) {
fmt.Println("网络错误:", netErr)
} else {
fmt.Println("其他错误:", err)
}
}
// 5. 错误分类处理函数
fmt.Println("\n=== 错误分类处理 ===")
handleRedisError(redis.Nil)
handleRedisError(context.DeadlineExceeded)
handleRedisError(fmt.Errorf("unknown error"))
}
func handleRedisError(err error) {
if err == nil {
return
}
switch {
case err == redis.Nil:
fmt.Println("[INFO] 键不存在")
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("[ERROR] 操作超时")
case errors.Is(err, context.Canceled):
fmt.Println("[WARN] 操作被取消")
default:
var netErr net.Error
if errors.As(err, &netErr) {
fmt.Printf("[ERROR] 网络错误: %v\n", netErr)
} else {
fmt.Printf("[ERROR] Redis错误: %v\n", err)
}
}
}
---
1.6 连接管理
01.连接创建
a.创建时机
a.首次请求
当第一次执行Redis命令时,如果连接池为空,会创建初始连接。
b.池不足时
当所有连接都在使用中且未达到PoolSize限制时,创建新连接。
c.预热连接
可以通过MinIdleConns配置预先创建一定数量的空闲连接,避免首次请求延迟。
b.创建过程
a.建立TCP连接
使用net.Dial建立到Redis服务器的TCP连接。
b.认证
如果配置了Password,发送AUTH命令进行身份验证。
c.选择数据库
如果配置了DB,发送SELECT命令切换到指定数据库。
d.健康检查
发送PING命令确认连接可用。
c.创建示例
---
// 连接创建过程示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建客户端时不会立即建立连接
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10,
MinIdleConns: 3, // 预创建3个空闲连接
DialTimeout: 5 * time.Second,
})
defer rdb.Close()
fmt.Println("客户端已创建,但连接尚未建立")
// 查看初始连接池状态
stats := rdb.PoolStats()
fmt.Printf("初始连接数: %d\n", stats.TotalConns)
// 第一次执行命令时才会创建连接
err := rdb.Ping(ctx).Err()
if err != nil {
panic(err)
}
fmt.Println("首次命令执行完毕")
// 再次查看连接池状态
stats = rdb.PoolStats()
fmt.Printf("当前总连接数: %d\n", stats.TotalConns)
fmt.Printf("当前空闲连接数: %d\n", stats.IdleConns)
// 手动触发连接预热
warmupConnections(ctx, rdb, 5)
stats = rdb.PoolStats()
fmt.Printf("预热后总连接数: %d\n", stats.TotalConns)
fmt.Printf("预热后空闲连接数: %d\n", stats.IdleConns)
}
// 连接预热函数
func warmupConnections(ctx context.Context, rdb *redis.Client, count int) {
fmt.Printf("开始预热 %d 个连接...\n", count)
pipe := rdb.Pipeline()
for i := 0; i < count; i++ {
pipe.Ping(ctx)
}
_, err := pipe.Exec(ctx)
if err != nil {
fmt.Println("预热失败:", err)
} else {
fmt.Println("预热完成")
}
}
---
02.连接复用
a.复用机制
a.命令完成后归还
每个命令执行完毕后,连接自动归还到连接池,供后续请求使用。
b.状态重置
归还前会重置连接状态,确保下次使用时是干净的状态。
c.健康检查
归还时检查连接是否仍然有效,无效连接会被丢弃。
b.复用优势
a.减少开销
避免频繁创建和销毁TCP连接,减少系统调用和网络握手开销。
b.提升性能
连接复用可以显著提升高并发场景下的性能,降低延��。
c.资源节约
减少服务器端的连接数,节约系统资源。
c.复用示例
---
// 连接复用示例
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 5, // 最多5个连接
})
defer rdb.Close()
// 模拟100个并发请求,观察连接复用
var wg sync.WaitGroup
startTime := time.Now()
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 每个请求执行一个SET命令
key := fmt.Sprintf("reuse:key:%d", id)
err := rdb.Set(ctx, key, id, 0).Err()
if err != nil {
fmt.Printf("请求 %d 失败: %v\n", id, err)
return
}
// 命令执行完毕,连接自动归还到池中
}(i)
// 控制并发速度
if i%10 == 0 {
time.Sleep(10 * time.Millisecond)
}
}
wg.Wait()
duration := time.Since(startTime)
// 查看连接池统计
stats := rdb.PoolStats()
fmt.Printf("\n=== 连接复用统计 ===\n")
fmt.Printf("总请求数: 100\n")
fmt.Printf("总耗时: %v\n", duration)
fmt.Printf("总连接数: %d (最大5个)\n", stats.TotalConns)
fmt.Printf("连接命中次数: %d\n", stats.Hits)
fmt.Printf("连接未命中次数: %d\n", stats.Misses)
fmt.Printf("连接复用率: %.2f\n",
stats.TotalConns,
stats.IdleConns,
float64(stats.Hits)/float64(stats.Hits+stats.Misses)*100)
case <-stopChan:
fmt.Println("\n收到停止信号")
return
}
}
}
---
05.连接监控
a.监控指标
a.连接数指标
总连接数TotalConns、空闲连接数IdleConns、过期连接数StaleConns。
b.性能指标
命中次数Hits、未命中次数Misses、超时次数Timeouts。
c.延迟指标
连接建立延迟、命令执行延迟、网络往返延迟。
b.监控方法
a.PoolStats
调用rdb.PoolStats()获取连接池统计信息。
b.Hook机制
使用Hook拦截命令执行,记录性能指标。
c.日志记录
记录连接创建、释放、错误等事件。
c.监控示例
---
// 连接监控示例
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// 监控Hook
type MonitorHook struct {
mu sync.Mutex
commandCount int
totalDuration time.Duration
errors int
}
func (h *MonitorHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
// 记录开始时间
ctx = context.WithValue(ctx, "start_time", time.Now())
return ctx, nil
}
func (h *MonitorHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
h.mu.Lock()
defer h.mu.Unlock()
// 计算执行时间
startTime := ctx.Value("start_time").(time.Time)
duration := time.Since(startTime)
h.commandCount++
h.totalDuration += duration
if cmd.Err() != nil && cmd.Err() != redis.Nil {
h.errors++
}
return nil
}
func (h *MonitorHook) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
ctx = context.WithValue(ctx, "start_time", time.Now())
return ctx, nil
}
func (h *MonitorHook) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
h.mu.Lock()
defer h.mu.Unlock()
startTime := ctx.Value("start_time").(time.Time)
duration := time.Since(startTime)
h.commandCount += len(cmds)
h.totalDuration += duration
for _, cmd := range cmds {
if cmd.Err() != nil && cmd.Err() != redis.Nil {
h.errors++
}
}
return nil
}
func (h *MonitorHook) GetStats() (int, time.Duration, int, time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
avgDuration := time.Duration(0)
if h.commandCount > 0 {
avgDuration = h.totalDuration / time.Duration(h.commandCount)
}
return h.commandCount, h.totalDuration, h.errors, avgDuration
}
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 10,
})
defer rdb.Close()
// 添加监控Hook
monitor := &MonitorHook{}
rdb.AddHook(monitor)
// 启动监控报告
stopChan := make(chan bool)
go monitorReport(rdb, monitor, 3*time.Second, stopChan)
// 模拟负载
fmt.Println("开始模拟负载...")
simulateLoad(ctx, rdb, 100)
// 等待一段时间观察监控数据
time.Sleep(10 * time.Second)
stopChan <- true
time.Sleep(1 * time.Second)
// 最终报告
fmt.Println("\n=== 最终监控报告 ===")
printFinalReport(rdb, monitor)
}
func simulateLoad(ctx context.Context, rdb *redis.Client, count int) {
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("monitor:key:%d", id)
rdb.Set(ctx, key, id, 0)
rdb.Get(ctx, key)
time.Sleep(100 * time.Millisecond)
}(i)
}
wg.Wait()
}
func monitorReport(rdb *redis.Client, monitor *MonitorHook, interval time.Duration, stopChan chan bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("\n=== 监控报告 ===")
// 连接池统计
stats := rdb.PoolStats()
fmt.Printf("连接池:\n")
fmt.Printf(" 总连接数: %d\n", stats.TotalConns)
fmt.Printf(" 空闲连接数: %d\n", stats.IdleConns)
fmt.Printf(" 过期连接数: %d\n", stats.StaleConns)
fmt.Printf(" 命中次数: %d\n", stats.Hits)
fmt.Printf(" 未命中次数: %d\n", stats.Misses)
fmt.Printf(" 超时次数: %d\n", stats.Timeouts)
if stats.Hits+stats.Misses > 0 {
hitRate := float64(stats.Hits) / float64(stats.Hits+stats.Misses) * 100
fmt.Printf(" 命中率: %.2f\n", errorRate)
}
case <-stopChan:
return
}
}
}
func printFinalReport(rdb *redis.Client, monitor *MonitorHook) {
stats := rdb.PoolStats()
cmdCount, totalDur, errors, avgDur := monitor.GetStats()
fmt.Printf("连接池最终状态:\n")
fmt.Printf(" 总连接数: %d\n", stats.TotalConns)
fmt.Printf(" 总命中次数: %d\n", stats.Hits)
fmt.Printf(" 总未命中次数: %d\n", stats.Misses)
fmt.Printf("\n命令执行统计:\n")
fmt.Printf(" 总命令数: %d\n", cmdCount)
fmt.Printf(" 总耗时: %v\n", totalDur)
fmt.Printf(" 平均延迟: %v\n", avgDur)
fmt.Printf(" 总错误数: %d\n", errors)
if cmdCount > 0 {
qps := float64(cmdCount) / totalDur.Seconds()
fmt.Printf(" QPS: %.2f\n", qps)
}
}
---
1.7 客户端特性
01.并发安全
a.线程安全设计
go-redis的Client是并发安全的,可以在多个goroutine中同时使用同一个Client实例,无需额外加锁。
b.安全机制
a.连接池隔离
每个请求从连接池获取独立的连接,不同请求之间不会相互干扰。
b.状态隔离
每个命令都有独立的状态和上下文,不会出现状态混乱。
c.原子操作
内部使用原子操作和互斥锁保护共享资源,确保并发安全。
c.并发示例
---
// 并发安全示例
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建一个Client实例
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 20,
})
defer rdb.Close()
// 多个goroutine并发使用同一个Client
var wg sync.WaitGroup
concurrency := 100
startTime := time.Now()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 每个goroutine执行多个操作
for j := 0; j < 10; j++ {
key := fmt.Sprintf("concurrent:key:%d:%d", id, j)
// Set操作
err := rdb.Set(ctx, key, fmt.Sprintf("value-%d-%d", id, j), 0).Err()
if err != nil {
fmt.Printf("Set错误: %v\n", err)
return
}
// Get操作
val, err := rdb.Get(ctx, key).Result()
if err != nil {
fmt.Printf("Get错误: %v\n", err)
return
}
// Incr操作
counter := fmt.Sprintf("concurrent:counter:%d", id)
_, err = rdb.Incr(ctx, counter).Result()
if err != nil {
fmt.Printf("Incr错误: %v\n", err)
return
}
_ = val // 使用val避免编译警告
}
}(i)
}
wg.Wait()
duration := time.Since(startTime)
fmt.Printf("并发测试完成:\n")
fmt.Printf(" 并发数: %d\n", concurrency)
fmt.Printf(" 每个goroutine操作数: 10\n")
fmt.Printf(" 总操作数: %d\n", concurrency*10*3)
fmt.Printf(" 总耗时: %v\n", duration)
fmt.Printf(" QPS: %.2f\n", float64(concurrency*10*3)/duration.Seconds())
// 验证计数器
for i := 0; i < 5; i++ {
counter := fmt.Sprintf("concurrent:counter:%d", i)
count, _ := rdb.Get(ctx, counter).Int()
fmt.Printf(" 计数器 %d: %d\n", i, count)
}
}
---
02.自动重连
a.重连机制
a.连接断开检测
当检测到连接断开时,自动标记连接为无效,从连接池中移除。
b.自动重试
命令执行失败时,根据MaxRetries配置自动重试,重试时会使用新连接。
c.指数退避
重试间隔采用指数退避策略,从MinRetryBackoff逐渐增加到MaxRetryBackoff。
b.重连配置
a.MaxRetries
最大重试次数,默认为3次。
b.MinRetryBackoff
最小重试间隔,默认为8毫秒。
c.MaxRetryBackoff
最大重试间隔,默认为512毫秒。
c.重连示例
---
// 自动重连示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 配置重连参数
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
MaxRetries: 5, // 最多重试5次
MinRetryBackoff: 100 * time.Millisecond, // 最小重试间隔100ms
MaxRetryBackoff: 3 * time.Second, // 最大重试间隔3s
DialTimeout: 5 * time.Second, // 连接超时5s
})
defer rdb.Close()
// 测试正常连接
fmt.Println("=== 测试正常连接 ===")
err := rdb.Ping(ctx).Err()
if err != nil {
fmt.Println("连接失败:", err)
} else {
fmt.Println("连接成功")
}
// 模拟网络抖动场景
fmt.Println("\n=== 模拟网络抖动 ===")
for i := 0; i < 10; i++ {
start := time.Now()
err := rdb.Set(ctx, fmt.Sprintf("retry:key:%d", i), i, 0).Err()
duration := time.Since(start)
if err != nil {
fmt.Printf("操作 %d 失败 (耗时: %v): %v\n", i, duration, err)
} else {
fmt.Printf("操作 %d 成功 (耗时: %v)\n", i, duration)
}
time.Sleep(500 * time.Millisecond)
}
// 自定义重试逻辑
fmt.Println("\n=== 自定义重试逻辑 ===")
err = retryOperation(ctx, rdb, "custom:key", "custom:value", 3)
if err != nil {
fmt.Println("最终失败:", err)
} else {
fmt.Println("最终成功")
}
}
// 自定义重试函数
func retryOperation(ctx context.Context, rdb *redis.Client, key, value string, maxRetries int) error {
var err error
backoff := 100 * time.Millisecond
for i := 0; i <= maxRetries; i++ {
if i > 0 {
fmt.Printf("重试 %d/%d (等待 %v)...\n", i, maxRetries, backoff)
time.Sleep(backoff)
backoff *= 2 // 指数退避
if backoff > 3*time.Second {
backoff = 3 * time.Second
}
}
err = rdb.Set(ctx, key, value, 0).Err()
if err == nil {
if i > 0 {
fmt.Printf("重试成功 (第 %d 次尝试)\n", i+1)
}
return nil
}
fmt.Printf("尝试 %d 失败: %v\n", i+1, err)
}
return fmt.Errorf("达到最大重试次数: %w", err)
}
---
03.命令超时
a.超时类型
a.连接超时
DialTimeout控制建立连接的超时时间。
b.读超时
ReadTimeout控制读取响应的超时时间。
c.写超时
WriteTimeout控制发送命令的超时时间。
d.Context超时
通过context.WithTimeout控制整个操作的超时时间。
b.超时优先级
Context超时优先级最高,会覆盖其他超时设置,建议使用Context统一管理超时。
c.超时示例
---
// 命令超时示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
// 配置各种超时参数
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DialTimeout: 5 * time.Second, // 连接超时
ReadTimeout: 3 * time.Second, // 读超时
WriteTimeout: 3 * time.Second, // 写超时
})
defer rdb.Close()
// 1. 使用默认超时
fmt.Println("=== 使用默认超时 ===")
ctx := context.Background()
err := rdb.Set(ctx, "timeout:key1", "value1", 0).Err()
if err != nil {
fmt.Println("Set失败:", err)
} else {
fmt.Println("Set成功")
}
// 2. 使用Context超时
fmt.Println("\n=== 使用Context超时(2秒) ===")
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
start := time.Now()
err = rdb.Set(ctx2, "timeout:key2", "value2", 0).Err()
duration := time.Since(start)
if err != nil {
fmt.Printf("Set失败 (耗时: %v): %v\n", duration, err)
} else {
fmt.Printf("Set成功 (耗时: %v)\n", duration)
}
// 3. 测试极短超时
fmt.Println("\n=== 测试极短超时(1纳秒) ===")
ctx3, cancel3 := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel3()
time.Sleep(10 * time.Millisecond) // 确保超时
start = time.Now()
err = rdb.Set(ctx3, "timeout:key3", "value3", 0).Err()
duration = time.Since(start)
if err != nil {
fmt.Printf("Set失败 (耗时: %v): %v\n", duration, err)
}
// 4. 批量操作超时
fmt.Println("\n=== 批量操作超时 ===")
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()
pipe := rdb.Pipeline()
for i := 0; i < 1000; i++ {
pipe.Set(ctx4, fmt.Sprintf("timeout:batch:%d", i), i, 0)
}
start = time.Now()
_, err = pipe.Exec(ctx4)
duration = time.Since(start)
if err != nil {
fmt.Printf("Pipeline失败 (耗时: %v): %v\n", duration, err)
} else {
fmt.Printf("Pipeline成功 (耗时: %v, 1000个命令)\n", duration)
}
// 5. 超时监控
fmt.Println("\n=== 超时监控 ===")
monitorTimeouts(rdb, 10)
}
func monitorTimeouts(rdb *redis.Client, count int) {
timeouts := []time.Duration{
10 * time.Millisecond,
50 * time.Millisecond,
100 * time.Millisecond,
500 * time.Millisecond,
1 * time.Second,
}
for _, timeout := range timeouts {
successCount := 0
failCount := 0
for i := 0; i < count; i++ {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
err := rdb.Ping(ctx).Err()
cancel()
if err == nil {
successCount++
} else {
failCount++
}
}
fmt.Printf("超时 %v: 成功 %d, 失败 %d, 成功率 %.1f\n",
stock.Symbol, stock.Price, stock.Change)
continue
}
fmt.Printf(" 原始内容: %s\n", msg.Payload)
}
}()
time.Sleep(100 * time.Millisecond)
// 发布新闻消息
news := NewsMessage{
ID: "news-001",
Title: "Go语言新版本发布",
Content: "Go 1.22带来了许多新特性和性能改进",
Category: "technology",
Author: "Tech Reporter",
Timestamp: time.Now(),
}
newsJSON, _ := json.Marshal(news)
result1, _ := rdb.Publish(ctx, "news", newsJSON).Result()
fmt.Printf("发布新闻消息 (订阅者数: %d)\n", result1)
// 发布股票更新
stock := StockUpdate{
Symbol: "AAPL",
Price: 182.52,
Change: 2.3,
Volume: 45000000,
Time: time.Now().Unix(),
}
stockJSON, _ := json.Marshal(stock)
result2, _ := rdb.Publish(ctx, "stocks", stockJSON).Result()
fmt.Printf("发布股票消息 (订阅者数: %d)\n", result2)
time.Sleep(500 * time.Millisecond)
}
func highFrequencyPublishExample(ctx context.Context, rdb *redis.Client) {
// 创建订阅者
sub := rdb.Subscribe(ctx, "realtime:data")
defer sub.Close()
messageCount := 0
go func() {
for {
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
return
}
messageCount++
if messageCount <= 10 || messageCount%100 == 0 {
fmt.Printf("实时数据 #%d: %s\n", messageCount, msg.Payload)
}
}
}()
time.Sleep(100 * time.Millisecond)
// 高频发布数据
fmt.Println("开始高频数据发布...")
startTime := time.Now()
totalMessages := 500
for i := 1; i <= totalMessages; i++ {
data := fmt.Sprintf("传感器数据 #%d - 温度: %.1f°C, 湿度: %.1f)\n", stats.IdleConns,
float64(stats.IdleConns)/float64(stats.TotalConns)*100)
fmt.Printf(" 过期连接: %d\n", stats.StaleConns)
fmt.Printf(" 命中次数: %d\n", stats.Hits)
fmt.Printf(" 未命中次数: %d\n", stats.Misses)
if stats.Hits+stats.Misses > 0 {
hitRate := float64(stats.Hits) / float64(stats.Hits+stats.Misses) * 100
fmt.Printf(" 连接池命中率: %.1f\n", stats.ErrorRate)
fmt.Printf("\n系统资源:\n")
fmt.Printf(" CPU使用率: %.1f\n", successRate)
}
// 系统资源
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("系统资源:\n")
fmt.Printf(" 内存使用: %d MB\n", m.Alloc/1024/1024)
fmt.Printf(" 协程数: %d\n", runtime.NumGoroutine())
fmt.Printf(" GC次数: %d\n", m.NumGC)
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
---
4 Context与并发
4.1 Context使用
01.Context基础概念
a.Context定义
Go语言中的context.Context是一个接口,用于跨API边界传递请求范围的值、取消信号和超时信息。
b.核心功能
a.传递值
使用context.WithValue()在请求链中传递键值对数据。
b.取消信号
使用context.WithCancel()创建可取消的Context。
c.超时控制
使用context.WithTimeout()和context.WithDeadline()设置超时。
d.传递链
支持Context的传递链,形成父子关系。
c.go-redis中的Context
a.必需参数
go-redis v8要求所有命令都传入Context参数。
b.超时传播
Redis操作会继承Context的超时设置。
c.取消传播
Context取消会中止正在执行的Redis命令。
d.基础示例
---
// Context基础使用示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
fmt.Println("=== Context基础使用示例 ===")
// 1. 使用Background Context
fmt.Println("\n--- Background Context ---")
backgroundContextExample(rdb)
// 2. 使用TODO Context
fmt.Println("\n--- TODO Context ---")
todoContextExample(rdb)
// 3. Context值传递
fmt.Println("\n--- Context值传递 ---")
contextValueExample(rdb)
// 4. Context传递链
fmt.Println("\n--- Context传递链 ---")
contextChainExample(rdb)
}
func backgroundContextExample(rdb *redis.Client) {
// context.Background()是所有Context树的根节点
// 通常用于main函数、初始化、测试等场景
ctx := context.Background()
// 使用Background Context执行Redis操作
err := rdb.Set(ctx, "background:test", "value", 0).Err()
if err != nil {
fmt.Printf("设置失败: %v\n", err)
return
}
value, err := rdb.Get(ctx, "background:test").Result()
if err != nil {
fmt.Printf("获取失败: %v\n", err)
return
}
fmt.Printf("Background Context操作成功: %s\n", value)
}
func todoContextExample(rdb *redis.Client) {
// context.TODO()表示尚未确定使用何种Context
// 通常在开发阶段使用,后续需要替换为具体的Context
ctx := context.TODO()
// 使用TODO Context执行Redis操作
err := rdb.Set(ctx, "todo:test", "value", 0).Err()
if err != nil {
fmt.Printf("设置失败: %v\n", err)
return
}
fmt.Println("TODO Context操作成功")
}
func contextValueExample(rdb *redis.Client) {
// 定义Context键的类型,避免键冲突
type contextKey string
const userIDKey contextKey = "userID"
const requestIDKey contextKey = "requestID"
// 创建带有值的Context
ctx := context.WithValue(context.Background(), userIDKey, "user123")
ctx = context.WithValue(ctx, requestIDKey, "req-456")
// 从Context中获取值
userID := ctx.Value(userIDKey).(string)
requestID := ctx.Value(requestIDKey).(string)
fmt.Printf("从Context获取值: userID=%s, requestID=%s\n", userID, requestID)
// 使用这些值构建Redis键
key := fmt.Sprintf("user:%s:data", userID)
err := rdb.Set(ctx, key, fmt.Sprintf("data_for_%s", requestID), 0).Err()
if err != nil {
fmt.Printf("设置失败: %v\n", err)
return
}
fmt.Printf("使用Context值构建的键: %s\n", key)
}
func contextChainExample(rdb *redis.Client) {
// 创建Context传递链
parentCtx := context.Background()
// 第一层: 添加请求ID
ctx1 := context.WithValue(parentCtx, "requestID", "req-789")
// 第二层: 添加用户信息
ctx2 := context.WithValue(ctx1, "userID", "user456")
// 第三层: 添加操作类型
ctx3 := context.WithValue(ctx2, "operation", "write")
// 在最深层Context中执行操作
requestID := ctx3.Value("requestID").(string)
userID := ctx3.Value("userID").(string)
operation := ctx3.Value("operation").(string)
key := fmt.Sprintf("%s:%s:temp", userID, operation)
err := rdb.Set(ctx3, key, requestID, 0).Err()
if err != nil {
fmt.Printf("设置失败: %v\n", err)
return
}
fmt.Printf("Context链操作成功: requestID=%s, userID=%s, operation=%s\n",
requestID, userID, operation)
// 验证所有值都存在
fmt.Printf("ctx1中的requestID: %s\n", ctx1.Value("requestID"))
fmt.Printf("ctx2中的userID: %s\n", ctx2.Value("userID"))
fmt.Printf("ctx3中的operation: %s\n", ctx3.Value("operation"))
}
---
02.Context创建方式
a.Background和TODO
a.Background
空的Context,不能被取消,没有值,没有超时。
b.TODO
用于尚未确定使用何种Context的情况。
c.使用场景
Background用于程序入口点,TODO用于开发阶段。
b.WithValue
a.功能说明
创建携带键值对的Context。
b.使用规范
键应该是不可比较的类型,避免键冲突。
c.最佳实践
定义自定义类型作为Context键。
c.WithCancel
a.功能说明
创建可取消的Context,返回Context和取消函数。
b.取消机制
调用取消函数会向Context发送取消信号。
c.资源清理
用于管理goroutine的生命周期。
d.WithTimeout
a.功能说明
创建带超时的Context,超时后自动取消。
b.超时传播
超时信号会传播到所有子Context。
c.使用场景
控制操作的最大执行时间。
e.WithDeadline
a.功能说明
创建带截止时间的Context。
b.与Timeout的区别
Timeout是相对时间,Deadline是绝对时间。
c.应用场景
基于具体时间点的操作控制。
f.创建方式示例
---
// Context创建方式示例
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// 定义Context键的类型,避免键冲突
type contextKey string
const (
traceIDKey contextKey = "traceID"
userIDKey contextKey = "userID"
operationID contextKey = "operationID"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
fmt.Println("=== Context创建方式示例 ===")
// 1. WithValue示例
fmt.Println("\n--- WithValue创建 ---")
withValueExample(rdb)
// 2. WithCancel示例
fmt.Println("\n--- WithCancel创建 ---")
withCancelExample(rdb)
// 3. WithTimeout示例
fmt.Println("\n--- WithTimeout创建 ---")
withTimeoutExample(rdb)
// 4. WithDeadline示例
fmt.Println("\n--- WithDeadline创建 ---")
withDeadlineExample(rdb)
// 5. 组合使用示例
fmt.Println("\n--- 组合使用 ---")
combinedContextExample(rdb)
}
func withValueExample(rdb *redis.Client) {
// 创建带有请求追踪信息的Context
ctx := context.WithValue(context.Background(), traceIDKey, "trace-123")
ctx = context.WithValue(ctx, userIDKey, "user-456")
ctx = context.WithValue(ctx, operationID, "op-789")
// 构建操作日志键
traceID := ctx.Value(traceIDKey).(string)
userID := ctx.Value(userIDKey).(string)
operationID := ctx.Value(operationID).(string)
logKey := fmt.Sprintf("log:%s:%s:%s", traceID, userID, operationID)
// 记录操作日志
err := rdb.LPush(ctx, logKey, "操作开始").Err()
if err != nil {
fmt.Printf("记录日志失败: %v\n", err)
return
}
fmt.Printf("WithValue Context操作成功: logKey=%s\n", logKey)
// 模拟操作过程
time.Sleep(100 * time.Millisecond)
// 记录操作完成
err = rdb.LPush(ctx, logKey, "操作完成").Err()
if err != nil {
fmt.Printf("记录完成日志失败: %v\n", err)
return
}
// 获取日志列表
logs, err := rdb.LRange(ctx, logKey, 0, -1).Result()
if err != nil {
fmt.Printf("获取日志失败: %v\n", err)
return
}
fmt.Printf("操作日志: %v\n", logs)
}
func withCancelExample(rdb *redis.Client) {
// 创建可取消的Context
ctx, cancel := context.WithCancel(context.Background())
// 启动一个goroutine执行长时间操作
done := make(chan bool, 1)
go func() {
defer func() { done <- true }()
key := "cancel:test"
// 模拟长时间写入操作
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
fmt.Printf("操作被取消: %v\n", ctx.Err())
return
default:
// 执行Redis操作
err := rdb.Set(ctx, key, i, 0).Err()
if err != nil {
fmt.Printf("设置 %d 失败: %v\n", i, err)
return
}
fmt.Printf("写入数据: %d\n", i)
time.Sleep(200 * time.Millisecond)
}
}
}()
// 等待1秒后取消操作
time.Sleep(1 * time.Second)
fmt.Println("触发取消操作...")
cancel()
// 等待goroutine完成
<-done
// 检查最终结果
value, err := rdb.Get(context.Background(), "cancel:test").Int()
if err != nil {
fmt.Printf("获取结果失败: %v\n", err)
return
}
fmt.Printf("最终结果: %d\n", value)
}
func withTimeoutExample(rdb *redis.Client) {
// 创建带3秒超时的Context
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 启动可能超时的操作
resultChan := make(chan string, 1)
errorChan := make(chan error, 1)
go func() {
// 模拟复杂的数据处理
key := "timeout:test"
// 阶段1: 数据准备
err := rdb.Set(ctx, key, "preparing", 0).Err()
if err != nil {
errorChan <- fmt.Errorf("数据准备失败: %v", err)
return
}
select {
case <-ctx.Done():
errorChan <- fmt.Errorf("在数据准备阶段超时: %v", ctx.Err())
return
default:
}
time.Sleep(1 * time.Second) // 模拟处理时间
// 阶段2: 数据处理
err = rdb.Set(ctx, key, "processing", 0).Err()
if err != nil {
errorChan <- fmt.Errorf("数据处理失败: %v", err)
return
}
select {
case <-ctx.Done():
errorChan <- fmt.Errorf("在数据处理阶段超时: %v", ctx.Err())
return
default:
}
time.Sleep(2 * time.Second) // 模拟处理时间
// 阶段3: 数据完成
err = rdb.Set(ctx, key, "completed", 0).Err()
if err != nil {
errorChan <- fmt.Errorf("数据完成失败: %v", err)
return
}
resultChan <- "操作成功完成"
}()
// 等待结果或超时
select {
case result := <-resultChan:
fmt.Printf("操作结果: %s\n", result)
value, err := rdb.Get(context.Background(), "timeout:test").Result()
if err == nil {
fmt.Printf("最终值: %s\n", value)
}
case err := <-errorChan:
fmt.Printf("操作失败: %v\n", err)
case <-ctx.Done():
fmt.Printf("操作超时: %v\n", ctx.Err())
}
}
func withDeadlineExample(rdb *redis.Client) {
// 设置具体的截止时间: 当前时间 + 2秒
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
fmt.Printf("设置截止时间: %v\n", deadline)
// 监控截止时间
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Printf("截止时间到达: %v\n", ctx.Err())
return
case <-ticker.C:
remaining := time.Until(deadline)
if remaining > 0 {
fmt.Printf("剩余时间: %v\n", remaining)
}
}
}
}()
// 执行需要截止时间的操作
key := "deadline:test"
// 模拟分步操作
steps := []string{"初始化", "数据加载", "处理", "保存结果"}
for i, step := range steps {
select {
case <-ctx.Done():
fmt.Printf("在步骤 '%s' 时截止时间到达\n", step)
return
default:
err := rdb.HSet(ctx, key, step, fmt.Sprintf("step_%d", i)).Err()
if err != nil {
fmt.Printf("步骤 %s 失败: %v\n", step, err)
return
}
fmt.Printf("完成步骤: %s\n", step)
time.Sleep(700 * time.Millisecond) // 每步需要0.7秒
}
}
fmt.Println("所有步骤完成!")
// 检查结果
data, err := rdb.HGetAll(ctx, key).Result()
if err != nil {
fmt.Printf("获取结果失败: %v\n", err)
return
}
fmt.Printf("操作结果: %v\n", data)
}
func combinedContextExample(rdb *redis.Client) {
// 组合使用多种Context创建方式
// 1. 首先创建带有值的Context
baseCtx := context.WithValue(context.Background(), traceIDKey, "combined-trace-123")
baseCtx = context.WithValue(baseCtx, userIDKey, "combined-user-456")
// 2. 基于值Context创建超时Context
timeoutCtx, cancel1 := context.WithTimeout(baseCtx, 3*time.Second)
defer cancel1()
// 3. 基于超时Context创建可取消Context
cancelCtx, cancel2 := context.WithCancel(timeoutCtx)
defer cancel2()
// 4. 基于可取消Context创建带截止时间的Context
deadline := time.Now().Add(4 * time.Second)
finalCtx, cancel3 := context.WithDeadline(cancelCtx, deadline)
defer cancel3()
fmt.Printf("组合Context创建完成\n")
fmt.Printf("traceID: %s\n", finalCtx.Value(traceIDKey))
fmt.Printf("userID: %s\n", finalCtx.Value(userIDKey))
fmt.Printf("超时: %v\n", finalCtx.Value(context.DeadlineExceededKey{}))
// 使用组合Context执行操作
key := fmt.Sprintf("combined:%s:%s",
finalCtx.Value(userIDKey),
finalCtx.Value(traceIDKey))
err := rdb.Set(finalCtx, key, "combined_operation_success", 0).Err()
if err != nil {
fmt.Printf("组合操作失败: %v\n", err)
return
}
fmt.Printf("组合Context操作成功: key=%s\n", key)
// 监控Context状态
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-finalCtx.Done():
fmt.Printf("Context结束: %v\n", finalCtx.Err())
return
case <-ticker.C:
fmt.Printf("Context仍然活跃\n")
}
}
}()
// 模拟操作
time.Sleep(2 * time.Second)
fmt.Println("组合Context示例完成")
}
---
03.Context最佳实践
a.Context传递原则
a.不存储Context
Context不应该作为结构体字段存储。
b.作为第一个参数
Context应该作为函数的第一个参数。
c.传递链不中断
即使函数不使用Context,也应该传递给调用的函数。
d.可空性处理
函数应该能处理nil Context的情况。
b.键值对使用规范
a.键的类型安全
使用自定义类型定义Context键。
b.避免键冲突
不同包应该使用不同的键类型。
c.值的数据类型
Context值应该是不可变和线程安全的。
c.取消机制使用
a.及时调用cancel
使用WithCancel后应该及时调用cancel函数。
b.重复调用安全
cancel函数可以多次调用,是安全的。
c.defer保证
使用defer确保cancel函数被调用。
d.超时设置建议
a.合理设置超时
根据操作复杂度和网络环境设置合适的超时时间。
b.分层超时
不同层级可以设置不同的超时时间。
c.超时传递
超时Context会自动传播到子Context。
e.最佳实践示例
---
// Context最佳实践示例
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// 服务层Context使用最佳实践
type UserService struct {
rdb *redis.Client
}
// 定义Context键类型
type contextKey string
const (
requestIDKey contextKey = "requestID"
userIDKey contextKey = "userID"
traceIDKey contextKey = "traceID"
)
func NewUserService(rdb *redis.Client) *UserService {
return &UserService{rdb: rdb}
}
// 好的实践: Context作为第一个参数
func (s *UserService) GetUserProfile(ctx context.Context, userID string) (*UserProfile, error) {
// 1. 检查Context是否已取消
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("context已取消: %w", err)
}
// 2. 从Context中提取请求追踪信息
requestID := s.getRequestID(ctx)
traceID := s.getTraceID(ctx)
fmt.Printf("处理用户配置文件 - requestID: %s, traceID: %s, userID: %s\n",
requestID, traceID, userID)
// 3. 创建子Context用于子操作
userCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
// 4. 执行Redis操作
profile, err := s.getUserProfileFromRedis(userCtx, userID)
if err != nil {
return nil, fmt.Errorf("获取用户配置文件失败: %w", err)
}
return profile, nil
}
// 请求处理函数 - Context使用的完整示例
func (s *UserService) ProcessRequest(ctx context.Context, req *UserRequest) (*UserResponse, error) {
// 1. 从请求中提取信息并设置到Context
ctx = context.WithValue(ctx, requestIDKey, req.RequestID)
ctx = context.WithValue(ctx, userIDKey, req.UserID)
ctx = context.WithValue(ctx, traceIDKey, req.TraceID)
// 2. 创建带超时的请求处理Context
requestCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// 3. 并发处理多个子任务
var wg sync.WaitGroup
var profile *UserProfile
var preferences *UserPreferences
var profileErr, prefErr error
// 获取用户配置文件
wg.Add(1)
go func() {
defer wg.Done()
profile, profileErr = s.GetUserProfile(requestCtx, req.UserID)
}()
// 获取用户偏好设置
wg.Add(1)
go func() {
defer wg.Done()
preferences, prefErr = s.getUserPreferences(requestCtx, req.UserID)
}()
// 等待所有子任务完成或Context取消
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// 所有任务完成
if profileErr != nil {
return nil, fmt.Errorf("获取配置文件失败: %w", profileErr)
}
if prefErr != nil {
return nil, fmt.Errorf("获取偏好设置失败: %w", prefErr)
}
return &UserResponse{
Profile: profile,
Preferences: preferences,
}, nil
case <-requestCtx.Done():
// 请求超时或取消
return nil, fmt.Errorf("请求处理超时或取消: %w", requestCtx.Err())
}
}
// Context安全的辅助函数
func (s *UserService) getRequestID(ctx context.Context) string {
if requestID, ok := ctx.Value(requestIDKey).(string); ok {
return requestID
}
return "unknown"
}
func (s *UserService) getTraceID(ctx context.Context) string {
if traceID, ok := ctx.Value(traceIDKey).(string); ok {
return traceID
}
return "unknown"
}
// 复杂操作的Context管理
func (s *UserService) BatchProcessUsers(ctx context.Context, userIDs []string) error {
// 1. 创建批量处理专用的Context
batchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 2. 添加批量处理标识
batchCtx = context.WithValue(batchCtx, "batchID", fmt.Sprintf("batch_%d", time.Now().Unix()))
fmt.Printf("开始批量处理 %d 个用户\n", len(userIDs))
// 3. 分批处理以避免内存压力
batchSize := 50
for i := 0; i < len(userIDs); i += batchSize {
end := i + batchSize
if end > len(userIDs) {
end = len(userIDs)
}
// 为每个批次创建子Context
batchItemCtx, itemCancel := context.WithTimeout(batchCtx, 5*time.Second)
batch := userIDs[i:end]
err := s.processBatch(batchItemCtx, batch)
itemCancel()
if err != nil {
return fmt.Errorf("处理批次 %d-%d 失败: %w", i, end-1, err)
}
// 检查主Context是否被取消
if err := batchCtx.Err(); err != nil {
return fmt.Errorf("批量处理被取消: %w", err)
}
fmt.Printf("完成批次 %d-%d\n", i, end-1)
}
return nil
}
// Context错误处理示例
func (s *UserService) HandleContextErrors(ctx context.Context) error {
// 检查不同类型的Context错误
err := ctx.Err()
if err == nil {
return nil
}
switch {
case errors.Is(err, context.Canceled):
fmt.Println("操作被明确取消")
return fmt.Errorf("操作被取消: %w", err)
case errors.Is(err, context.DeadlineExceeded):
fmt.Println("操作超过截止时间")
return fmt.Errorf("操作超时: %w", err)
default:
fmt.Printf("未知Context错误: %v\n", err)
return fmt.Errorf("Context错误: %w", err)
}
}
// 实际业务方法实现
func (s *UserService) getUserProfileFromRedis(ctx context.Context, userID string) (*UserProfile, error) {
key := fmt.Sprintf("user:profile:%s", userID)
data, err := s.rdb.HGetAll(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return nil, fmt.Errorf("用户不存在: %s", userID)
}
return nil, err
}
profile := &UserProfile{
UserID: userID,
Name: data["name"],
Email: data["email"],
Age: data["age"],
}
return profile, nil
}
func (s *UserService) getUserPreferences(ctx context.Context, userID string) (*UserPreferences, error) {
key := fmt.Sprintf("user:preferences:%s", userID)
data, err := s.rdb.HGetAll(ctx, key).Result()
if err != nil {
if err == redis.Nil {
return &UserPreferences{}, nil
}
return nil, err
}
preferences := &UserPreferences{
Language: data["language"],
Theme: data["theme"],
TimeZone: data["timezone"],
}
return preferences, nil
}
func (s *UserService) processBatch(ctx context.Context, userIDs []string) error {
for _, userID := range userIDs {
// 检查Context状态
if err := ctx.Err(); err != nil {
return err
}
// 处理单个用户
key := fmt.Sprintf("user:processed:%s", userID)
err := s.rdb.Set(ctx, key, time.Now().Format(time.RFC3339), 0).Err()
if err != nil {
return err
}
}
return nil
}
// 数据结构定义
type UserRequest struct {
RequestID string
UserID string
TraceID string
}
type UserResponse struct {
Profile *UserProfile
Preferences *UserPreferences
}
type UserProfile struct {
UserID string
Name string
Email string
Age string
}
type UserPreferences struct {
Language string
Theme string
TimeZone string
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
fmt.Println("=== Context最佳实践示例 ===")
// 创建服务
userService := NewUserService(rdb)
// 1. 基本请求处理
fmt.Println("\n--- 基本请求处理 ---")
basicRequestExample(userService)
// 2. 超时处理
fmt.Println("\n--- 超时处理 ---")
timeoutExample(userService)
// 3. 取消处理
fmt.Println("\n--- 取消处理 ---")
cancelExample(userService)
// 4. 批量处理
fmt.Println("\n--- 批量处理 ---")
batchProcessExample(userService)
// 5. 错误处理
fmt.Println("\n--- 错误处理 ---")
errorHandlingExample(userService)
}
func basicRequestExample(userService *UserService) {
// 创建请求
req := &UserRequest{
RequestID: "req-001",
UserID: "user-123",
TraceID: "trace-001",
}
// 使用Background Context处理请求
ctx := context.Background()
response, err := userService.ProcessRequest(ctx, req)
if err != nil {
fmt.Printf("请求处理失败: %v\n", err)
return
}
fmt.Printf("请求处理成功: %+v\n", response)
}
func timeoutExample(userService *UserService) {
// 创建带超时的Context
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// 创建请求
req := &UserRequest{
RequestID: "timeout-req",
UserID: "user-456",
TraceID: "timeout-trace",
}
// 处理请求(可能超时)
response, err := userService.ProcessRequest(ctx, req)
if err != nil {
fmt.Printf("请求超时: %v\n", err)
return
}
fmt.Printf("超时请求处理成功: %+v\n", response)
}
func cancelExample(userService *UserService) {
// 创建可取消的Context
ctx, cancel := context.WithCancel(context.Background())
// 创建请求
req := &UserRequest{
RequestID: "cancel-req",
UserID: "user-789",
TraceID: "cancel-trace",
}
// 在后台处理请求
done := make(chan struct{})
go func() {
defer close(done)
_, err := userService.ProcessRequest(ctx, req)
if err != nil {
fmt.Printf("请求被取消: %v\n", err)
return
}
fmt.Println("请求处理完成")
}()
// 500ms后取消请求
time.Sleep(500 * time.Millisecond)
fmt.Println("触发取消...")
cancel()
// 等待处理完成
<-done
}
func batchProcessExample(userService *UserService) {
// 创建批量处理的用户ID列表
userIDs := make([]string, 100)
for i := 0; i < 100; i++ {
userIDs[i] = fmt.Sprintf("batch-user-%d", i)
}
// 使用Context进行批量处理
ctx := context.Background()
err := userService.BatchProcessUsers(ctx, userIDs)
if err != nil {
fmt.Printf("批量处理失败: %v\n", err)
return
}
fmt.Println("批量处理成功")
}
func errorHandlingExample(userService *UserService) {
// 创建已取消的Context
ctx, cancel := context.WithCancel(context.Background())
cancel() // 立即取消
// 处理Context错误
err := userService.HandleContextErrors(ctx)
if err != nil {
fmt.Printf("Context错误处理: %v\n", err)
}
// 创建超时的Context
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
time.Sleep(1 * time.Millisecond) // 确保超时
defer timeoutCancel()
err = userService.HandleContextErrors(timeoutCtx)
if err != nil {
fmt.Printf("超时错误处理: %v\n", err)
}
}
---
4.2 超时控制
01.超时控制原理
a.超时机制
Redis操作中的超时控制通过Context实现,当超时发生时会自动中止正在执行的操作。
b.超时类型
a.连接超时(DialTimeout)
建立Redis连接的超时时间。
b.读取超时(ReadTimeout)
等待Redis响应的超时时间。
c.写入超时(WriteTimeout)
发送Redis命令的超时时间。
d.上下文超时
通过Context设置的整体操作超时。
c.超时传播
a.父到子传播
父Context的超时会自动传播到所有子Context。
b.操作级控制
可以为不同操作设置不同的超时时间。
c.层次化超时
支持多层级的超时控制机制。
d.超时示例
---
// 超时控制原理演示
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DialTimeout: 5 * time.Second, // 连接超时
ReadTimeout: 3 * time.Second, // 读取超时
WriteTimeout: 3 * time.Second, // 写入超时
PoolTimeout: 4 * time.Second, // 获取连接超时
})
defer rdb.Close()
fmt.Println("=== 超时控制原理演示 ===")
// 1. Context超时 vs Redis超时
fmt.Println("\n--- Context超时 vs Redis超时 ---")
contextVsRedisTimeout(rdb)
// 2. 不同超时设置的效果
fmt.Println("\n--- 不同超时设置的效果 ---")
differentTimeoutEffects(rdb)
// 3. 超时传播演示
fmt.Println("\n--- 超时传播演示 ---")
timeoutPropagation(rdb)
// 4. 超时错误处理
fmt.Println("\n--- 超时错误处理 ---")
timeoutErrorHandling(rdb)
}
func contextVsRedisTimeout(rdb *redis.Client) {
fmt.Println("1. Context超时 (1秒)")
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
start := time.Now()
err := rdb.Set(ctx, "timeout:test1", "value", 0).Err()
duration := time.Since(start)
if err != nil {
fmt.Printf(" Context超时操作失败: %v (耗时: %v)\n", err, duration)
} else {
fmt.Printf(" Context超时操作成功 (耗时: %v)\n", duration)
}
fmt.Println("\n2. Redis客户端超时配置")
// 显示Redis客户端的超时配置
opts := rdb.Options()
fmt.Printf(" DialTimeout: %v\n", opts.DialTimeout)
fmt.Printf(" ReadTimeout: %v\n", opts.ReadTimeout)
fmt.Printf(" WriteTimeout: %v\n", opts.WriteTimeout)
fmt.Printf(" PoolTimeout: %v\n", opts.PoolTimeout)
// 测试正常操作
start = time.Now()
err = rdb.Set(context.Background(), "timeout:test2", "value", 0).Err()
duration = time.Since(start)
if err != nil {
fmt.Printf(" 正常操作失败: %v (耗时: %v)\n", err, duration)
} else {
fmt.Printf(" 正常操作成功 (耗时: %v)\n", duration)
}
}
func differentTimeoutEffects(rdb *redis.Client) {
testCases := []struct {
name string
timeout time.Duration
delay time.Duration
}{
{"短超时 (100ms)", 100 * time.Millisecond, 200 * time.Millisecond},
{"中等超时 (1秒)", 1 * time.Second, 500 * time.Millisecond},
{"长超时 (5秒)", 5 * time.Second, 2 * time.Second},
}
for _, tc := range testCases {
fmt.Printf("\n%s:\n", tc.name)
ctx, cancel := context.WithTimeout(context.Background(), tc.timeout)
defer cancel()
// 模拟带延迟的操作
go func() {
time.Sleep(tc.delay)
rdb.Set(ctx, fmt.Sprintf("timeout:delay:%d", time.Now().Nanosecond()), "value", 0)
}()
start := time.Now()
_, err := rdb.Get(ctx, "nonexistent:key").Result()
duration := time.Since(start)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Printf(" 操作超时 (耗时: %v)\n", duration)
} else {
fmt.Printf(" 操作失败: %v (耗时: %v)\n", err, duration)
}
} else {
fmt.Printf(" 操作成功 (耗时: %v)\n", duration)
}
}
}
func timeoutPropagation(rdb *redis.Client) {
// 创建父Context,设置5秒超时
parentCtx, parentCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer parentCancel()
fmt.Printf("父Context超时: 5秒\n")
// 创建子Context,设置10秒超时(实际以父Context为准)
childCtx, childCancel := context.WithTimeout(parentCtx, 10*time.Second)
defer childCancel()
fmt.Printf("子Context超时: 10秒 (但实际受父Context限制)\n")
// 监控两个Context的状态
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-parentCtx.Done():
fmt.Printf("父Context结束: %v\n", parentCtx.Err())
return
case <-childCtx.Done():
fmt.Printf("子Context结束: %v\n", childCtx.Err())
return
case <-ticker.C:
fmt.Printf("父Context剩余: %v, 子Context剩余: %v\n",
time.Until(time.Now().Add(5*time.Second)),
time.Until(time.Now().Add(10*time.Second)))
}
}
}()
// 在子Context中执行操作
start := time.Now()
err := rdb.Set(childCtx, "timeout:propagation", "test", 0).Err()
duration := time.Since(start)
if err != nil {
fmt.Printf("操作失败: %v (耗时: %v)\n", err, duration)
} else {
fmt.Printf("操作成功 (耗时: %v)\n", duration)
}
time.Sleep(6 * time.Second) // 观察超时传播
}
func timeoutErrorHandling(rdb *redis.Client) {
// 测试不同类型的超时错误
testCases := []struct {
name string
setupCtx func() context.Context
}{
{
"Context超时",
func() context.Context {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Nanosecond)
time.Sleep(1 * time.Millisecond) // 确保超时
return ctx
},
},
{
"已取消Context",
func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
},
},
}
for _, tc := range testCases {
fmt.Printf("\n%s:\n", tc.name)
ctx := tc.setupCtx()
// 测试不同Redis操作的超时行为
operations := []struct {
name string
op func() error
}{
{
"GET操作",
func() error {
_, err := rdb.Get(ctx, "test:key").Result()
return err
},
},
{
"SET操作",
func() error {
return rdb.Set(ctx, "test:key", "value", 0).Err()
},
},
{
"Pipeline操作",
func() error {
pipe := rdb.Pipeline()
pipe.Get(ctx, "test:key1")
pipe.Set(ctx, "test:key2", "value", 0)
_, err := pipe.Exec(ctx)
return err
},
},
}
for _, op := range operations {
start := time.Now()
err := op.op()
duration := time.Since(start)
if err != nil {
fmt.Printf(" %s: 失败 - %v (耗时: %v)\n", op.name, err, duration)
} else {
fmt.Printf(" %s: 成功 (耗时: %v)\n", op.name, duration)
}
}
}
}
---
02.超时设置策略
a.全局超时设置
a.客户端级别
在创建Redis客户端时设置默认超时参数。
b.应用级别
在应用层设置全局超时Context。
c.继承机制
子操作继承父操作的超时设置。
b.操作级超时
a.快速操作
简单的GET/SET操作设置较短超时。
b.复杂操作
复杂查询或批量操作设置较长超时。
c.网络敏感操作
根据网络状况动态调整超时。
c.自适应超时
a.响应时间监控
监控历史响应时间数据。
b.动态调整
根据历史数据自动调整超时时间。
c.熔断机制
超时率过高时自动熔断。
d.分层超时设计
a.接口层超时
HTTP接口级别的超时控制。
b.服务层超时
业务服务层的超时控制。
c.数据层超时
Redis操作层的超时控制。
e.超时设置示例
---
// 超时设置策略示例
package main
import (
"context"
"fmt"
"math"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// Redis超时配置管理器
type TimeoutManager struct {
rdb *redis.Client
globalTimeout time.Duration
fastOpTimeout time.Duration
slowOpTimeout time.Duration
batchOpTimeout time.Duration
responseHistory []time.Duration
historyMutex sync.RWMutex
maxHistorySize int
adaptiveEnabled bool
baseTimeout time.Duration
maxTimeout time.Duration
}
// 操作类型枚举
type OperationType int
const (
FastOp OperationType = iota
SlowOp
BatchOp
CustomOp
)
func NewTimeoutManager(addr string) *TimeoutManager {
return &TimeoutManager{
rdb: redis.NewClient(&redis.Options{
Addr: addr,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
PoolTimeout: 4 * time.Second,
}),
globalTimeout: 5 * time.Second,
fastOpTimeout: 1 * time.Second,
slowOpTimeout: 3 * time.Second,
batchOpTimeout: 10 * time.Second,
responseHistory: make([]time.Duration, 0),
maxHistorySize: 100,
adaptiveEnabled: true,
baseTimeout: 1 * time.Second,
maxTimeout: 30 * time.Second,
}
}
func (tm *TimeoutManager) Close() {
tm.rdb.Close()
}
// 根据操作类型获取超时时间
func (tm *TimeoutManager) GetTimeout(opType OperationType) time.Duration {
if tm.adaptiveEnabled {
return tm.getAdaptiveTimeout(opType)
}
switch opType {
case FastOp:
return tm.fastOpTimeout
case SlowOp:
return tm.slowOpTimeout
case BatchOp:
return tm.batchOpTimeout
default:
return tm.globalTimeout
}
}
// 自适应超时计算
func (tm *TimeoutManager) getAdaptiveTimeout(opType OperationType) time.Duration {
tm.historyMutex.RLock()
defer tm.historyMutex.RUnlock()
if len(tm.responseHistory) == 0 {
return tm.baseTimeout
}
// 计算平均响应时间
var sum time.Duration
for _, duration := range tm.responseHistory {
sum += duration
}
avg := sum / time.Duration(len(tm.responseHistory))
// 计算标准差
var variance float64
for _, duration := range tm.responseHistory {
diff := float64(duration - avg)
variance += diff * diff
}
variance /= float64(len(tm.responseHistory))
stdDev := time.Duration(math.Sqrt(variance))
// 自适应超时 = 平均值 + 2倍标准差
adaptiveTimeout := avg + 2*stdDev
// 限制在合理范围内
if adaptiveTimeout < tm.baseTimeout {
adaptiveTimeout = tm.baseTimeout
}
if adaptiveTimeout > tm.maxTimeout {
adaptiveTimeout = tm.maxTimeout
}
// 根据操作类型调整
switch opType {
case FastOp:
adaptiveTimeout = adaptiveTimeout / 2
case SlowOp:
adaptiveTimeout = adaptiveTimeout * 2
case BatchOp:
adaptiveTimeout = adaptiveTimeout * 3
}
return adaptiveTimeout
}
// 记录响应时间
func (tm *TimeoutManager) RecordResponseTime(duration time.Duration) {
tm.historyMutex.Lock()
defer tm.historyMutex.Unlock()
tm.responseHistory = append(tm.responseHistory, duration)
// 限制历史记录大小
if len(tm.responseHistory) > tm.maxHistorySize {
tm.responseHistory = tm.responseHistory[1:]
}
}
// 执行带超时的操作
func (tm *TimeoutManager) ExecuteWithTimeout(ctx context.Context, opType OperationType, operation func(context.Context) error) error {
timeout := tm.GetTimeout(opType)
opCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
start := time.Now()
err := operation(opCtx)
duration := time.Since(start)
// 记录响应时间
tm.RecordResponseTime(duration)
return err
}
// 分层超时示例
type LayeredTimeoutService struct {
tm *TimeoutManager
}
func NewLayeredTimeoutService(tm *TimeoutManager) *LayeredTimeoutService {
return &LayeredTimeoutService{tm: tm}
}
// 接口层超时
func (s *LayeredTimeoutService) HandleHTTPRequest(requestID string) error {
// 接口层: 10秒超时
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fmt.Printf("HTTP请求 %s: 接口层超时 10秒\n", requestID)
return s.processBusinessLogic(ctx, requestID)
}
// 服务层超时
func (s *LayeredTimeoutService) processBusinessLogic(ctx context.Context, requestID string) error {
// 服务层: 5秒超时 (但受接口层限制)
serviceCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
fmt.Printf("请求 %s: 服务层超时 5秒\n", requestID)
return s.executeDataOperation(serviceCtx, requestID)
}
// 数据层超时
func (s *LayeredTimeoutService) executeDataOperation(ctx context.Context, requestID string) error {
// 数据层: 使用自适应超时
fmt.Printf("请求 %s: 数据层自适应超时\n", requestID)
return s.tm.ExecuteWithTimeout(ctx, SlowOp, func(opCtx context.Context) error {
key := fmt.Sprintf("layered:test:%s", requestID)
return s.tm.rdb.Set(opCtx, key, fmt.Sprintf("value_%s", requestID), 0).Err()
})
}
func main() {
fmt.Println("=== 超时设置策略示例 ===")
// 创建超时管理器
tm := NewTimeoutManager("localhost:6379")
defer tm.Close()
// 1. 基本超时设置
fmt.Println("\n--- 基本超时设置 ---")
basicTimeoutSetup(tm)
// 2. 操作级超时
fmt.Println("\n--- 操作级超时 ---")
operationLevelTimeout(tm)
// 3. 自适应超时
fmt.Println("\n--- 自适应超时 ---")
adaptiveTimeout(tm)
// 4. 分层超时
fmt.Println("\n--- 分层超时 ---")
layeredTimeout(tm)
// 5. 超时策略比较
fmt.Println("\n--- 超时策略比较 ---")
timeoutStrategyComparison(tm)
}
func basicTimeoutSetup(tm *TimeoutManager) {
fmt.Println("全局超时配置:")
fmt.Printf(" 全局超时: %v\n", tm.globalTimeout)
fmt.Printf(" 快速操作超时: %v\n", tm.fastOpTimeout)
fmt.Printf(" 慢速操作超时: %v\n", tm.slowOpTimeout)
fmt.Printf(" 批量操作超时: %v\n", tm.batchOpTimeout)
// 测试全局超时
ctx := context.Background()
err := tm.ExecuteWithTimeout(ctx, FastOp, func(opCtx context.Context) error {
return tm.rdb.Set(opCtx, "timeout:basic", "test", 0).Err()
})
if err != nil {
fmt.Printf("基本超时操作失败: %v\n", err)
} else {
fmt.Printf("基本超时操作成功\n")
}
}
func operationLevelTimeout(tm *TimeoutManager) {
operationTests := []struct {
name string
opType OperationType
testFunc func() error
}{
{
"快速GET操作",
FastOp,
func() error {
ctx := context.Background()
return tm.ExecuteWithTimeout(ctx, FastOp, func(opCtx context.Context) error {
_, err := tm.rdb.Get(opCtx, "operation:fast").Result()
return err
})
},
},
{
"慢速复杂查询",
SlowOp,
func() error {
ctx := context.Background()
return tm.ExecuteWithTimeout(ctx, SlowOp, func(opCtx context.Context) error {
pipe := tm.rdb.Pipeline()
for i := 0; i < 10; i++ {
pipe.Get(opCtx, fmt.Sprintf("slow:query:%d", i))
pipe.Set(opCtx, fmt.Sprintf("slow:result:%d", i), i, 0)
}
_, err := pipe.Exec(opCtx)
return err
})
},
},
{
"批量写入操作",
BatchOp,
func() error {
ctx := context.Background()
return tm.ExecuteWithTimeout(ctx, BatchOp, func(opCtx context.Context) error {
pipe := tm.rdb.Pipeline()
for i := 0; i < 100; i++ {
pipe.Set(opCtx, fmt.Sprintf("batch:item:%d", i), i, 0)
}
_, err := pipe.Exec(opCtx)
return err
})
},
},
}
for _, test := range operationTests {
fmt.Printf("\n%s:\n", test.name)
start := time.Now()
err := test.testFunc()
duration := time.Since(start)
timeout := tm.GetTimeout(test.opType)
fmt.Printf(" 设置超时: %v\n", timeout)
fmt.Printf(" 实际耗时: %v\n", duration)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Printf(" 结果: 超时\n")
} else {
fmt.Printf(" 结果: 失败 - %v\n", err)
}
} else {
fmt.Printf(" 结果: 成功\n")
}
}
}
func adaptiveTimeout(tm *TimeoutManager) {
fmt.Println("自适应超时学习阶段:")
// 模拟不同响应时间的操作
responseTimes := []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
150 * time.Millisecond,
300 * time.Millisecond,
120 * time.Millisecond,
180 * time.Millisecond,
250 * time.Millisecond,
160 * time.Millisecond,
}
for i, rt := range responseTimes {
fmt.Printf("学习阶段 %d: 模拟响应时间 %v\n", i+1, rt)
// 模拟操作
ctx := context.Background()
err := tm.ExecuteWithTimeout(ctx, FastOp, func(opCtx context.Context) error {
// 模拟实际处理时间
time.Sleep(rt)
return tm.rdb.Set(opCtx, fmt.Sprintf("adaptive:learn:%d", i), i, 0).Err()
})
if err != nil {
fmt.Printf(" 操作失败: %v\n", err)
} else {
fmt.Printf(" 操作成功\n")
}
// 显示当前学习的超时设置
currentTimeout := tm.GetTimeout(FastOp)
fmt.Printf(" 当前自适应超时: %v\n", currentTimeout)
}
fmt.Println("\n自适应超时测试:")
ctx := context.Background()
err := tm.ExecuteWithTimeout(ctx, FastOp, func(opCtx context.Context) error {
return tm.rdb.Get(opCtx, "adaptive:test").Result()
})
finalTimeout := tm.GetTimeout(FastOp)
fmt.Printf("最终自适应超时: %v\n", finalTimeout)
if err != nil {
fmt.Printf("测试结果: %v\n", err)
} else {
fmt.Printf("测试结果: 成功\n")
}
}
func layeredTimeout(tm *TimeoutManager) {
service := NewLayeredTimeoutService(tm)
// 测试分层超时
requestIDs := []string{"req-001", "req-002", "req-003"}
for _, requestID := range requestIDs {
fmt.Printf("\n处理请求 %s:\n", requestID)
start := time.Now()
err := service.HandleHTTPRequest(requestID)
duration := time.Since(start)
fmt.Printf("总耗时: %v\n", duration)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Printf("结果: 超时\n")
} else {
fmt.Printf("结果: 失败 - %v\n", err)
}
} else {
fmt.Printf("结果: 成功\n")
}
}
}
func timeoutStrategyComparison(tm *TimeoutManager) {
fmt.Println("比较不同超时策略:")
strategies := []struct {
name string
timeout time.Duration
operation func() error
}{
{
"固定短超时 (500ms)",
500 * time.Millisecond,
func() error {
ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
time.Sleep(1 * time.Second) // 模拟慢操作
return tm.rdb.Set(ctx, "strategy:short", "value", 0).Err()
},
},
{
"固定长超时 (5秒)",
5 * time.Second,
func() error {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
time.Sleep(1 * time.Second) // 模拟正常操作
return tm.rdb.Set(ctx, "strategy:long", "value", 0).Err()
},
},
{
"自适应超时",
tm.GetTimeout(SlowOp),
func() error {
ctx := context.Background()
return tm.ExecuteWithTimeout(ctx, SlowOp, func(opCtx context.Context) error {
time.Sleep(800 * time.Millisecond) // 模拟操作
return tm.rdb.Set(opCtx, "strategy:adaptive", "value", 0).Err()
})
},
},
}
for _, strategy := range strategies {
fmt.Printf("\n%s:\n", strategy.name)
fmt.Printf(" 设置超时: %v\n", strategy.timeout)
start := time.Now()
err := strategy.operation()
duration := time.Since(start)
fmt.Printf(" 实际耗时: %v\n", duration)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Printf(" 结果: 超时\n")
} else {
fmt.Printf(" 结果: 失败 - %v\n", err)
}
} else {
fmt.Printf(" 结果: 成功\n")
}
}
}
---
03.超时监控与调优
a.超时监控指标
a.超时率统计
监控操作超时的比例。
b.响应时间分布
统计P50、P95、P99响应时间。
c.超时趋势分析
分析超时率的变化趋势。
d.资源利用率
监控CPU、内存、网络使用情况。
b.实时监控
a.超时告警
当超时率超过阈值时触发告警。
b.性能仪表板
实时显示超时相关指标。
c.自动调优
根据监控数据自动调整超时参数。
c.调优策略
a.超时时间优化
根据历史数据优化超时设置。
b.连接池调整
调整连接池大小改善响应时间。
c.网络优化
优化网络配置减少延迟。
d.系统调优
优化系统级参数。
d.监控实现
---
// 超时监控与调优示例
package main
import (
"context"
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/go-redis/redis/v8"
)
// 超时监控器
type TimeoutMonitor struct {
metrics *TimeoutMetrics
mutex sync.RWMutex
}
// 超时指标
type TimeoutMetrics struct {
TotalRequests int64
TimeoutRequests int64
SuccessRequests int64
ResponseTimes []time.Duration
SlowOperations []SlowOperation
StartTime time.Time
LastUpdateTime time.Time
}
type SlowOperation struct {
Operation string
Duration time.Duration
Timeout time.Duration
Timestamp time.Time
}
func NewTimeoutMonitor() *TimeoutMonitor {
return &TimeoutMonitor{
metrics: &TimeoutMetrics{
ResponseTimes: make([]time.Duration, 0),
SlowOperations: make([]SlowOperation, 0),
StartTime: time.Now(),
},
}
}
// 记录操作结果
func (tm *TimeoutMonitor) RecordOperation(operation string, duration time.Duration, timeout time.Duration, err error) {
tm.mutex.Lock()
defer tm.mutex.Unlock()
atomic.AddInt64(&tm.metrics.TotalRequests, 1)
tm.metrics.LastUpdateTime = time.Now()
if err == context.DeadlineExceeded {
atomic.AddInt64(&tm.metrics.TimeoutRequests, 1)
tm.metrics.SlowOperations = append(tm.metrics.SlowOperations, SlowOperation{
Operation: operation,
Duration: duration,
Timeout: timeout,
Timestamp: time.Now(),
})
} else if err == nil {
atomic.AddInt64(&tm.metrics.SuccessRequests, 1)
}
// 记录响应时间(限制数组大小)
tm.metrics.ResponseTimes = append(tm.metrics.ResponseTimes, duration)
if len(tm.metrics.ResponseTimes) > 1000 {
tm.metrics.ResponseTimes = tm.metrics.ResponseTimes[1:]
}
}
// 获取超时率
func (tm *TimeoutMonitor) GetTimeoutRate() float64 {
tm.mutex.RLock()
defer tm.mutex.RUnlock()
total := atomic.LoadInt64(&tm.metrics.TotalRequests)
timeouts := atomic.LoadInt64(&tm.metrics.TimeoutRequests)
if total == 0 {
return 0
}
return float64(timeouts) / float64(total) * 100
}
// 获取成功率
func (tm *TimeoutMonitor) GetSuccessRate() float64 {
tm.mutex.RLock()
defer tm.mutex.RUnlock()
total := atomic.LoadInt64(&tm.metrics.TotalRequests)
successes := atomic.LoadInt64(&tm.metrics.SuccessRequests)
if total == 0 {
return 0
}
return float64(successes) / float64(total) * 100
}
// 获取响应时间统计
func (tm *TimeoutMonitor) GetResponseTimeStats() ResponseTimeStats {
tm.mutex.RLock()
defer tm.mutex.RUnlock()
if len(tm.metrics.ResponseTimes) == 0 {
return ResponseTimeStats{}
}
times := make([]time.Duration, len(tm.metrics.ResponseTimes))
copy(times, tm.metrics.ResponseTimes)
sort.Slice(times, func(i, j int) bool {
return times[i] < times[j]
})
var sum time.Duration
for _, t := range times {
sum += t
}
return ResponseTimeStats{
Count: len(times),
Average: sum / time.Duration(len(times)),
Min: times[0],
Max: times[len(times)-1],
P50: times[len(times)/2],
P95: times[int(float64(len(times))*0.95)],
P99: times[int(float64(len(times))*0.99)],
}
}
type ResponseTimeStats struct {
Count int
Average time.Duration
Min time.Duration
Max time.Duration
P50 time.Duration
P95 time.Duration
P99 time.Duration
}
// 获取最近慢操作
func (tm *TimeoutMonitor) GetRecentSlowOperations(limit int) []SlowOperation {
tm.mutex.RLock()
defer tm.mutex.RUnlock()
if limit <= 0 || limit > len(tm.metrics.SlowOperations) {
limit = len(tm.metrics.SlowOperations)
}
start := len(tm.metrics.SlowOperations) - limit
return tm.metrics.SlowOperations[start:]
}
// 自动调优器
type TimeoutOptimizer struct {
monitor *TimeoutMonitor
rdb *redis.Client
currentTimeout time.Duration
minTimeout time.Duration
maxTimeout time.Duration
adjustmentStep time.Duration
targetTimeoutRate float64
optimizerActive bool
mutex sync.Mutex
}
func NewTimeoutOptimizer(monitor *TimeoutMonitor, rdb *redis.Client) *TimeoutOptimizer {
return &TimeoutOptimizer{
monitor: monitor,
rdb: rdb,
currentTimeout: 3 * time.Second,
minTimeout: 500 * time.Millisecond,
maxTimeout: 30 * time.Second,
adjustmentStep: 200 * time.Millisecond,
targetTimeoutRate: 5.0, // 目标超时率5%
optimizerActive: true,
}
}
// 启动自动调优
func (to *TimeoutOptimizer) StartOptimization() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
to.optimizeTimeouts()
}
}
}()
}
func (to *TimeoutOptimizer) optimizeTimeouts() {
to.mutex.Lock()
defer to.mutex.Unlock()
if !to.optimizerActive {
return
}
timeoutRate := to.monitor.GetTimeoutRate()
stats := to.monitor.GetResponseTimeStats()
fmt.Printf("自动调优分析 - 当前超时率: %.2f\n", monitor.GetSuccessRate())
fmt.Printf("超时率: %.2f\n", client.monitor.GetTimeoutRate())
}
func timeoutAnalysis(monitor *TimeoutMonitor) {
fmt.Println("超时详细分析:")
slowOps := monitor.GetRecentSlowOperations(5)
if len(slowOps) > 0 {
fmt.Printf("\n最近 %d 个慢操作:\n", len(slowOps))
for i, op := range slowOps {
fmt.Printf(" %d. %s - 耗时: %v, 超时: %v, 时间: %v\n",
i+1, op.Operation, op.Duration, op.Timeout,
op.Timestamp.Format("15:04:05"))
}
}
// 分析超时趋势
timeoutRate := monitor.GetTimeoutRate()
fmt.Printf("\n超时分析:\n")
fmt.Printf(" 当前超时率: %.2f\n", hitRate)
}
// 系统资源
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("系统资源:\n")
fmt.Printf(" 内存使用: %d MB\n", m.Alloc/1024/1024)
fmt.Printf(" 协程数量: %d\n", runtime.NumGoroutine())
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 50,
})
defer rdb.Close()
fmt.Println("=== 并发控制策略示例 ===")
// 1. 乐观锁演示
fmt.Println("\n--- 乐观锁演示 ---")
optimisticLockDemo(rdb)
// 2. 分布式锁演示
fmt.Println("\n--- 分布式锁演示 ---")
distributedLockDemo(rdb)
// 3. 无锁编程演示
fmt.Println("\n--- 无锁编程演示 ---")
lockFreeProgrammingDemo(rdb)
// 4. 连接池管理
fmt.Println("\n--- 连接池管理 ---")
connectionPoolManagement(rdb)
}
func optimisticLockDemo(rdb *redis.Client) {
olm := NewOptimisticLockManager(rdb)
// 初始化账户
rdb.Set(context.Background(), "account:A", 1000, 0)
rdb.Set(context.Background(), "account:B", 500, 0)
fmt.Println("初始余额:")
balanceA, _ := rdb.Get(context.Background(), "account:A").Int64()
balanceB, _ := rdb.Get(context.Background(), "account:B").Int64()
fmt.Printf(" 账户A: %d\n", balanceA)
fmt.Printf(" 账户B: %d\n", balanceB)
// 模拟并发转账
var wg sync.WaitGroup
transferCount := 10
transferAmount := int64(10)
for i := 0; i < transferCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
err := olm.TransferWithWatch(context.Background(), "account:A", "account:B", transferAmount)
if err != nil {
fmt.Printf("转账 %d 失败: %v\n", id, err)
} else {
fmt.Printf("转账 %d 成功\n", id)
}
}(i)
}
wg.Wait()
// 检查最终余额
finalBalanceA, _ := rdb.Get(context.Background(), "account:A").Int64()
finalBalanceB, _ := rdb.Get(context.Background(), "account:B").Int64()
fmt.Printf("\n最终余额:\n")
fmt.Printf(" 账户A: %d (期望: %d)\n", finalBalanceA, balanceA-transferAmount*int64(transferCount))
fmt.Printf(" 账户B: %d (期望: %d)\n", finalBalanceB, balanceB+transferAmount*int64(transferCount))
if finalBalanceA+finalBalanceB == balanceA+balanceB {
fmt.Printf("✅ 乐观锁转账成功,总额保持一致\n")
} else {
fmt.Printf("❌ 乐观锁转账失败,总额不一致\n")
}
}
func distributedLockDemo(rdb *redis.Client) {
dlm := NewDistributedLockManager(rdb)
lockKey := "distributed:lock:resource"
fmt.Println("测试分布式锁:")
// 启动多个并发任务
var wg sync.WaitGroup
taskCount := 5
for i := 0; i < taskCount; i++ {
wg.Add(1)
go func(taskID int) {
defer wg.Done()
for attempt := 1; attempt <= 3; attempt++ {
// 尝试获取锁
lock, err := dlm.AcquireLock(context.Background(), lockKey, 5*time.Second)
if err != nil {
fmt.Printf("任务 %d 第 %d 次获取锁失败: %v\n", taskID, attempt, err)
time.Sleep(time.Duration(100*attempt) * time.Millisecond)
continue
}
fmt.Printf("任务 %d 第 %d 次获取锁成功\n", taskID, attempt)
// 模拟临界区操作
key := fmt.Sprintf("lock:task:%d", taskID)
value := fmt.Sprintf("completed_by_task_%d_at_%d", taskID, time.Now().Unix())
err = rdb.Set(context.Background(), key, value, 0).Err()
if err != nil {
fmt.Printf("任务 %d 临界区操作失败: %v\n", taskID, err)
}
// 模拟处理时间
time.Sleep(500 * time.Millisecond)
// 释放锁
err = lock.Release(context.Background())
if err != nil {
fmt.Printf("任务 %d 释放锁失败: %v\n", taskID, err)
} else {
fmt.Printf("任务 %d 释放锁成功\n", taskID)
}
break // 成功完成任务,退出循环
}
}(i)
}
wg.Wait()
// 检查结果
keys, _ := rdb.Keys(context.Background(), "lock:task:*").Result()
fmt.Printf("完成 %d 个任务,创建了 %d 个键\n", taskCount, len(keys))
}
func lockFreeProgrammingDemo(rdb *redis.Client) {
// 初始化计数器
rdb.Set(context.Background(), "lockfree:counter", 0, 0)
lfc := NewLockFreeCounter(rdb, "lockfree:counter")
fmt.Println("使用无锁编程进行并发计数:")
var wg sync.WaitGroup
goroutineCount := 20
incrementsPerGoroutine := 50
for i := 0; i < goroutineCount; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < incrementsPerGoroutine; j++ {
// 原子增加操作
newCount, err := lfc.Increment(context.Background(), 1)
if err != nil {
fmt.Printf("goroutine %d increment %d 失败: %v\n", goroutineID, j, err)
continue
}
// 偶尔减少操作
if j%10 == 5 {
_, err := lfc.Decrement(context.Background(), 1)
if err != nil {
fmt.Printf("goroutine %d decrement %d 失败: %v\n", goroutineID, j, err)
}
}
}
}(i)
}
wg.Wait()
finalCount, err := lfc.Get(context.Background())
if err != nil {
fmt.Printf("获取最终计数失败: %v\n", err)
return
}
expectedCount := goroutineCount * incrementsPerGoroutine
decrements := goroutineCount * (incrementsPerGoroutine / 10)
expectedCount -= decrements
fmt.Printf("最终计数: %d (期望: %d)\n", finalCount, expectedCount)
if finalCount == expectedCount {
fmt.Printf("✅ 无锁编程计数正确\n")
} else {
fmt.Printf("❌ 无锁编程计数错误\n")
}
}
func connectionPoolManagement(rdb *redis.Client) {
monitor := NewConnectionPoolMonitor(rdb)
fmt.Println("连接池配置和监控:")
// 显示当前配置
opts := rdb.Options()
fmt.Printf("连接池配置:\n")
fmt.Printf(" PoolSize: %d\n", opts.PoolSize)
fmt.Printf(" MinIdleConns: %d\n", opts.MinIdleConns)
fmt.Printf(" MaxRetries: %d\n", opts.MaxRetries)
fmt.Printf(" DialTimeout: %v\n", opts.DialTimeout)
fmt.Printf(" ReadTimeout: %v\n", opts.ReadTimeout)
fmt.Printf(" WriteTimeout: %v\n", opts.WriteTimeout)
// 模拟不同并发级别的负载
concurrencyLevels := []int{5, 10, 20, 50}
for _, level := range concurrencyLevels {
fmt.Printf("\n并发级别: %d\n", level)
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < level; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for j := 0; j < 10; j++ {
key := fmt.Sprintf("pool:test:%d:%d", workerID, j)
value := fmt.Sprintf("value_%d_%d", workerID, j)
err := rdb.Set(context.Background(), key, value, 0).Err()
if err != nil {
// 记录错误但继续
}
}
}(i)
}
wg.Wait()
duration := time.Since(start)
// 显示连接池状态
monitor.GetStats()
fmt.Printf("并发 %d 完成, 耗时: %v\n", level, duration)
}
// 最终状态
fmt.Printf("\n最终连接池状态:\n")
monitor.GetStats()
}
---
03.并发安全最佳实践
a.共享Redis客户端
a.单一实例
整个应用程序使用一个Redis客户端实例。
b.初始化时机
在程序启动时初始化Redis客户端。
c.优雅关闭
程序退出时正确关闭Redis客户端。
b.连接池配置
a.PoolSize设置
根据并发量和服务器性能设置合适的连接池大小。
b.MinIdleConns设置
设置最小空闲连接数提高响应速度。
c.超时配置
合理设置各种超时参数。
c.原子操作优先
a.选择原子命令
优先使用Redis提供的原子命令。
b.Lua脚本使用
复杂操作使用Lua脚本保证原子性。
c.避免手动锁
避免在应用层实现锁机制。
d.错误处理
a.网络错误处理
妥善处理网络中断和超时错误。
b.重试机制
实现合理的重试策略。
c.降级策略
在Redis不可用时实现降级机制。
e.监控和调优
a.性能监控
监控QPS、响应时间、错误率等指标。
b.资源监控
监控连接池使用情况。
c.告警机制
设置合理的告警阈值。
f.最佳实践示例
---
// 并发安全最佳实践示例
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/signal"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/go-redis/redis/v8"
)
// 全局Redis客户端
var (
globalRedisClient *redis.Client
redisOnce sync.Once
)
// Redis配置结构
type RedisConfig struct {
Addr string
Password string
DB int
PoolSize int
MinIdleConns int
MaxRetries int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}
// 单例模式获取Redis客户端
func GetRedisClient(config *RedisConfig) *redis.Client {
redisOnce.Do(func() {
globalRedisClient = createRedisClient(config)
})
return globalRedisClient
}
func createRedisClient(config *RedisConfig) *redis.Client {
options := &redis.Options{
Addr: config.Addr,
Password: config.Password,
DB: config.DB,
PoolSize: config.PoolSize,
MinIdleConns: config.MinIdleConns,
MaxRetries: config.MaxRetries,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
// 网络配置优化
KeepAlive: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
PoolTimeout: 4 * time.Second,
}
return redis.NewClient(options)
}
// 健康检查器
type HealthChecker struct {
rdb *redis.Client
ticker *time.Ticker
stopCh chan struct{}
healthy int32
}
func NewHealthChecker(rdb *redis.Client) *HealthChecker {
return &HealthChecker{
rdb: rdb,
stopCh: make(chan struct{}),
healthy: 1, // 初始状态为健康
}
}
func (hc *HealthChecker) Start() {
hc.ticker = time.NewTicker(30 * time.Second)
go func() {
defer hc.ticker.Stop()
for {
select {
case <-hc.ticker.C:
hc.checkHealth()
case <-hc.stopCh:
return
}
}
}()
}
func (hc *HealthChecker) checkHealth() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := hc.rdb.Ping(ctx).Result()
if err != nil {
atomic.StoreInt32(&hc.healthy, 0)
log.Printf("Redis健康检查失败: %v", err)
} else {
atomic.StoreInt32(&hc.healthy, 1)
}
}
func (hc *HealthChecker) IsHealthy() bool {
return atomic.LoadInt32(&hc.healthy) == 1
}
func (hc *HealthChecker) Stop() {
close(hc.stopCh)
}
// 重试机制
type RetryConfig struct {
MaxRetries int
BaseDelay time.Duration
MaxDelay time.Duration
MaxMultiplier float64
}
func DefaultRetryConfig() *RetryConfig {
return &RetryConfig{
MaxRetries: 3,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 5 * time.Second,
MaxMultiplier: 2.0,
}
}
func ExecuteWithRetry(ctx context.Context, config *RetryConfig, operation func() error) error {
var lastErr error
for attempt := 0; attempt <= config.MaxRetries; attempt++ {
if attempt > 0 {
// 计算退避延迟
delay := config.BaseDelay * time.Duration(1<<uint(attempt-1))
if delay > config.MaxDelay {
delay = config.MaxDelay
}
// 检查Context是否被取消
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
// 继续重试
}
}
err := operation()
if err == nil {
return nil
}
// 检查是否是可重试的错误
if !isRetryableError(err) {
return err
}
lastErr = err
log.Printf("操作失败(尝试 %d/%d): %v", attempt+1, config.MaxRetries+1, err)
}
return lastErr
}
func isRetryableError(err error) bool {
if err == nil {
return false
}
// 网络错误通常可重试
var netErr net.Error
if errors.As(err, &netErr) {
return true
}
// Redis超时错误可重试
if errors.Is(err, context.DeadlineExceeded) {
return true
}
// Redis连接错误可重试
var redisErr *redis.Error
if errors.As(err, &redisErr) {
// 检查具体的错误类型
if redisErr.Error() == "connection refused" ||
redisErr.Error() == "connection reset by peer" ||
redisErr.Error() == "i/o timeout" {
return true
}
}
return false
}
// 限流器
type RateLimiter struct {
rdb *redis.Client
key string
limit int
window time.Duration
luaScript *redis.Script
}
func NewRateLimiter(rdb *redis.Client, key string, limit int, window time.Duration) *RateLimiter {
script := redis.NewScript(`
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- 移除过期的请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
-- 获取当前窗口内的请求数
local current_requests = redis.call('ZCARD', key)
if current_requests < limit then
-- 允许请求,记录当前时间
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, math.ceil(window))
return 1
else
-- 拒绝请求
return 0
end
`)
return &RateLimiter{
rdb: rdb,
key: key,
limit: limit,
window: window,
luaScript: script,
}
}
func (rl *RateLimiter) Allow(ctx context.Context) (bool, error) {
result, err := rl.luaScript.Run(ctx, rl.rdb, []string{rl.key},
rl.limit, rl.window.Milliseconds(), time.Now().UnixMilli()).Result()
if err != nil {
return false, err
}
return result.(int64) == 1, nil
}
// 性能监控器
type PerformanceMonitor struct {
rdb *redis.Client
metrics *PerformanceMetrics
mutex sync.RWMutex
}
type PerformanceMetrics struct {
TotalRequests int64
SuccessRequests int64
FailedRequests int64
TotalLatency int64
MaxLatency int64
MinLatency int64
StartTime time.Time
}
func NewPerformanceMonitor(rdb *redis.Client) *PerformanceMonitor {
return &PerformanceMonitor{
rdb: rdb,
metrics: &PerformanceMetrics{
MinLatency: int64(time.Hour), // 初始化为大值
StartTime: time.Now(),
},
}
}
func (pm *PerformanceMonitor) RecordOperation(latency time.Duration, success bool) {
atomic.AddInt64(&pm.metrics.TotalRequests, 1)
latencyNanos := latency.Nanoseconds()
if success {
atomic.AddInt64(&pm.metrics.SuccessRequests, 1)
atomic.AddInt64(&pm.metrics.TotalLatency, latencyNanos)
// 更新最大延迟
for {
current := atomic.LoadInt64(&pm.metrics.MaxLatency)
if latencyNanos <= current ||
atomic.CompareAndSwapInt64(&pm.metrics.MaxLatency, current, latencyNanos) {
break
}
}
// 更新最小延迟
for {
current := atomic.LoadInt64(&pm.metrics.MinLatency)
if latencyNanos >= current ||
atomic.CompareAndSwapInt64(&pm.metrics.MinLatency, current, latencyNanos) {
break
}
}
} else {
atomic.AddInt64(&pm.metrics.FailedRequests, 1)
}
}
func (pm *PerformanceMonitor) GetStats() (total, success, failed int64, avgLatency, maxLatency, minLatency time.Duration) {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
total = atomic.LoadInt64(&pm.metrics.TotalRequests)
success = atomic.LoadInt64(&pm.metrics.SuccessRequests)
failed = atomic.LoadInt64(&pm.metrics.FailedRequests)
totalLatency := atomic.LoadInt64(&pm.metrics.TotalLatency)
if success > 0 {
avgLatency = time.Duration(totalLatency / success)
}
maxLatency = time.Duration(atomic.LoadInt64(&pm.metrics.MaxLatency))
minLatency = time.Duration(atomic.LoadInt64(&pm.metrics.MinLatency))
return
}
func (pm *PerformanceMonitor) PrintStats() {
total, success, failed, avgLatency, maxLatency, minLatency := pm.GetStats()
runtime := time.Since(pm.metrics.StartTime)
fmt.Printf("\n=== 性能统计 ===\n")
fmt.Printf("运行时间: %v\n", runtime)
fmt.Printf("总请求数: %d\n", total)
fmt.Printf("成功请求数: %d\n", success)
fmt.Printf("失败请求数: %d\n", failed)
if total > 0 {
fmt.Printf("成功率: %.2f\n\n", successRate))
// 执行时间统计
report.WriteString("执行时间统计:\n")
report.WriteString(fmt.Sprintf(" 平均时间: %v\n", metrics.AvgExecutionTime))
report.WriteString(fmt.Sprintf(" 最小时间: %v\n", metrics.MinExecutionTime))
report.WriteString(fmt.Sprintf(" 最大时间: %v\n\n", metrics.MaxExecutionTime))
// 命令类型统计
report.WriteString("命令类型统计:\n")
for command, stats := range metrics.CommandStats {
avgTime := time.Duration(0)
if stats.Count > 0 {
avgTime = stats.TotalTime / time.Duration(stats.Count)
}
errorRate := float64(0)
if stats.Count > 0 {
errorRate = float64(stats.ErrorCount) / float64(stats.Count) * 100
}
report.WriteString(fmt.Sprintf(" %s: 执行%d次, 平均%v, 错误率%.2f", name, stats.PassRate))
}
}
}
// 生成建议
func (pa *PerformanceAnalyzer) generateRecommendations(snapshots []PerformanceSnapshot, result *Reult) {
latest := snapshots[len(snapshots)-1]
passRate := float64(latest.AlowedRequests) / float64(latest.TotalRequests) * 100
if passRate < 95 {
result.Recommendations = append(result.Recommendations,
"建议适当放宽限流参数,提高通过率")
}
if latest.AvgLatency > 50*time.Millisecond {
result.Recommendations = append(result.Recommendations,
"建议优化限流算法实现,降低延迟")
}
// 根据算法表现提供建议
bestAlgorithm := ""
bestPerformance := 0.0
for name, stats := range latest.AlgorithmStats {
performance := stats.PassRate / (float64(stats.AvgLatency.Milliseconds()) + 1)
if performance > bestPerformance {
bestPerformance = performance
bestAlgorithm = name
}
}
if bestAlgorithm != "" {
result.Recommendations = append(result.Recommendations,
fmt.Sprintf("建议使用 %s 算法,性能表现最佳", bestAlgorithm))
}
}
// 分析结果
type AnalysisResult struct {
Summary string `json:"summary"`
Trends []Trend `json:"trends"`
Insights []string `json:"insights"`
Recommendations []string `json:"recommendations"`
}
// 趋势信息
type Trend struct {
Metric string `json:"metric"`
Change float64 `json:"change"`
Direction string `json:"direction"`
}
---
7.5 排行榜
01.排行榜概述
a.基本概念
a.排行榜定义
排行榜是根据特定分数对项目进行排序的数据结构,实时显示排名信息,广泛应用于游戏、电商、社交等场景。
b.排名机制
通过分数的高低来决定项目在排行榜中的位置,分数越高排名越靠前。
c.实时更新
支持实时更新项目分数和排名变化,提供即时反馈的用户体验。
d.分页展示
提供分页查询功能,支持大量数据的分页显示和按需加载。
b.技术优势
a.高性能排序
利用Redis的ZSET数据结构实现O(logN)时间复杂度的排序操作。
b.实时性
支持实时更新分数和排名,无需重新计算整个排行榜。
c.可扩展性
支持海量数据的排行榜管理,通过分片和集群实现水平扩展。
d.多维度支持
支持同时维护多个排行榜,如总榜、日榜、周榜、月榜等。
c.应用场景
a.游戏排名
实现游戏积分排名、等级排名、成就排名等。
b.销售排行
商品销量排行榜、热销产品排行榜等电商场景。
c.内容推荐
热门内容排行榜、阅读量排行榜、点赞排行榜等。
d.用户活跃度
用户活跃度排行榜、签到排行榜、贡献度排行榜等。
02.排行榜数据结构
a.ZSET数据结构
a.功能说明
使用Redis的Sorted Set(ZSET)数据结构存储排行榜数据,支持高效的分数排序和范围查询。
b.数据组织
每个排行榜使用独立的ZSET,以分数为排序依据,成员为项目标识符。
c.分数管理
支持动态更新项目分数,自动维护排名顺序。
d.过期策略
设置合理的数据过期时间,自动清理过期数据。
b.排行榜配置
a.功能说明
配置排行榜的基本参数,包括排行榜名称、分数类型、更新频率等。
b.配置实现
---
// 排行榜配置
type LeaderboardConfig struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description" yaml:"description"`
ScoreType string `json:"score_type" yaml:"score_type"`
DefaultScore float64 `json:"default_score" yaml:"default_score"`
MaxSize int64 `json:"max_size" yaml:"max_size"`
UpdateFrequency time.Duration `json:"update_frequency" yaml:"update_frequency"`
ExpireTime time.Duration `json:"expire_time" yaml:"expire_time"`
EnableHistory bool `json:"enable_history" yaml:"enable_history"`
EnableAnalytics bool `json:"enable_analytics" yaml:"enable_analytics"`
Decimals int `json:"decimals" yaml:"decimals"`
EnableNegative bool `json:"enable_negative" yaml:"enable_negative"`
BatchSize int `json:"batch_size" yaml:"batch_size"`
}
// 创建排行榜配置
func NewLeaderboardConfig(name, description, scoreType string) *LeaderboardConfig {
return &LeaderboardConfig{
Name: name,
Description: description,
ScoreType: scoreType,
DefaultScore: 0,
MaxSize: 10000,
UpdateFrequency: time.Second * 10,
ExpireTime: time.Hour * 24 * 7,
EnableHistory: true,
EnableAnalytics:true,
Decimals: 2,
EnableNegative: false,
BatchSize: 100,
}
}
// 验证配置
func (config *LeaderboardConfig) Validate() error {
if config.Name == "" {
return fmt.Errorf("排行榜名称不能为空")
}
if config.ScoreType == "" {
return fmt.Errorf("分数类型不能为空")
}
if config.MaxSize <= 0 {
config.MaxSize = 10000
}
if config.Decimals < 0 || config.Decimals > 6 {
return fmt.Errorf("小数位数必须在0-6之间")
}
return nil
}
---
c.排行榜管理器
a.功能说明
提供排行榜的统一管理接口,支持创建、更新、查询、统计等操作。
b.管理器实现
---
// 排行榜管理器
type LeaderboardManager struct {
client redis.Cmdable
configs map[string]*LeaderboardConfig
analytics *LeaderboardAnalytics
cache *LeaderboardCache
logger *log.Logger
mu sync.RWMutex
}
// 创建排行榜管理器
func NewLeaderboardManager(client redis.Cmdable) *LeaderboardManager {
return &LeaderboardManager{
client: client,
configs: make(map[string]*LeaderboardConfig),
analytics: NewLeaderboardAnalytics(client),
cache: NewLeaderboardCache(client),
logger: log.Default(),
}
}
// 注册排行榜
func (lm *LeaderboardManager) RegisterLeaderboard(config *LeaderboardConfig) error {
lm.mu.Lock()
defer lm.mu.Unlock()
// 验证配置
if err := config.Validate(); err != nil {
return fmt.Errorf("配置验证失败: %w", err)
}
// 检查是否已存在
if _, exists := lm.configs[config.Name]; exists {
return fmt.Errorf("排行榜 %s 已存在", config.Name)
}
// 创建Redis键
leaderboardKey := fmt.Sprintf("leaderboard:%s", config.Name)
// 设置排行榜元数据
configJSON, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("序列化配置失败: %w", err)
}
metadataKey := fmt.Sprintf("leaderboard:%s:metadata", config.Name)
pipeline := lm.client.Pipeline()
pipeline.Set(context.Background(), metadataKey, configJSON, 0)
pipeline.Expire(context.Background(), metadataKey, config.ExpireTime)
_, err = pipeline.Exec(context.Background())
if err != nil {
return fmt.Errorf("保存配置元数据失败: %w", err)
}
lm.configs[config.Name] = config
lm.logger.Printf("注册排行榜成功: %s", config.Name)
return nil
}
// 获取排行榜配置
func (lm *LeaderboardManager) GetLeaderboardConfig(name string) (*LeaderboardConfig, bool) {
lm.mu.RLock()
defer lm.RUnlock()
config, exists := lm.configs[name]
return config, exists
}
// 获取所有排行榜配置
func (lm *LeaderboardManager) GetAllLeaderboards() map[string]*LeaderboardConfig {
lm.mu.RLock()
defer lm.RUnlock()
configs := make(map[string]*LeaderboardConfig)
for name, config := range lm.configs {
configs[name] = config
}
return configs
}
---
d.排行榜键管理
a.功能说明
统一管理Redis键的生成和命名规则,确保键的唯一性和可读性。
b.键管理实现
---
// 键管理器
type LeaderboardKeyManager struct {
keyPrefix string
}
// 创建键管理器
func NewLeaderboardKeyManager() *LeaderboardKeyManager {
return &LeaderboardKeyManager{
keyPrefix: "leaderboard",
}
}
// 生成排行榜键
func (lkm *LeaderboardKeyManager) GetLeaderboardKey(leaderboardName string) string {
return fmt.Sprintf("%s:%s", lkm.keyPrefix, leaderboardName)
}
// 生成成员键
func (lkm *LeaderboardKeyManager) GetMemberKey(leaderboardName, memberID string) string {
return fmt.Sprintf("%s:%s:member:%s", lkm.keyPrefix, leaderboardName, memberID)
}
// 生成分数键
func (lkm *LeaderboardKeyManager) GetScoreKey(leaderboardName string) string {
return fmt.Sprintf("%s:%s:score", lkm.keyPrefix, leaderboardName)
}
// 生成排名键
func (lkm *LeaderboardKeyManager) GetRankKey(leaderboardName string) string {
return fmt.Sprintf("%s:%s:rank", lkm.keyPrefix, leaderboardName)
}
// 生成历史记录键
func (lkm *LeaderboardKeyManager) GetHistoryKey(leaderboardName string, timestamp time.Time) string {
return fmt.Sprintf("%s:%s:history:%s", lkm.keyPrefix, leaderboardName, timestamp.Format("2006-01-02"))
}
// 生成统计键
func (lkm *LeaderboardKeyManager) GetStatsKey(leaderboardName string) string {
return fmt.Sprintf("%s:%s:stats", lkm.keyPrefix, leaderboardName)
}
// 生成分析键
func (lkm *LeaderboardKeyManager) GetAnalyticsKey(leaderboardName string) string {
return fmt.Sprintf("%s:%s:analytics", lkm.keyPrefix, leaderboardName)
}
---
03.基础排行榜操作
a.添加和更新分数
a.功能说明
向排行榜添加新成员或更新现有成员的分数,自动维护排名顺序。
b.添加实现
---
// 添加成员到排行榜
func (lm *LeaderboardManager) AddMember(ctx context.Context, leaderboardName, memberID string, score float64, metadata map[string]interface{}) error {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
// 验证分数
if !config.EnableNegative && score < 0 {
return fmt.Errorf("不允许负分数")
}
// 格式化分数
formattedScore := lm.formatScore(score, config.Decimals)
keyManager := NewLeaderboardKeyManager()
leaderboardKey := keyManager.GetLeaderboardKey(leaderboardName)
memberKey := keyManager.GetMemberKey(leaderboardName, memberID)
// 使用Lua脚本保证原子操作
script := `
local leaderboard_key = KEYS[1]
local member_key = KEYS[2]
local score = tonumber(ARGV[1])
local member_id = ARGV[2]
local metadata = ARGV[3]
local max_size = tonumber(ARGV[4])
-- 检查成员是否已存在
local current_score = redis.call('ZSCORE', leaderboard_key, member_id)
if current_score == false then
-- 新成员,添加到排行榜
redis.call('ZADD', leaderboard_key, score, member_id)
-- 检查排行榜大小限制
local size = redis.call('ZCARD', leaderboard_key)
if size > max_size then
-- 移除最低分的成员
local removed = redis.call('ZPOPMIN', leaderboard_key, 0, -1)
if #removed > 0 then
-- 删除最低分成员的元数据
redis.call('DEL', member_key .. ':' .. removed[1])
end
end
else
-- 现有成员,更新分数
redis.call('ZADD', leaderboard_key, score, member_id)
-- 检查排名变化
local rank = redis.call('ZRANK', leaderboard_key, member_id, 'REV')
if rank then
-- 记录排名历史
redis.call('ZADD', leaderboard_key .. ':history', time.now(), member_id .. ':' .. score .. ':' .. rank)
end
end
-- 更新元数据
if metadata then
local metadata_json = cjson.decode(metadata)
redis.call('HSET', member_key, 'data', metadata)
redis.call('EXPIRE', member_key, 86400) -- 24小时过期
end
-- 返回当前排名
local rank = redis.call('ZRANK', leaderboard_key, member_id, 'REV')
if rank then
return rank
else
return -1
end
`
// 序列化元数据
metadataJSON := ""
if metadata != nil {
metadataJSONBytes, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("序列化元数据失败: %w", err)
}
metadataJSON = string(metadataJSONBytes)
}
keyManager := NewLeaderboardKeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
memberKey := keyManager.GetMemberKey(leaderboardName, memberID)
result, err := lm.client.Eval(ctx, script, []string{leaderboardKey, memberKey},
formattedScore, memberID, metadataJSON, config.MaxSize).Result()
if err != nil {
return fmt.Errorf("执行排行榜操作失败: %w", err)
}
if rank, ok := result.(int64); ok {
// 记录分析数据
lm.analytics.RecordOperation("add", leaderboardName, memberID, float64(score), time.Since(time.Now()))
return nil
}
return nil
}
// 批量添加成员
func (lm *LeaderboardManager) BatchAddMembers(ctx context.Context, leaderboardName string, members []LeaderboardMember) error {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
if len(members) == 0 {
return nil
}
// 分批处理
batchSize := config.BatchSize
for i := 0; i < len(members); i += batchSize {
end := i + batchSize
if end > len(members) {
end = len(members)
}
batch := members[i:end]
if err := lm.processBatchAdd(ctx, leaderboardName, batch); err != nil {
return fmt.Errorf("批量添加成员失败: %w", err)
}
}
return nil
}
// 批量处理添加操作
func (lm *LeaderboardManager) processBatchAdd(ctx context.Context, leaderboardName string, members []LeaderboardMember) error {
keyManager := NewLeaderkeyManager()
leaderboardKey := keyManager.GetLeaderboardKey(leaderboardName)
pipeline := lm.client.Pipeline()
for _, member := range members {
// 格式化分数
formattedScore := lm.formatScore(member.Score, 8) // 使用8位精度
memberKey := keyManager.GetMemberKey(leaderboardName, member.MemberID)
// 检查并更新
pipeline.ZScore(ctx, leaderboardKey, member.MemberID)
pipeline.ZAdd(ctx, leaderboardKey, formattedScore, member.MemberID)
// 保存元数据
if member.Metadata != nil {
metadataJSON, _ := json.Marshal(member.Metadata)
pipeline.HSet(ctx, memberKey, "data", string(metadataJSON))
pipeline.Expire(ctx, memberKey, 86400)
}
}
_, err := pipeline.Exec(ctx)
if err != nil {
return fmt.Errorf("执行批量操作失败: %w", err)
}
// 清理超大小限制的成员
return lm.trimLeaderboard(ctx, leaderboardName)
}
// 清理超大小限制的成员
func (lm *LeaderboardManager) trimLeaderboard(ctx context.Context, leaderboardName string) error {
config, exists := lm.GetLeaderboardConfig(leaderName)
if !exists {
return nil
}
keyManager := NewLeaderboardKeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
// 检查当前大小
size, err := lm.client.ZCard(ctx, leaderboardKey).Result()
if err != nil {
return fmt.Errorf("获取排行榜大小失败: %w", err)
}
if int64(size) <= config.MaxSize {
return nil
}
// 移除超出的成员
removeCount := int64(size) - config.MaxSize
end := -1
start := int64(0)
members, err := lm.client.ZRange(ctx, leaderboardKey, start, end).Result()
if err != nil {
return fmt.Errorf("获取要删除的成员失败: %w", err)
}
// 批量删除
for _, member := range members {
memberStr, _ := member.(string)
memberKey := keyManager.GetMemberKey(leaderboardName, memberStr)
lm.client.Del(ctx, memberKey)
}
lm.client.ZRemRangeByScore(ctx, leaderboardKey, 0, "+inf")
lm.logger.Printf("排行榜 %s 清理了 %d 个成员", leaderboardName, removeCount)
return nil
}
---
c.排行榜查询
a.功能说明
提供多种排行榜查询功能,支持按排名查询、按范围查询、按成员查询等。
b.查询实现
---
// 获取排行榜前N名
func (lm *LeaderboardManager) GetTopMembers(ctx context.Context, leaderboardName string, limit int64) ([]*LeaderboardEntry, error) {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return nil, fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
if limit <= 0 {
return []*LeaderboardEntry{}, nil
}
keyManager := NewLeaderboardKeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
// 使用反向范围查询获取前N名
members, err := lm.client.ZRevRangeWithScores(ctx, leaderboardKey, 0, limit-1).Result()
if err != nil {
return nil, fmt.Errorf("获取排行榜数据失败: w", err)
}
entries := make([]*LeaderboardEntry, 0, len(members))
for _, member := range members {
entry := &LeaderboardEntry{
MemberID: member.Member.(string),
Score: member.Score,
Rank: int64(len(entries)) + 1,
}
// 获取元数据
memberKey := keyManager.GetMemberKey(leaderboardName, member.Member.(string))
data, err := lm.client.HGet(ctx, memberKey, "data").Result()
if err == nil && data != "" {
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(data), &metadata); err == nil {
entry.Metadata = metadata
}
}
entries = append(entries, entry)
}
// 记录查询分析
lm.analytics.RecordQuery("top", leaderboardName, limit, time.Since(time.Now()))
return entries, nil
}
// 获取成员排名
func (lm *LeaderboardManager) GetRank(ctx context.Context, leaderboardName, memberID string) (int64, error) {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return 0, fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
keyManager := NewLeaderboardKeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
rank, err := lm.client.ZRevRank(ctx, leaderboardKey, memberID).Result()
if err != nil {
if err == redis.Nil {
return 0, ErrMemberNotFound
}
return 0, fmt.Errorf("获取排名失败: %w", err)
}
return rank + 1, nil // 转换为1-based排名
}
// 获取成员分数
func (lm *LeaderboardManager) GetScore(ctx context.Context, leaderboardName, memberID string) (float64, error) {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return 0, fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
keyManager := NewLeaderkeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
score, err := lm.client.ZScore(ctx, leaderboardKey, memberID).Result()
if err != nil {
if err == redis.Nil {
return config.DefaultScore, ErrMemberNotFound
}
return 0, fmt.Errorf("获取分数失败: %w", err)
}
return lm.parseScore(score, config.Decimals), nil
}
// 获取排名范围内的成员
func (lm *LeaderboardManager) GetMembersByRankRange(ctx context.Context, leaderboardName string, startRank, endRank int64) ([]*LeaderboardEntry, error) {
config, exists := lm.GetLeaderboardConfig(leaderboardName)
if !exists {
return nil, fmt.Errorf("排行榜 %s 不存在", leaderboardName)
}
if startRank < 1 {
startRank = 1
}
if endRank < startRank {
endRank = startRank
}
keyManager := NewLeaderkeyManager()
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
// Redis的ZRevRange使用0-based索引
startIdx := startRank - 1
endIdx := endRank - 1
members, err := lm.client.ZRevRangeWithScores(ctx, leaderboardKey, startIdx, endIdx).Result()
if err != nil {
return nil, fmt.Errorf("获取排名范围数据失败: %w", err)
}
entries := make([]*LeaderboardEntry, 0, len(members))
for i, member := range members {
entry := &LeaderboardEntry{
MemberID: member.Member.(string),
Score: member.Score,
Rank: int64(startIdx) + i + 1,
}
// 获取元数据
memberKey := keyManager.GetMemberKey(leaderboardName, member.Member.(string))
data, err := lm.client.HGet(ctx, memberKey, "data").Result()
if err == nil && data != "" {
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(data), &metadata); err == nil {
entry.Metadata = metadata
}
}
entries = append(entries, entry)
}
return entries, nil
}
// 排行榜条目
type LeaderboardEntry struct {
MemberID string `json:"member_id"`
Score float64 `json:"score"`
Rank int64 `json:"rank"`
Metadata map[string]interface{} `json:"metadata"`
}
---
04.高级功能实现
a.历史记录
a.功能说明
记录排行榜的历史变化,支持历史排名查询和趋势分析。
b.历史记录实现
---
// 历史记录管理器
type HistoryManager struct {
client redis.Cmdable
keyManager *LeaderboardKeyManager
retention time.Duration
enabled bool
}
// 创建历史记录管理器
func NewHistoryManager(client redis.Cmdable, retention time.Duration, enabled bool) *HistoryManager {
return &HistoryManager{
client: client,
keyManager: NewLeaderboardKeyManager(),
retention: retention,
enabled: enabled,
}
}
// 记录历史变更
func (hm *HistoryManager) RecordChange(ctx context.Context, leaderboardName, memberID string, oldScore, newScore float64, oldRank, newRank int64) error {
if !hm.enabled {
return nil
}
keyManager := hm.keyManager
historyKey := keyManager.GetHistoryKey(leaderboardName, time.Now())
// 记录变更事件
event := HistoryEvent{
Timestamp: time.Now(),
MemberID: memberID,
OldScore: oldScore,
NewScore: newScore,
OldRank: oldRank,
NewRank: newRank,
ScoreChange: newScore - oldScore,
RankChange: newRank - oldRank,
}
eventJSON, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("序列化历史事件失败: %w", err)
}
eventKey := fmt.Sprintf("%s:%s", historyKey, memberID)
err = hm.client.ZAdd(ctx, historyKey, event.Timestamp.UnixNano(), eventKey).Err()
if err != nil {
return fmt.Errorf("记录历史事件失败: %w", err)
}
// 设置过期时间
err = hm.client.Expire(ctx, historyKey, hm.retention).Err()
if err != nil {
return fmt.Errorf("设置历史记录过期时间失败: %w", err)
}
return nil
}
// 获取成员历史
func (hm *HistoryManager) GetMemberHistory(ctx context.Context, leaderboardName, memberID string, limit int) ([]HistoryEvent, error) {
if !hm.enabled {
return nil, fmt.Errorf("历史记录功能未启用")
}
keyManager := hm.keyManager
historyKey := keyManager.GetHistoryKey(leaderName, time.Now().AddDate(0, -7*24*time.Hour)) // 最近7天
// 获取该成员的历史记录
memberHistoryKey := fmt.Sprintf("%s:%s", historyKey, memberID)
events, err := hm.client.ZRevRangeWithScores(ctx, memberHistoryKey, 0, limit).Result()
if err != nil {
return nil, fmt.Errorf("获取成员历史失败: %w", err)
}
history := make([]HistoryEvent, 0, len(events))
for _, event := range events {
eventJSON, err := event.Member.(string)
if err != nil {
continue
}
var historyEvent HistoryEvent
err = json.Unmarshal([]byte(eventJSON), &historyEvent)
if err != nil {
continue
}
history = append(history, historyEvent)
}
return history, nil
}
// 历史事件
type HistoryEvent struct {
Timestamp time.Time `json:"timestamp"`
MemberID string `json:"member_id"`
OldScore float64 `json:"old_score"`
NewScore float64 `json:"new_score"`
OldRank int64 `json:"old_rank"`
NewRank int64 `json:"new_rank"`
ScoreChange float64 `json:"score_change"`
RankChange int64 `json:"rank_change"`
}
---
b.统计分析
a.功能说明
提供排行榜的统计分析功能,包括趋势分析、热力分析、用户行为分析等。
b.统计分析实现
---
// 排行榜分析器
type LeaderboardAnalytics struct {
client redis.Cmdable
keyManager *LeaderboardKeyManager
cache *LeaderboardCache
}
// 创建排行榜分析器
func NewLeaderboardAnalytics(client redis.Cmdable) *LeaderboardAnalytics {
return &LeaderboardAnalytics{
client: client,
keyManager: NewLeaderboardKeyManager(),
cache: NewLeaderboardCache(client),
}
}
// 获取排行榜统计
func (la *LeaderboardAnalytics) GetStats(ctx context.Context, leaderboardName string) (*LeaderboardStats, error) {
statsKey := la.keyManager.GetStatsKey(leaderboardName)
// 尝试从缓存获取
stats, err := la.cache.GetStats(leaderboardName)
if err == nil {
return stats, nil
}
// 从Redis获取统计信息
statsKey := la.keyManager.GetStatsKey(leadername)
data, err := la.client.HGetAll(ctx, statsKey).Result()
if err != nil {
return nil, fmt.Errorf("获取统计数据失败: %w", err)
}
// 解析统计信息
stats = &LeaderboardStats{
MemberCount: 0,
TotalScore: 0,
AvgScore: 0,
HighScore: 0,
LowScore: 0,
UpdateCount: 0,
LastUpdate: time.Time{},
}
if totalCount, exists := data["member_count"]; exists {
stats.MemberCount, _ = strconv.ParseInt(totalCount.(string), 10, 64)
}
if totalScore, exists := data["total_score"]; exists {
stats.TotalScore, _ = strconv.ParseFloat64(totalScore.(string), 64)
}
if avgScore, exists := data["avg_score"]; exists {
stats.AvgScore, _ = strconv.ParseFloat64(avgScore.(string), 64)
}
if highScore, exists := data["high_score"]; exists {
stats.HighScore, _ = strconv.ParseFloat64(highScore.(string), 64)
}
if lowScore, exists := data["low_score"]; exists {
stats.LowScore, _ = strconv.ParseFloat64(lowScore.(string), 64)
}
if updateCount, exists := data["update_count"]; exists {
stats.UpdateCount, _ = strconv.ParseInt(updateCount.(string), 10, 64)
}
if lastUpdate, exists := data["last_update"]; exists {
lastUpdate, _ := time.Parse(time.RFC3339, lastUpdate.(string))
stats.LastUpdate = lastUpdate
}
// 缓存统计信息
la.cache.SetStats(leaderboardName, stats, 5*time.Minute)
return stats, nil
}
// 更新统计信息
func (la *LeaderboardAnalytics) UpdateStats(ctx context.Context, leaderboardName string) error {
keyManager := la.keyManager
leaderboardKey := keyManager.GetLeaderboardName(leaderboardName)
// 获取排行榜基本信息
size, err := la.client.ZCard(ctx, leaderboardKey).Result()
if err != nil {
return fmt.Errorf("获取排行榜大小失败: %w", err)
}
// 获取所有成员的分数
members, err := la.client.ZRangeWithScores(ctx, leaderboardKey, 0, -1).Result()
if err != nil {
return fmt.Errorf("获取排行榜成员失败: w", err)
}
var totalScore float64
if size > 0 {
for _, member := range members {
totalScore += member.Score
}
}
var highScore float64
var lowScore float64
if len(members) > 0 {
highScore = members[0].Score
lowScore = members[len(members)-1].Score
}
avgScore := totalScore / float64(size)
updateCount := la.getUpdateCount(ctx, leaderboardName)
// 保存统计信息
statsKey := keyManager.GetStatsKey(leaderboardName)
pipeline := la.client.Pipeline()
pipeline.HSet(ctx, statsKey, "member_count", strconv.FormatInt(int(size), 10))
pipeline.HSet(ctx, statsKey, "total_score", fmt.Sprintf("%.2f", totalScore))
pipeline.HSet(ctx, statsKey, "avg_score", fmt.Sprintf("%.2f", avgScore))
pipeline.HSet(ctx, statsKey, "high_score", fmt.Sprintf("%.2f", highScore))
pipeline.HSet(ctx, statsKey, "low_score", fmt.Sprintf("%.2f", lowScore))
pipeline.HSet(ctx, statsKey, "update_count", strconv.FormatInt(int(updateCount), 10))
pipeline.HSet(ctx, statsKey, "last_update", time.Now().Format(time.RFC3339))
pipeline.Expire(ctx, statsKey, 24*time.Hour)
_, err = pipeline.Exec(ctx)
if err != nil {
return fmt.Errorf("更新统计信息失败: %w", err)
}
// 清理缓存
la.cache.DeleteStats(leaderboardName)
return nil
}
// 获取更新次数
func (la *LeaderboardAnalytics) getUpdateCount(ctx context.Context, leaderboardName string) int64 {
statsKey := la.keyManager.GetStatsKey(leaderboardName)
count, err := la.client.HGet(ctx, statsKey, "update_count").Result()
if err != nil {
return 0
}
countInt, _ := strconv.ParseInt(count.(string), 10, 64)
return countInt
}
// 排行榜统计
type LeaderboardStats struct {
MemberCount int64 `json:"member_count"`
TotalScore float64 `json:"total_score"`
AvgScore float64 `json:"avg_score"`
HighScore float64 `json:"high_score"`
LowScore float64 `json:"low_score"`
UpdateCount int64 `json:"update_count"`
LastUpdate time.Time `json:"last_update"`
}
---
c.缓存优化
a.功能说明
实现排行榜数据缓存,提高查询性能,减少Redis访问次数。
b.缓存实现
---
// 排行榜缓存
type LeaderboardCache struct {
client redis.Cmdable
cacheTTL time.Duration
caches map[string]*CacheData
mu sync.RWMutex
}
// 缓存数据
type CacheData struct {
Data interface{}
ExpiresAt time.Time
}
// 创建排行榜缓存
func NewLeaderboardCache(client redis.Cmdable) *LeaderboardCache {
return &LeaderboardCache{
client: client,
cacheTTL: 5 * time.Minute,
caches: make(map[string]*CacheData),
}
}
// 获取排行榜缓存
func (lc *LeaderboardCache) GetTopMembers(leaderboardName string) ([]*LeaderboardEntry, bool) {
lc.mu.RLock()
defer lc.mu.RUnlock()
cacheKey := "top:" + leaderboardName
if cacheData, exists := lc.caches[cacheKey]; exists {
if time.Now().Before(cacheData.ExpiresAt) {
if entries, ok := cacheData.Data.([]*LeaderboardEntry); ok {
return entries, true
}
}
}
return nil, false
}
// 设置排行榜缓存
func (lc *LeaderCache) SetTopMembers(leaderboardName string, entries []*LeaderboardEntry) {
lc.mu.Lock()
defer lc.mu.Unlock()
cacheKey := "top:" + leaderboardName
lc.caches[cacheKey] = &CacheData{
Data: entries,
ExpiresAt: time.Now().Add(lc.cacheTTL),
}
// 设置过期时间
lc.client.Set(context.Background(), cacheKey, "cached", lc.cacheTTL)
}
// 删除排行榜缓存
func (lc *LeaderCache) DeleteTopMembers(leaderboardName string) {
lc.mu.Lock()
defer lc.mu.Unlock()
cacheKey := "top:" + leaderboardName
delete(lc.caches, cacheKey)
lc.client.Del(context.Background(), cacheKey)
}
// 获取统计缓存
func (lc *LeaderboardCache) GetStats(leaderboardName string) (*LeaderboardStats, bool) {
lc.mu.RLock()
defer lc.mu.RUnlock()
cacheKey := "stats:" + leaderboardName
if cacheData, exists := lc.caches[cacheKey]; exists {
if time.Now().Before(cacheData.ExpiresAt) {
if stats, ok := cacheData.Data.(*LeaderboardStats); ok {
return stats, true
}
}
}
return nil, false
}
// 设置统计缓存
func (lc *LeaderboardCache) SetStats(leaderboardName string, stats *LeaderboardStats) {
lc.mu.Lock()
defer lc.mu.Unlock()
cacheKey := "stats:" + leaderboardName
lc.caches[cacheKey] = &CacheData{
Data: stats,
ExpiresAt: time.Now().Add(lc.cacheTTL),
}
lc.client.Set(context.Background(), cacheKey, "cached", lc.cacheTTL)
}
// 删除统计缓存
func (lc *LeaderboardCache) DeleteStats(leaderboardName string) {
lc.mu.Lock()
defer lc.mu.Unlock()
cacheKey := "stats:" + leaderboardName
delete(lc.caches, cacheKey)
lc.client.Del(context.Background(), cacheKey)
}
// 清理过期缓存
func (lc *LeaderboardCache) CleanupExpired() {
lc.mu.Lock()
defer lc.mu.Unlock()
now := time.Now()
for key, cacheData := range lc.caches {
if now.After(cacheData.ExpiresAt) {
delete(lc.caches, key)
}
}
}
---
05.实际应用示例
a.游戏排行榜
a.功能说明
实现游戏中的积分排行榜、等级排行榜、成就排行榜等功能。
b.游戏排行榜实现
---
// 游戏排行榜管理器
type GameLeaderboardManager struct {
*LeaderboardManager
gameID string
}
// 创建游戏排行榜管理器
func NewGameLeaderboardManager(client redis.Cmdable, gameID string) *GameLeaderboardManager {
manager := NewLeaderboardManager(client)
return &GameLeaderboardManager{
LeaderboardManager: manager,
gameID: gameID,
}
}
// 更新玩家积分
func (glm *GameLeaderboardManager) UpdatePlayerScore(ctx context.Context, playerID, playerName string, score float64, level int, achievements []string) error {
leaderboardName := fmt.Sprintf("game:%s:score", glm.gameID)
// 构建元数据
metadata := map[string]interface{}{
"player_name": playerName,
"level": level,
"achievements": achievements,
"last_updated": time.Now().Unix(),
}
return glm.AddMember(ctx, leaderboardName, playerID, score, metadata)
}
// 获取玩家排名
func (glm *GameLeaderboardManager) GetPlayerRank(ctx context.Context, playerID string) (int64, error) {
leaderboardName := fmt.Sprintf("game:%s:score", glm.gameID)
return glm.GetRank(ctx, leaderboardName, playerID)
}
// 获取排行榜前N名
func (glm *GameLeaderboard) GetTopPlayers(ctx context.Context, limit int64) ([]*LeaderboardEntry, error) {
leaderboardName := fmt.Sprintf("game:%s:score", glm.gameID)
return glm.GetTopMembers(ctx, leaderboardName, limit)
}
// 获取指定等级的玩家
func (glm *GameLeaderboardManager) GetPlayersByLevel(ctx context.Context, minLevel, maxLevel int) ([]*LeaderboardEntry, error) {
return glm.GetMembersByScoreRange(ctx,
fmt.Sprintf("game:%s:level", glm.gameID), float64(minLevel), float64(maxLevel))
}
// 等级排行榜
func (glm *GameLeaderboardManager) UpdateLevelLeaderboard(ctx context.Context) error {
leaderboardName := fmt.Sprintf("game:%s:level", glm.gameID)
// 获取所有在线玩家
onlinePlayers := glm.getOnlinePlayers(ctx)
for _, player := range onlinePlayers {
// 更新到等级排行榜
err := glm.UpdatePlayerScore(ctx, player.PlayerID, player.Name, float64(player.Level), player.Level, player.Achievements)
if err != nil {
glm.logger.Printf("更新玩家 %s 等级排行榜失败: %v", player.PlayerID, err)
}
}
return nil
}
// 在线玩家
type OnlinePlayer struct {
PlayerID string `json:"player_id"`
Name string `json:"name"`
Level int `json:"level"`
Score float64 `json:"score"`
Achievements []string `json:"achievements"`
}
// 获取在线玩家
func (glm *GameLeaderboardManager) getOnlinePlayers(ctx context.Context) []*OnlinePlayer {
// 这里应该从游戏服务器获取在线玩家列表
// 暂时返回模拟数据
return []*OnlinePlayer{
{PlayerID: "player1", Name: "Alice", Level: 25, Score: 1250.5},
{PlayerID: "player2", Name: "Bob", Level: 30, Score: 3200.0},
{PlayerID: "player3", Name: "Charlie", Level: 22, Score: 980.3},
}
}
---
b.电商排行榜
a.功能说明
实现商品销量排行榜、热销产品排行榜等功能,支持实时更新和多维度排序。
b.电商排行榜实现
---
// 电商排行榜管理器
class EcommerceLeaderboardManager struct {
*LeaderboardManager
categoryID string
}
// 创建电商排行榜管理器
func NewEcommerceLeaderboardManager(client redis.Cmdable, categoryID string) *EcommerceLeaderboardManager {
manager := NewLeaderboardManager(client)
return &EcommerceLeaderboardManager{
LeaderboardManager: manager,
categoryID: categoryID,
}
}
// 更新商品销量
func (elm *EcommerceLeaderboardManager) UpdateProductSales(ctx context.Context, productID, productName string, salesCount int64, price float64, category string, tags []string) error {
leaderboardName := fmt.Sprintf("ecommerce:%s:%s:score", elm.categoryID, category)
// 销售量作为分数
score := float64(salesCount)
// 价格权重因子(可根据业务需求调整)
priceFactor := 0.1
weightedScore := score + price*priceFactor
// 构建元数据
metadata := map[string]interface{}{
"product_name": productName,
"price": price,
"category": category,
"tags": tags,
"sales_count": salesCount,
"last_updated": time.Now().Unix(),
"product_id": productID,
}
return elm.AddMember(ctx, leaderboardName, productID, weightedScore, metadata)
}
// 获取商品排名
func (elm *EcommerceLeaderboardManager) GetProductRank(ctx context.Context, productID string) (int64, error) {
leaderboardName := fmt.Sprintf("ecommerce:%s:%s:score", elm.categoryID, category)
return elm.GetRank(ctx, leaderboardName, productID)
}
// 获取热销商品
func (elm *EcommerceLeaderboardManager) GetTopProducts(ctx context.Context, limit int64) ([]*LeaderboardEntry, error) {
leaderboardName := fmt.Sprintf("ecommerce:%s:score", elm.categoryID)
return elm.GetTopMembers(ctx, leaderboardName, limit)
}
// 分类排行榜
func (elm *EcommerceLeaderboard) UpdateCategoryLeaderboard(ctx contextContext, category string, products []Product) error {
leaderboardName := fmt.Sprintf("ecommerce:%s:%s:score", elm.categoryID, category)
// 批量更新商品销量
for _, product := range products {
err := elm.UpdateProductSales(ctx, product.ID, product.Name, product.SalesCount, product.Price, product.Category, product.Tags)
if err != nil {
return fmt.Errorf("更新商品 %s 销量失败: %v", product.ID, err)
}
}
return nil
}
// 商品信息
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Category string `json:"category"`
Tags []string `json:"tags"`
SalesCount int64 `json:"sales_count"`
ImageURL string `json:"image_url"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
---
c.社交平台排行榜
a.功能说明
实现用户活跃度排行榜、内容热度排行榜、用户积分排行榜等社交功能。
b.社交排行榜实现
---
// 社交排行榜管理器
class SocialLeaderboardManager struct {
*LeaderboardManager
platformType string
}
// 创建社交排行榜管理器
func NewSocialLeaderboardManager(client redis.Cmdable, platformType string) *SocialLeaderboardManager {
manager := NewLeaderboardManager(client)
return &SocialLeaderboardManager{
LeaderboardManager: manager,
platformType: platformType,
}
}
// 更新用户活跃度
func (slm *SocialLeaderboardManager) UpdateUserActivity(ctx context.Context, userID, username string, activityScore float64, metadata map[string]interface{}) error {
leaderboardName := fmt.Sprintf("social:%s:activity", slm.platformType)
// 基础分数
baseScore := activityScore
// 在线时长权重
onlineTimeWeight := 0.5
if onlineTime, exists := metadata["online_time"]; exists {
if onlineTimeFloat, ok := onlineTime.(float64); ok {
baseScore += onlineTimeFloat * onlineTimeWeight
}
}
// 互动权重
interactionWeight := 2.0
if interactions, exists := metadata["interactions"]; exists {
if interactionsFloat, ok := interactions.(float64); ok {
baseScore += interactionsFloat * interactionWeight
}
}
// 内容创建权重
contentWeight := 1.0
if contentCount, exists := metadata["content_count"]; exists {
if contentCountFloat, ok := contentCount.(float64); ok {
baseScore += contentCountFloat * contentWeight
}
}
// 更新活跃度排行榜
return slm.AddMember(ctx, leaderboardName, userID, baseScore, metadata)
}
// 获取用户活跃度排名
func (slm *SocialLeaderboardManager) GetUserActivityRank(ctx context.Context, userID string) (int64, error) {
leaderboardName := fmt.Sprintf("social:%s:activity", slm.platformType)
return slm.GetRank(ctx, leaderboardName, userID)
}
// 更新内容热度
func (slm *SocialLeaderboardManager) UpdateContentPopularity(ctx context.Context, contentID string, contentType string, popularityScore float64, metadata map[string]interface{}) error {
leaderboardName := fmt.Sprintf("social:%s:content:%s", slm.platformType, contentType)
// 基础分数
baseScore := popularityScore
// 点赞权重
likeWeight := 5.0
if likes, exists := metadata["likes"]; exists {
if likesFloat, ok := likes.(float64); ok {
baseScore += likesFloat * likeWeight
}
}
// 评论权重
commentWeight := 3.0
if comments, exists := metadata["comments"]; exists {
if commentsFloat, ok := comments.(float64); ok {
baseScore += commentsFloat * commentWeight
}
}
// 更新内容热度排行榜
return slm.AddMember(ctx, leaderboardName, contentID, baseScore, metadata)
}
// 获取内容热度排名
func (slm *SocialLeaderboardManager) GetContentPopularityRank(ctx context.Context, contentID string, contentType string) (int64, error) {
leaderboardName := fmt.Sprintf("social:%s:content:%s", slm.platformType, contentType)
return slm.GetRank(ctx, leaderboardName, contentID)
}
// 获取热门用户
func (slm *SocialLeaderboardManager) GetTopActiveUsers(ctx context.Context, limit int64) ([]*LeaderboardEntry, error) {
leaderboardName := fmt.Sprintf("social:%s:activity", slm.platformType)
return slm.GetTopMembers(ctx, leaderboardName, limit)
}
// 获取热门内容
func (slm *SocialLeaderManager) GetTrendingContent(ctx context.Context, contentType string, limit int64) ([]*LeaderboardEntry, error) {
leaderboardName := fmt.Sprintf("social:%s:content:%s", slm.platformType, contentType)
return slm.GetTopMembers(ctx, leaderboardName, limit)
}
---
7.6 常见问题
01.连接问题
a.连接超时
a.问题现象
客户端连接Redis服务器时出现超时错误,无法建立连接或操作执行超时。
b.可能原因
网络延迟过高或网络不稳定
Redis服务器负载过高,响应缓慢
连接池配置不当,连接数不足
客户端超时设置过短
防火墙或网络设备限制
c.解决方案
---
// 连接超时问题诊断和解决
type ConnectionDiagnostics struct {
client redis.Cmdable
config *RedisConfig
logger *log.Logger
}
// 连接诊断工具
func (cd *ConnectionDiagnostics) DiagnoseConnectionIssues() *ConnectionDiagnosticResult {
result := &ConnectionDiagnosticResult{
Timestamp: time.Now(),
Checks: make([]DiagnosticCheck, 0),
}
// 1. 网络连通性检查
cd.checkNetworkConnectivity(result)
// 2. Redis服务状态检查
cd.checkRedisServiceStatus(result)
// 3. 连接池配置检查
cd.checkConnectionPoolConfig(result)
// 4. 性能基准测试
cd.performPerformanceBenchmark(result)
// 5. 资源使用情况检查
cd.checkResourceUsage(result)
return result
}
// 网络连通性检查
func (cd *ConnectionDiagnostics) checkNetworkConnectivity(result *ConnectionDiagnosticResult) {
check := DiagnosticCheck{
Name: "网络连通性",
Status: "PASS",
Message: "",
Suggestions: []string{},
}
// TCP连接测试
conn, err := net.DialTimeout("tcp",
fmt.Sprintf("%s:%d", cd.config.Host, cd.config.Port),
5*time.Second)
if err != nil {
check.Status = "FAIL"
check.Message = fmt.Sprintf("TCP连接失败: %v", err)
check.Suggestions = append(check.Suggestions,
"检查Redis服务器是否正常运行",
"验证主机名和端口配置",
"检查防火墙设置",
"确认网络连通性")
} else {
conn.Close()
check.Message = "TCP连接成功"
}
// 延迟测试
if check.Status == "PASS" {
start := time.Now()
conn, err := net.DialTimeout("tcp",
fmt.Sprintf("%s:%d", cd.config.Host, cd.config.Port),
cd.config.DialTimeout)
if err != nil {
check.Status = "WARN"
check.Message += fmt.Sprintf(", 但延迟测试失败: %v", err)
} else {
latency := time.Since(start)
conn.Close()
if latency > 100*time.Millisecond {
check.Status = "WARN"
check.Message += fmt.Sprintf(", 延迟较高: %v", latency)
check.Suggestions = append(check.Suggestions,
"考虑优化网络配置",
"检查网络负载情况",
"考虑部署Redis到更近的位置")
} else {
check.Message += fmt.Sprintf(", 延迟正常: %v", latency)
}
}
}
result.Checks = append(result.Checks, check)
}
// Redis服务状态检查
func (cd *ConnectionDiagnostics) checkRedisServiceStatus(result *ConnectionDiagnosticResult) {
check := DiagnosticCheck{
Name: "Redis服务状态",
Status: "PASS",
Message: "",
Suggestions: []string{},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Ping测试
pong, err := cd.client.Ping(ctx).Result()
if err != nil {
check.Status = "FAIL"
check.Message = fmt.Sprintf("Redis PING失败: %v", err)
check.Suggestions = append(check.Suggestions,
"检查Redis服务是否启动",
"验证Redis配置文件",
"检查Redis日志文件",
"确认Redis监听端口")
} else {
check.Message = fmt.Sprintf("Redis PING成功: %s", pong)
// 获取Redis信息
info, err := cd.client.Info(ctx).Result()
if err != nil {
check.Status = "WARN"
check.Message += ", 但获取INFO失败"
} else {
check.Message += ", INFO命令执行成功"
// 解析Redis版本和负载信息
if strings.Contains(info, "redis_version:") {
lines := strings.Split(info, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "redis_version:") {
version := strings.TrimPrefix(line, "redis_version:")
check.Message += fmt.Sprintf(", 版本: %s", version)
break
}
}
}
}
}
result.Checks = append(result.Checks, check)
}
// 连接池配置检查
func (cd *ConnectionDiagnostics) checkConnectionPoolConfig(result *ConnectionDiagnosticResult) {
check := DiagnosticCheck{
Name: "连接池配置",
Status: "PASS",
Message: "",
Suggestions: []string{},
}
// 检查连接池大小配置
if cd.config.PoolSize <= 0 {
check.Status = "FAIL"
check.Message = "连接池大小配置无效"
check.Suggestions = append(check.Suggestions,
"设置合理的连接池大小",
"建议根据并发量和服务器性能调整")
} else if cd.config.PoolSize < 10 {
check.Status = "WARN"
check.Message = fmt.Sprintf("连接池大小较小: %d", cd.config.PoolSize)
check.Suggestions = append(check.Suggestions,
"考虑增加连接池大小以提高并发性能",
"监控连接池使用情况")
} else {
check.Message = fmt.Sprintf("连接池大小配置合理: %d", cd.config.PoolSize)
}
// 检查超时配置
if cd.config.DialTimeout <= 0 {
if check.Status == "PASS" {
check.Status = "WARN"
}
check.Message += ", 拨号超时未设置"
check.Suggestions = append(check.Suggestions,
"设置合理的拨号超时时间",
"建议设置为5-10秒")
} else if cd.config.DialTimeout > 30*time.Second {
if check.Status == "PASS" {
check.Status = "WARN"
}
check.Message += fmt.Sprintf(", 拨号超时较长: %v", cd.config.DialTimeout)
} else {
check.Message += fmt.Sprintf(", 拨号超时合理: %v", cd.config.DialTimeout)
}
// 检查读写超时
if cd.config.ReadTimeout <= 0 || cd.config.WriteTimeout <= 0 {
if check.Status == "PASS" {
check.Status = "WARN"
}
check.Message += ", 读写超时未设置"
check.Suggestions = append(check.Suggestions,
"设置合理的读写超时时间",
"建议根据业务需求设置,通常3-10秒")
}
result.Checks = append(result.Checks, check)
}
// 性能基准测试
func (cd *ConnectionDiagnostics) performPerformanceBenchmark(result *ConnectionDiagnosticResult) {
check := DiagnosticCheck{
Name: "性能基准测试",
Status: "PASS",
Message: "",
Suggestions: []string{},
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 执行基准测试
testCount := 100
var totalLatency time.Duration
successCount := 0
for i := 0; i < testCount; i++ {
start := time.Now()
_, err := cd.client.Ping(ctx).Result()
latency := time.Since(start)
if err != nil {
continue
}
totalLatency += latency
successCount++
}
if successCount == 0 {
check.Status = "FAIL"
check.Message = "所有基准测试都失败了"
check.Suggestions = append(check.Suggestions,
"检查网络连接稳定性",
"确认Redis服务状态",
"检查客户端配置")
} else {
avgLatency := totalLatency / time.Duration(successCount)
successRate := float64(successCount) / float64(testCount) * 100
check.Message = fmt.Sprintf("成功率: %.1f", usagePercent),
Recommendations: []string{
"立即执行内存清理",
"检查大键并优化",
"考虑删除过期数据",
"准备扩容方案",
},
}
mm.sendAlert(alert)
} else if usagePercent >= mm.alertThreshold {
alert := MemoryAlert{
Level: "WARNING",
Usage: usagePercent,
UsedMemory: memoryInfo.UsedMemory,
MaxMemory: memoryInfo.MaxMemory,
Timestamp: time.Now(),
Message: fmt.Sprintf("内存使用率较高: %.2f%%", usagePercent),
Recommendations: []string{
"监控内存使用趋势",
"检查数据增长情况",
"优化数据结构",
"清理无用数据",
},
}
mm.sendAlert(alert)
}
}
// 内存信息
type MemoryInfo struct {
UsedMemory int64
MaxMemory int64
UsedMemoryRss int64
UsedMemoryPeak int64
Fragmentation float64
}
// 解析内存信息
func (mm *MemoryMonitor) parseMemoryInfo(info string) MemoryInfo {
memoryInfo := MemoryInfo{}
lines := strings.Split(info, "\r\n")
for _, line := range lines {
if strings.HasPrefix(line, "used_memory:") {
memoryInfo.UsedMemory = parseMemoryValue(line)
} else if strings.HasPrefix(line, "maxmemory:") {
memoryInfo.MaxMemory = parseMemoryValue(line)
} else if strings.HasPrefix(line, "used_memory_rss:") {
memoryInfo.UsedMemoryRss = parseMemoryValue(line)
} else if strings.HasPrefix(line, "used_memory_peak:") {
memoryInfo.UsedMemoryPeak = parseMemoryValue(line)
}
}
if memoryInfo.UsedMemory > 0 {
memoryInfo.Fragmentation = float64(memoryInfo.UsedMemoryRss) / float64(memoryInfo.UsedMemory)
}
return memoryInfo
}
// 发送告警
func (mm *MemoryMonitor) sendAlert(alert MemoryAlert) {
select {
case mm.alertChannel <- alert:
default:
// 通道满了,丢弃告警
}
}
// 获取告警通道
func (mm *MemoryMonitor) AlertChannel() <-chan MemoryAlert {
return mm.alertChannel
}
// 停止监控
func (mm *MemoryMonitor) Stop() {
close(mm.stopChan)
}
---
d.内存优化策略
i.设置maxmemory和maxmemory-policy
ii.定期清理过期键
iii.优化数据结构设计
iv.使用Redis压缩
v.分片和集群部署
vi.监控内存使用趋势
04.数据一致性问题
a.缓存穿透
a.问题现象
大量请求查询不存在的数据,绕过缓存直接访问数据库。
b.缓存穿透防护
---
// 缓存穿透防护器
type CachePenetrationProtector struct {
client redis.Cmdable
nullCache *NullCache
bloomFilter *BloomFilter
logger *log.Logger
}
// 空值缓存
type NullCache struct {
client redis.Cmdable
keyPrefix string
ttl time.Duration
}
// 布隆过滤器
type BloomFilter struct {
bitSet []bool
size uint
hashCount uint
client redis.Cmdable
key string
}
// 创建缓存穿透防护器
func NewCachePenetrationProtector(client redis.Cmdable) *CachePenetrationProtector {
return &CachePenetrationProtector{
client: client,
nullCache: NewNullCache(client, "null_cache:", 5*time.Minute),
bloomFilter: NewBloomFilter(client, "cache_filter", 1000000, 4),
logger: log.Default(),
}
}
// 安全查询(带缓存穿透防护)
func (cpp *CachePenetrationProtector) SafeQuery(ctx context.Context, key string) (interface{}, bool, error) {
// 1. 检查布隆过滤器
if !cpp.bloomFilter.MightContain(key) {
// 键一定不存在
return nil, false, nil
}
// 2. 检查空值缓存
if nullValue, exists := cpp.nullCache.Get(ctx, key); exists {
// 返回缓存的空值
return nil, false, nil
}
// 3. 正常查询缓存
value, err := cpp.client.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {
// 缓存中没有,设置空值缓存
cpp.nullCache.Set(ctx, key, "")
return nil, false, nil
}
return nil, false, err
}
return value, true, nil
}
// 添加键到布隆过滤器
func (cpp *CachePenetrationProtector) AddKey(ctx context.Context, key string) error {
return cpp.bloomFilter.Add(ctx, key)
}
// 空值缓存实现
func NewNullCache(client redis.Cmdable, keyPrefix string, ttl time.Duration) *NullCache {
return &NullCache{
client: client,
keyPrefix: keyPrefix,
ttl: ttl,
}
}
func (nc *NullCache) Set(ctx context.Context, key string, value string) error {
nullKey := nc.keyPrefix + key
return nc.client.Set(ctx, nullKey, value, nc.ttl).Err()
}
func (nc *NullCache) Get(ctx context.Context, key string) (string, bool) {
nullKey := nc.keyPrefix + key
value, err := nc.client.Get(ctx, nullKey).Result()
if err == redis.Nil {
return "", false
}
return value, true
}
// 布隆过滤器实现
func NewBloomFilter(client redis.Cmdable, key string, size uint, hashCount uint) *BloomFilter {
return &BloomFilter{
bitSet: make([]bool, size),
size: size,
hashCount: hashCount,
client: client,
key: key,
}
}
func (bf *BloomFilter) Add(ctx context.Context, item string) error {
hashes := bf.hash(item)
for _, hash := range hashes {
bitIndex := hash % uint32(bf.size)
bf.client.SetBit(ctx, bf.key, int64(bitIndex), 1)
}
return nil
}
func (bf *BloomFilter) MightContain(item string) bool {
ctx := context.Background()
hashes := bf.hash(item)
for _, hash := range hashes {
bitIndex := hash % uint32(bf.size)
bit, err := bf.client.GetBit(ctx, bf.key, int64(bitIndex)).Result()
if err != nil || bit == 0 {
return false
}
}
return true
}
func (bf *BloomFilter) hash(item string) []uint32 {
// 简化的哈希实现,实际应用中应使用更强的哈希算法
var hashes []uint32
hash1 := uint32(0)
hash2 := uint32(0)
for _, c := range item {
hash1 = hash1*31 + uint32(c)
hash2 = hash2*37 + uint32(c)
}
for i := uint(0); i < bf.hashCount; i++ {
combinedHash := hash1 + uint32(i)*hash2
hashes = append(hashes, combinedHash)
}
return hashes
}
---
b.缓存雪崩
a.问题现象
大量缓存同时失效,导致大量请求同时访问数据库。
b.缓存雪崩防护
---
// 缓存雪崩防护器
type CacheAvalancheProtector struct {
client redis.Cmdable
randomJitter time.Duration
staggeredTTL time.Duration
preWarm bool
preWarmKeys []string
logger *log.Logger
}
// 创建缓存雪崩防护器
func NewCacheAvalancheProtector(client redis.Cmdable) *CacheAvalancheProtector {
return &CacheAvalancheProtector{
client: client,
randomJitter: 5 * time.Minute,
staggeredTTL: 10 * time.Minute,
preWarm: true,
logger: log.Default(),
}
}
// 安全设置缓存(添加随机过期时间)
func (cap *CacheAvalancheProtector) SafeSet(ctx context.Context, key string, value interface{}, baseTTL time.Duration) error {
// 添加随机抖动
jitter := time.Duration(rand.Int63n(int64(cap.randomJitter)))
finalTTL := baseTTL + jitter
return cap.client.Set(ctx, key, value, finalTTL).Err()
}
// 批量设置缓存(错开过期时间)
func (cap *CacheAvalancheProtector) BatchSafeSet(ctx context.Context, items map[string]interface{}, baseTTL time.Duration) error {
pipe := cap.client.Pipeline()
i := 0
for key, value := range items {
// 为每个键添加不同的延迟过期
staggerDelay := time.Duration(i) * cap.staggeredTTL / time.Duration(len(items))
jitter := time.Duration(rand.Int63n(int64(cap.randomJitter)))
finalTTL := baseTTL + staggerDelay + jitter
pipe.Set(ctx, key, value, finalTTL)
i++
}
_, err := pipe.Exec(ctx)
return err
}
// 缓存预热
func (cap *CacheAvalancheProtector) PreWarmCache(ctx context.Context) error {
if !cap.preWarm || len(cap.preWarmKeys) == 0 {
return nil
}
cap.logger.Printf("开始缓存预热,键数量: %d", len(cap.preWarmKeys))
for _, key := range cap.preWarmKeys {
// 这里应该从数据源加载并设置到缓存
// 具体实现取决于业务逻辑
if err := cap.loadAndSetCache(ctx, key); err != nil {
cap.logger.Printf("预热键 %s 失败: %v", key, err)
}
}
cap.logger.Printf("缓存预热完成")
return nil
}
// 加载并设置缓存(示例实现)
func (cap *CacheAvalancheProtector) loadAndSetCache(ctx context.Context, key string) error {
// 模拟从数据库加载数据
value := fmt.Sprintf("data_for_%s", key)
ttl := 1 * time.Hour
return cap.SafeSet(ctx, key, value, ttl)
}
// 设置预热键列表
func (cap *CacheAvalancheProtector) SetPreWarmKeys(keys []string) {
cap.preWarmKeys = keys
}
// 检查缓存健康状态
func (cap *CacheAvalancheProtector) CheckCacheHealth(ctx context.Context) (*CacheHealthStatus, error) {
status := &CacheHealthStatus{
Timestamp: time.Now(),
KeyCount: 0,
HitRate: 0,
MemoryUsage: 0,
Status: "HEALTHY",
}
// 获取基本信息
info, err := cap.client.Info(ctx, "keyspace", "memory").Result()
if err != nil {
status.Status = "ERROR"
status.Message = fmt.Sprintf("获取Redis信息失败: %v", err)
return status, err
}
// 解析信息
cap.parseRedisInfo(info, status)
// 健康检查
if status.HitRate < 50 {
status.Status = "WARNING"
status.Message = "缓存命中率较低"
}
if status.MemoryUsage > 80 {
status.Status = "WARNING"
if status.Message != "" {
status.Message += "; "
}
status.Message += "内存使用率较高"
}
return status, nil
}
// 缓存健康状态
type CacheHealthStatus struct {
Timestamp time.Time `json:"timestamp"`
KeyCount int64 `json:"key_count"`
HitRate float64 `json:"hit_rate"`
MemoryUsage float64 `json:"memory_usage"`
Status string `json:"status"`
Message string `json:"message"`
}
// 解析Redis信息
func (cap *CacheAvalancheProtector) parseRedisInfo(info string, status *CacheHealthStatus) {
lines := strings.Split(info, "\r\n")
for _, line := range lines {
if strings.Contains(line, "keys=") {
if keys, err := parseNumericValue(line); err == nil {
status.KeyCount = keys
}
}
}
}
---
c.缓存击穿
a.问题现象
热点key过期瞬间,大量请求同时访问数据库。
b.缓存击穿防护
---
// 缓存击穿防护器
type CacheBreakdownProtector struct {
client redis.Cmdable
lockTimeout time.Duration
retryTimes int
logger *log.Logger
}
// 创建缓存击穿防护器
func NewCacheBreakdownProtector(client redis.Cmdable) *CacheBreakdownProtector {
return &CacheBreakdownProtector{
client: client,
lockTimeout: 10 * time.Second,
retryTimes: 3,
logger: log.Default(),
}
}
// 安全获取缓存(防止击穿)
func (cbp *CacheBreakdownProtector) SafeGet(ctx context.Context, key string, loader func() (interface{}, error)) (interface{}, error) {
// 1. 先尝试从缓存获取
value, err := cbp.client.Get(ctx, key).Result()
if err == nil {
return value, nil
}
if err != redis.Nil {
return nil, fmt.Errorf("缓存查询失败: %w", err)
}
// 2. 缓存中没有,尝试获取分布式锁
lockKey := fmt.Sprintf("lock:%s", key)
lockValue := fmt.Sprintf("%d", time.Now().UnixNano())
// 尝试获取锁
locked, err := cbp.client.SetNX(ctx, lockKey, lockValue, cbp.lockTimeout).Result()
if err != nil {
return nil, fmt.Errorf("获取锁失败: %w", err)
}
if locked {
// 3. 获得锁,负责加载数据
defer cbp.releaseLock(ctx, lockKey, lockValue)
return cbp.loadAndCache(ctx, key, loader)
} else {
// 4. 没有获得锁,等待并重试
return cbp.waitAndRetry(ctx, key, loader)
}
}
// 加载数据并缓存
func (cbp *CacheBreakdownProtector) loadAndCache(ctx context.Context, key string, loader func() (interface{}, error)) (interface{}, error) {
// 双重检查:可能在等待期间其他线程已经加载了数据
value, err := cbp.client.Get(ctx, key).Result()
if err == nil {
return value, nil
}
// 从数据源加载
start := time.Now()
data, err := loader()
loadTime := time.Since(start)
if err != nil {
return nil, fmt.Errorf("数据加载失败: %w", err)
}
// 设置缓存
cacheTTL := cbp.calculateCacheTTL(loadTime)
if err := cbp.client.Set(ctx, key, data, cacheTTL).Err(); err != nil {
cbp.logger.Printf("设置缓存失败: %v", err)
}
cbp.logger.Printf("加载并缓存数据 %s,耗时: %v, TTL: %v", key, loadTime, cacheTTL)
return data, nil
}
// 等待并重试
func (cbp *CacheBreakdownProtector) waitAndRetry(ctx context.Context, key string, loader func() (interface{}, error)) (interface{}, error) {
for i := 0; i < cbp.retryTimes; i++ {
// 等待一段时间
waitTime := time.Duration(50*(i+1)) * time.Millisecond
time.Sleep(waitTime)
// 再次尝试从缓存获取
value, err := cbp.client.Get(ctx, key).Result()
if err == nil {
return value, nil
}
if err != redis.Nil {
cbp.logger.Printf("缓存查询失败: %v", err)
}
}
// 重试次数用完,尝试直接加载(作为降级策略)
cbp.logger.Printf("重试次数用完,尝试直接加载数据: %s", key)
return cbp.loadAndCache(ctx, key, loader)
}
// 释放锁
func (cbp *CacheBreakdownProtector) releaseLock(ctx context.Context, lockKey, lockValue string) {
// 使用Lua脚本确保原子性释放锁
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
cbp.client.Eval(ctx, script, []string{lockKey}, lockValue)
}
// 计算缓存TTL
func (cbp *CacheBreakdownProtector) calculateCacheTTL(loadTime time.Duration) time.Duration {
// 基于加载时间计算合理的TTL
baseTTL := 5 * time.Minute
if loadTime > 1*time.Second {
baseTTL = 30 * time.Minute
} else if loadTime > 100*time.Millisecond {
baseTTL = 15 * time.Minute
}
// 添加随机抖动
jitter := time.Duration(rand.Int63n(int64(baseTTL) / 10))
return baseTTL + jitter
}
---
05.集群和分布式问题
a.数据分片问题
a.问题现象
数据分布不均,某些节点负载过高,出现热点问题。
b.分片优化策略
---
// 数据分片管理器
type ShardManager struct {
client redis.Cmdable
shardCount int
hashRing *HashRing
loadBalancer *LoadBalancer
metrics *ShardMetrics
}
// 哈希环
type HashRing struct {
ring map[uint32]string
sortedKeys []uint32
replicas int
}
// 负载均衡器
type LoadBalancer struct {
nodeWeights map[string]int
currentLoad map[string]int64
}
// 分片指标
type ShardMetrics struct {
ShardLoads map[string]int64
HotKeys map[string]int64
Distribution map[string]float64
RebalanceCount int64
}
// 创建分片管理器
func NewShardManager(client redis.Cmdable, shardCount int) *ShardManager {
return &ShardManager{
client: client,
shardCount: shardCount,
hashRing: NewHashRing(shardCount, 150),
loadBalancer: &LoadBalancer{
nodeWeights: make(map[string]int),
currentLoad: make(map[string]int64),
},
metrics: &ShardMetrics{
ShardLoads: make(map[string]int64),
HotKeys: make(map[string]int64),
Distribution: make(map[string]float64),
},
}
}
// 添加节点到哈希环
func (sm *ShardManager) AddNode(nodeID string, weight int) {
sm.hashRing.AddNode(nodeID)
sm.loadBalancer.nodeWeights[nodeID] = weight
sm.loadBalancer.currentLoad[nodeID] = 0
}
// 获取数据所在分片
func (sm *ShardManager) GetShard(key string) string {
shard := sm.hashRing.GetNode(key)
// 更新负载统计
sm.loadBalancer.currentLoad[shard]++
return shard
}
// 智能分片策略(基于热点检测)
func (sm *ShardManager) IntelligentSharding(key string) string {
// 检查是否为热点键
if sm.isHotKey(key) {
// 热点键使用特殊分片策略
return sm.getHotKeyShard(key)
}
// 普通键使用一致性哈希
return sm.hashRing.GetNode(key)
}
// 检查是否为热点键
func (sm *ShardManager) isHotKey(key string) bool {
load, exists := sm.metrics.HotKeys[key]
if !exists {
return false
}
// 如果访问频率超过阈值,认为是热点
return load > 1000
}
// 获取热点键的分片
func (sm *ShardManager) getHotKeyShard(key string) string {
// 为热点键选择负载最低的分片
return sm.loadBalancer.GetLeastLoadedNode()
}
// 重新平衡分片
func (sm *ShardManager) Rebalance() error {
// 收集当前分片状态
shardStats := sm.collectShardStats()
// 计算理想分布
idealDistribution := sm.calculateIdealDistribution()
// 执行数据迁移
return sm.performRebalance(shardStats, idealDistribution)
}
// 收集分片统计信息
func (sm *ShardManager) collectShardStats() map[string]*ShardStats {
stats := make(map[string]*ShardStats)
for nodeID := range sm.loadBalancer.nodeWeights {
stat := &ShardStats{
NodeID: nodeID,
KeyCount: 0,
Memory: 0,
Load: sm.loadBalancer.currentLoad[nodeID],
}
// 实际实现中应该从Redis节点获取统计信息
// 这里简化处理
stats[nodeID] = stat
}
return stats
}
// 理想分布
type ShardStats struct {
NodeID string
KeyCount int64
Memory int64
Load int64
}
// 计算理想分布
func (sm *ShardManager) calculateIdealDistribution() map[string]int64 {
totalWeight := 0
for _, weight := range sm.loadBalancer.nodeWeights {
totalWeight += weight
}
ideal := make(map[string]int64)
for nodeID, weight := range sm.loadBalancer.nodeWeights {
ideal[nodeID] = int64(float64(weight) / float64(totalWeight) * 1000)
}
return ideal
}
// 执行重新平衡
func (sm *ShardManager) performRebalance(current map[string]*ShardStats, ideal map[string]int64) error {
// 简化的重新平衡逻辑
// 实际实现需要考虑数据迁移的复杂性和一致性
for nodeID, idealLoad := range ideal {
currentStat := current[nodeID]
if currentStat.Load > idealLoad*2 {
// 节点负载过高,需要迁移数据
sm.logger.Printf("节点 %s 负载过高,需要迁移数据", nodeID)
}
}
sm.metrics.RebalanceCount++
return nil
}
// 哈希环实现
func NewHashRing(nodeCount, replicas int) *HashRing {
return &HashRing{
ring: make(map[uint32]string),
sortedKeys: make([]uint32, 0),
replicas: replicas,
}
}
func (hr *HashRing) AddNode(nodeID string) {
for i := 0; i < hr.replicas; i++ {
hash := hr.hash(fmt.Sprintf("%s:%d", nodeID, i))
hr.ring[hash] = nodeID
hr.sortedKeys = append(hr.sortedKeys, hash)
}
sort.Slice(hr.sortedKeys, func(i, j int) bool {
return hr.sortedKeys[i] < hr.sortedKeys[j]
})
}
func (hr *HashRing) GetNode(key string) string {
if len(hr.ring) == 0 {
return ""
}
hash := hr.hash(key)
// 找到第一个大于等于hash值的节点
idx := sort.Search(len(hr.sortedKeys), func(i int) bool {
return hr.sortedKeys[i] >= hash
})
if idx == len(hr.sortedKeys) {
idx = 0
}
return hr.ring[hr.sortedKeys[idx]]
}
func (hr *HashRing) hash(key string) uint32 {
hash := fnv.New32a()
hash.Write([]byte(key))
return hash.Sum32()
}
// 负载均衡器实现
func (lb *LoadBalancer) GetLeastLoadedNode() string {
var leastLoaded string
var minLoad int64 = math.MaxInt64
for nodeID, load := range lb.currentLoad {
if load < minLoad {
minLoad = load
leastLoaded = nodeID
}
}
return leastLoaded
}
---
b.集群脑裂问题
a.问题现象
集群节点之间网络分区,导致数据不一致。
b.脑裂预防和处理
---
// 脑裂检测和防护
type SplitBrainDetector struct {
client redis.Cmdable
nodeID string
quorumSize int
heartbeatInterval time.Duration
lastHeartbeat time.Time
isActive bool
logger *log.Logger
}
// 创建脑裂检测器
func NewSplitBrainDetector(client redis.Cmdable, nodeID string, quorumSize int) *SplitBrainDetector {
return &SplitBrainDetector{
client: client,
nodeID: nodeID,
quorumSize: quorumSize,
heartbeatInterval: 5 * time.Second,
lastHeartbeat: time.Now(),
isActive: false,
logger: log.Default(),
}
}
// 启动脑裂检测
func (sbd *SplitBrainDetector) Start() {
ticker := time.NewTicker(sbd.heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sbd.sendHeartbeat()
sbd.checkClusterHealth()
}
}
}
// 发送心跳
func (sbd *SplitBrainDetector) sendHeartbeat() {
ctx := context.Background()
heartbeatKey := fmt.Sprintf("cluster:heartbeat:%s", sbd.nodeID)
heartbeat := Heartbeat{
NodeID: sbd.nodeID,
Timestamp: time.Now(),
Status: "active",
}
data, _ := json.Marshal(heartbeat)
// 设置心跳信息,过期时间为心跳间隔的3倍
ttl := sbd.heartbeatInterval * 3
sbd.client.Set(ctx, heartbeatKey, data, ttl)
sbd.lastHeartbeat = time.Now()
sbd.isActive = true
}
// 检查集群健康状态
func (sbd *SplitBrainDetector) checkClusterHealth() {
ctx := context.Background()
// 获取所有活跃节点的心跳
pattern := "cluster:heartbeat:*"
keys, err := sbd.client.Keys(ctx, pattern).Result()
if err != nil {
sbd.logger.Printf("获取心跳键失败: %v", err)
return
}
activeNodes := make([]string, 0)
now := time.Now()
for _, key := range keys {
data, err := sbd.client.Get(ctx, key).Result()
if err != nil {
continue
}
var heartbeat Heartbeat
if err := json.Unmarshal([]byte(data), &heartbeat); err != nil {
continue
}
// 检查心跳是否超时
if now.Sub(heartbeat.Timestamp) < sbd.heartbeatInterval*2 {
activeNodes = append(activeNodes, heartbeat.NodeID)
}
}
// 检查是否满足法定人数
if len(activeNodes) < sbd.quorumSize {
sbd.handleSplitBrain(activeNodes)
} else {
sbd.logger.Printf("集群健康,活跃节点数: %d", len(activeNodes))
}
}
// 处理脑裂情况
func (sbd *SplitBrainDetector) handleSplitBrain(activeNodes []string) {
sbd.logger.Printf("检测到可能的脑裂,活跃节点数: %d,法定人数: %d", len(activeNodes), sbd.quorumSize)
// 检查当前节点是否在多数派中
inMajority := false
for _, node := range activeNodes {
if node == sbd.nodeID {
inMajority = true
break
}
}
if !inMajority {
// 当前节点不在多数派中,应该进入安全模式
sbd.enterSafeMode()
} else {
sbd.logger.Printf("当前节点在多数派中,继续正常运行")
}
}
// 进入安全模式
func (sbd *SplitBrainDetector) enterSafeMode() {
sbd.logger.Printf("进入安全模式,停止写操作")
sbd.isActive = false
// 实际实现中应该:
// 1. 停止接受写请求
// 2. 只允许读操作
// 3. 定期检查集群状态
// 4. 准备数据恢复
}
// 心跳信息
type Heartbeat struct {
NodeID string `json:"node_id"`
Timestamp time.Time `json:"timestamp"`
Status string `json:"status"`
}
// 恢复操作
func (sbd *SplitBrainDetector) AttemptRecovery() error {
ctx := context.Background()
// 尝试重新连接集群
if err := sbd.reconnectToCluster(ctx); err != nil {
return fmt.Errorf("重新连接集群失败: %w", err)
}
// 同步数据
if err := sbd.syncData(ctx); err != nil {
return fmt.Errorf("数据同步失败: %w", err)
}
// 退出安全模式
sbd.isActive = true
sbd.logger.Printf("集群恢复完成,退出安全模式")
return nil
}
// 重新连接集群
func (sbd *SplitBrainDetector) reconnectToCluster(ctx context.Context) error {
// 尝试连接到其他节点
// 实际实现需要根据集群配置进行连接
sbd.logger.Printf("尝试重新连接集群")
return nil
}
// 同步数据
func (sbd *SplitBrainDetector) syncData(ctx context.Context) error {
// 实现数据同步逻辑
// 可能需要比较数据版本,解决冲突等
sbd.logger.Printf("开始数据同步")
return nil
}
// 获取当前状态
func (sbd *SplitBrainDetector) GetStatus() *ClusterStatus {
return &ClusterStatus{
NodeID: sbd.nodeID,
IsActive: sbd.isActive,
LastUpdate: sbd.lastHeartbeat,
Healthy: sbd.isActive,
}
}
// 集群状态
type ClusterStatus struct {
NodeID string `json:"node_id"`
IsActive bool `json:"is_active"`
LastUpdate time.Time `json:"last_update"`
Healthy bool `json:"healthy"`
}
---
06.安全问题和最佳实践
a.安全配置建议
i.启用密码认证和ACL控制
ii.限制Redis监听地址
iii.定期更新Redis版本
iv.配置防火墙规则
v.监控异常访问行为
b.运维最佳实践
i.定期备份Redis数据
ii.监控服务器性能指标
iii.配置日志记录和审计
iv.建立应急响应机制
v.定期进行安全审计
c.性能优化总结
i.选择合适的数据结构
ii.合理设置缓存过期策略
iii.使用Pipeline和批量操作
iv.优化连接池配置
v.监控和调优系统参数
d.故障处理预案
i.制定详细的故障处理流程
ii.准备数据恢复方案
iii.建立备用系统架构
iv.定期进行故障演练
v.保持技术文档的更新