1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-Tao

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
conn.go 19 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
leesper Отправлено 25.08.2017 12:03 dd55dc4
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
package tao
import (
"context"
"crypto/tls"
"net"
"sync"
"time"
"github.com/leesper/holmes"
)
const (
// MessageTypeBytes is the length of type header.
MessageTypeBytes = 4
// MessageLenBytes is the length of length header.
MessageLenBytes = 4
// MessageMaxBytes is the maximum bytes allowed for application data.
MessageMaxBytes = 1 << 23 // 8M
)
// MessageHandler is a combination of message and its handler function.
type MessageHandler struct {
message Message
handler HandlerFunc
}
// WriteCloser is the interface that groups Write and Close methods.
type WriteCloser interface {
Write(Message) error
Close()
}
// ServerConn represents a server connection to a TCP server, it implments Conn.
type ServerConn struct {
netid int64
belong *Server
rawConn net.Conn
once *sync.Once
wg *sync.WaitGroup
sendCh chan []byte
handlerCh chan MessageHandler
timerCh chan *OnTimeOut
mu sync.Mutex // guards following
name string
heart int64
pending []int64
ctx context.Context
cancel context.CancelFunc
}
// NewServerConn returns a new server connection which has not started to
// serve requests yet.
func NewServerConn(id int64, s *Server, c net.Conn) *ServerConn {
sc := &ServerConn{
netid: id,
belong: s,
rawConn: c,
once: &sync.Once{},
wg: &sync.WaitGroup{},
sendCh: make(chan []byte, s.opts.bufferSize),
handlerCh: make(chan MessageHandler, s.opts.bufferSize),
timerCh: make(chan *OnTimeOut, s.opts.bufferSize),
heart: time.Now().UnixNano(),
}
sc.ctx, sc.cancel = context.WithCancel(context.WithValue(s.ctx, serverCtx, s))
sc.name = c.RemoteAddr().String()
sc.pending = []int64{}
return sc
}
// ServerFromContext returns the server within the context.
func ServerFromContext(ctx context.Context) (*Server, bool) {
server, ok := ctx.Value(serverCtx).(*Server)
return server, ok
}
// NetID returns net ID of server connection.
func (sc *ServerConn) NetID() int64 {
return sc.netid
}
// SetName sets name of server connection.
func (sc *ServerConn) SetName(name string) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.name = name
}
// Name returns the name of server connection.
func (sc *ServerConn) Name() string {
sc.mu.Lock()
defer sc.mu.Unlock()
name := sc.name
return name
}
// SetHeartBeat sets the heart beats of server connection.
func (sc *ServerConn) SetHeartBeat(heart int64) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.heart = heart
}
// HeartBeat returns the heart beats of server connection.
func (sc *ServerConn) HeartBeat() int64 {
sc.mu.Lock()
defer sc.mu.Unlock()
heart := sc.heart
return heart
}
// SetContextValue sets extra data to server connection.
func (sc *ServerConn) SetContextValue(k, v interface{}) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.ctx = context.WithValue(sc.ctx, k, v)
}
// ContextValue gets extra data from server connection.
func (sc *ServerConn) ContextValue(k interface{}) interface{} {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.ctx.Value(k)
}
// Start starts the server connection, creating go-routines for reading,
// writing and handlng.
func (sc *ServerConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
onConnect := sc.belong.opts.onConnect
if onConnect != nil {
onConnect(sc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, l := range loopers {
looper := l
sc.wg.Add(1)
go looper(sc, sc.wg)
}
}
// Close gracefully closes the server connection. It blocked until all sub
// go-routines are completed and returned.
func (sc *ServerConn) Close() {
sc.once.Do(func() {
holmes.Infof("conn close gracefully, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
// callback on close
onClose := sc.belong.opts.onClose
if onClose != nil {
onClose(sc)
}
// remove connection from server
sc.belong.conns.Delete(sc.netid)
addTotalConn(-1)
// close net.Conn, any blocked read or write operation will be unblocked and
// return errors.
if tc, ok := sc.rawConn.(*net.TCPConn); ok {
// avoid time-wait state
tc.SetLinger(0)
}
sc.rawConn.Close()
// cancel readLoop, writeLoop and handleLoop go-routines.
sc.mu.Lock()
sc.cancel()
pending := sc.pending
sc.pending = nil
sc.mu.Unlock()
// clean up pending timers
for _, id := range pending {
sc.CancelTimer(id)
}
// wait until all go-routines exited.
sc.wg.Wait()
// close all channels and block until all go-routines exited.
close(sc.sendCh)
close(sc.handlerCh)
close(sc.timerCh)
// tell server I'm done :( .
sc.belong.wg.Done()
})
}
// AddPendingTimer adds a timer ID to server Connection.
func (sc *ServerConn) AddPendingTimer(timerID int64) {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.pending != nil {
sc.pending = append(sc.pending, timerID)
}
}
// Write writes a message to the client.
func (sc *ServerConn) Write(message Message) error {
return asyncWrite(sc, message)
}
// RunAt runs a callback at the specified timestamp.
func (sc *ServerConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64 {
id := runAt(sc.ctx, sc.netid, sc.belong.timing, timestamp, callback)
if id >= 0 {
sc.AddPendingTimer(id)
}
return id
}
// RunAfter runs a callback right after the specified duration ellapsed.
func (sc *ServerConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64 {
id := runAfter(sc.ctx, sc.netid, sc.belong.timing, duration, callback)
if id >= 0 {
sc.AddPendingTimer(id)
}
return id
}
// RunEvery runs a callback on every interval time.
func (sc *ServerConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64 {
id := runEvery(sc.ctx, sc.netid, sc.belong.timing, interval, callback)
if id >= 0 {
sc.AddPendingTimer(id)
}
return id
}
// CancelTimer cancels a timer with the specified ID.
func (sc *ServerConn) CancelTimer(timerID int64) {
cancelTimer(sc.belong.timing, timerID)
}
func cancelTimer(timing *TimingWheel, timerID int64) {
if timing != nil {
timing.CancelTimer(timerID)
}
}
// RemoteAddr returns the remote address of server connection.
func (sc *ServerConn) RemoteAddr() net.Addr {
return sc.rawConn.RemoteAddr()
}
// LocalAddr returns the local address of server connection.
func (sc *ServerConn) LocalAddr() net.Addr {
return sc.rawConn.LocalAddr()
}
// ClientConn represents a client connection to a TCP server.
type ClientConn struct {
addr string
opts options
netid int64
rawConn net.Conn
once *sync.Once
wg *sync.WaitGroup
sendCh chan []byte
handlerCh chan MessageHandler
timing *TimingWheel
mu sync.Mutex // guards following
name string
heart int64
pending []int64
ctx context.Context
cancel context.CancelFunc
}
// NewClientConn returns a new client connection which has not started to
// serve requests yet.
func NewClientConn(netid int64, c net.Conn, opt ...ServerOption) *ClientConn {
var opts options
for _, o := range opt {
o(&opts)
}
if opts.codec == nil {
opts.codec = TypeLengthValueCodec{}
}
if opts.bufferSize <= 0 {
opts.bufferSize = BufferSize256
}
return newClientConnWithOptions(netid, c, opts)
}
func newClientConnWithOptions(netid int64, c net.Conn, opts options) *ClientConn {
cc := &ClientConn{
addr: c.RemoteAddr().String(),
opts: opts,
netid: netid,
rawConn: c,
once: &sync.Once{},
wg: &sync.WaitGroup{},
sendCh: make(chan []byte, opts.bufferSize),
handlerCh: make(chan MessageHandler, opts.bufferSize),
heart: time.Now().UnixNano(),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.timing = NewTimingWheel(cc.ctx)
cc.name = c.RemoteAddr().String()
cc.pending = []int64{}
return cc
}
// NetID returns the net ID of client connection.
func (cc *ClientConn) NetID() int64 {
return cc.netid
}
// SetName sets the name of client connection.
func (cc *ClientConn) SetName(name string) {
cc.mu.Lock()
cc.name = name
cc.mu.Unlock()
}
// Name gets the name of client connection.
func (cc *ClientConn) Name() string {
cc.mu.Lock()
name := cc.name
cc.mu.Unlock()
return name
}
// SetHeartBeat sets the heart beats of client connection.
func (cc *ClientConn) SetHeartBeat(heart int64) {
cc.mu.Lock()
cc.heart = heart
cc.mu.Unlock()
}
// HeartBeat gets the heart beats of client connection.
func (cc *ClientConn) HeartBeat() int64 {
cc.mu.Lock()
heart := cc.heart
cc.mu.Unlock()
return heart
}
// SetContextValue sets extra data to client connection.
func (cc *ClientConn) SetContextValue(k, v interface{}) {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.ctx = context.WithValue(cc.ctx, k, v)
}
// ContextValue gets extra data from client connection.
func (cc *ClientConn) ContextValue(k interface{}) interface{} {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.ctx.Value(k)
}
// Start starts the client connection, creating go-routines for reading,
// writing and handlng.
func (cc *ClientConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", cc.rawConn.LocalAddr(), cc.rawConn.RemoteAddr())
onConnect := cc.opts.onConnect
if onConnect != nil {
onConnect(cc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, l := range loopers {
looper := l
cc.wg.Add(1)
go looper(cc, cc.wg)
}
}
// Close gracefully closes the client connection. It blocked until all sub
// go-routines are completed and returned.
func (cc *ClientConn) Close() {
cc.once.Do(func() {
holmes.Infof("conn close gracefully, <%v -> %v>\n", cc.rawConn.LocalAddr(), cc.rawConn.RemoteAddr())
// callback on close
onClose := cc.opts.onClose
if onClose != nil {
onClose(cc)
}
// close net.Conn, any blocked read or write operation will be unblocked and
// return errors.
cc.rawConn.Close()
// cancel readLoop, writeLoop and handleLoop go-routines.
cc.mu.Lock()
cc.cancel()
cc.pending = nil
cc.mu.Unlock()
// stop timer
cc.timing.Stop()
// wait until all go-routines exited.
cc.wg.Wait()
// close all channels.
close(cc.sendCh)
close(cc.handlerCh)
// cc.once is a *sync.Once. After reconnect() returned, cc.once will point
// to a newly-allocated one while other go-routines such as readLoop,
// writeLoop and handleLoop blocking on the old *sync.Once continue to
// execute Close() (and of course do nothing because of sync.Once).
// NOTE that it will cause an "unlock of unlocked mutex" error if cc.once is
// a sync.Once struct, because "defer o.m.Unlock()" in sync.Once.Do() will
// be performed on an unlocked mutex(the newly-allocated one noticed above)
if cc.opts.reconnect {
cc.reconnect()
}
})
}
// reconnect reconnects and returns a new *ClientConn.
func (cc *ClientConn) reconnect() {
var c net.Conn
var err error
if cc.opts.tlsCfg != nil {
c, err = tls.Dial("tcp", cc.addr, cc.opts.tlsCfg)
if err != nil {
holmes.Fatalln("tls dial error", err)
}
} else {
c, err = net.Dial("tcp", cc.addr)
if err != nil {
holmes.Fatalln("net dial error", err)
}
}
// copy the newly-created *ClientConn to cc, so after
// reconnect returned cc will be updated to new one.
*cc = *newClientConnWithOptions(cc.netid, c, cc.opts)
cc.Start()
}
// Write writes a message to the client.
func (cc *ClientConn) Write(message Message) error {
return asyncWrite(cc, message)
}
// RunAt runs a callback at the specified timestamp.
func (cc *ClientConn) RunAt(timestamp time.Time, callback func(time.Time, WriteCloser)) int64 {
id := runAt(cc.ctx, cc.netid, cc.timing, timestamp, callback)
if id >= 0 {
cc.AddPendingTimer(id)
}
return id
}
// RunAfter runs a callback right after the specified duration ellapsed.
func (cc *ClientConn) RunAfter(duration time.Duration, callback func(time.Time, WriteCloser)) int64 {
id := runAfter(cc.ctx, cc.netid, cc.timing, duration, callback)
if id >= 0 {
cc.AddPendingTimer(id)
}
return id
}
// RunEvery runs a callback on every interval time.
func (cc *ClientConn) RunEvery(interval time.Duration, callback func(time.Time, WriteCloser)) int64 {
id := runEvery(cc.ctx, cc.netid, cc.timing, interval, callback)
if id >= 0 {
cc.AddPendingTimer(id)
}
return id
}
// AddPendingTimer adds a new timer ID to client connection.
func (cc *ClientConn) AddPendingTimer(timerID int64) {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.pending != nil {
cc.pending = append(cc.pending, timerID)
}
}
// CancelTimer cancels a timer with the specified ID.
func (cc *ClientConn) CancelTimer(timerID int64) {
cancelTimer(cc.timing, timerID)
}
// RemoteAddr returns the remote address of server connection.
func (cc *ClientConn) RemoteAddr() net.Addr {
return cc.rawConn.RemoteAddr()
}
// LocalAddr returns the local address of server connection.
func (cc *ClientConn) LocalAddr() net.Addr {
return cc.rawConn.LocalAddr()
}
func runAt(ctx context.Context, netID int64, timing *TimingWheel, ts time.Time, cb func(time.Time, WriteCloser)) int64 {
timeout := NewOnTimeOut(NewContextWithNetID(ctx, netID), cb)
return timing.AddTimer(ts, 0, timeout)
}
func runAfter(ctx context.Context, netID int64, timing *TimingWheel, d time.Duration, cb func(time.Time, WriteCloser)) int64 {
delay := time.Now().Add(d)
return runAt(ctx, netID, timing, delay, cb)
}
func runEvery(ctx context.Context, netID int64, timing *TimingWheel, d time.Duration, cb func(time.Time, WriteCloser)) int64 {
delay := time.Now().Add(d)
timeout := NewOnTimeOut(NewContextWithNetID(ctx, netID), cb)
return timing.AddTimer(delay, d, timeout)
}
func asyncWrite(c interface{}, m Message) (err error) {
defer func() {
if p := recover(); p != nil {
err = ErrServerClosed
}
}()
var (
pkt []byte
sendCh chan []byte
)
switch c := c.(type) {
case *ServerConn:
pkt, err = c.belong.opts.codec.Encode(m)
sendCh = c.sendCh
case *ClientConn:
pkt, err = c.opts.codec.Encode(m)
sendCh = c.sendCh
}
if err != nil {
holmes.Errorf("asyncWrite error %v\n", err)
return
}
select {
case sendCh <- pkt:
err = nil
default:
err = ErrWouldBlock
}
return
}
/* readLoop() blocking read from connection, deserialize bytes into message,
then find corresponding handler, put it into channel */
func readLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
rawConn net.Conn
codec Codec
cDone <-chan struct{}
sDone <-chan struct{}
setHeartBeatFunc func(int64)
onMessage onMessageFunc
handlerCh chan MessageHandler
msg Message
err error
)
switch c := c.(type) {
case *ServerConn:
rawConn = c.rawConn
codec = c.belong.opts.codec
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
setHeartBeatFunc = c.SetHeartBeat
onMessage = c.belong.opts.onMessage
handlerCh = c.handlerCh
case *ClientConn:
rawConn = c.rawConn
codec = c.opts.codec
cDone = c.ctx.Done()
sDone = nil
setHeartBeatFunc = c.SetHeartBeat
onMessage = c.opts.onMessage
handlerCh = c.handlerCh
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
wg.Done()
holmes.Debugln("readLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
default:
msg, err = codec.Decode(rawConn)
if err != nil {
holmes.Errorf("error decoding message %v\n", err)
if _, ok := err.(ErrUndefined); ok {
// update heart beats
setHeartBeatFunc(time.Now().UnixNano())
continue
}
return
}
setHeartBeatFunc(time.Now().UnixNano())
handler := GetHandlerFunc(msg.MessageNumber())
if handler == nil {
if onMessage != nil {
holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
onMessage(msg, c.(WriteCloser))
} else {
holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
}
continue
}
handlerCh <- MessageHandler{msg, handler}
}
}
}
/* writeLoop() receive message from channel, serialize it into bytes,
then blocking write into connection */
func writeLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
rawConn net.Conn
sendCh chan []byte
cDone <-chan struct{}
sDone <-chan struct{}
pkt []byte
err error
)
switch c := c.(type) {
case *ServerConn:
rawConn = c.rawConn
sendCh = c.sendCh
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
case *ClientConn:
rawConn = c.rawConn
sendCh = c.sendCh
cDone = c.ctx.Done()
sDone = nil
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
// drain all pending messages before exit
OuterFor:
for {
select {
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
}
}
default:
break OuterFor
}
}
wg.Done()
holmes.Debugln("writeLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
return
}
}
}
}
}
// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
cDone <-chan struct{}
sDone <-chan struct{}
timerCh chan *OnTimeOut
handlerCh chan MessageHandler
netID int64
ctx context.Context
askForWorker bool
err error
)
switch c := c.(type) {
case *ServerConn:
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
timerCh = c.timerCh
handlerCh = c.handlerCh
netID = c.netid
ctx = c.ctx
askForWorker = true
case *ClientConn:
cDone = c.ctx.Done()
sDone = nil
timerCh = c.timing.timeOutChan
handlerCh = c.handlerCh
netID = c.netid
ctx = c.ctx
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
wg.Done()
holmes.Debugln("handleLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connectin closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler
if handler != nil {
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
})
if err != nil {
holmes.Errorln(err)
}
addTotalHandle()
} else {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}
}
case timeout := <-timerCh:
if timeout != nil {
timeoutNetID := NetIDFromContext(timeout.Ctx)
if timeoutNetID != netID {
holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
}
if askForWorker {
err = WorkerPoolInstance().Put(netID, func() {
timeout.Callback(time.Now(), c.(WriteCloser))
})
if err != nil {
holmes.Errorln(err)
}
} else {
timeout.Callback(time.Now(), c.(WriteCloser))
}
}
}
}
}

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-Tao.git
git@api.gitlife.ru:oschina-mirror/mirrors-Tao.git
oschina-mirror
mirrors-Tao
mirrors-Tao
master