1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-rueidis

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
client_test.go 44 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
fscnick Отправлено 24.03.2025 07:50 f426952
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467
package rueidis
import (
"context"
"errors"
"io"
"net"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/redis/rueidis/internal/cmds"
)
type mockConn struct {
DoFn func(cmd Completed) RedisResult
DoCacheFn func(cmd Cacheable, ttl time.Duration) RedisResult
DoMultiFn func(multi ...Completed) *redisresults
DoMultiCacheFn func(multi ...CacheableTTL) *redisresults
ReceiveFn func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
DoStreamFn func(cmd Completed) RedisResultStream
DoMultiStreamFn func(cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
VersionFn func() int
AZFn func() string
ErrorFn func() error
CloseFn func()
DialFn func() error
AcquireFn func() wire
StoreFn func(w wire)
OverrideFn func(c conn)
AddrFn func() string
DoOverride map[string]func(cmd Completed) RedisResult
DoCacheOverride map[string]func(cmd Cacheable, ttl time.Duration) RedisResult
ReceiveOverride map[string]func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
}
func (m *mockConn) Override(c conn) {
if m.OverrideFn != nil {
m.OverrideFn(c)
}
}
func (m *mockConn) Dial() error {
if m.DialFn != nil {
return m.DialFn()
}
return nil
}
func (m *mockConn) Acquire(ctx context.Context) wire {
if m.AcquireFn != nil {
return m.AcquireFn()
}
return nil
}
func (m *mockConn) Store(w wire) {
if m.StoreFn != nil {
m.StoreFn(w)
}
}
func (m *mockConn) Do(ctx context.Context, cmd Completed) RedisResult {
if fn := m.DoOverride[strings.Join(cmd.Commands(), " ")]; fn != nil {
return fn(cmd)
}
if m.DoFn != nil {
return m.DoFn(cmd)
}
return RedisResult{}
}
func (m *mockConn) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult {
if fn := m.DoCacheOverride[strings.Join(cmd.Commands(), " ")]; fn != nil {
return fn(cmd, ttl)
}
if m.DoCacheFn != nil {
return m.DoCacheFn(cmd, ttl)
}
return RedisResult{}
}
func (m *mockConn) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisresults {
overrides := make([]RedisResult, 0, len(multi))
for _, cmd := range multi {
if fn := m.DoCacheOverride[strings.Join(cmd.Cmd.Commands(), " ")]; fn != nil {
overrides = append(overrides, fn(cmd.Cmd, cmd.TTL))
}
}
if len(overrides) == len(multi) {
return &redisresults{s: overrides}
}
if m.DoMultiCacheFn != nil {
return m.DoMultiCacheFn(multi...)
}
return nil
}
func (m *mockConn) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
overrides := make([]RedisResult, 0, len(multi))
for _, cmd := range multi {
if fn := m.DoOverride[strings.Join(cmd.Commands(), " ")]; fn != nil {
overrides = append(overrides, fn(cmd))
}
}
if len(overrides) == len(multi) {
return &redisresults{s: overrides}
}
if m.DoMultiFn != nil {
return m.DoMultiFn(multi...)
}
return nil
}
func (m *mockConn) Receive(ctx context.Context, subscribe Completed, hdl func(message PubSubMessage)) error {
if fn := m.ReceiveOverride[strings.Join(subscribe.Commands(), " ")]; fn != nil {
return fn(ctx, subscribe, hdl)
}
if m.ReceiveFn != nil {
return m.ReceiveFn(ctx, subscribe, hdl)
}
return nil
}
func (m *mockConn) DoStream(ctx context.Context, cmd Completed) RedisResultStream {
if m.DoStreamFn != nil {
return m.DoStreamFn(cmd)
}
return RedisResultStream{}
}
func (m *mockConn) DoMultiStream(ctx context.Context, cmd ...Completed) MultiRedisResultStream {
if m.DoMultiStreamFn != nil {
return m.DoMultiStreamFn(cmd...)
}
return MultiRedisResultStream{}
}
func (m *mockConn) CleanSubscriptions() {
panic("not implemented")
}
func (m *mockConn) SetPubSubHooks(_ PubSubHooks) <-chan error {
panic("not implemented")
}
func (m *mockConn) SetOnCloseHook(func(error)) {
}
func (m *mockConn) Info() map[string]RedisMessage {
if m.InfoFn != nil {
return m.InfoFn()
}
return nil
}
func (m *mockConn) Version() int {
if m.VersionFn != nil {
return m.VersionFn()
}
return 0
}
func (m *mockConn) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}
func (m *mockConn) Error() error {
if m.ErrorFn != nil {
return m.ErrorFn()
}
return nil
}
func (m *mockConn) Close() {
if m.CloseFn != nil {
m.CloseFn()
}
}
func (m *mockConn) Addr() string {
if m.AddrFn != nil {
return m.AddrFn()
}
return ""
}
func (m *mockConn) OptInCmd() cmds.Completed {
return cmds.OptInCmd
}
func TestNewSingleClientNoNode(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
if _, err := newSingleClient(
&ClientOption{}, nil, func(dst string, opt *ClientOption) conn {
return nil
}, newRetryer(defaultRetryDelayFn),
); err != ErrNoAddr {
t.Fatalf("unexpected err %v", err)
}
}
func TestNewSingleClientReplicaOnlyNotSupported(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
if _, err := newSingleClient(
&ClientOption{ReplicaOnly: true, InitAddress: []string{"localhost"}}, nil, func(dst string, opt *ClientOption) conn { return nil }, newRetryer(defaultRetryDelayFn),
); err != ErrReplicaOnlyNotSupported {
t.Fatalf("unexpected err %v", err)
}
}
func TestNewSingleClientError(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
v := errors.New("dail err")
if _, err := newSingleClient(
&ClientOption{InitAddress: []string{""}}, nil, func(dst string, opt *ClientOption) conn { return &mockConn{DialFn: func() error { return v }} }, newRetryer(defaultRetryDelayFn),
); err != v {
t.Fatalf("unexpected err %v", err)
}
}
func TestNewSingleClientOverride(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m1 := &mockConn{}
var m2 conn
if _, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m1,
func(dst string, opt *ClientOption) conn {
return &mockConn{OverrideFn: func(c conn) { m2 = c }}
},
newRetryer(defaultRetryDelayFn),
); err != nil {
t.Fatalf("unexpected err %v", err)
}
if m2.(*mockConn) != m1 {
t.Fatalf("unexpected m2 %v", m2)
}
}
//gocyclo:ignore
func TestSingleClient(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := &mockConn{
AddrFn: func() string { return "myaddr" },
}
client, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("Nodes", func(t *testing.T) {
if nodes := client.Nodes(); len(nodes) != 1 || nodes["myaddr"] != client {
t.Fatalf("unexpected nodes")
}
})
t.Run("Mode", func(t *testing.T) {
if v := client.Mode(); v != ClientModeStandalone {
t.Fatalf("unexpected mode %v", v)
}
})
t.Run("Delegate Do", func(t *testing.T) {
c := client.B().Get().Key("Do").Build()
m.DoFn = func(cmd Completed) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), c.Commands()) {
t.Fatalf("unexpected command %v", cmd)
}
return newResult(strmsg('+', "Do"), nil)
}
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoStream", func(t *testing.T) {
c := client.B().Get().Key("Do").Build()
m.DoStreamFn = func(cmd Completed) RedisResultStream {
return RedisResultStream{e: errors.New(cmd.Commands()[1])}
}
if s := client.DoStream(context.Background(), c); s.Error().Error() != "Do" {
t.Fatalf("unexpected response %v", s.Error())
}
})
t.Run("Delegate DoMulti", func(t *testing.T) {
c := client.B().Get().Key("Do").Build()
m.DoMultiFn = func(cmd ...Completed) *redisresults {
if !reflect.DeepEqual(cmd[0].Commands(), c.Commands()) {
t.Fatalf("unexpected command %v", cmd)
}
return &redisresults{s: []RedisResult{newResult(strmsg('+', "Do"), nil)}}
}
if len(client.DoMulti(context.Background())) != 0 {
t.Fatalf("unexpected response length")
}
if v, err := client.DoMulti(context.Background(), c)[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiStream", func(t *testing.T) {
c := client.B().Get().Key("Do").Build()
m.DoMultiStreamFn = func(cmd ...Completed) MultiRedisResultStream {
return MultiRedisResultStream{e: errors.New(cmd[0].Commands()[1])}
}
if s := client.DoMultiStream(context.Background()); s.Error() != io.EOF {
t.Fatalf("unexpected response %v", err)
}
if s := client.DoMultiStream(context.Background(), c); s.Error().Error() != "Do" {
t.Fatalf("unexpected response %v", s.Error())
}
})
t.Run("Delegate DoCache", func(t *testing.T) {
c := client.B().Get().Key("DoCache").Cache()
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
if !reflect.DeepEqual(cmd.Commands(), c.Commands()) || ttl != 100 {
t.Fatalf("unexpected command %v, %v", cmd, ttl)
}
return newResult(strmsg('+', "DoCache"), nil)
}
if v, err := client.DoCache(context.Background(), c, 100).ToString(); err != nil || v != "DoCache" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiCache", func(t *testing.T) {
c := client.B().Get().Key("DoCache").Cache()
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
if !reflect.DeepEqual(multi[0].Cmd.Commands(), c.Commands()) || multi[0].TTL != 100 {
t.Fatalf("unexpected command %v, %v", multi[0].Cmd, multi[0].TTL)
}
return &redisresults{s: []RedisResult{newResult(strmsg('+', "DoCache"), nil)}}
}
if len(client.DoMultiCache(context.Background())) != 0 {
t.Fatalf("unexpected response length")
}
if v, err := client.DoMultiCache(context.Background(), CT(c, 100))[0].ToString(); err != nil || v != "DoCache" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Receive", func(t *testing.T) {
c := client.B().Subscribe().Channel("ch").Build()
hdl := func(message PubSubMessage) {}
m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
if !reflect.DeepEqual(subscribe.Commands(), c.Commands()) {
t.Fatalf("unexpected command %v", subscribe)
}
return nil
}
if err := client.Receive(context.Background(), c, hdl); err != nil {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Delegate Receive Redis Err", func(t *testing.T) {
c := client.B().Subscribe().Channel("ch").Build()
e := &RedisError{}
m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return e
}
if err := client.Receive(context.Background(), c, func(message PubSubMessage) {}); err != e {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Delegate Close", func(t *testing.T) {
called := false
m.CloseFn = func() { called = true }
client.Close()
if !called {
t.Fatalf("Close is not delegated")
}
})
t.Run("Dedicated Err", func(t *testing.T) {
v := errors.New("fn err")
if err := client.Dedicated(func(client DedicatedClient) error {
return v
}); err != v {
t.Fatalf("unexpected err %v", err)
}
})
t.Run("Dedicated Delegate Receive Redis Err", func(t *testing.T) {
e := &RedisError{}
w := &mockWire{
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return e
},
}
m.AcquireFn = func() wire {
return w
}
if err := client.Dedicated(func(c DedicatedClient) error {
return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {})
}); err != e {
t.Fatalf("unexpected err %v", err)
}
})
t.Run("Dedicated Delegate", func(t *testing.T) {
closed := false
w := &mockWire{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "Delegate"), nil)
},
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "Delegate"), nil)}}
},
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return ErrClosing
},
SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error {
ch := make(chan error, 1)
ch <- ErrClosing
close(ch)
return ch
},
ErrorFn: func() error {
return ErrClosing
},
CloseFn: func() {
closed = true
},
}
m.AcquireFn = func() wire {
return w
}
stored := false
m.StoreFn = func(ww wire) {
if ww != w {
t.Fatalf("received unexpected wire %v", ww)
}
stored = true
}
if err := client.Dedicated(func(c DedicatedClient) error {
if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
}
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}
if err := <-c.SetPubSubHooks(PubSubHooks{}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}
c.Close()
return nil
}); err != nil {
t.Fatalf("unexpected err %v", err)
}
if !stored {
t.Fatalf("Dedicated desn't put back the wire")
}
if !closed {
t.Fatalf("Dedicated desn't delegate Close")
}
})
t.Run("Dedicate Delegate", func(t *testing.T) {
closed := false
w := &mockWire{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "Delegate"), nil)
},
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "Delegate"), nil)}}
},
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return ErrClosing
},
SetPubSubHooksFn: func(hooks PubSubHooks) <-chan error {
ch := make(chan error, 1)
ch <- ErrClosing
close(ch)
return ch
},
ErrorFn: func() error {
return ErrClosing
},
CloseFn: func() {
closed = true
},
}
m.AcquireFn = func() wire {
return w
}
stored := false
m.StoreFn = func(ww wire) {
if ww != w {
t.Fatalf("received unexpected wire %v", ww)
}
stored = true
}
c, cancel := client.Dedicate()
if v, err := c.Do(context.Background(), c.B().Get().Key("a").Build()).ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
t.Fatalf("unexpected response %v %v", v, err)
}
}
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}
if err := <-c.SetPubSubHooks(PubSubHooks{}); err != ErrClosing {
t.Fatalf("unexpected ret %v", err)
}
c.Close()
cancel()
if !stored {
t.Fatalf("Dedicated desn't put back the wire")
}
if !closed {
t.Fatalf("Dedicated desn't delegate Close")
}
})
t.Run("Dedicate Delegate Release On Close", func(t *testing.T) {
stored := 0
w := &mockWire{}
m.AcquireFn = func() wire { return w }
m.StoreFn = func(ww wire) { stored++ }
c, _ := client.Dedicate()
c.Close()
if stored != 1 {
t.Fatalf("unexpected stored count %v", stored)
}
})
t.Run("Dedicate Delegate No Duplicate Release", func(t *testing.T) {
stored := 0
w := &mockWire{}
m.AcquireFn = func() wire { return w }
m.StoreFn = func(ww wire) { stored++ }
c, cancel := client.Dedicate()
c.Close()
c.Close() // should have no effect
cancel() // should have no effect
cancel() // should have no effect
if stored != 1 {
t.Fatalf("unexpected stored count %v", stored)
}
})
t.Run("Dedicate ErrDedicatedClientRecycled after released", func(t *testing.T) {
m.AcquireFn = func() wire { return &mockWire{} }
check := func(err error) {
if !errors.Is(err, ErrDedicatedClientRecycled) {
t.Fatalf("unexpected err %v", err)
}
}
for _, closeFn := range []func(client DedicatedClient, cancel func()){
func(client DedicatedClient, cancel func()) {
client.Close()
},
func(client DedicatedClient, cancel func()) {
cancel()
},
} {
c, cancel := client.Dedicate()
closeFn(c, cancel)
for _, fn := range []func(){
func() {
resp := c.Do(context.Background(), c.B().Get().Key("k").Build())
check(resp.Error())
},
func() {
resp := c.DoMulti(context.Background(), c.B().Get().Key("k").Build())
for _, r := range resp {
check(r.Error())
}
},
func() {
err := c.Receive(context.Background(), c.B().Subscribe().Channel("k").Build(), func(msg PubSubMessage) {})
check(err)
},
func() {
ch := c.SetPubSubHooks(PubSubHooks{})
check(<-ch)
},
} {
fn()
}
}
})
}
func TestSingleClientRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
SetupClientRetry(t, func(m *mockConn) Client {
c, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return c
})
}
//gocyclo:ignore
func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
setup := func() (Client, *mockConn) {
m := &mockConn{}
return fn(m), m
}
makeDoFn := func(results ...RedisResult) func(cmd Completed) RedisResult {
count := -1
return func(cmd Completed) RedisResult {
count++
return results[count]
}
}
makeDoCacheFn := func(results ...RedisResult) func(cmd Cacheable, ttl time.Duration) RedisResult {
count := -1
return func(cmd Cacheable, ttl time.Duration) RedisResult {
count++
return results[count]
}
}
makeReceiveFn := func(results ...error) func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
count := -1
return func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
count++
return results[count]
}
}
makeDoMultiFn := func(results ...[]RedisResult) func(multi ...Completed) *redisresults {
count := -1
return func(multi ...Completed) *redisresults {
count++
return &redisresults{s: results[count]}
}
}
makeDoMultiCacheFn := func(results ...[]RedisResult) func(multi ...CacheableTTL) *redisresults {
count := -1
return func(multi ...CacheableTTL) *redisresults {
count++
return &redisresults{s: results[count]}
}
}
t.Run("Delegate Do ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
if v, err := c.Do(context.Background(), c.B().Get().Key("Do").Build()).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Do ReadOnly NoRetry - closed", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
c.Close()
if v, err := c.Do(context.Background(), c.B().Get().Key("Do").Build()).ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Do ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
ctx, cancel := context.WithCancel(context.Background())
cancel()
if v, err := c.Do(ctx, c.B().Get().Key("Do").Build()).ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Do ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
if v, err := c.Do(context.Background(), c.B().Get().Key("Do").Build()).ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Do Write NoRetry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
if v, err := c.Do(context.Background(), c.B().Set().Key("Do").Value("V").Build()).ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMulti ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
if v, err := c.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMulti ReadOnly NoRetry - closed", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
c.Close()
if v, err := c.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMulti ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
ctx, cancel := context.WithCancel(context.Background())
cancel()
if v, err := c.DoMulti(ctx, c.B().Get().Key("Do").Build())[0].ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMulti ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
return -1
},
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
if duration != -1 {
t.Fatalf("unexpected duration %v", duration)
}
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
if v, err := c.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMulti Write NoRetry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
if v, err := c.DoMulti(context.Background(), c.B().Set().Key("Do").Value("V").Build())[0].ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache Retry", func(t *testing.T) {
c, m := setup()
m.DoCacheFn = makeDoCacheFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
if v, err := c.DoCache(context.Background(), c.B().Get().Key("Do").Cache(), 0).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache NoRetry - closed", func(t *testing.T) {
c, m := setup()
m.DoCacheFn = makeDoCacheFn(newErrResult(ErrClosing))
c.Close()
if v, err := c.DoCache(context.Background(), c.B().Get().Key("Do").Cache(), 0).ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoCacheFn = makeDoCacheFn(newErrResult(ErrClosing))
ctx, cancel := context.WithCancel(context.Background())
cancel()
if v, err := c.DoCache(ctx, c.B().Get().Key("Do").Cache(), 0).ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoCache ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
m.DoCacheFn = makeDoCacheFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
if v, err := c.DoCache(context.Background(), c.B().Get().Key("Do").Cache(), 0).ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiCache Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiCacheFn = makeDoMultiCacheFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
if v, err := c.DoMultiCache(context.Background(), CT(c.B().Get().Key("Do").Cache(), 0))[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiCache NoRetry - closed", func(t *testing.T) {
c, m := setup()
m.DoMultiCacheFn = makeDoMultiCacheFn([]RedisResult{newErrResult(ErrClosing)})
c.Close()
if v, err := c.DoMultiCache(context.Background(), CT(c.B().Get().Key("Do").Cache(), 0))[0].ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiCache ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoMultiCacheFn = makeDoMultiCacheFn([]RedisResult{newErrResult(ErrClosing)})
ctx, cancel := context.WithCancel(context.Background())
cancel()
if v, err := c.DoMultiCache(ctx, CT(c.B().Get().Key("Do").Cache(), 0))[0].ToString(); err != ErrClosing {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate DoMultiCache ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
return -1
},
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
if duration != -1 {
t.Fatalf("unexpected duration %v", duration)
}
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
m.DoMultiCacheFn = makeDoMultiCacheFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
if v, err := c.DoMultiCache(context.Background(), CT(c.B().Get().Key("Do").Cache(), 0))[0].ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Delegate Receive Retry", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("ch").Build(), nil); err != nil {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Delegate Receive NoRetry - closed", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
c.Close()
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("ch").Build(), nil); err != ErrClosing {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Delegate Receive NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := c.Receive(ctx, c.B().Subscribe().Channel("ch").Build(), nil); err != ErrClosing {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Delegate Receive NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
if err := c.Receive(context.Background(), c.B().Subscribe().Channel("ch").Build(), nil); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v", err)
}
})
t.Run("Dedicate Delegate Do ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if v, err := cc.Do(context.Background(), c.B().Get().Key("Do").Build()).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
t.Run("Dedicate Delegate Do ReadOnly NoRetry - broken", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
m.ErrorFn = func() error { return ErrClosing }
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn, ErrorFn: m.ErrorFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.Do(context.Background(), c.B().Get().Key("Do").Build()).Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Dedicate Delegate Do ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
ctx, cancel := context.WithCancel(context.Background())
cancel()
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.Do(ctx, c.B().Get().Key("Do").Build()).Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Dedicate Delegate Do ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if v, err := cc.Do(context.Background(), c.B().Get().Key("Do").Build()).ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
t.Run("Dedicate Delegate Do Write NoRetry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(newErrResult(ErrClosing))
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.Do(context.Background(), c.B().Set().Key("Do").Value("Do").Build()).Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Dedicate Delegate DoMulti ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if v, err := cc.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
t.Run("Dedicate Delegate DoMulti ReadOnly NoRetry - broken", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
m.ErrorFn = func() error { return ErrClosing }
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn, ErrorFn: m.ErrorFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Dedicate Delegate DoMulti ReadOnly NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
ctx, cancel := context.WithCancel(context.Background())
cancel()
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.DoMulti(ctx, c.B().Get().Key("Do").Build())[0].Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Dedicate Delegate DoMulti ReadOnly NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if v, err := cc.DoMulti(context.Background(), c.B().Get().Key("Do").Build())[0].ToString(); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
t.Run("Dedicate Delegate DoMulti Write NoRetry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn([]RedisResult{newErrResult(ErrClosing)})
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.DoMulti(context.Background(), c.B().Set().Key("Do").Value("Do").Build())[0].Error()
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Delegate Receive Retry", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if err := cc.Receive(context.Background(), c.B().Subscribe().Channel("Do").Build(), nil); err != nil {
t.Fatalf("unexpected response %v", err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
t.Run("Delegate Receive NoRetry - broken", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
m.ErrorFn = func() error { return ErrClosing }
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn, ErrorFn: m.ErrorFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.Receive(context.Background(), c.B().Subscribe().Channel("Do").Build(), nil)
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Delegate Receive NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
ctx, cancel := context.WithCancel(context.Background())
cancel()
if ret := c.Dedicated(func(cc DedicatedClient) error {
return cc.Receive(ctx, c.B().Subscribe().Channel("Do").Build(), nil)
}); ret != ErrClosing {
t.Fatalf("unexpected response %v", ret)
}
})
t.Run("Delegate Receive NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if err := cc.Receive(context.Background(), c.B().Subscribe().Channel("Do").Build(), nil); !errors.Is(err, ErrClosing) {
t.Fatalf("unexpected response %v", err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})
}
func TestSingleClientLoadingRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
setup := func() (*singleClient, *mockConn) {
m := &mockConn{}
client, err := newSingleClient(
&ClientOption{InitAddress: []string{""}},
m,
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}
t.Run("Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)
}
return newResult(strmsg('+', "OK"), nil)
}
if v, err := client.Do(context.Background(), client.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %v", attempts)
}
})
t.Run("Do not retry on non-loading errors", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(strmsg('-', "ERR some other error"), nil)
}
return newResult(strmsg('+', "OK"), nil)
}
if err := client.Do(context.Background(), client.B().Get().Key("test").Build()).Error(); err == nil {
t.Fatal("expected error but got nil")
}
if attempts != 1 {
t.Fatalf("unexpected attempts %v, expected no retry", attempts)
}
})
t.Run("DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)}}
}
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
}
cmd := client.B().Get().Key("test").Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("DoCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
attempts++
if attempts == 1 {
return newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)
}
return newResult(strmsg('+', "OK"), nil)
}
cmd := client.B().Get().Key("test").Cache()
if v, err := client.DoCache(context.Background(), cmd, time.Minute).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("DoMultiCache Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)}}
}
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
}
cmd := client.B().Get().Key("test").Cache()
resps := client.DoMultiCache(context.Background(), CT(cmd, time.Minute))
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})
t.Run("Dedicated Do Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoFn = func(cmd Completed) RedisResult {
attempts++
if attempts == 1 {
return newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)
}
return newResult(strmsg('+', "OK"), nil)
}
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
err := client.Dedicated(func(c DedicatedClient) error {
if v, err := c.Do(context.Background(), c.B().Get().Key("test").Build()).ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})
t.Run("Dedicated DoMulti Retry on Loading", func(t *testing.T) {
client, m := setup()
attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(strmsg('-', "LOADING Redis is loading the dataset in memory"), nil)}}
}
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
}
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
err := client.Dedicated(func(c DedicatedClient) error {
resps := c.DoMulti(context.Background(), c.B().Get().Key("test").Build())
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
return nil
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
})
}
func BenchmarkSingleClient_DoCache(b *testing.B) {
ctx := context.Background()
client, err := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}, Dialer: net.Dialer{KeepAlive: -1}})
if err != nil {
b.Fatal(err)
}
keys := make([]string, 10000)
for i := 0; i < 10000; i++ {
keys[i] = strconv.Itoa(i)
}
mset := client.B().Mset().KeyValue()
for _, v := range keys {
mset = mset.KeyValue(v, v)
}
if err := client.Do(ctx, mset.Build()).Error(); err != nil {
b.Fatal(err)
}
b.Run("NoCache", func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := client.Do(ctx, client.B().Mget().Key(keys...).Build()).Error(); err != nil {
b.Errorf("unexpected %v", err)
}
}
})
b.StopTimer()
})
b.Run("DoCache", func(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := client.DoCache(ctx, client.B().Mget().Key(keys...).Cache(), time.Minute).Error(); err != nil {
b.Errorf("unexpected %v", err)
}
}
})
b.StopTimer()
})
client.Close()
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-rueidis.git
git@api.gitlife.ru:oschina-mirror/mirrors-rueidis.git
oschina-mirror
mirrors-rueidis
mirrors-rueidis
main