Redis介绍 Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。
Redis支持的数据结构 Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。
利用 LIST 可以实现队列的功能。
利用 HyperLogLog 统计UV、PV等数据。
使用 geospatial index 进行地理位置相关查询。
go-redis库 安装 Go 社区中目前有很多成熟的 redis client 库,比如 和 等。本文使用 go-redis 这个库来操作 Redis 数据库。
使用以下命令下安装 go-redis 库。
1 go get -redis/v8
1 go get -redis/v9
连接 在项目中导入 go-redis
1 import ""
普通连接模式 go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。
1 2 3 4 5 6 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379" , Password: "" , DB: 0 , PoolSize: 20 , })
除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。
1 2 3 4 5 opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>" )if err != nil { panic (err) } rdb := redis.NewClient(opt)
TLS连接模式 如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。
1 2 3 4 5 6 7 rdb := redis.NewClient(&redis.Options{ TLSConfig: &tls.Config{ MinVersion: tls.VersionTLS12, }, })
Redis Sentinel模式 使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。
1 2 3 4 rdb := redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: "master-name" , SentinelAddrs: []string {":9126" , ":9127" , ":9128" }, })
Redis Cluster模式 使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。
1 2 3 4 5 6 7 rdb := redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string {":7000" , ":7001" , ":7002" , ":7003" , ":7004" , ":7005" }, })
基本使用 执行命令 下面的示例代码演示了 go-redis 库的基本使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func doCommand () { ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() val, err := rdb.Get(ctx, "key" ).Result() fmt.Println(val, err) cmder := rdb.Get(ctx, "key" ) fmt.Println(cmder.Val()) fmt.Println(cmder.Err()) err = rdb.Set(ctx, "key" , 10 , time.Hour).Err() value := rdb.Get(ctx, "key" ).Val() fmt.Println(value) }
:这是超时时间,表示 500 毫秒。在这个例子中,如果操作没有在 500 毫秒内完成,上下文将会被取消。
defer cancel()
:这是在函数结束时调用 cancel()
函数,以确保在函数执行完毕后及时取消上下文。这个 cancel()
这段代码的作用是创建一个具有 500 毫秒超时的上下文,这样在执行操作时,如果操作在 500 毫秒内没有完成,就会自动取消。
执行任意命令 go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。
1 2 3 4 5 6 7 8 9 10 11 12 func doDemo () { ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() err := rdb.Do(ctx, "set" , "key" , 10 , "EX" , 3600 ).Err() fmt.Println(err) val, err := rdb.Do(ctx, "get" , "key" ).Result() fmt.Println(val, err) }
redis.Nil go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func getValueFromRedis (key, defaultValue string ) (string , error ) { ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() val, err := rdb.Get(ctx, key).Result() if err != nil { if errors.Is(err, redis.Nil) { return defaultValue, nil } return "" , err } return val, nil }
其他示例 zset示例 下面的示例代码演示了如何使用 go-redis 库操作 zset。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 func zsetDemo () { zsetKey := "language_rank" languages := []redis.Z{ {Score: 90.0 , Member: "Golang" }, {Score: 98.0 , Member: "Java" }, {Score: 95.0 , Member: "Python" }, {Score: 97.0 , Member: "JavaScript" }, {Score: 99.0 , Member: "C/C++" }, } ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() err := rdb.ZAdd(ctx, zsetKey, languages...).Err() if err != nil { fmt.Printf("zadd failed, err:%v\n" , err) return } fmt.Println("zadd success" ) newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0 , "Golang" ).Result() if err != nil { fmt.Printf("zincrby failed, err:%v\n" , err) return } fmt.Printf("Golang's score is %f now.\n" , newScore) ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0 , 2 ).Val() for _, z := range ret { fmt.Println(z.Member, z.Score) } op := &redis.ZRangeBy{ Min: "95" , Max: "100" , } ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result() if err != nil { fmt.Printf("zrangebyscore failed, err:%v\n" , err) return } for _, z := range ret { fmt.Println(z.Member, z.Score) } }
1 2 3 4 5 6 7 8 9 10 zadd success Golang's score is 100.000000 now. Golang 100 C/C++ 99 Java 98 Python 95 JavaScript 97 Java 98 C/C++ 99 Golang 100
扫描或遍历所有key 在Redis中可以使用KEYS prefix*
命令按前缀查询所有符合条件的 key,go-redis
vals, err **:=** rdb.Keys(ctx, "user:*").Result()
但是如果需要扫描数百万的 key ,那速度就会比较慢。这种场景下你可以使用Scan
命令来遍历所有符合要求的 key。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func scanKeysDemo1 () { ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() var cursor uint64 for { var keys []string var err error keys, cursor, err = rdb.Scan(ctx, cursor, "prefix:*" , 0 ).Result() if err != nil { panic (err) } for _, key := range keys { fmt.Println("key" , key) } if cursor == 0 { break } } }
:这是 Redis 客户端库中用于执行 SCAN 命令的方法。它接受一个上下文(context)对象作为第一个参数,然后是游标(cursor)、匹配模式(pattern)和 COUNT 参数。
:这是 SCAN 命令中的游标参数,用于标识当前迭代的位置。在第一次调用时,通常设置为 0,后续调用会返回新的游标值,以便进行下一次迭代。
:这是 SCAN 命令中的 COUNT 参数,用于指定每次迭代返回的最大元素数量。0 表示返回所有匹配的键。
外部的 for
循环是用来处理整个 SCAN 过程的迭代。在每次迭代中,它执行一次 SCAN 命令,获取一批匹配的键,并处理这批键。然后,它检查游标值是否为 0。如果游标为 0,表示已经扫描完所有的键,就退出循环,结束整个 SCAN 过程。如果游标不为 0,表示还有更多的键需要扫描,就继续下一次迭代,执行下一次 SCAN 命令。
游标(cursor)在 Redis 中用于处理 SCAN 命令,它是一种用于分页扫描大量键的机制。SCAN 命令可以用于迭代遍历 Redis 数据库中的键,而不会阻塞服务器,因此在处理大量键时非常有用。使用游标可以将扫描结果分批返回,避免一次性返回大量数据给客户端,减轻客户端和服务器的压力。
1 2 3 4 5 6 7 8 9 10 11 12 13 func scanKeysDemo2 () { ctx, cancel := context.WithTimeout(context.Background(), 500 *time.Millisecond) defer cancel() iter := rdb.Scan(ctx, 0 , "prefix:*" , 0 ).Iterator() for iter.Next(ctx) { fmt.Println("keys" , iter.Val()) } if err := iter.Err(); err != nil { panic (err) } }
例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func delKeysByMatch (match string , timeout time.Duration) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() iter := rdb.Scan(ctx, 0 , match, 0 ).Iterator() for iter.Next(ctx) { err := rdb.Del(ctx, iter.Val()).Err() if err != nil { panic (err) } } if err := iter.Err(); err != nil { panic (err) } }
此外,对于 Redis 中的 set、hash、zset 数据类型,go-redis
1 2 3 iter := rdb.SScan(ctx, "set-key" , 0 , "prefix:*" , 0 ).Iterator() iter := rdb.HScan(ctx, "hash-key" , 0 , "prefix:*" , 0 ).Iterator() iter := rdb.ZScan(ctx, "sorted-hash-key" , 0 , "prefix:*" , 0 ).Iterator()
Pipeline Redis Pipeline 允许通过使用单个 client-server-client 往返执行多个命令来提高性能。区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline 中,然后使用1次读写操作像执行单个命令一样执行它们。这样做的好处是节省了执行命令的网络往返时间(RTT)。
下面的示例代码中演示了使用 pipeline 通过一个 write + read 操作来执行多个命令。
1 2 3 4 5 6 7 8 9 10 11 12 pipe := rdb.Pipeline() incr := pipe.Incr(ctx, "pipeline_counter" ) pipe.Expire(ctx, "pipeline_counter" , time.Hour) cmds, err := pipe.Exec(ctx)if err != nil { panic (err) } fmt.Println(incr.Val())
上面的代码相当于将以下两个命令一次发给 Redis Server 端执行,与不使用 Pipeline 相比能减少一次RTT。
1 2 INCR pipeline_counter EXPIRE pipeline_counts 3600
方法,它会在函数退出时调用 Exec。
1 2 3 4 5 6 7 8 9 10 11 12 13 var incr *redis.IntCmd cmds, err := rdb.Pipelined(ctx, func (pipe redis.Pipeliner) error { incr = pipe.Incr(ctx, "pipelined_counter" ) pipe.Expire(ctx, "pipelined_counter" , time.Hour) return nil })if err != nil { panic (err) } fmt.Println(incr.Val())
我们可以遍历 pipeline 命令的返回值依次获取每个命令的结果。下方的示例代码中使用pipiline一次执行了100个 Get 命令,在pipeline 执行后遍历取出100个命令的执行结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 cmds, err := rdb.Pipelined(ctx, func (pipe redis.Pipeliner) error { for i := 0 ; i < 100 ; i++ { pipe.Get(ctx, fmt.Sprintf("key%d" , i)) } return nil })if err != nil { panic (err) }for _, cmd := range cmds { fmt.Println(cmd.(*redis.StringCmd).Val()) }
在那些我们需要一次性执行多个命令的场景下,就可以考虑使用 pipeline 来优化。
事务 在 Redis 中,MULTI 是一个事务(transaction)命令,它用于标记一个事务的开始。在 MULTI 命令之后,所有后续的命令都会被添加到一个事务队列中,而不会立即执行。只有在 EXEC 命令被调用时,才会执行所有在 MULTI 和 EXEC 之间添加到事务队列中的命令。
MULTI 命令不接受任何参数,它只是一个简单的标记,表示后续的命令应该被视为一个事务的一部分。在调用 MULTI 命令后,Redis 服务器会进入事务状态,并在接收到 EXEC 命令时执行事务队列中的所有命令。
使用事务可以保证一系列的 Redis 命令在执行过程中不会被其他客户端的命令中断。如果在 MULTI 和 EXEC 之间的某个时间点发生了错误,Redis 会取消事务,并且事务队列中的所有命令都不会执行。
举个例子,以下是一个在 Redis 使用 MULTI 命令创建事务的示例:
1 2 3 4 5 MULTISET key1 value1SET key2 value2GET key1EXEC
Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec
在这种场景我们需要使用 TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTI
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 pipe := rdb.TxPipeline() incr := pipe.Incr(ctx, "tx_pipeline_counter" ) pipe.Expire(ctx, "tx_pipeline_counter" , time.Hour) _, err := pipe.Exec(ctx) fmt.Println(incr.Val(), err)var incr2 *redis.IntCmd _, err = rdb.TxPipelined(ctx, func (pipe redis.Pipeliner) error { incr2 = pipe.Incr(ctx, "tx_pipeline_counter" ) pipe.Expire(ctx, "tx_pipeline_counter" , time.Hour) return nil }) fmt.Println(incr2.Val(), err)
1 2 3 4 MULTI INCR pipeline_counter EXPIRE pipeline_counts 3600 EXEC
TxPipeline :
是在事务中使用管道(pipeline)的方式。使用 TxPipeline
创建的事务对象可以执行多个命令,并将这些命令一次性发送给 Redis 服务器,然后一次性获取所有命令的响应。这样做可以减少网络往返次数,提高性能。但是,TxPipeline
TxPipelined :
是在事务中使用异步管道(pipeline)的方式。使用 TxPipelined
创建的事务对象允许在事务中的每个命令之间执行其他 Go 代码。这些命令被添加到事务队列中,并在调用 TxPipelined
对象的 Exec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func exampleTxPipeline () { pipe := rdb.TxPipeline() pipe.Set("key1" , "value1" , 0 ) pipe.Set("key2" , "value2" , 0 ) pipe.Get("key1" ) _, err := pipe.Exec() if err != nil { panic (err) } }func exampleTxPipelined () { pipe := rdb.TxPipelined() pipe.Set("key1" , "value1" , 0 ) pipe.Set("key2" , "value2" , 0 ) pipe.Get("key1" ) go func () { fmt.Println("Do something else while waiting for transaction to finish" ) }() _, err := pipe.Exec() if err != nil { panic (err) } }
Watch 我们通常搭配 WATCH
命令监视某个 key 开始,直到执行EXEC
命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC
1 Watch(fn func (*Tx) error , keys ...string ) error
下面的代码片段演示了 Watch 方法搭配 TxPipelined 的使用示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func watchDemo (ctx context.Context, key string ) error { return rdb.Watch(ctx, func (tx *redis.Tx) error { n, err := tx.Get(ctx, key).Int() if err != nil && err != redis.Nil { return err } time.Sleep(5 * time.Second) _, err = tx.TxPipelined(ctx, func (pipe redis.Pipeliner) error { pipe.Set(ctx, key, n+1 , time.Hour) return nil }) return err }, key) }
将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed
最后我们来看一个 go-redis 官方文档中使用 GET
命令实现一个 INCR 命令的完整示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package mainimport ( "context" "errors" "fmt" "" "sync" "time" )func main () { opt, err := redis.ParseURL("redis://@localhost:6379/0" ) if err != nil { panic (err) } rdb := redis.NewClient(opt) const routineCount = 100 ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() increment := func (key string ) error { txf := func (tx *redis.Tx) error { n, err := tx.Get(ctx, key).Int() if err != nil && !errors.Is(err, redis.Nil) { return err } n++ _, err = tx.TxPipelined(ctx, func (pipe redis.Pipeliner) error { pipe.Set(ctx, key, n, 0 ) return nil }) return err } for retries := routineCount; retries > 0 ; retries-- { err := rdb.Watch(ctx, txf, key) if !errors.Is(err, redis.TxFailedErr) { return err } } return errors.New("increment reached maximum number of retries" ) } var wg sync.WaitGroup wg.Add(routineCount) for i := 0 ; i < routineCount; i++ { go func () { defer wg.Done() if err := increment("counter3" ); err != nil { fmt.Println("increment error:" , err) } }() } wg.Wait() n, err := rdb.Get(ctx, "counter3" ).Int() fmt.Println("最终结果:" , n, err) }
在这个示例中使用了 redis.TxFailedErr
更多详情请查阅官方文档 。