Слияние кода завершено, страница обновится автоматически
// Package rueidis is a fast Golang Redis RESP3 client that does auto pipelining and supports client side caching.
package rueidis
//go:generate go run hack/cmds/gen.go internal/cmds hack/cmds/*.json
import (
"context"
"crypto/tls"
"errors"
"math"
"net"
"runtime"
"strings"
"time"
"github.com/redis/rueidis/internal/util"
)
const (
// DefaultCacheBytes is the default value of ClientOption.CacheSizeEachConn, which is 128 MiB
DefaultCacheBytes = 128 * (1 << 20)
// DefaultRingScale is the default value of ClientOption.RingScaleEachConn, which results into having a ring of size 2^10 for each connection
DefaultRingScale = 10
// DefaultPoolSize is the default value of ClientOption.BlockingPoolSize
DefaultPoolSize = 1024
// DefaultBlockingPipeline is the default value of ClientOption.BlockingPipeline
DefaultBlockingPipeline = 2000
// DefaultDialTimeout is the default value of ClientOption.Dialer.Timeout
DefaultDialTimeout = 5 * time.Second
// DefaultTCPKeepAlive is the default value of ClientOption.Dialer.KeepAlive
DefaultTCPKeepAlive = 1 * time.Second
// DefaultReadBuffer is the default value of bufio.NewReaderSize for each connection, which is 0.5MiB
DefaultReadBuffer = 1 << 19
// DefaultWriteBuffer is the default value of bufio.NewWriterSize for each connection, which is 0.5MiB
DefaultWriteBuffer = 1 << 19
// MaxPipelineMultiplex is the maximum meaningful value for ClientOption.PipelineMultiplex
MaxPipelineMultiplex = 8
// https://github.com/valkey-io/valkey/blob/1a34a4ff7f101bb6b17a0b5e9aa3bf7d6bd29f68/src/networking.c#L4118-L4124
ClientModeCluster ClientMode = "cluster"
ClientModeSentinel ClientMode = "sentinel"
ClientModeStandalone ClientMode = "standalone"
)
var (
// ErrClosing means the Client.Close had been called
ErrClosing = errors.New("rueidis client is closing or unable to connect redis")
// ErrNoAddr means the ClientOption.InitAddress is empty
ErrNoAddr = errors.New("no alive address in InitAddress")
// ErrNoCache means your redis does not support client-side caching and must set ClientOption.DisableCache to true
ErrNoCache = errors.New("ClientOption.DisableCache must be true for redis not supporting client-side caching or not supporting RESP3")
// ErrRESP2PubSubMixed means your redis does not support RESP3 and rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE in mixed case
ErrRESP2PubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other commands in RESP2")
// ErrBlockingPubSubMixed rueidis can't handle SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands
ErrBlockingPubSubMixed = errors.New("rueidis does not support SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE mixed with other blocking commands")
// ErrDoCacheAborted means redis abort EXEC request or connection closed
ErrDoCacheAborted = errors.New("failed to fetch the cache because EXEC was aborted by redis or connection closed")
// ErrReplicaOnlyNotSupported means ReplicaOnly flag is not supported by
// current client
ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client")
// ErrNoSendToReplicas means SendToReplicas function must be provided for standalone client with replicas.
ErrNoSendToReplicas = errors.New("no SendToReplicas provided for standalone client with replicas")
// ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex
ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex")
// ErrDedicatedClientRecycled means the caller attempted to use the dedicated client which has been already recycled (after canceled/closed).
ErrDedicatedClientRecycled = errors.New("dedicated client should not be used after recycled")
// DisableClientSetInfo is the value that can be used for ClientOption.ClientSetInfo to disable making the CLIENT SETINFO command
DisableClientSetInfo = make([]string, 0)
)
// ClientOption should be passed to NewClient to construct a Client
type ClientOption struct {
TLSConfig *tls.Config
// DialFn allows for a custom function to be used to create net.Conn connections
// Deprecated: use DialCtxFn instead.
DialFn func(string, *net.Dialer, *tls.Config) (conn net.Conn, err error)
// DialCtxFn allows for a custom function to be used to create net.Conn connections
DialCtxFn func(context.Context, string, *net.Dialer, *tls.Config) (conn net.Conn, err error)
// NewCacheStoreFn allows a custom client side caching store for each connection
NewCacheStoreFn NewCacheStoreFn
// OnInvalidations is a callback function in case of client-side caching invalidation received.
// Note that this function must be fast, otherwise other redis messages will be blocked.
OnInvalidations func([]RedisMessage)
// SendToReplicas is a function that returns true if the command should be sent to replicas.
// currently only used for cluster client.
// NOTE: This function can't be used with ReplicaOnly option.
SendToReplicas func(cmd Completed) bool
// AuthCredentialsFn allows for setting the AUTH username and password dynamically on each connection attempt to
// support rotating credentials
AuthCredentialsFn func(AuthCredentialsContext) (AuthCredentials, error)
// RetryDelay is the function that returns the delay that should be used before retrying the attempt.
// The default is an exponential backoff with a maximum delay of 1 second.
// Only used when DisableRetry is false.
RetryDelay RetryDelayFn
// ReplicaSelector selects a replica node when `SendToReplicas` returns true.
// If the function is set, the client will send selected command to the replica node.
// Returned value is the index of the replica node in the replicas slice.
// If the returned value is out of range, the primary node will be selected.
// If primary node does not have any replica, the primary node will be selected
// and function will not be called.
// Currently only used for cluster client.
// Each ReplicaInfo must not be modified.
// NOTE: This function can't be used with ReplicaOnly option.
// NOTE: This function must be used with SendToReplicas function.
ReplicaSelector func(slot uint16, replicas []ReplicaInfo) int
// Sentinel options, including MasterSet and Auth options
Sentinel SentinelOption
// TCP & TLS
// Dialer can be used to customized how rueidis connect to a redis instance via TCP, including:
// - Timeout, the default is DefaultDialTimeout
// - KeepAlive, the default is DefaultTCPKeepAlive
// The Dialer.KeepAlive interval is used to detect an unresponsive idle tcp connection.
// OS takes at least (tcp_keepalive_probes+1)*Dialer.KeepAlive time to conclude an idle connection to be unresponsive.
// For example: DefaultTCPKeepAlive = 1s and the default of tcp_keepalive_probes on Linux is 9.
// Therefore, it takes at least 10s to kill an idle and unresponsive tcp connection on Linux by default.
Dialer net.Dialer
// Redis AUTH parameters
Username string
Password string
ClientName string
// ClientSetInfo will assign various info attributes to the current connection.
// Note that ClientSetInfo should have exactly 2 values, the lib name and the lib version respectively.
ClientSetInfo []string
// InitAddress point to redis nodes.
// Rueidis will connect to them one by one and issue CLUSTER SLOT command to initialize the cluster client until success.
// If len(InitAddress) == 1 and the address is not running in cluster mode, rueidis will fall back to the single client mode.
// If ClientOption.Sentinel.MasterSet is set, then InitAddress will be used to connect sentinels
// You can bypass this behaviour by using ClientOption.ForceSingleClient.
InitAddress []string
// ClientTrackingOptions will be appended to CLIENT TRACKING ON command when the connection is established.
// The default is []string{"OPTIN"}
ClientTrackingOptions []string
// Standalone is the option for the standalone client.
Standalone StandaloneOption
SelectDB int
// CacheSizeEachConn is redis client side cache size that bind to each TCP connection to a single redis instance.
// The default is DefaultCacheBytes.
CacheSizeEachConn int
// RingScaleEachConn sets the size of the ring buffer in each connection to (2 ^ RingScaleEachConn).
// The default is RingScaleEachConn, which results into having a ring of size 2^10 for each connection.
// Reduce this value can reduce the memory consumption of each connection at the cost of potential throughput degradation.
// Values smaller than 8 is typically not recommended.
RingScaleEachConn int
// ReadBufferEachConn is the size of the bufio.NewReaderSize for each connection, default to DefaultReadBuffer (0.5 MiB).
ReadBufferEachConn int
// WriteBufferEachConn is the size of the bufio.NewWriterSize for each connection, default to DefaultWriteBuffer (0.5 MiB).
WriteBufferEachConn int
// BlockingPoolCleanup is the duration for cleaning up idle connections.
// If BlockingPoolCleanup is 0, then idle connections will not be cleaned up.
BlockingPoolCleanup time.Duration
// BlockingPoolMinSize is the minimum size of the connection pool
// shared by blocking commands (ex BLPOP, XREAD with BLOCK).
// Only relevant if BlockingPoolCleanup is not 0. This parameter limits
// the number of idle connections that can be removed by BlockingPoolCleanup.
BlockingPoolMinSize int
// BlockingPoolSize is the size of the connection pool shared by blocking commands (ex BLPOP, XREAD with BLOCK).
// The default is DefaultPoolSize.
BlockingPoolSize int
// BlockingPipeline is the threshold of a pipeline that will be treated as blocking commands when exceeding it.
BlockingPipeline int
// PipelineMultiplex determines how many tcp connections used to pipeline commands to one redis instance.
// The default for single and sentinel clients is 2, which means 4 connections (2^2).
// The default for cluster clients is 0, which means 1 connection (2^0).
PipelineMultiplex int
// ConnWriteTimeout is read/write timeout for each connection. If specified,
// it is used to control the maximum duration waits for responses to pipeline commands.
// Also, ConnWriteTimeout is applied net.Conn.SetDeadline and periodic PING to redis
// Since the Dialer.KeepAlive will not be triggered if there is data in the outgoing buffer,
// ConnWriteTimeout should be set in order to detect local congestion or unresponsive redis server.
// This default is ClientOption.Dialer.KeepAlive * (9+1), where 9 is the default of tcp_keepalive_probes on Linux.
ConnWriteTimeout time.Duration
// MaxFlushDelay when greater than zero pauses pipeline write loop for some time (not larger than MaxFlushDelay)
// after each flushing of data to the connection. This gives pipeline a chance to collect more commands to send
// to Redis. Adding this delay increases latency, reduces throughput – but in most cases may significantly reduce
// application and Redis CPU utilization due to less executed system calls. By default, Rueidis flushes data to the
// connection without extra delays. Depending on network latency and application-specific conditions the value
// of MaxFlushDelay may vary, sth like 20 microseconds should not affect latency/throughput a lot but still
// produce notable CPU usage reduction under load. Ref: https://github.com/redis/rueidis/issues/156
MaxFlushDelay time.Duration
// ClusterOption is the options for the redis cluster client.
ClusterOption ClusterOption
// DisableTCPNoDelay turns on Nagle's algorithm in pipelining mode by using conn.SetNoDelay(false).
// Turning this on can result in lower p99 latencies and lower CPU usages if all your requests are small.
// But if you have large requests or fast network, this might degrade the performance. Ref: https://github.com/redis/rueidis/pull/650
DisableTCPNoDelay bool
// ShuffleInit is a handy flag that shuffles the InitAddress after passing to the NewClient() if it is true
ShuffleInit bool
// ClientNoTouch controls whether commands alter LRU/LFU stats
ClientNoTouch bool
// DisableRetry disables retrying read-only commands under network errors
DisableRetry bool
// DisableCache falls back Client.DoCache/Client.DoMultiCache to Client.Do/Client.DoMulti
DisableCache bool
// DisableAutoPipelining makes rueidis.Client always pick a connection from the BlockingPool to serve each request.
DisableAutoPipelining bool
// AlwaysPipelining makes rueidis.Client always pipeline redis commands even if they are not issued concurrently.
AlwaysPipelining bool
// AlwaysRESP2 makes rueidis.Client always uses RESP2, otherwise it will try using RESP3 first.
AlwaysRESP2 bool
// ForceSingleClient force the usage of a single client connection, without letting the lib guessing
// if redis instance is a cluster or a single redis instance.
ForceSingleClient bool
// ReplicaOnly indicates that this client will only try to connect to readonly replicas of redis setup.
ReplicaOnly bool
// ClientNoEvict sets the client eviction mode for the current connection.
// When turned on and client eviction is configured,
// the current connection will be excluded from the client eviction process
// even if we're above the configured client eviction threshold.
ClientNoEvict bool
// EnableReplicaAZInfo enables the client to load the replica node's availability zone.
// If true, the client will set the `AZ` field in `ReplicaInfo`.
EnableReplicaAZInfo bool
}
// SentinelOption contains MasterSet,
type SentinelOption struct {
// TCP & TLS, same as ClientOption but for connecting sentinel
Dialer net.Dialer
TLSConfig *tls.Config
// MasterSet is the redis master set name monitored by sentinel cluster.
// If this field is set, then ClientOption.InitAddress will be used to connect to sentinel cluster.
MasterSet string
// Redis AUTH parameters for sentinel
Username string
Password string
ClientName string
}
// ClusterOption is the options for the redis cluster client.
type ClusterOption struct {
// ShardsRefreshInterval is the interval to scan the cluster topology.
// If the value is zero, refreshment will be disabled.
// Cluster topology cache refresh happens always in the background after successful scan.
ShardsRefreshInterval time.Duration
}
// StandaloneOption is the options for the standalone client.
type StandaloneOption struct {
// ReplicaAddress is the list of replicas for the primary node.
// Note that these addresses must be online and can not be promoted.
// An example use case is the reader endpoint provided by cloud vendors.
ReplicaAddress []string
}
// ReplicaInfo is the information of a replica node in a redis cluster.
type ReplicaInfo struct {
Addr string
AZ string
}
type ClientMode string
// Client is the redis client interface for both single redis instance and redis cluster. It should be created from the NewClient()
type Client interface {
CoreClient
// DoCache is similar to Do, but it uses opt-in client side caching and requires a client side TTL.
// The explicit client side TTL specifies the maximum TTL on the client side.
// If the key's TTL on the server is smaller than the client side TTL, the client side TTL will be capped.
// client.Do(ctx, client.B().Get().Key("k").Cache(), time.Minute).ToString()
// The above example will send the following command to redis if cache miss:
// CLIENT CACHING YES
// PTTL k
// GET k
// The in-memory cache size is configured by ClientOption.CacheSizeEachConn.
// The cmd parameter is recycled after passing into DoCache() and should not be reused.
DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult)
// DoMultiCache is similar to DoCache, but works with multiple cacheable commands across different slots.
// It will first group commands by slots and will send only cache missed commands to redis.
DoMultiCache(ctx context.Context, multi ...CacheableTTL) (resp []RedisResult)
// DoStream send a command to redis through a dedicated connection acquired from a connection pool.
// It returns a RedisResultStream, but it does not read the command response until the RedisResultStream.WriteTo is called.
// After the RedisResultStream.WriteTo is called, the underlying connection is then recycled.
// DoStream should only be used when you want to stream redis response directly to an io.Writer without additional allocation,
// otherwise, the normal Do() should be used instead.
// Also note that DoStream can only work with commands returning string, integer, or float response.
DoStream(ctx context.Context, cmd Completed) RedisResultStream
// DoMultiStream is similar to DoStream, but pipelines multiple commands to redis.
// It returns a MultiRedisResultStream, and users should call MultiRedisResultStream.WriteTo as many times as the number of commands sequentially
// to read each command response from redis. After all responses are read, the underlying connection is then recycled.
// DoMultiStream should only be used when you want to stream redis responses directly to an io.Writer without additional allocation,
// otherwise, the normal DoMulti() should be used instead.
// DoMultiStream does not support multiple key slots when connecting to a redis cluster.
DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisResultStream
// Dedicated acquire a connection from the blocking connection pool, no one else can use the connection
// during Dedicated. The main usage of Dedicated is CAS operation, which is WATCH + MULTI + EXEC.
// However, one should try to avoid CAS operation but use Lua script instead, because occupying a connection
// is not good for performance.
Dedicated(fn func(DedicatedClient) error) (err error)
// Dedicate does the same as Dedicated, but it exposes DedicatedClient directly
// and requires user to invoke cancel() manually to put connection back to the pool.
Dedicate() (client DedicatedClient, cancel func())
// Nodes returns each redis node this client known as rueidis.Client. This is useful if you want to
// send commands to some specific redis nodes in the cluster.
Nodes() map[string]Client
// Mode returns the current mode of the client, which indicates whether the client is operating
// in standalone, sentinel, or cluster mode.
// This can be useful for determining the type of Redis deployment the client is connected to
// and for making decisions based on the deployment type.
Mode() ClientMode
}
// DedicatedClient is obtained from Client.Dedicated() and it will be bound to single redis connection and
// no other commands can be pipelined in to this connection during Client.Dedicated().
// If the DedicatedClient is obtained from cluster client, the first command to it must have a Key() to identify the redis node.
type DedicatedClient interface {
CoreClient
// SetPubSubHooks is an alternative way to processing Pub/Sub messages instead of using Receive.
// SetPubSubHooks is non-blocking and allows users to subscribe/unsubscribe channels later.
// Note that the hooks will be called sequentially but in another goroutine.
// The return value will be either:
// 1. an error channel, if the hooks passed in is not zero, or
// 2. nil, if the hooks passed in is zero. (used for reset hooks)
// In the former case, the error channel is guaranteed to be close when the hooks will not be called anymore,
// and has at most one error describing the reason why the hooks will not be called anymore.
// Users can use the error channel to detect disconnection.
SetPubSubHooks(hooks PubSubHooks) <-chan error
}
// CoreClient is the minimum interface shared by the Client and the DedicatedClient.
type CoreClient interface {
// B is the getter function to the command builder for the client
// If the client is a cluster client, the command builder also prohibits cross key slots in one command.
B() Builder
// Do is the method sending user's redis command building from the B() to a redis node.
// client.Do(ctx, client.B().Get().Key("k").Build()).ToString()
// All concurrent non-blocking commands will be pipelined automatically and have better throughput.
// Blocking commands will use another separated connection pool.
// The cmd parameter is recycled after passing into Do() and should not be reused.
Do(ctx context.Context, cmd Completed) (resp RedisResult)
// DoMulti takes multiple redis commands and sends them together, reducing RTT from the user code.
// The multi parameters are recycled after passing into DoMulti() and should not be reused.
DoMulti(ctx context.Context, multi ...Completed) (resp []RedisResult)
// Receive accepts SUBSCRIBE, SSUBSCRIBE, PSUBSCRIBE command and a message handler.
// Receive will block and then return value only when the following cases:
// 1. return nil when received any unsubscribe/punsubscribe message related to the provided `subscribe` command.
// 2. return ErrClosing when the client is closed manually.
// 3. return ctx.Err() when the `ctx` is done.
// 4. return non-nil err when the provided `subscribe` command failed.
Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) error
// Close will make further calls to the client be rejected with ErrClosing,
// and Close will wait until all pending calls finished.
Close()
}
// CT is a shorthand constructor for CacheableTTL
func CT(cmd Cacheable, ttl time.Duration) CacheableTTL {
return CacheableTTL{Cmd: cmd, TTL: ttl}
}
// CacheableTTL is parameter container of DoMultiCache
type CacheableTTL struct {
Cmd Cacheable
TTL time.Duration
}
// AuthCredentialsContext is the parameter container of AuthCredentialsFn
type AuthCredentialsContext struct {
Address net.Addr
}
// AuthCredentials is the output of AuthCredentialsFn
type AuthCredentials struct {
Username string
Password string
}
// NewClient uses ClientOption to initialize the Client for both cluster client and single client.
// It will first try to connect as cluster client. If the len(ClientOption.InitAddress) == 1 and
// the address does not enable cluster mode, the NewClient() will use single client instead.
func NewClient(option ClientOption) (client Client, err error) {
if option.ReadBufferEachConn < 32 { // the buffer should be able to hold an int64 string at least
option.ReadBufferEachConn = DefaultReadBuffer
}
if option.WriteBufferEachConn < 32 {
option.WriteBufferEachConn = DefaultWriteBuffer
}
if option.CacheSizeEachConn <= 0 {
option.CacheSizeEachConn = DefaultCacheBytes
}
if option.Dialer.Timeout == 0 {
option.Dialer.Timeout = DefaultDialTimeout
}
if option.Dialer.KeepAlive == 0 {
option.Dialer.KeepAlive = DefaultTCPKeepAlive
}
if option.ConnWriteTimeout == 0 {
option.ConnWriteTimeout = max(DefaultTCPKeepAlive, option.Dialer.KeepAlive) * 10
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
if option.DisableAutoPipelining {
option.AlwaysPipelining = false
}
if option.ShuffleInit {
util.Shuffle(len(option.InitAddress), func(i, j int) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
})
}
if option.PipelineMultiplex > MaxPipelineMultiplex {
return nil, ErrWrongPipelineMultiplex
}
if option.RetryDelay == nil {
option.RetryDelay = defaultRetryDelayFn
}
if option.Sentinel.MasterSet != "" {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newSentinelClient(&option, makeConn, newRetryer(option.RetryDelay))
}
if len(option.Standalone.ReplicaAddress) > 0 {
if option.SendToReplicas == nil {
return nil, ErrNoSendToReplicas
}
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newStandaloneClient(&option, makeConn, newRetryer(option.RetryDelay))
}
if option.ForceSingleClient {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newSingleClient(&option, nil, makeConn, newRetryer(option.RetryDelay))
}
if client, err = newClusterClient(&option, makeConn, newRetryer(option.RetryDelay)); err != nil {
if client == (*clusterClient)(nil) {
return nil, err
}
if len(option.InitAddress) == 1 && (err.Error() == redisErrMsgCommandNotAllow || strings.Contains(strings.ToUpper(err.Error()), "CLUSTER")) {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
client, err = newSingleClient(&option, client.(*clusterClient).single(), makeConn, newRetryer(option.RetryDelay))
} else {
client.Close()
return nil, err
}
}
return client, err
}
func singleClientMultiplex(multiplex int) int {
if multiplex == 0 {
if multiplex = int(math.Log2(float64(runtime.GOMAXPROCS(0)))); multiplex >= 2 {
multiplex = 2
}
}
if multiplex < 0 {
multiplex = 0
}
return multiplex
}
func makeConn(dst string, opt *ClientOption) conn {
return makeMux(dst, opt, dial)
}
func dial(ctx context.Context, dst string, opt *ClientOption) (conn net.Conn, err error) {
if opt.DialCtxFn != nil {
return opt.DialCtxFn(ctx, dst, &opt.Dialer, opt.TLSConfig)
}
if opt.DialFn != nil {
return opt.DialFn(dst, &opt.Dialer, opt.TLSConfig)
}
if opt.TLSConfig != nil {
dialer := tls.Dialer{NetDialer: &opt.Dialer, Config: opt.TLSConfig}
conn, err = dialer.DialContext(ctx, "tcp", dst)
} else {
conn, err = opt.Dialer.DialContext(ctx, "tcp", dst)
}
return conn, err
}
const redisErrMsgCommandNotAllow = "command is not allowed"
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )