1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-rueidis

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
mux_test.go 34 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
fscnick Отправлено 24.03.2025 07:50 f426952
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246
package rueidis
import (
"bufio"
"context"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/redis/rueidis/internal/cmds"
)
func setupMux(wires []*mockWire) (conn *mux, checkClean func(t *testing.T)) {
return setupMuxWithOption(wires, &ClientOption{})
}
func setupMuxWithOption(wires []*mockWire, option *ClientOption) (conn *mux, checkClean func(t *testing.T)) {
var mu sync.Mutex
var count = -1
wfn := func(_ context.Context) wire {
mu.Lock()
defer mu.Unlock()
count++
return wires[count]
}
if option.BlockingPipeline == 0 {
option.BlockingPipeline = DefaultBlockingPipeline
}
return newMux("", option, (*mockWire)(nil), (*mockWire)(nil), wfn, wfn), func(t *testing.T) {
if count != len(wires)-1 {
t.Fatalf("there is %d remaining unused wires", len(wires)-count-1)
}
}
}
func TestNewMuxDailErr(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
c := 0
e := errors.New("any")
m := makeMux("", &ClientOption{}, func(ctx context.Context, dst string, opt *ClientOption) (net.Conn, error) {
timer := time.NewTimer(time.Millisecond * 10) // delay time
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
// noop
}
c++
return nil, e
})
if err := m.Dial(); err != e {
t.Fatalf("unexpected return %v", err)
}
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel1()
if _, err := m._pipe(ctx1, 0); err != context.DeadlineExceeded {
t.Fatalf("unexpected return %v", err)
}
if c != 1 {
t.Fatalf("dialFn not called")
}
if w := m.pipe(context.Background(), 0); w != m.dead { // c = 2
t.Fatalf("unexpected wire %v", w)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel2()
if w := m.pipe(ctx2, 0); w != m.dead {
t.Fatalf("unexpected wire %v", w)
}
if err := m.Dial(); err != e { // c = 3
t.Fatalf("unexpected return %v", err)
}
if w := m.Acquire(context.Background()); w != m.dead {
t.Fatalf("unexpected wire %v", w)
}
ctx3, cancel3 := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel3()
if w := m.Acquire(ctx3); w != m.dead {
t.Fatalf("unexpected wire %v", w)
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel4()
if w := m.Acquire(ctx4); w != m.dead {
t.Fatalf("unexpected wire %v", w)
}
if c != 5 {
t.Fatalf("dialFn not called %v", c)
}
}
func TestNewMux(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
n1, n2 := net.Pipe()
mock := &redisMock{t: t, buf: bufio.NewReader(n2), conn: n2}
go func() {
mock.Expect("HELLO", "3").
Reply(slicemsg(
'%',
[]RedisMessage{
strmsg('+', "proto"),
{typ: ':', intlen: 3},
},
))
mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN").
ReplyString("OK")
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("PING").ReplyString("OK")
mock.Close()
}()
m := makeMux("", &ClientOption{}, func(_ context.Context, dst string, opt *ClientOption) (net.Conn, error) {
return n1, nil
})
if err := m.Dial(); err != nil {
t.Fatalf("unexpected error %v", err)
}
t.Run("Override with previous mux", func(t *testing.T) {
m2 := makeMux("", &ClientOption{}, func(_ context.Context, dst string, opt *ClientOption) (net.Conn, error) {
return n1, nil
})
m2.Override(m)
if err := m2.Dial(); err != nil {
t.Fatalf("unexpected error %v", err)
}
m2.Close()
})
}
func TestNewMuxPipelineMultiplex(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
for _, v := range []int{-1, 0, 1, 2} {
m := makeMux("", &ClientOption{PipelineMultiplex: v}, func(_ context.Context, dst string, opt *ClientOption) (net.Conn, error) { return nil, nil })
if (v < 0 && len(m.wire) != 1) || (v >= 0 && len(m.wire) != 1<<v) {
t.Fatalf("unexpected len(m.wire): %v", len(m.wire))
}
}
}
func TestMuxAddr(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := makeMux("dst1", &ClientOption{}, nil)
if m.Addr() != "dst1" {
t.Fatalf("unexpected m.Addr != dst1")
}
}
func TestMuxOptInCmd(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
if m := makeMux("dst1", &ClientOption{
ClientTrackingOptions: []string{"OPTOUT"},
}, nil); m.OptInCmd() != cmds.OptInNopCmd {
t.Fatalf("unexpected OptInCmd")
}
if m := makeMux("dst1", &ClientOption{
ClientTrackingOptions: []string{"PREFIX", "a", "BCAST"},
}, nil); m.OptInCmd() != cmds.OptInNopCmd {
t.Fatalf("unexpected OptInCmd")
}
if m := makeMux("dst1", &ClientOption{
ClientTrackingOptions: nil,
}, nil); m.OptInCmd() != cmds.OptInCmd {
t.Fatalf("unexpected OptInCmd")
}
}
func TestMuxDialSuppress(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
var wires, waits, done int64
blocking := make(chan struct{})
m := newMux("", &ClientOption{}, (*mockWire)(nil), (*mockWire)(nil), func(_ context.Context) wire {
atomic.AddInt64(&wires, 1)
<-blocking
return &mockWire{}
}, func(_ context.Context) wire {
return &mockWire{}
})
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&waits, 1)
m.Info()
atomic.AddInt64(&done, 1)
}()
}
for atomic.LoadInt64(&waits) != 1000 {
runtime.Gosched()
}
close(blocking)
for atomic.LoadInt64(&done) != 1000 {
runtime.Gosched()
}
if atomic.LoadInt64(&wires) != 1 {
t.Fatalf("wireFn is not suppressed")
}
}
//gocyclo:ignore
func TestMuxReuseWire(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("reuse wire if no error", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "PONG"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
for i := 0; i < 2; i++ {
if err := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).Error(); err != nil {
t.Fatalf("unexpected error %v", err)
}
}
m.Close()
})
t.Run("reuse blocking (dpool) pool", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "ACQUIRED"), nil)
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocking <- struct{}{}
return <-response
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.dpool.Acquire(context.Background())
go func() {
// this should use the second wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.dpool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(strmsg('+', "BLOCK_RESPONSE"), nil)
<-blocking
})
t.Run("reuse blocking (spool) pool", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "PIPELINED"), nil)
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "ACQUIRED"), nil)
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocking <- struct{}{}
return <-response
},
},
})
m.usePool = true // switch to spool
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.spool.Acquire(context.Background())
go func() {
// this should use the second wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
// this should use auto pipeline
if val, err := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"}).ToPipe()).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "PIPELINED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(strmsg('+', "BLOCK_RESPONSE"), nil)
<-blocking
})
t.Run("reuse blocking (dpool) pool DoMulti", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "PIPELINED"), nil)}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "ACQUIRED"), nil)}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocking <- struct{}{}
return &redisresults{s: []RedisResult{<-response}}
},
},
})
m.usePool = true // switch to spool
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.spool.Acquire(context.Background())
go func() {
// this should use the second wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.spool.Store(wire1)
// this should use the first wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
// this should use auto pipeline
if val, err := m.DoMulti(context.Background(), cmds.NewCompleted([]string{"PING"}).ToPipe()).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "PIPELINED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(strmsg('+', "BLOCK_RESPONSE"), nil)
<-blocking
})
t.Run("reuse blocking (spool) pool DoMulti", func(t *testing.T) {
blocking := make(chan struct{})
response := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "ACQUIRED"), nil)}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocking <- struct{}{}
return &redisresults{s: []RedisResult{<-response}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.dpool.Acquire(context.Background())
go func() {
// this should use the second wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_RESPONSE" {
t.Errorf("unexpected response %v", val)
}
close(blocking)
}()
<-blocking
m.dpool.Store(wire1)
// this should use the first wire
if val, err := m.DoMulti(context.Background(), cmds.NewBlockingCompleted([]string{"PING"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "ACQUIRED" {
t.Fatalf("unexpected response %v", val)
}
response <- newResult(strmsg('+', "BLOCK_RESPONSE"), nil)
<-blocking
})
t.Run("unsubscribe blocking pool", func(t *testing.T) {
cleaned := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
CleanSubscriptionsFn: func() {
cleaned = true
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wire1 := m.Acquire(context.Background())
m.Store(wire1)
if !cleaned {
t.Fatalf("CleanSubscriptions not called")
}
})
}
//gocyclo:ignore
func TestMuxDelegation(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("wire info", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
InfoFn: func() map[string]RedisMessage {
return map[string]RedisMessage{"key": strmsg('+', "value")}
},
},
})
defer checkClean(t)
defer m.Close()
if info := m.Info(); info == nil {
t.Fatalf("unexpected info %v", info)
} else if infoKey := info["key"]; infoKey.string() != "value" {
t.Fatalf("unexpected info %v", info)
}
})
t.Run("wire version", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
VersionFn: func() int {
return 7
},
},
})
defer checkClean(t)
defer m.Close()
if version := m.Version(); version != 7 {
t.Fatalf("unexpected version %v", version)
}
})
t.Run("wire az", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
AZFn: func() string {
return "az"
},
},
})
defer checkClean(t)
defer m.Close()
if az := m.AZ(); az != "az" {
t.Fatalf("unexpected az %v", az)
}
})
t.Run("wire err", func(t *testing.T) {
e := errors.New("err")
m, checkClean := setupMux([]*mockWire{
{
ErrorFn: func() error {
return e
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Error(); err != e {
t.Fatalf("unexpected err %v", err)
}
})
t.Run("wire do", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoFn: func(cmd Completed) RedisResult {
if cmd.Commands()[0] != "READONLY_COMMAND" {
t.Fatalf("command should be READONLY_COMMAND")
}
return newResult(strmsg('+', "READONLY_COMMAND_RESPONSE"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Do(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "READONLY_COMMAND_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do stream", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoStreamFn: func(pool *pool, cmd Completed) RedisResultStream {
return RedisResultStream{e: errors.New(cmd.Commands()[0])}
},
},
})
defer checkClean(t)
defer m.Close()
if s := m.DoStream(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})); s.Error().Error() != "READONLY_COMMAND" {
t.Fatalf("unexpected error %v", s.Error())
}
})
t.Run("wire do multi", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiFn: func(multi ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoMultiFn: func(multi ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "MULTI_COMMANDS_RESPONSE"), nil)}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoMulti(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoMulti(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "MULTI_COMMANDS_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi stream", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiStreamFn: func(pool *pool, cmd ...Completed) MultiRedisResultStream {
return MultiRedisResultStream{e: errors.New(cmd[0].Commands()[0])}
},
},
})
defer checkClean(t)
defer m.Close()
if s := m.DoMultiStream(context.Background(), cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})); s.Error().Error() != "READONLY_COMMAND" {
t.Fatalf("unexpected error %v", s.Error())
}
})
t.Run("wire do cache", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoCacheFn: func(cmd Cacheable, ttl time.Duration) RedisResult {
return newResult(strmsg('+', "READONLY_COMMAND_RESPONSE"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoCache(context.Background(), Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second).Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoCache(context.Background(), Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second).ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "READONLY_COMMAND_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi cache", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
return &redisresults{s: []RedisResult{newResult(strmsg('+', "MULTI_COMMANDS_RESPONSE"), nil)}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.DoMultiCache(context.Background(), CT(Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second)).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if val, err := m.DoMultiCache(context.Background(), CT(Cacheable(cmds.NewReadOnlyCompleted([]string{"READONLY_COMMAND"})), time.Second)).s[0].ToString(); err != nil {
t.Fatalf("unexpected error %v", err)
} else if val != "MULTI_COMMANDS_RESPONSE" {
t.Fatalf("unexpected response %v", val)
}
})
t.Run("wire do multi cache multiple slots", func(t *testing.T) {
multiplex := 1
wires := make([]*mockWire, 1<<multiplex)
for i := range wires {
idx := uint16(i)
wires[i] = &mockWire{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
result := make([]RedisResult, len(multi))
for j, cmd := range multi {
if s := cmd.Cmd.Slot() & uint16(len(wires)-1); s != idx {
result[j] = newErrResult(fmt.Errorf("wrong slot %v %v", s, idx))
} else {
result[j] = newResult(strmsg('+', cmd.Cmd.Commands()[1]), nil)
}
}
return &redisresults{s: result}
},
}
}
m, checkClean := setupMuxWithOption(wires, &ClientOption{PipelineMultiplex: multiplex})
defer checkClean(t)
defer m.Close()
for i := range wires {
m._pipe(context.Background(), uint16(i))
}
builder := cmds.NewBuilder(cmds.NoSlot)
for count := 1; count <= 3; count++ {
commands := make([]CacheableTTL, count)
for c := 0; c < count; c++ {
commands[c] = CT(builder.Get().Key(strconv.Itoa(c)).Cache(), time.Second)
}
for i, resp := range m.DoMultiCache(context.Background(), commands...).s {
if v, err := resp.ToString(); err != nil || v != strconv.Itoa(i) {
t.Fatalf("unexpected resp %v %v", v, err)
}
}
}
})
t.Run("wire do multi cache multiple slots fail", func(t *testing.T) {
multiplex := 1
wires := make([]*mockWire, 1<<multiplex)
for i := range wires {
idx := uint16(i)
wires[i] = &mockWire{
DoMultiCacheFn: func(multi ...CacheableTTL) *redisresults {
for _, cmd := range multi {
if s := cmd.Cmd.Slot() & uint16(len(wires)-1); s != idx {
return &redisresults{s: []RedisResult{newErrResult(fmt.Errorf("wrong slot %v %v", s, idx))}}
}
}
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
}
}
m, checkClean := setupMuxWithOption(wires, &ClientOption{PipelineMultiplex: multiplex})
defer checkClean(t)
defer m.Close()
for i := range wires {
m._pipe(context.Background(), uint16(i))
}
builder := cmds.NewBuilder(cmds.NoSlot)
commands := make([]CacheableTTL, 4)
for c := 0; c < len(commands); c++ {
commands[c] = CT(builder.Get().Key(strconv.Itoa(c)).Cache(), time.Second)
}
if err := m.DoMultiCache(context.Background(), commands...).s[0].Error(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("wire receive", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
return context.DeadlineExceeded
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
},
{
ReceiveFn: func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
if subscribe.Commands()[0] != "SUBSCRIBE" {
t.Fatalf("command should be SUBSCRIBE")
}
return nil
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Receive(context.Background(), cmds.NewCompleted([]string{"SUBSCRIBE"}), func(message PubSubMessage) {}); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected error %v", err)
}
if err := m.Receive(context.Background(), cmds.NewCompleted([]string{"SUBSCRIBE"}), func(message PubSubMessage) {}); err != nil {
t.Fatalf("unexpected error %v", err)
}
})
t.Run("single blocking", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
blocked <- struct{}{}
return <-responses
},
},
{
DoFn: func(cmd Completed) RedisResult {
blocked <- struct{}{}
return <-responses
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(strmsg('+', "BLOCK_COMMANDS_RESPONSE"), nil)
}
wg.Wait()
})
t.Run("single blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoFn: func(cmd Completed) RedisResult {
return newErrResult(context.DeadlineExceeded)
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
CloseFn: func() {
closed = true
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "OK"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
if err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).Error(); err != context.DeadlineExceeded {
t.Errorf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil || val != "OK" {
t.Errorf("unexpected response %v %v", err, val)
}
if !closed {
t.Errorf("wire not closed")
}
})
t.Run("multiple blocking", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
if val, err := m.DoMulti(
context.Background(),
cmds.NewReadOnlyCompleted([]string{"READONLY"}),
cmds.NewBlockingCompleted([]string{"BLOCK"}),
).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(strmsg('+', "BLOCK_COMMANDS_RESPONSE"), nil)
}
wg.Wait()
})
t.Run("multiple long pipeline", func(t *testing.T) {
blocked := make(chan struct{})
responses := make(chan RedisResult)
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
blocked <- struct{}{}
return &redisresults{s: []RedisResult{<-responses}}
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
pipeline := make(Commands, DefaultBlockingPipeline)
for i := 0; i < len(pipeline); i++ {
pipeline[i] = cmds.NewCompleted([]string{"SET"})
}
if val, err := m.DoMulti(context.Background(), pipeline...).s[0].ToString(); err != nil {
t.Errorf("unexpected error %v", err)
} else if val != "BLOCK_COMMANDS_RESPONSE" {
t.Errorf("unexpected response %v", val)
} else {
wg.Done()
}
}()
}
for i := 0; i < 2; i++ {
<-blocked
}
for i := 0; i < 2; i++ {
responses <- newResult(strmsg('+', "BLOCK_COMMANDS_RESPONSE"), nil)
}
wg.Wait()
})
t.Run("multi blocking no recycle the wire if err", func(t *testing.T) {
closed := false
m, checkClean := setupMux([]*mockWire{
{
// leave first wire for pipeline calls
},
{
DoMultiFn: func(cmd ...Completed) *redisresults {
return &redisresults{s: []RedisResult{newErrResult(context.DeadlineExceeded)}}
},
ErrorFn: func() error {
return context.DeadlineExceeded
},
CloseFn: func() {
closed = true
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "OK"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
if err := m.Dial(); err != nil {
t.Fatalf("unexpected dial error %v", err)
}
if err := m.DoMulti(
context.Background(),
cmds.NewReadOnlyCompleted([]string{"READONLY"}),
cmds.NewBlockingCompleted([]string{"BLOCK"}),
).s[0].Error(); err != context.DeadlineExceeded {
t.Errorf("unexpected error %v", err)
}
if val, err := m.Do(context.Background(), cmds.NewBlockingCompleted([]string{"BLOCK"})).ToString(); err != nil || val != "OK" {
t.Errorf("unexpected response %v %v", err, val)
}
if !closed {
t.Errorf("wire not closed")
}
})
}
//gocyclo:ignore
func TestMuxRegisterCloseHook(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
t.Run("trigger hook with unexpected error", func(t *testing.T) {
var hook atomic.Value
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "PONG1"), nil)
},
SetOnCloseHookFn: func(fn func(error)) {
hook.Store(fn)
},
},
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "PONG2"), nil)
},
},
})
defer checkClean(t)
defer m.Close()
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
hook.Load().(func(error))(errors.New("any")) // invoke the hook, this should cause the first wire be discarded
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG2" {
t.Fatalf("unexpected response %v", resp)
}
})
t.Run("not trigger hook with ErrClosing", func(t *testing.T) {
var hook atomic.Value
m, checkClean := setupMux([]*mockWire{
{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "PONG1"), nil)
},
SetOnCloseHookFn: func(fn func(error)) {
hook.Store(fn)
},
},
})
defer checkClean(t)
defer m.Close()
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
hook.Load().(func(error))(ErrClosing) // invoke the hook, this should cause the first wire be discarded
if resp, _ := m.Do(context.Background(), cmds.NewCompleted([]string{"PING"})).ToString(); resp != "PONG1" {
t.Fatalf("unexpected response %v", resp)
}
})
}
func BenchmarkClientSideCaching(b *testing.B) {
setup := func(b *testing.B) *mux {
c := makeMux("127.0.0.1:6379", &ClientOption{CacheSizeEachConn: DefaultCacheBytes}, func(_ context.Context, dst string, opt *ClientOption) (conn net.Conn, err error) {
return net.Dial("tcp", dst)
})
if err := c.Dial(); err != nil {
panic(err)
}
b.SetParallelism(100)
b.ResetTimer()
return c
}
b.Run("Do", func(b *testing.B) {
m := setup(b)
cmd := cmds.NewCompleted([]string{"GET", "a"})
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.Do(context.Background(), cmd)
}
})
})
b.Run("DoCache", func(b *testing.B) {
m := setup(b)
cmd := Cacheable(cmds.NewCompleted([]string{"GET", "a"}))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.DoCache(context.Background(), cmd, time.Second*5)
}
})
})
}
type mockWire struct {
DoFn func(cmd Completed) RedisResult
DoCacheFn func(cmd Cacheable, ttl time.Duration) RedisResult
DoMultiFn func(multi ...Completed) *redisresults
DoMultiCacheFn func(multi ...CacheableTTL) *redisresults
ReceiveFn func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
DoStreamFn func(pool *pool, cmd Completed) RedisResultStream
DoMultiStreamFn func(pool *pool, cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
AZFn func() string
VersionFn func() int
ErrorFn func() error
CloseFn func()
CleanSubscriptionsFn func()
SetPubSubHooksFn func(hooks PubSubHooks) <-chan error
SetOnCloseHookFn func(fn func(error))
}
func (m *mockWire) Do(ctx context.Context, cmd Completed) RedisResult {
if m.DoFn != nil {
return m.DoFn(cmd)
}
return RedisResult{}
}
func (m *mockWire) DoCache(ctx context.Context, cmd Cacheable, ttl time.Duration) RedisResult {
if m.DoCacheFn != nil {
return m.DoCacheFn(cmd, ttl)
}
return RedisResult{}
}
func (m *mockWire) DoMultiCache(ctx context.Context, multi ...CacheableTTL) *redisresults {
if m.DoMultiCacheFn != nil {
return m.DoMultiCacheFn(multi...)
}
return nil
}
func (m *mockWire) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
if m.DoMultiFn != nil {
return m.DoMultiFn(multi...)
}
return nil
}
func (m *mockWire) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
if m.ReceiveFn != nil {
return m.ReceiveFn(ctx, subscribe, fn)
}
return nil
}
func (m *mockWire) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisResultStream {
if m.DoStreamFn != nil {
return m.DoStreamFn(pool, cmd)
}
return RedisResultStream{}
}
func (m *mockWire) DoMultiStream(ctx context.Context, pool *pool, cmd ...Completed) MultiRedisResultStream {
if m.DoMultiStreamFn != nil {
return m.DoMultiStreamFn(pool, cmd...)
}
return MultiRedisResultStream{}
}
func (m *mockWire) CleanSubscriptions() {
if m.CleanSubscriptionsFn != nil {
m.CleanSubscriptionsFn()
}
}
func (m *mockWire) SetPubSubHooks(hooks PubSubHooks) <-chan error {
if m.SetPubSubHooksFn != nil {
return m.SetPubSubHooksFn(hooks)
}
return nil
}
func (m *mockWire) SetOnCloseHook(fn func(error)) {
if m.SetOnCloseHookFn != nil {
m.SetOnCloseHookFn(fn)
}
}
func (m *mockWire) Info() map[string]RedisMessage {
if m.InfoFn != nil {
return m.InfoFn()
}
return nil
}
func (m *mockWire) Version() int {
if m.VersionFn != nil {
return m.VersionFn()
}
return 0
}
func (m *mockWire) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}
func (m *mockWire) Error() error {
if m == nil {
return ErrClosing
}
if m.ErrorFn != nil {
return m.ErrorFn()
}
return nil
}
func (m *mockWire) Close() {
if m == nil {
return
}
if m.CloseFn != nil {
m.CloseFn()
}
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-rueidis.git
git@api.gitlife.ru:oschina-mirror/mirrors-rueidis.git
oschina-mirror
mirrors-rueidis
mirrors-rueidis
main