1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-Gubernator

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
gubernator.go 24 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Shawn Poulson Отправлено 19.03.2024 16:18 3982e50
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
/*
Copyright 2018-2022 Mailgun Technologies Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gubernator
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/mailgun/errors"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/mailgun/holster/v4/tracing"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
const (
maxBatchSize = 1000
Healthy = "healthy"
UnHealthy = "unhealthy"
)
type V1Instance struct {
UnimplementedV1Server
UnimplementedPeersV1Server
global *globalManager
peerMutex sync.RWMutex
log FieldLogger
conf Config
isClosed bool
workerPool *WorkerPool
}
type RateLimitReqState struct {
IsOwner bool
}
var (
metricGetRateLimitCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_getratelimit_counter",
Help: "The count of getLocalRateLimit() calls. Label \"calltype\" may be \"local\" for calls handled by the same peer, or \"global\" for global rate limits.",
}, []string{"calltype"})
metricFuncTimeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_func_duration",
Help: "The timings of key functions in Gubernator in seconds.",
Objectives: map[float64]float64{
1: 0.001,
0.99: 0.001,
0.5: 0.01,
},
}, []string{"name"})
metricOverLimitCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "gubernator_over_limit_counter",
Help: "The number of rate limit checks that are over the limit.",
})
metricConcurrentChecks = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gubernator_concurrent_checks_counter",
Help: "The number of concurrent GetRateLimits API calls.",
})
metricCheckErrorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_check_error_counter",
Help: "The number of errors while checking rate limits.",
}, []string{"error"})
metricCommandCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_command_counter",
Help: "The count of commands processed by each worker in WorkerPool.",
}, []string{"worker", "method"})
metricWorkerQueue = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "gubernator_worker_queue_length",
Help: "The count of requests queued up in WorkerPool.",
}, []string{"method", "worker"})
// Batch behavior.
metricBatchSendRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "gubernator_batch_send_retries",
Help: "The count of retries occurred in asyncRequest() forwarding a request to another peer.",
}, []string{"name"})
metricBatchQueueLength = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "gubernator_batch_queue_length",
Help: "The getRateLimitsBatch() queue length in PeerClient. This represents rate checks queued by for batching to a remote peer.",
}, []string{"peerAddr"})
metricBatchSendDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "gubernator_batch_send_duration",
Help: "The timings of batch send operations to a remote peer.",
Objectives: map[float64]float64{
0.99: 0.001,
},
}, []string{"peerAddr"})
)
// NewV1Instance instantiate a single instance of a gubernator peer and register this
// instance with the provided GRPCServer.
func NewV1Instance(conf Config) (s *V1Instance, err error) {
ctx := context.Background()
if conf.GRPCServers == nil {
return nil, errors.New("at least one GRPCServer instance is required")
}
if err := conf.SetDefaults(); err != nil {
return nil, err
}
s = &V1Instance{
log: conf.Logger,
conf: conf,
}
s.workerPool = NewWorkerPool(&conf)
s.global = newGlobalManager(conf.Behaviors, s)
// Register our instance with all GRPC servers
for _, srv := range conf.GRPCServers {
RegisterV1Server(srv, s)
RegisterPeersV1Server(srv, s)
}
if s.conf.Loader == nil {
return s, nil
}
// Load the cache.
err = s.workerPool.Load(ctx)
if err != nil {
return nil, errors.Wrap(err, "Error in workerPool.Load")
}
return s, nil
}
func (s *V1Instance) Close() (err error) {
ctx := context.Background()
if s.isClosed {
return nil
}
s.global.Close()
if s.conf.Loader != nil {
err = s.workerPool.Store(ctx)
if err != nil {
s.log.WithError(err).
Error("Error in workerPool.Store")
return errors.Wrap(err, "Error in workerPool.Store")
}
}
err = s.workerPool.Close()
if err != nil {
s.log.WithError(err).
Error("Error in workerPool.Close")
return errors.Wrap(err, "Error in workerPool.Close")
}
s.isClosed = true
return nil
}
// GetRateLimits is the public interface used by clients to request rate limits from the system. If the
// rate limit `Name` and `UniqueKey` is not owned by this instance, then we forward the request to the
// peer that does.
func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) {
funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetRateLimits"))
defer funcTimer.ObserveDuration()
metricConcurrentChecks.Inc()
defer metricConcurrentChecks.Dec()
if len(r.Requests) > maxBatchSize {
metricCheckErrorCounter.WithLabelValues("Request too large").Inc()
return nil, status.Errorf(codes.OutOfRange,
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}
createdAt := epochMillis(clock.Now())
resp := GetRateLimitsResp{
Responses: make([]*RateLimitResp, len(r.Requests)),
}
var wg sync.WaitGroup
asyncCh := make(chan AsyncResp, len(r.Requests))
// For each item in the request body
for i, req := range r.Requests {
key := req.Name + "_" + req.UniqueKey
var peer *PeerClient
var err error
if req.UniqueKey == "" {
metricCheckErrorCounter.WithLabelValues("Invalid request").Inc()
resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
continue
}
if req.Name == "" {
metricCheckErrorCounter.WithLabelValues("Invalid request").Inc()
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
continue
}
if req.CreatedAt == nil || *req.CreatedAt == 0 {
req.CreatedAt = &createdAt
}
if ctx.Err() != nil {
err = errors.Wrap(ctx.Err(), "Error while iterating request items")
span := trace.SpanFromContext(ctx)
span.RecordError(err)
resp.Responses[i] = &RateLimitResp{
Error: err.Error(),
}
continue
}
if s.conf.Behaviors.ForceGlobal {
SetBehavior(&req.Behavior, Behavior_GLOBAL, true)
}
peer, err = s.GetPeer(ctx, key)
if err != nil {
countError(err, "Error in GetPeer")
err = errors.Wrapf(err, "Error in GetPeer, looking up peer that owns rate limit '%s'", key)
resp.Responses[i] = &RateLimitResp{
Error: err.Error(),
}
continue
}
// If our server instance is the owner of this rate limit
reqState := RateLimitReqState{IsOwner: peer.Info().IsOwner}
if reqState.IsOwner {
// Apply our rate limit algorithm to the request
resp.Responses[i], err = s.getLocalRateLimit(ctx, req, reqState)
if err != nil {
err = errors.Wrapf(err, "Error while apply rate limit for '%s'", key)
span := trace.SpanFromContext(ctx)
span.RecordError(err)
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}
} else {
if HasBehavior(req.Behavior, Behavior_GLOBAL) {
resp.Responses[i], err = s.getGlobalRateLimit(ctx, req)
if err != nil {
err = errors.Wrap(err, "Error in getGlobalRateLimit")
span := trace.SpanFromContext(ctx)
span.RecordError(err)
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}
// Inform the client of the owner key of the key
resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
continue
}
// Request must be forwarded to peer that owns the key.
// Launch remote peer request in goroutine.
wg.Add(1)
go s.asyncRequest(ctx, &AsyncReq{
AsyncCh: asyncCh,
Peer: peer,
Req: req,
WG: &wg,
Key: key,
Idx: i,
})
}
}
// Wait for any async responses if any
wg.Wait()
close(asyncCh)
for a := range asyncCh {
resp.Responses[a.Idx] = a.Resp
}
return &resp, nil
}
type AsyncResp struct {
Idx int
Resp *RateLimitResp
}
type AsyncReq struct {
WG *sync.WaitGroup
AsyncCh chan AsyncResp
Req *RateLimitReq
Peer *PeerClient
Key string
Idx int
}
func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
var attempts int
var err error
ctx = tracing.StartNamedScope(ctx, "V1Instance.asyncRequest")
defer tracing.EndScope(ctx, nil)
funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.asyncRequest"))
defer funcTimer.ObserveDuration()
reqState := RateLimitReqState{IsOwner: req.Peer.Info().IsOwner}
resp := AsyncResp{
Idx: req.Idx,
}
for {
if attempts > 5 {
s.log.WithContext(ctx).
WithError(err).
WithField("key", req.Key).
Error("GetPeer() returned peer that is not connected")
countError(err, "Peer not connected")
err = errors.Wrapf(err, "GetPeer() keeps returning peers that are not connected for '%s'", req.Key)
resp.Resp = &RateLimitResp{Error: err.Error()}
break
}
// If we are attempting again, the owner of this rate limit might have changed to us!
if attempts != 0 {
if reqState.IsOwner {
resp.Resp, err = s.getLocalRateLimit(ctx, req.Req, reqState)
if err != nil {
s.log.WithContext(ctx).
WithError(err).
WithField("key", req.Key).
Error("Error applying rate limit")
err = errors.Wrapf(err, "Error in getLocalRateLimit for '%s'", req.Key)
resp.Resp = &RateLimitResp{Error: err.Error()}
}
break
}
}
// Make an RPC call to the peer that owns this rate limit
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
attempts++
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
req.Peer, err = s.GetPeer(ctx, req.Key)
if err != nil {
errPart := fmt.Sprintf("Error finding peer that owns rate limit '%s'", req.Key)
s.log.WithContext(ctx).WithError(err).WithField("key", req.Key).Error(errPart)
countError(err, "Error in GetPeer")
err = errors.Wrap(err, errPart)
resp.Resp = &RateLimitResp{Error: err.Error()}
break
}
continue
}
// Not calling `countError()` because we expect the remote end to
// report this error.
err = errors.Wrap(err, fmt.Sprintf("Error while fetching rate limit '%s' from peer", req.Key))
resp.Resp = &RateLimitResp{Error: err.Error()}
break
}
// Inform the client of the owner key of the key
resp.Resp = r
resp.Resp.Metadata = map[string]string{"owner": req.Peer.Info().GRPCAddress}
break
}
req.AsyncCh <- resp
req.WG.Done()
if isDeadlineExceeded(ctx.Err()) {
metricCheckErrorCounter.WithLabelValues("Timeout forwarding to peer").Inc()
}
}
// getGlobalRateLimit handles rate limits that are marked as `Behavior = GLOBAL`. Rate limit responses
// are returned from the local cache and the hits are queued to be sent to the owning peer.
func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) (resp *RateLimitResp, err error) {
ctx = tracing.StartNamedScope(ctx, "V1Instance.getGlobalRateLimit", trace.WithAttributes(
attribute.String("ratelimit.key", req.UniqueKey),
attribute.String("ratelimit.name", req.Name),
))
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getGlobalRateLimit")).ObserveDuration()
defer func() {
if err == nil {
s.global.QueueHit(req)
}
tracing.EndScope(ctx, err)
}()
req2 := proto.Clone(req).(*RateLimitReq)
SetBehavior(&req2.Behavior, Behavior_NO_BATCHING, true)
SetBehavior(&req2.Behavior, Behavior_GLOBAL, false)
reqState := RateLimitReqState{IsOwner: false}
// Process the rate limit like we own it
resp, err = s.getLocalRateLimit(ctx, req2, reqState)
if err != nil {
return nil, errors.Wrap(err, "during in getLocalRateLimit")
}
metricGetRateLimitCounter.WithLabelValues("global").Inc()
return resp, nil
}
// UpdatePeerGlobals updates the local cache with a list of global rate limits. This method should only
// be called by a peer who is the owner of a global rate limit.
func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobalsReq) (*UpdatePeerGlobalsResp, error) {
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.UpdatePeerGlobals")).ObserveDuration()
now := MillisecondNow()
for _, g := range r.Globals {
item := &CacheItem{
ExpireAt: g.Status.ResetTime,
Algorithm: g.Algorithm,
Key: g.Key,
}
switch g.Algorithm {
case Algorithm_LEAKY_BUCKET:
item.Value = &LeakyBucketItem{
Remaining: float64(g.Status.Remaining),
Limit: g.Status.Limit,
Duration: g.Duration,
Burst: g.Status.Limit,
UpdatedAt: now,
}
case Algorithm_TOKEN_BUCKET:
item.Value = &TokenBucketItem{
Status: g.Status.Status,
Limit: g.Status.Limit,
Duration: g.Duration,
Remaining: g.Status.Remaining,
CreatedAt: now,
}
}
err := s.workerPool.AddCacheItem(ctx, g.Key, item)
if err != nil {
return nil, errors.Wrap(err, "Error in workerPool.AddCacheItem")
}
}
return &UpdatePeerGlobalsResp{}, nil
}
// GetPeerRateLimits is called by other peers to get the rate limits owned by this peer.
func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) {
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeerRateLimits")).ObserveDuration()
if len(r.Requests) > maxBatchSize {
err := fmt.Errorf("'PeerRequest.rate_limits' list too large; max size is '%d'", maxBatchSize)
metricCheckErrorCounter.WithLabelValues("Request too large").Inc()
return nil, status.Error(codes.OutOfRange, err.Error())
}
// Invoke each rate limit request.
type reqIn struct {
idx int
req *RateLimitReq
}
type respOut struct {
idx int
rl *RateLimitResp
}
resp = &GetPeerRateLimitsResp{
RateLimits: make([]*RateLimitResp, len(r.Requests)),
}
respChan := make(chan respOut)
var respWg sync.WaitGroup
respWg.Add(1)
reqState := RateLimitReqState{IsOwner: true}
go func() {
// Capture each response and return in the same order
for out := range respChan {
resp.RateLimits[out.idx] = out.rl
}
respWg.Done()
}()
// Fan out requests.
fan := syncutil.NewFanOut(s.conf.Workers)
for idx, req := range r.Requests {
fan.Run(func(in interface{}) error {
rin := in.(reqIn)
// Extract the propagated context from the metadata in the request
prop := propagation.TraceContext{}
ctx := prop.Extract(ctx, &MetadataCarrier{Map: rin.req.Metadata})
// Forwarded global requests must have DRAIN_OVER_LIMIT set so token and leaky algorithms
// drain the remaining in the event a peer asks for more than is remaining.
// This is needed because with GLOBAL behavior peers will accumulate hits, which could
// result in requesting more hits than is remaining.
if HasBehavior(rin.req.Behavior, Behavior_GLOBAL) {
SetBehavior(&rin.req.Behavior, Behavior_DRAIN_OVER_LIMIT, true)
}
// Assign default to CreatedAt for backwards compatibility.
if rin.req.CreatedAt == nil || *rin.req.CreatedAt == 0 {
createdAt := epochMillis(clock.Now())
rin.req.CreatedAt = &createdAt
}
rl, err := s.getLocalRateLimit(ctx, rin.req, reqState)
if err != nil {
// Return the error for this request
err = errors.Wrap(err, "Error in getLocalRateLimit")
rl = &RateLimitResp{Error: err.Error()}
// metricCheckErrorCounter is updated within getLocalRateLimit(), not in GetPeerRateLimits.
}
respChan <- respOut{rin.idx, rl}
return nil
}, reqIn{idx, req})
}
// Wait for all requests to be handled, then clean up.
_ = fan.Wait()
close(respChan)
respWg.Wait()
return resp, nil
}
// HealthCheck Returns the health of our instance.
func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health *HealthCheckResp, err error) {
span := trace.SpanFromContext(ctx)
var errs []string
s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
// Iterate through local peers and get their last errors
localPeers := s.conf.LocalPicker.Peers()
for _, peer := range localPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("error returned from local peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
}
// Do the same for region peers
regionPeers := s.conf.RegionPicker.Peers()
for _, peer := range regionPeers {
for _, errMsg := range peer.GetLastErr() {
err := fmt.Errorf("error returned from region peer.GetLastErr: %s", errMsg)
span.RecordError(err)
errs = append(errs, err.Error())
}
}
health = &HealthCheckResp{
PeerCount: int32(len(localPeers) + len(regionPeers)),
Status: Healthy,
}
if len(errs) != 0 {
health.Status = UnHealthy
health.Message = strings.Join(errs, "|")
}
span.SetAttributes(
attribute.Int64("health.peerCount", int64(health.PeerCount)),
attribute.String("health.status", health.Status),
)
return health, nil
}
func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq, reqState RateLimitReqState) (_ *RateLimitResp, err error) {
ctx = tracing.StartNamedScope(ctx, "V1Instance.getLocalRateLimit", trace.WithAttributes(
attribute.String("ratelimit.key", r.UniqueKey),
attribute.String("ratelimit.name", r.Name),
attribute.Int64("ratelimit.limit", r.Limit),
attribute.Int64("ratelimit.hits", r.Hits),
))
defer func() { tracing.EndScope(ctx, err) }()
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration()
resp, err := s.workerPool.GetRateLimit(ctx, r, reqState)
if err != nil {
return nil, errors.Wrap(err, "during workerPool.GetRateLimit")
}
// If global behavior, then broadcast update to all peers.
if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r)
}
if reqState.IsOwner {
metricGetRateLimitCounter.WithLabelValues("local").Inc()
}
return resp, nil
}
// SetPeers replaces the peers and shuts down all the previous peers.
// TODO this should return an error if we failed to connect to any of the new peers
func (s *V1Instance) SetPeers(peerInfo []PeerInfo) {
localPicker := s.conf.LocalPicker.New()
regionPicker := s.conf.RegionPicker.New()
for _, info := range peerInfo {
// Add peers that are not in our local DC to the RegionPicker
if info.DataCenter != s.conf.DataCenter {
peer := s.conf.RegionPicker.GetByPeerInfo(info)
// If we don't have an existing PeerClient create a new one
if peer == nil {
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
regionPicker.Add(peer)
continue
}
// If we don't have an existing PeerClient create a new one
peer := s.conf.LocalPicker.GetByPeerInfo(info)
if peer == nil {
var err error
peer, err = NewPeerClient(PeerConfig{
TraceGRPC: s.conf.PeerTraceGRPC,
Behavior: s.conf.Behaviors,
TLS: s.conf.PeerTLS,
Log: s.log,
Info: info,
})
if err != nil {
s.log.Errorf("error connecting to peer %s: %s", info.GRPCAddress, err)
return
}
}
localPicker.Add(peer)
}
s.peerMutex.Lock()
// Replace our current pickers
oldLocalPicker := s.conf.LocalPicker
oldRegionPicker := s.conf.RegionPicker
s.conf.LocalPicker = localPicker
s.conf.RegionPicker = regionPicker
s.peerMutex.Unlock()
s.log.WithField("peers", peerInfo).Debug("peers updated")
// Shutdown any old peers we no longer need
ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout)
defer cancel()
var shutdownPeers []*PeerClient
for _, peer := range oldLocalPicker.Peers() {
if peerInfo := s.conf.LocalPicker.GetByPeerInfo(peer.Info()); peerInfo == nil {
shutdownPeers = append(shutdownPeers, peer)
}
}
for _, regionPicker := range oldRegionPicker.Pickers() {
for _, peer := range regionPicker.Peers() {
if peerInfo := s.conf.RegionPicker.GetByPeerInfo(peer.Info()); peerInfo == nil {
shutdownPeers = append(shutdownPeers, peer)
}
}
}
var wg syncutil.WaitGroup
for _, p := range shutdownPeers {
wg.Run(func(obj interface{}) error {
pc := obj.(*PeerClient)
err := pc.Shutdown(ctx)
if err != nil {
s.log.WithError(err).WithField("peer", pc).Error("while shutting down peer")
}
return nil
}, p)
}
wg.Wait()
if len(shutdownPeers) > 0 {
var peers []string
for _, p := range shutdownPeers {
peers = append(peers, p.Info().GRPCAddress)
}
s.log.WithField("peers", peers).Debug("peers shutdown")
}
}
// GetPeer returns a peer client for the hash key provided
func (s *V1Instance) GetPeer(ctx context.Context, key string) (p *PeerClient, err error) {
defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeer")).ObserveDuration()
s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
p, err = s.conf.LocalPicker.Get(key)
if err != nil {
return nil, errors.Wrap(err, "Error in conf.LocalPicker.Get")
}
return p, nil
}
func (s *V1Instance) GetPeerList() []*PeerClient {
s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
return s.conf.LocalPicker.Peers()
}
func (s *V1Instance) GetRegionPickers() map[string]PeerPicker {
s.peerMutex.RLock()
defer s.peerMutex.RUnlock()
return s.conf.RegionPicker.Pickers()
}
// Describe fetches prometheus metrics to be registered
func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) {
metricBatchQueueLength.Describe(ch)
metricBatchSendDuration.Describe(ch)
metricBatchSendRetries.Describe(ch)
metricCheckErrorCounter.Describe(ch)
metricCommandCounter.Describe(ch)
metricConcurrentChecks.Describe(ch)
metricFuncTimeDuration.Describe(ch)
metricGetRateLimitCounter.Describe(ch)
metricOverLimitCounter.Describe(ch)
metricWorkerQueue.Describe(ch)
s.global.metricBroadcastDuration.Describe(ch)
s.global.metricGlobalQueueLength.Describe(ch)
s.global.metricGlobalSendDuration.Describe(ch)
s.global.metricGlobalSendQueueLength.Describe(ch)
}
// Collect fetches metrics from the server for use by prometheus
func (s *V1Instance) Collect(ch chan<- prometheus.Metric) {
metricBatchQueueLength.Collect(ch)
metricBatchSendDuration.Collect(ch)
metricBatchSendRetries.Collect(ch)
metricCheckErrorCounter.Collect(ch)
metricCommandCounter.Collect(ch)
metricConcurrentChecks.Collect(ch)
metricFuncTimeDuration.Collect(ch)
metricGetRateLimitCounter.Collect(ch)
metricOverLimitCounter.Collect(ch)
metricWorkerQueue.Collect(ch)
s.global.metricBroadcastDuration.Collect(ch)
s.global.metricGlobalQueueLength.Collect(ch)
s.global.metricGlobalSendDuration.Collect(ch)
s.global.metricGlobalSendQueueLength.Collect(ch)
}
// HasBehavior returns true if the provided behavior is set
func HasBehavior(b Behavior, flag Behavior) bool {
return b&flag != 0
}
// SetBehavior sets or clears the behavior depending on the boolean `set`
func SetBehavior(b *Behavior, flag Behavior, set bool) {
if set {
*b = *b | flag
} else {
mask := *b ^ flag
*b &= mask
}
}
// Count an error type in the metricCheckErrorCounter metric.
// Recurse into wrapped errors if necessary.
func countError(err error, defaultType string) {
for {
if err == nil {
metricCheckErrorCounter.WithLabelValues(defaultType).Inc()
return
}
if errors.Is(err, context.DeadlineExceeded) {
metricCheckErrorCounter.WithLabelValues("Timeout").Inc()
return
}
err = errors.Unwrap(err)
}
}
func isDeadlineExceeded(err error) bool {
if err == nil {
return false
}
return errors.Is(err, context.DeadlineExceeded)
}
func epochMillis(t time.Time) int64 {
return t.UnixNano() / 1_000_000
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-Gubernator.git
git@api.gitlife.ru:oschina-mirror/mirrors-Gubernator.git
oschina-mirror
mirrors-Gubernator
mirrors-Gubernator
master