1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/teamlint-nrpc

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
nrpc.go 21 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
venjiang Отправлено 21.09.2020 10:07 5260746
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
package nrpc
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
// ContextKey type for storing values into context.Context
type ContextKey int
// ErrStreamInvalidMsgCount is when a stream reply gets a wrong number of messages
var ErrStreamInvalidMsgCount = errors.New("Stream reply received an incorrect number of messages")
//go:generate protoc --go_out=. --go_opt=paths=source_relative nrpc.proto
type NatsConn interface {
Publish(subj string, data []byte) error
PublishRequest(subj, reply string, data []byte) error
Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)
Subscribe(subj string, handler nats.MsgHandler) (*nats.Subscription, error)
ChanSubscribe(subj string, ch chan *nats.Msg) (*nats.Subscription, error)
SubscribeSync(subj string) (*nats.Subscription, error)
QueueSubscribe(subj, queue string, handler nats.MsgHandler) (*nats.Subscription, error)
ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)
QueueSubscribeSync(subj, queue string) (*nats.Subscription, error)
}
// ReplyInboxMaker returns a new inbox subject for a given nats connection.
type ReplyInboxMaker func(NatsConn) string
// GetReplyInbox is used by StreamCall to get a inbox subject
// It can be changed by a client lib that needs custom inbox subjects
var GetReplyInbox ReplyInboxMaker = func(NatsConn) string {
return nats.NewInbox()
}
func (e *Error) Error() string {
return fmt.Sprintf("%s error: %s", Error_Type_name[int32(e.Type)], e.Message)
}
func Unmarshal(encoding string, data []byte, msg proto.Message) error {
switch encoding {
case "protobuf":
return proto.Unmarshal(data, msg)
case "json":
return jsonpb.Unmarshal(data, msg)
default:
return errors.New("Invalid encoding: " + encoding)
}
}
func UnmarshalResponse(encoding string, data []byte, msg proto.Message) error {
switch encoding {
case "protobuf":
if len(data) > 0 && data[0] == 0 {
var repErr Error
if err := proto.Unmarshal(data[1:], &repErr); err != nil {
return err
}
return &repErr
}
return proto.Unmarshal(data, msg)
case "json":
if len(data) > 13 && bytes.Equal(data[:13], []byte("{\"__error__\":")) {
var rep map[string]json.RawMessage
if err := json.Unmarshal(data, &rep); err != nil {
return err
}
errbuf, ok := rep["__error__"]
if !ok {
panic("invalid error message")
}
var nrpcErr Error
if err := jsonpb.Unmarshal(errbuf, &nrpcErr); err != nil {
return err
}
return &nrpcErr
}
return jsonpb.Unmarshal(data, msg)
default:
return errors.New("Invalid encoding: " + encoding)
}
}
func Marshal(encoding string, msg proto.Message) ([]byte, error) {
switch encoding {
case "protobuf":
return proto.Marshal(msg)
case "json":
return jsonpb.Marshal(msg)
default:
return nil, errors.New("Invalid encoding: " + encoding)
}
}
func MarshalErrorResponse(encoding string, repErr *Error) ([]byte, error) {
switch encoding {
case "protobuf":
b, err := proto.Marshal(repErr)
if err != nil {
return nil, err
}
return append([]byte{0}, b...), nil
case "json":
b, err := jsonpb.Marshal(repErr)
if err != nil {
return nil, err
}
return json.Marshal(map[string]json.RawMessage{
"__error__": json.RawMessage(b),
})
default:
return nil, errors.New("Invalid encoding: " + encoding)
}
}
func ParseSubject(
packageSubject string, packageParamsCount int,
serviceSubject string, serviceParamsCount int,
subject string,
) (packageParams []string, serviceParams []string,
name string, tail []string, err error,
) {
packageSubjectDepth := 0
if packageSubject != "" {
packageSubjectDepth = strings.Count(packageSubject, ".") + 1
}
serviceSubjectDepth := strings.Count(serviceSubject, ".") + 1
subjectMinSize := packageSubjectDepth + packageParamsCount + serviceSubjectDepth + serviceParamsCount + 1
tokens := strings.Split(subject, ".")
if len(tokens) < subjectMinSize {
err = fmt.Errorf(
"Invalid subject len. Expects number of parts >= %d, got %d",
subjectMinSize, len(tokens))
return
}
if packageSubject != "" {
for i, packageSubjectPart := range strings.Split(packageSubject, ".") {
if tokens[i] != packageSubjectPart {
err = fmt.Errorf(
"Invalid subject prefix. Expected '%s', got '%s'",
packageSubjectPart, tokens[i])
return
}
}
tokens = tokens[packageSubjectDepth:]
}
packageParams = tokens[0:packageParamsCount]
tokens = tokens[packageParamsCount:]
for i, serviceSubjectPart := range strings.Split(serviceSubject, ".") {
if tokens[i] != serviceSubjectPart {
err = fmt.Errorf(
"Invalid subject. Service should be '%s', got '%s'",
serviceSubjectPart, tokens[i])
return
}
}
tokens = tokens[serviceSubjectDepth:]
serviceParams = tokens[0:serviceParamsCount]
tokens = tokens[serviceParamsCount:]
name = tokens[0]
tokens = tokens[1:]
tail = tokens
return
}
func ParseSubjectTail(
methodParamsCount int,
tail []string,
) (
methodParams []string, encoding string, err error,
) {
if len(tail) < methodParamsCount || len(tail) > methodParamsCount+1 {
err = fmt.Errorf(
"Invalid subject tail length. Expects %d or %d parts, got %d",
methodParamsCount, methodParamsCount+1, len(tail),
)
return
}
methodParams = tail[:methodParamsCount]
tail = tail[methodParamsCount:]
switch len(tail) {
case 0:
encoding = "protobuf"
case 1:
encoding = tail[0]
default:
panic("Got extra tokens, which should be impossible at this point")
}
return
}
func Call(req proto.Message, rep proto.Message, nc NatsConn, subject string, encoding string, timeout time.Duration) error {
// encode request
rawRequest, err := Marshal(encoding, req)
if err != nil {
log.Printf("nrpc: inner request marshal failed: %v", err)
return err
}
if encoding != "protobuf" {
subject += "." + encoding
}
// call
if _, noreply := rep.(*NoReply); noreply {
err := nc.Publish(subject, rawRequest)
if err != nil {
log.Printf("nrpc: nats publish failed: %v", err)
}
return err
}
msg, err := nc.Request(subject, rawRequest, timeout)
if err != nil {
log.Printf("nrpc: nats request failed: %v", err)
return err
}
data := msg.Data
if err := UnmarshalResponse(encoding, data, rep); err != nil {
if _, isError := err.(*Error); !isError {
log.Printf("nrpc: response unmarshal failed: %v", err)
}
return err
}
return nil
}
const (
// RequestContextKey is the key for string the request into the context
RequestContextKey ContextKey = iota
)
// NewRequest creates a Request instance
func NewRequest(ctx context.Context, conn NatsConn, subject string, replySubject string) *Request {
return &Request{
Context: ctx,
Conn: conn,
Subject: subject,
ReplySubject: replySubject,
CreatedAt: time.Now(),
}
}
// GetRequest returns the Request associated with a context, or nil if absent
func GetRequest(ctx context.Context) *Request {
request, _ := ctx.Value(RequestContextKey).(*Request)
return request
}
// Request is a server-side incoming request
type Request struct {
Context context.Context
Conn NatsConn
isStreamedReply bool
KeepStreamAlive *KeepStreamAlive
StreamContext context.Context
StreamCancel func()
StreamMsgCount uint32
streamLock sync.Mutex
Subject string
MethodName string
SubjectTail []string
CreatedAt time.Time
StartedAt time.Time
Encoding string
NoReply bool
ReplySubject string
PackageParams map[string]string
ServiceParams map[string]string
AfterReply func(r *Request, success bool, replySuccess bool)
Handler func(context.Context) (proto.Message, error)
}
// Elapsed duration since request was started
func (r *Request) Elapsed() time.Duration {
return time.Since(r.CreatedAt)
}
// Run the handler and capture any error. Returns the response or the error
// that should be returned to the caller
func (r *Request) Run() (msg proto.Message, replyError *Error) {
r.StartedAt = time.Now()
ctx := r.Context
if r.StreamedReply() {
ctx = r.StreamContext
}
ctx = context.WithValue(ctx, RequestContextKey, r)
msg, replyError = CaptureErrors(
func() (proto.Message, error) {
return r.Handler(ctx)
})
return
}
// RunAndReply calls Run() and send the reply back to the caller
func (r *Request) RunAndReply() {
var failed, replyFailed bool
// In case RunAndReply was called directly, we may need to initialize the
// streamed reply
r.setupStreamedReply()
resp, replyError := r.Run()
if replyError != nil {
failed = true
log.Printf("%s handler failed: %s", r.MethodName, replyError)
}
if !r.NoReply {
if err := r.SendReply(resp, replyError); err != nil {
replyFailed = true
log.Printf("%s failed to publish the response: %s", r.MethodName, err)
}
}
if r.AfterReply != nil {
r.AfterReply(r, !failed, !replyFailed)
}
}
// PackageParam returns a package parameter value, or "" if absent
func (r *Request) PackageParam(key string) string {
if r == nil || r.PackageParams == nil {
return ""
}
return r.PackageParams[key]
}
// ServiceParam returns a package parameter value, or "" if absent
func (r *Request) ServiceParam(key string) string {
if r == nil || r.ServiceParams == nil {
return ""
}
return r.ServiceParams[key]
}
// SetPackageParam sets a package param value
func (r *Request) SetPackageParam(key, value string) {
if r.PackageParams == nil {
r.PackageParams = make(map[string]string)
}
r.PackageParams[key] = value
}
// SetServiceParam sets a service param value
func (r *Request) SetServiceParam(key, value string) {
if r.ServiceParams == nil {
r.ServiceParams = make(map[string]string)
}
r.ServiceParams[key] = value
}
// EnableStreamedReply enables the streamed reply mode
func (r *Request) EnableStreamedReply() {
r.isStreamedReply = true
}
// setupStreamedReply initializes the reply stream if needed.
func (r *Request) setupStreamedReply() {
r.streamLock.Lock()
defer r.streamLock.Unlock()
if !r.StreamedReply() || r.KeepStreamAlive != nil {
return
}
r.StreamContext, r.StreamCancel = context.WithCancel(r.Context)
r.KeepStreamAlive = NewKeepStreamAlive(
r.Conn, r.ReplySubject, r.Encoding, r.StreamCancel)
}
// StreamedReply returns true if the request reply is streamed
func (r *Request) StreamedReply() bool {
return r.isStreamedReply
}
// SendStreamReply send a reply a part of a stream
func (r *Request) SendStreamReply(msg proto.Message) {
if err := r.sendReply(msg, nil); err != nil {
log.Printf("nrpc: error publishing response: %s", err)
r.StreamCancel()
return
}
r.StreamMsgCount++
}
// SendReply sends a reply to the caller
func (r *Request) SendReply(resp proto.Message, withError *Error) error {
if r.StreamedReply() {
r.KeepStreamAlive.Stop()
if withError == nil {
return r.sendReply(
nil, &Error{Type: Error_EOS, MsgCount: r.StreamMsgCount},
)
}
}
return r.sendReply(resp, withError)
}
// sendReply sends a reply to the caller
func (r *Request) sendReply(resp proto.Message, withError *Error) error {
return Publish(resp, withError, r.Conn, r.ReplySubject, r.Encoding)
}
// SendErrorTooBusy cancels the request with a 'SERVERTOOBUSY' error
func (r *Request) SendErrorTooBusy(msg string) error {
return r.SendReply(nil, &Error{
Type: Error_SERVERTOOBUSY,
Message: msg,
})
}
var ErrEOS = errors.New("End of stream")
var ErrCanceled = errors.New("Call canceled")
func NewStreamCallSubscription(
ctx context.Context, nc NatsConn, encoding string, subject string,
timeout time.Duration,
) (*StreamCallSubscription, error) {
sub := StreamCallSubscription{
ctx: ctx,
nc: nc,
encoding: encoding,
subject: subject,
timeout: timeout,
timeoutT: time.NewTimer(timeout),
closed: false,
subCh: make(chan *nats.Msg, 256),
nextCh: make(chan *nats.Msg),
quit: make(chan struct{}),
errCh: make(chan error, 1),
}
ssub, err := nc.ChanSubscribe(subject, sub.subCh)
if err != nil {
return nil, err
}
go sub.loop(ssub)
return &sub, nil
}
type StreamCallSubscription struct {
ctx context.Context
nc NatsConn
encoding string
subject string
timeout time.Duration
timeoutT *time.Timer
closed bool
subCh chan *nats.Msg
nextCh chan *nats.Msg
quit chan struct{}
errCh chan error
msgCount uint32
}
func (sub *StreamCallSubscription) stop() {
close(sub.quit)
}
func (sub *StreamCallSubscription) loop(ssub *nats.Subscription) {
hbSubject := sub.subject + ".heartbeat"
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer ssub.Unsubscribe()
for {
select {
case msg := <-sub.subCh:
sub.timeoutT.Reset(sub.timeout)
if len(msg.Data) == 1 && msg.Data[0] == 0 {
break
}
sub.nextCh <- msg
case <-sub.timeoutT.C:
sub.errCh <- nats.ErrTimeout
return
case <-sub.ctx.Done():
// send a 'lastbeat' and quit
b, err := Marshal(sub.encoding, &Heartbeat{Lastbeat: true})
if err != nil {
err = fmt.Errorf("Error marshaling heartbeat: %s", err)
sub.errCh <- err
return
}
if err := sub.nc.Publish(hbSubject, b); err != nil {
err = fmt.Errorf("Error sending heartbeat: %s", err)
sub.errCh <- err
return
}
sub.errCh <- ErrCanceled
return
case <-ticker.C:
msg, err := Marshal(sub.encoding, &Heartbeat{})
if err != nil {
err = fmt.Errorf("Error marshaling heartbeat: %s", err)
sub.errCh <- err
return
}
if err := sub.nc.Publish(hbSubject, msg); err != nil {
err = fmt.Errorf("Error sending heartbeat: %s", err)
sub.errCh <- err
return
}
case <-sub.quit:
return
}
}
}
func (sub *StreamCallSubscription) Next(rep proto.Message) error {
if sub.closed {
return nats.ErrBadSubscription
}
select {
case err := <-sub.errCh:
sub.closed = true
return err
case msg := <-sub.nextCh:
if err := UnmarshalResponse(sub.encoding, msg.Data, rep); err != nil {
sub.stop()
sub.closed = true
if nrpcErr, ok := err.(*Error); ok {
if nrpcErr.GetMsgCount() != sub.msgCount {
log.Printf(
"nrpc: received invalid number of messages. Expected %d, got %d",
nrpcErr.GetMsgCount(), sub.msgCount)
}
if nrpcErr.GetType() == Error_EOS {
if nrpcErr.GetMsgCount() != sub.msgCount {
return ErrStreamInvalidMsgCount
}
return ErrEOS
}
} else {
log.Printf("nrpc: response unmarshal failed: %v", err)
}
return err
}
sub.msgCount++
}
return nil
}
func StreamCall(ctx context.Context, nc NatsConn, subject string, req proto.Message, encoding string, timeout time.Duration) (*StreamCallSubscription, error) {
rawRequest, err := Marshal(encoding, req)
if err != nil {
log.Printf("nrpc: inner request marshal failed: %v", err)
return nil, err
}
if encoding != "protobuf" {
subject += "." + encoding
}
reply := GetReplyInbox(nc)
streamSub, err := NewStreamCallSubscription(ctx, nc, encoding, reply, timeout)
if err != nil {
return nil, err
}
if err := nc.PublishRequest(subject, reply, rawRequest); err != nil {
streamSub.stop()
return nil, err
}
return streamSub, nil
}
func Publish(resp proto.Message, withError *Error, nc NatsConn, subject string, encoding string) error {
var rawResponse []byte
var err error
if withError != nil {
rawResponse, err = MarshalErrorResponse(encoding, withError)
} else {
rawResponse, err = Marshal(encoding, resp)
}
if err != nil {
log.Printf("nrpc: rpc response marshal failed: %v", err)
return err
}
// send response
if err := nc.Publish(subject, rawResponse); err != nil {
log.Printf("nrpc: response publish failed: %v", err)
return err
}
return nil
}
// CaptureErrors runs a handler and convert error and panics into proper Error
func CaptureErrors(fn func() (proto.Message, error)) (msg proto.Message, replyError *Error) {
defer func() {
if r := recover(); r != nil {
log.Printf("Caught panic: %s\n%s", r, debug.Stack())
replyError = &Error{
Type: Error_SERVER,
Message: fmt.Sprint(r),
}
}
}()
var err error
msg, err = fn()
if err != nil {
var ok bool
if replyError, ok = err.(*Error); !ok {
replyError = &Error{
Type: Error_CLIENT,
Message: err.Error(),
}
}
}
return
}
func NewKeepStreamAlive(nc NatsConn, subject string, encoding string, onError func()) *KeepStreamAlive {
k := KeepStreamAlive{
nc: nc,
subject: subject,
encoding: encoding,
c: make(chan struct{}),
onError: onError,
}
go k.loop()
return &k
}
type KeepStreamAlive struct {
nc NatsConn
subject string
encoding string
c chan struct{}
onError func()
}
func (k *KeepStreamAlive) Stop() {
if k != nil {
close(k.c)
}
}
func (k *KeepStreamAlive) loop() {
hbChan := make(chan *nats.Msg, 256)
hbSub, err := k.nc.ChanSubscribe(k.subject+".heartbeat", hbChan)
if err != nil {
log.Printf("nrpc: could not subscribe to heartbeat: %s", err)
k.onError()
}
defer func() {
if err := hbSub.Unsubscribe(); err != nil {
log.Printf("nrpc: error unsubscribing from heartbeat: %s", err)
}
}()
hbDelay := 0
ticker := time.NewTicker(time.Second)
for {
select {
case msg := <-hbChan:
var hb Heartbeat
if err := Unmarshal(k.encoding, msg.Data, &hb); err != nil {
log.Printf("nrpc: error unmarshaling heartbeat: %s", err)
ticker.Stop()
k.onError()
return
}
if hb.Lastbeat {
log.Printf("nrpc: client canceled the streamed reply. (%s)", k.subject)
ticker.Stop()
k.onError()
return
}
hbDelay = 0
case <-ticker.C:
hbDelay++
if hbDelay >= 5 {
log.Printf("nrpc: No heartbeat received in 5 seconds. Canceling.")
ticker.Stop()
k.onError()
return
}
if err := k.nc.Publish(k.subject, []byte{0}); err != nil {
log.Printf("nrpc: error publishing response: %s", err)
ticker.Stop()
k.onError()
return
}
case <-k.c:
ticker.Stop()
return
}
}
}
// WorkerPool is a pool of workers
type WorkerPool struct {
Context context.Context
contextCancel context.CancelFunc
queue chan *Request
schedule chan *Request
waitGroup sync.WaitGroup
m sync.Mutex
size uint
maxPending uint
maxPendingDuration time.Duration
}
// NewWorkerPool creates a pool of workers
func NewWorkerPool(
ctx context.Context,
size uint,
maxPending uint,
maxPendingDuration time.Duration,
) *WorkerPool {
nCtx, cancel := context.WithCancel(ctx)
pool := WorkerPool{
Context: nCtx,
contextCancel: cancel,
queue: make(chan *Request, maxPending),
schedule: make(chan *Request),
maxPending: maxPending,
maxPendingDuration: maxPendingDuration,
}
pool.waitGroup.Add(1)
go pool.scheduler()
pool.SetSize(size)
return &pool
}
func (pool *WorkerPool) getQueue() (queue chan *Request) {
pool.m.Lock()
queue = pool.queue
pool.m.Unlock()
return
}
func (pool *WorkerPool) scheduler() {
defer pool.waitGroup.Done()
for {
queue := pool.getQueue()
if queue == nil {
return
}
queueLoop:
for request := range queue {
now := time.Now()
pool.m.Lock()
deadline := request.CreatedAt.Add(pool.maxPendingDuration)
pool.m.Unlock()
if deadline.After(now) {
// Safety call to setupStreamedReply in case QueueRequest had
// to time to do it yet
request.setupStreamedReply()
select {
case pool.schedule <- request:
continue queueLoop
case <-time.After(deadline.Sub(now)):
// Too late
}
}
request.SendErrorTooBusy("No worker available")
}
}
}
func (pool *WorkerPool) worker() {
defer pool.waitGroup.Done()
for request := range pool.schedule {
if request == nil {
return
}
request.RunAndReply()
}
}
// SetMaxPending changes the queue size
func (pool *WorkerPool) SetMaxPending(value uint) {
if pool.maxPending == value {
return
}
pool.m.Lock()
defer pool.m.Unlock()
oldQueue := pool.queue
pool.queue = make(chan *Request, value)
pool.maxPending = value
close(oldQueue)
// drain the old queue and cancel requests if there are too many
for request := range oldQueue {
select {
case pool.queue <- request:
default:
request.SendErrorTooBusy("too many pending requests")
}
}
}
// SetMaxPendingDuration changes the max pending delay
func (pool *WorkerPool) SetMaxPendingDuration(value time.Duration) {
pool.m.Lock()
pool.maxPendingDuration = value
pool.m.Unlock()
}
// SetSize changes the number of workers
func (pool *WorkerPool) SetSize(size uint) {
pool.m.Lock()
defer pool.m.Unlock()
if size == pool.size {
return
}
for size < pool.size {
pool.schedule <- nil
pool.size--
}
for size > pool.size {
pool.waitGroup.Add(1)
go pool.worker()
pool.size++
}
}
// QueueRequest adds a request to the queue
// Send a SERVERTOOBUSY error to the client if the queue is full
func (pool *WorkerPool) QueueRequest(request *Request) error {
select {
case pool.getQueue() <- request:
request.setupStreamedReply()
return nil
default:
return request.SendErrorTooBusy("too many pending requests")
}
}
// Close stops all the workers and wait for their completion
// If the workers do not stop before the timeout, their context is canceled
// Will never return if a request ignores the context
func (pool *WorkerPool) Close(timeout time.Duration) {
// Stops all the workers so nothing more gets scheduled
pool.SetSize(0)
pool.m.Lock()
oldQueue := pool.queue
pool.queue = nil
pool.m.Unlock()
close(oldQueue)
for request := range oldQueue {
request.SendErrorTooBusy("Worker pool shutting down")
}
// Now wait for the workers to stop and cancel the context if they don't
timer := time.AfterFunc(timeout, pool.contextCancel)
pool.waitGroup.Wait()
timer.Stop()
close(pool.schedule)
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/teamlint-nrpc.git
git@api.gitlife.ru:oschina-mirror/teamlint-nrpc.git
oschina-mirror
teamlint-nrpc
teamlint-nrpc
master