1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mirrors-Tao

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
DOC.md 56 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
gitlife-traslator Отправлено 27.11.2024 21:14 82086e1

Что такое Tao

Tao на английском языке означает «The ultimate principle of universe», то есть «Дао» — это высший принцип вселенной.

«Дао порождает одно, одно порождает два, два порождают три, а три порождают бесконечность». — «Даодэцзин».

Tao также является асинхронным TCP-сервером, разработанным на языке Go с использованием философии Go «Less is more». Он способен проникать сквозь все проявления и позволяет заглянуть в мир сетевого программирования, помогая навсегда избавиться от ограничений, связанных с написанием только «socket-bind-listen-accept». В этой статье мы обсудим дизайн и философию этого фреймворка.

1. Что решает Tao

  • 1.1 Сценарий: у вашего продукта есть собственная бизнес-логика, которая требует поддержки со стороны сервера для обслуживания клиентов.
  • 1.2 Проблема: как быстро и надёжно реализовать функции продукта без необходимости тратить много времени на обработку деталей сетевых коммуникаций?
  • 1.3 Решение: Tao предоставляет механизм, основанный на фреймворке, для поддержки бизнес-логики. Вам нужно только определить формат сообщений с клиентом и написать соответствующую бизнес-логику в виде функций, которые затем регистрируются во фреймворке.

2. Запуск чата на 50 строках кода Давайте рассмотрим пример использования фреймворка Tao для создания простого группового чата. Код сервера может выглядеть следующим образом:

package main

import (
    "fmt"
    "net"

    "github.com/leesper/holmes"
    "github.com/leesper/tao"
    "github.com/leesper/tao/examples/chat"
)

// ChatServer — сервер чата.
type ChatServer struct {
    *tao.Server
}

// NewChatServer возвращает ChatServer.
func NewChatServer() *ChatServer {
    onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
        holmes.Infoln("on connect")
        return true
    })
    onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) {
        holmes.Infoln("on error")
    })
    onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) {
        holmes.Infoln("close chat client")
    })
    return &ChatServer{
        tao.NewServer(onConnectOption, onErrorOption, onCloseOption),
    }
}

func main() {
    defer holmes.Start().Stop()

    tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage)

    l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345))
    if err != nil {
        holmes.Fatalln("listen error", err)
    }
    chatServer := NewChatServer()
    err = chatServer.Start(l)
    if err != nil {
        holmes.Fatalln("start error", err)
    }
}

Для запуска сервера достаточно выполнить три шага. Сначала зарегистрируйте сообщения и обратные вызовы бизнес-логики, затем укажите IP-адрес и порт, и наконец, запустите Start. После этого клиенты смогут подключаться и начинать общение. Реализация бизнес-логики проста: она перебирает все соединения и отправляет данные:

// ProcessMessage обрабатывает логику сообщений.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    holmes.Infof("ProcessMessage")
    s, ok := tao.ServerFromContext(ctx)
    if ok {
        msg := tao.MessageFromContext(ctx)
        s.Broadcast(msg)
    }
}

3. Философия программирования на Go Go — язык программирования для разработки базовых сервисов, таких как серверы. Он похож на C по синтаксису и имеет богатую стандартную библиотеку, что делает его быстрым для изучения. Компиляция происходит быстро, а производительность близка к C.

  • 3.1 Объектно-ориентированное программирование: подход Go к объектно-ориентированному программированию основан на «многократном использовании композиции и редком использовании наследования». Это достигается путём встраивания наследования через анонимные структуры. Например, ChatServer автоматически наследует все свойства и методы Server.

  • 3.2 Интерфейсно-ориентированное программирование: в Go используется подход «утиного типа», где «если я хожу как утка, говорю как утка и плаваю как утка, то я утка». Другие языки требуют явного указания наследование интерфейса, в то время как Go использует неявное объявление.

  • 3.3 Один центр, две основы: понимание Go требует понимания «одного центра» и «двух основ». «Один центр» относится к модели параллелизма Go, которая гласит: «Не используйте общую память для общения, используйте общение для общей памяти». «Две основы» представляют собой два столпа модели параллелизма Go: каналы и горутины. Понимание этих концепций поможет разобраться в большинстве кода. Текст запроса:

errors, the listener will be closed when returned.
func (s *Server) Start(l net.Listener) error {
    s.mu.Lock()
    if s.lis == nil {
        s.mu.Unlock()
        l.Close()
        return ErrServerClosed
    }
    s.lis[l] = true
    s.mu.Unlock()

    defer func() {
        s.mu.Lock()
        if s.lis != nil && s.lis[l] {
            l.Close()
            delete(s.lis, l)
        }
        s.mu.Unlock()
    }()

    holmes.Infof("server start, net %s addr %s\n", l.Addr().Network(), l.Addr().String())

    s.wg.Add(1)
    go s.timeOutLoop()

    var tempDelay time.Duration
    for {
        rawConn, err := l.Accept()
        if err != nil {
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay >= max {
                    tempDelay = max
                }
                holmes.Errorf("accept error %v, retrying in %d\n", err, tempDelay)
                select {
                case <-time.After(tempDelay):
                case <-s.ctx.Done():
                }
                continue
            }
            return err
        }
        tempDelay = 0

        // how many connections do we have ?
        sz := s.conns.Size()
        if sz >= MaxConnections {
            holmes.Warnf("max connections size %d, refuse\n", sz)
            rawConn.Close()
            continue
        }

        if s.opts.tlsCfg != nil {
            rawConn = tls.Server(rawConn, s.opts.tlsCfg)
        }

        netid := netIdentifier.GetAndIncrement()
        sc := NewServerConn(netid, s, rawConn)
        sc.SetName(sc.rawConn.RemoteAddr().String())

        s.mu.Lock()
        if s.sched != nil {
            sc.RunEvery(s.interv, s.sched)
        }
        s.mu.Unlock()

        s.conns.Put(netid, sc)
        addTotalConn(1)

        s.wg.Add(1)
        go func() {
            sc.Start()
        }()

        holmes.Infof("accepted client %s, id %d, total %d\n", sc.GetName(), netid, s.conns.Size())
        s.conns.RLock()
        for _, c := range s.conns.m {
            holmes.Infof("client %s\n", c.GetName())
        }
        s.conns.RUnlock()
    } // for loop
}

Перевод текста на русский язык:

Если при приёме клиентских запросов на соединение возникает временная ошибка, сервер будет ожидать максимум одну секунду, прежде чем повторить попытку приёма. Если текущее количество подключений превышает значение MaxConnections (по умолчанию 1000), соединение отклоняется и закрывается. В противном случае создаётся новое подключение и начинается работа.

2. Изящное завершение работы сервера

В версии Go 1.7 в стандартную библиотеку был добавлен пакет context. Context предоставляет структуру Context, которая позволяет устанавливать связанные «контексты» между сервером, сетевыми подключениями и соответствующими потоками. Информация, содержащаяся в контексте, связана с определённым сетевым запросом и доступна для безопасного доступа всем потокам Go, связанным с этим запросом. Например, поток handleLoop помещает идентификатор сети (net ID) и сообщение в контекст, а затем передаёт их вместе с функцией обработчика рабочему потоку:

// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
    //... omitted ...

    
    for {
        select {
        //... omitted ...
        case msgHandler := <-handlerCh:
            msg, handler := msgHandler.message, msgHandler.handler
            if handler != nil {
                if askForWorker {
                    WorkerPoolInstance().Put(netID, func() {
                        handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                    })
                } 
            }
        //... omitted ...
    }
}

Позже, когда рабочий поток действительно выполняет код, бизнес-логика может получить доступ к сообщению или идентификатору сети в функции обработчика, поскольку они связаны с текущим запросом. Типичный эхо-сервер обрабатывает данные следующим образом:

// ProcessMessage process the logic of echo message.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    msg := tao.MessageFromContext(ctx).(Message)
    holmes.Infof("receving message %s\n", msg.Content)
    conn.Write(msg)
}

Другое использование контекста — это обеспечение «изящного завершения работы» сервера и сетевых подключений. Сервер передаёт свой контекст при управлении сетевыми подключениями, и сетевые подключения также передают свой контекст новым потокам. Эти контексты можно отменить (cancelable). Когда серверу необходимо завершить работу или сетевое подключение должно быть закрыто, вызов функции cancel уведомляет все потоки, которые затем завершают свою работу. После того как все потоки завершат работу, сервер или сетевое подключение могут безопасно закрыться. Ключевой код для изящного завершения работы сервера выглядит следующим образом:

// Stop gracefully closes the server, it blocked until all connections
// are closed and all go-routines are exited.
func (s *Server) Stop() {
    // immediately stop accepting new clients
``` **Перевод текста на русский язык:**

s.mu.Lock()  
    listeners := s.lis  
    s.lis = nil  
    s.mu.Unlock()

    for l := range listeners {  
        l.Close()  
        holmes.Infof("stop accepting at address %s\n", l.Addr().String())  
    }

    // close all connections  
    conns := map[int64]*ServerConn{}  
    s.conns.RLock()  
    for k, v := range s.conns.m {  
        conns[k] = v  
    }  
    s.conns.Clear()  
    s.conns.RUnlock()

    for _, c := range conns {  
        c.rawConn.Close()  
        holmes.Infof("close client %s\n", c.GetName())  
    }

    s.mu.Lock()  
    s.cancel()  
    s.mu.Unlock()

    s.wg.Wait()

    holmes.Infoln("server stopped gracefully, bye.")  
    os.Exit(0)  
}

## 3. Сетевая модель соединения
В других языках программирования серверы, написанные с использованием реакторного режима, часто требуют асинхронного мультиплексирования через epoll в одном потоке ввода-вывода. Из-за низкой стоимости создания потоков в Go, язык Go может создавать три go-routine для каждого сетевого соединения. readLoop() отвечает за чтение данных и десериализацию их в сообщения; writeLoop() отвечает за сериализацию сообщений и отправку двоичных байтовых потоков; наконец, handleLoop() отвечает за вызов функции обработки сообщений. Эти три сопрограммы запускаются независимо при создании и запуске соединения:
```go
// Start запускает соединение сервера, создавая go-routines для чтения,
// записи и обработки.
func (sc *ServerConn) Start() {
    holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
    onConnect := sc.belong.opts.onConnect
    if onConnect != nil {
        onConnect(sc)
    }

    loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
    for _, l := range loopers {
        looper := l
        sc.wg.Add(1)
        go looper(sc, sc.wg)
    }
}
### 3.1 Анализ основного кода readLoop
readLoop выполняет три ключевые задачи. Во-первых, он вызывает кодировщик/декодировщик сообщений для десериализации полученных байтовых потоков в сообщения. Затем он обновляет метку времени для проверки сердцебиения. Наконец, в зависимости от номера протокола сообщения, он находит соответствующую функцию обработки сообщений. Если зарегистрирована функция обратного вызова сообщения, то она вызывается для обработки сообщения. В противном случае сообщение и функция обработки упаковываются и отправляются в handlerCh. Обратите внимание, что cDone и sDone  это каналы в структуре сетевого подключения и структуры сервера соответственно, которые используются для отслеживания событий «закрытия» сетевых подключений и серверов (далее аналогично).
```go
/* readLoop() блокирует чтение из соединения, десериализует байты в сообщение,
затем находит соответствующую функцию обработчика и помещает её в канал */
func readLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        rawConn          net.Conn
        codec            Codec
        cDone            <-chan struct{}
        sDone            <-chan struct{}
        setHeartBeatFunc func(int64)
        onMessage        onMessageFunc
        handlerCh        chan MessageHandler
        msg              Message
        err              error
    )

    switch c := c.(type) {
    case *ServerConn:
        rawConn = c.rawConn
        codec = c.belong.opts.codec
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
        setHeartBeatFunc = c.SetHeartBeat
        onMessage = c.belong.opts.onMessage
        handlerCh = c.handlerCh
    case *ClientConn:
        rawConn = c.rawConn
        codec = c.opts.codec
        cDone = c.ctx.Done()
        sDone = nil
        setHeartBeatFunc = c.SetHeartBeat
        onMessage = c.opts.onMessage
        handlerCh = c.handlerCh
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        wg.Done()
        holmes.Debugln("readLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        default:
            msg, err = codec.Decode(rawConn)
            if err != nil {
                holmes.Errorf("error decoding message %v\n", err)
                if _, ok := err.(ErrUndefined); ok {
                    // update heart beats
                    setHeartBeatFunc(time.Now().UnixNano())
                    continue
                }
                return
            }
            setHeartBeatFunc(time.Now().UnixNano())
            handler := GetHandlerFunc(msg.MessageNumber())
            if handler == nil {
                if onMessage != nil {
                    holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
                    onMessage(msg, c.(WriteCloser))
                } else {
                    holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
                }
                continue
            } ### 3.2 Ядро анализа кода: writeLoop

writeLoop выполняет одну задачу: считывает сериализованный поток байтов из sendCh и отправляет его в сеть. Важно отметить, что этот корутин перед завершением и выходом будет неблокирующим образом отправлять все сообщения из sendCh до конца, **чтобы избежать потери сообщений**, в этом и заключается ключевой момент.
```go
/* writeLoop() получает сообщение из канала, сериализует его в байты,
затем блокирует запись в соединение */
func writeLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        rawConn net.Conn
        sendCh  chan []byte
        cDone   <-chan struct{}
        sDone   <-chan struct{}
        pkt     []byte
        err     error
    )

    switch c := c.(type) {
    case *ServerConn:
        rawConn = c.rawConn
        sendCh = c.sendCh
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
    case *ClientConn:
        rawConn = c.rawConn
        sendCh = c.sendCh
        cDone = c.ctx.Done()
        sDone = nil
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        // drain all pending messages before exit
    OuterFor:
        for {
            select {
            case pkt = <-sendCh:
                if pkt != nil {
                    if _, err = rawConn.Write(pkt); err != nil {
                        holmes.Errorf("error writing data %v\n", err)
                    }
                }
            default:
                break OuterFor
            }
        }
        wg.Done()
        holmes.Debugln("writeLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        case pkt = <-sendCh:
            if pkt != nil {
                if _, err = rawConn.Write(pkt); err != nil {
                    holmes.Errorf("error writing data %v\n", err)
                    return
                }
            }
        }
    }
}

3.3 Ядро анализа кода: handleLoop

readLoop упаковывает сообщения и функции обработки в handlerCh, после чего handleLoop извлекает сообщения и функции из handlerCh и передаёт их рабочим потокам пула, которые отвечают за планирование выполнения и обработку сообщений. Это хороший пример того, как язык Go использует каналы для межпоточной коммуникации.

// handleLoop() - помещает обработчик или обратный вызов тайм-аута в рабочие потоки
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        cDone        <-chan struct{}
        sDone        <-chan struct{}
        timerCh      chan *OnTimeOut
        handlerCh    chan MessageHandler
        netID        int64
        ctx          context.Context
        askForWorker bool
    )

    switch c := c.(type) {
    case *ServerConn:
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
        timerCh = c.timerCh
        handlerCh = c.handlerCh
        netID = c.netid
        ctx = c.ctx
        askForWorker = true
    case *ClientConn:
        cDone = c.ctx.Done()
        sDone = nil
        timerCh = c.timing.timeOutChan
        handlerCh = c.handlerCh
        netID = c.netid
        ctx = c.ctx
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        wg.Done()
        holmes.Debugln("handleLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connectin closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        case msgHandler := <-handlerCh:
            msg, handler := msgHandler.message, msgHandler.handler
            if handler != nil {
                if askForWorker {
                    WorkerPoolInstance().Put(netID, func() {
                        handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                    })
                    addTotalHandle()
                } else {
                    handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                }
            }
        case timeout := <-timerCh:
            if timeout != nil {

``` ```
timeoutNetID := NetIDFromContext(timeout.Ctx)
if timeoutNetID != netID {
    holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
}
if askForWorker {
    WorkerPoolInstance().Put(netID, func() {
        timeout.Callback(time.Now(), c.(WriteCloser))
    })
} else {
    timeout.Callback(time.Now(), c.(WriteCloser))
}
## 4. 消息处理机制
### 4.1 消息上下文

Любой тип, который реализует интерфейс Message, является сообщением и должен предоставлять метод для доступа к своему протоколу и сериализовать себя в массив байтов. Кроме того, каждое сообщение должно зарегистрировать свою функцию десериализации и обработки:

```go
// Handler берёт на себя ответственность за обработку входящих сообщений.
type Handler interface {
    Handle(context.Context, interface{})
}

// HandlerFunc служит адаптером, позволяющим использовать обычные функции в качестве обработчиков.
type HandlerFunc func(context.Context, WriteCloser)

// Handle вызывает f(ctx, c)
func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser) {
    f(ctx, c)
}

// UnmarshalFunc десериализует байты в Message.
type UnmarshalFunc func([]byte) (Message, error)

// handlerUnmarshaler — это комбинация функций десериализации и обработки для сообщения.
type handlerUnmarshaler struct {
    handler     HandlerFunc
    unmarshaler UnmarshalFunc
}

func init() {
  messageRegistry = map[int32]messageFunc{}
  buf = new(bytes.Buffer)
}

// Register регистрирует функции десериализации и обработки для msgType.
// Если функция десериализации не предоставлена, сообщение не будет проанализировано.
// Если функция обработки не предоставлена, сообщение не будет обработано, если только вы
// не установите её по умолчанию, вызвав SetOnMessageCallback.
// При двойном вызове Register для одного msgType произойдёт паника.
func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser)) {
    if _, ok := messageRegistry[msgType]; ok {
        panic(fmt.Sprintf("trying to register message %d twice", msgType))
    }

    messageRegistry[msgType] = handlerUnmarshaler{
        unmarshaler: unmarshaler,
        handler:     HandlerFunc(handler),
    }
}

// GetUnmarshalFunc возвращает соответствующую функцию десериализации для msgType.
func GetUnmarshalFunc(msgType int32) UnmarshalFunc {
    entry, ok := messageRegistry[msgType]
    if !ok {
        return nil
    }
    return entry.unmarshaler
}

// GetHandlerFunc возвращает соответствующую функцию обработки для msgType.
func GetHandlerFunc(msgType int32) HandlerFunc {
    entry, ok := messageRegistry[msgType]
    если !ok {
        вернуть nil
    }
    вернуть entry.handler
}

// Message представляет структурированные данные, которые можно обработать.
type Message interface {
    MessageNumber() int32
    Serialize() ([]byte, error)
}

Для каждой функции обработки сообщений информация о сообщении и клиенте, отправившем это сообщение, различна и называется «контекстом сообщения». Она представлена структурой Context, где каждый отдельный клиент идентифицируется 64-битным целым числом netid:

// Context — это контекстная информация для каждой функции обработчика.
// Функция обработчика обрабатывает бизнес-логику сообщения.
// Мы можем найти клиентское соединение, которое отправило это сообщение, по netid и отправить ответные сообщения.
type Context struct{
  message Message
  netid int64
}

func NewContext(msg Message, id int64) Context {
  return Context{
    message: msg,
    netid: id,
  }
}

func (ctx Context)Message() Message {
  return ctx.message
}

func (ctx Context)Id() int64 {
  return ctx.netid
}

4.2 Кодек

При получении данных кодек отвечает за десериализацию сетевых данных в соответствии с определённым форматом и передачу сообщений на верхний уровень (декодирование). При отправке данных кодек сериализует сообщения, переданные верхним уровнем, в байтовые данные и передаёт их нижнему уровню (кодирование):

// Codec — это интерфейс для кодера и декодера сообщений.
// Программист приложения может определить собственный кодек.
type Codec interface {
  Decode(Connection) (Message, error)
  Encode(Message) ([]byte, error)
}

Tao Framework использует формат «Тип-Длина-Данные» для упаковки данных. Тип занимает 4 байта и обозначает тип протокола; Длина также занимает 4 байта и указывает длину сообщения; Данные представляют собой переменную последовательность байтов, длина которой определяется длиной. Во время десериализации тип поля определяет тип протокола, затем извлекаются данные длины байтов и вызывается зарегистрированная функция десериализации. Основной код выглядит следующим образом:

// Codec — это интерфейс для кодера и декодера сообщений.
// Программист приложения может определить собственный кодек.
type Codec interface {
    Decode(net.Conn) (Message, error)
    Encode(Message) ([]byte, error)
}
``` **TypeLengthValueCodec определяет специальный кодек.**

// Формат: тип-длина-значение | 4 байта | 4 байта | n байтов <= 8M |
type TypeLengthValueCodec struct{}

// Decode декодирует байты данных в сообщение
func (codec TypeLengthValueCodec) Decode(raw net.Conn) (Message, error) {
    byteChan := make(chan []byte)
    errorChan := make(chan error)

    go func(bc chan []byte, ec chan error) {
        typeData := make([]byte, MessageTypeBytes)
        _, err := io.ReadFull(raw, typeData)
        if err != nil {
            ec <- err
            close(bc)
            close(ec)
            holmes.Debugln("go-routine read message type exited")
            return
        }
        bc <- typeData
    }(byteChan, errorChan)

    var typeBytes []byte

    select {
    case err := <-errorChan:
        return nil, err

    case typeBytes = <-byteChan:
        if typeBytes == nil {
            holmes.Warnln("read type bytes nil")
            return nil, ErrBadData
        }
        typeBuf := bytes.NewReader(typeBytes)
        var msgType int32
        if err := binary.Read(typeBuf, binary.LittleEndian, &msgType); err != nil {
            return nil, err
        }

        lengthBytes := make([]byte, MessageLenBytes)
        _, err := io.ReadFull(raw, lengthBytes)
        if err != nil {
            return nil, err
        }
        lengthBuf := bytes.NewReader(lengthBytes)
        var msgLen uint32
        if err = binary.Read(lengthBuf, binary.LittleEndian, &msgLen); err != nil {
            return nil, err
        }
        if msgLen > MessageMaxBytes {
            holmes.Errorf("message(type %d) has bytes(%d) beyond max %d\n", msgType, msgLen, MessageMaxBytes)
            return nil, ErrBadData
        }

        // read application data
        msgBytes := make([]byte, msgLen)
        _, err = io.ReadFull(raw, msgBytes)
        if err != nil {
            return nil, err
        }

        // deserialize message from bytes
        unmarshaler := GetUnmarshalFunc(msgType)
        if unmarshaler == nil {
            return nil, ErrUndefined(msgType)
        }
        return unmarshaler(msgBytes)
    }
}

**Здесь код содержит некоторые тонкие моменты, которые требуют подробного объяснения.**

Функция TypeLengthValueCodec.Decode() будет использоваться readLoop-потоком. Поскольку io.ReadFull() является синхронным вызовом, поток readLoop блокируется, если данные не могут быть прочитаны. В этом случае, если сетевое соединение закрыто, поток readLoop не сможет выйти. Поэтому здесь используется небольшой трюк: создаётся новый поток для ожидания чтения первых 4-х байтовых данных типа, а затем сам поток блокируется на нескольких каналах, чтобы не пропустить другие сообщения, передаваемые по каналам. Как только данные типа успешно прочитаны, продолжается дальнейший процесс: чтение данных длины, и на основе длины считываются данные приложения, которые передаются ранее зарегистрированной функции десериализации. Обратите внимание, что если получены данные, превышающие максимальную длину, соединение будет закрыто, это сделано для предотвращения злоупотребления внешними программами системными ресурсами.

## 5. Рабочий пул потоков

Чтобы повысить надёжность фреймворка и избежать задержек в ответе из-за обработки бизнес-логики, функции обработки сообщений обычно планируются для выполнения в рабочем пуле потоков. Ключевым моментом при проектировании рабочего пула потоков является то, как распределить задачи между потоками в пуле. С одной стороны, необходимо избегать проблем с параллелизмом, гарантируя, что все сообщения, отправленные одним и тем же сетевым соединением, выполняются последовательно одним и тем же потоком; с другой стороны, распределение должно быть равномерным, не позволяя потокам «быть слишком занятыми или слишком свободными». Ключом к этому является дизайн функции распределения.

### 5.1 Анализ основного кода

Пул потоков спроектирован как одноэлементный. При создании вызывается newWorker() для создания ряда рабочих потоков.

```go
// WorkerPool is a pool of go-routines running functions.
type WorkerPool struct {
workers   []*worker
closeChan chan struct{}
}

var (
globalWorkerPool *WorkerPool
)

func init() {
globalWorkerPool = newWorkerPool(WorkersNum)
}

// WorkerPoolInstance returns the global pool.
func WorkerPoolInstance() *WorkerPool {
return globalWorkerPool
}

func newWorkerPool(vol int) *WorkerPool {
if vol <= 0 {
vol = WorkersNum
}

pool := &WorkerPool{
workers:   make([]*worker, vol),
closeChan: make(chan struct{}),
}

for i := range pool.workers {
pool.workers[i] = newWorker(i, 1024, pool.closeChan)
if pool.workers[i] == nil {
panic("worker nil")
}
}

return pool
}

5.2 Распределение задач между рабочими потоками

Способ распределения задач между рабочими потоками очень прост: с помощью функции hashCode() определяется соответствующий рабочий поток, а затем функция обратного вызова отправляется в канал соответствующего потока. Соответствующий поток во время выполнения извлекает функцию обратного вызова из канала и выполняет её. Это происходит в функции start().

// Put appends a function to some worker's channel.
func (wp *WorkerPool) Put(k interface{}, cb func()) error {
code := hashCode(k)
return wp.workers[code&uint32(len(wp.workers)-1)].put(workerFunc(cb))
}

func (w *worker) start() {
for {
select {
case <-w.closeChan:
return
case cb := <-w.callbackChan:
before := time.Now()
cb()
addTotalTime(time.Since(before).Seconds())
}
}
} **Перевод текста на русский язык:**

(w *worker) put(cb workerFunc) error {
    select {
        case w.callbackChan <- cb:
            return nil
        default:
            return ErrWouldBlock
    }
}

6. Поточное безопасное устройство таймера

Tao-фреймворк разработал устройство таймера TimingWheel для управления задачами по времени. Connection провёл дальнейшую упаковку на этой основе. Предоставляются функции выполнения по расписанию (RunAt), отложенного выполнения (RunAfter) и периодического выполнения (RunEvery). Здесь, через дизайн устройства таймера, выводится некоторый опыт программирования с использованием многопоточности.

6.1 Структура данных задачи по времени

6.1.1 Задача по времени

Каждая задача по времени представлена структурой timerType, которая содержит собственный id и структуру OnTimeOut, содержащую функцию обратного вызова, выполняемую при истечении времени ожидания. Значение expiration указывает время, когда задача должна быть выполнена, значение interval указывает временной интервал, а значение interval > 0 означает, что задача будет периодически повторяться.

/* 'expiration' is the time when timer time out, if 'interval' > 0
the timer will time out periodically, 'timeout' contains the callback
to be called when times out */
type timerType struct {
    id         int64
    expiration time.Time
    interval   time.Duration
    timeout    *OnTimeOut
    index      int // for container/heap
}

// OnTimeOut представляет задачу по времени.
type OnTimeOut struct {
    Callback func(time.Time, WriteCloser)
    Ctx      context.Context
}

// NewOnTimeOut возвращает OnTimeOut.
func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut {
    return &OnTimeOut{
        Callback: cb,
        Ctx:      ctx,
    }
}

6.1.2 Организация задач по времени

Устройство таймера должно упорядочивать задачи по времени истечения в порядке от ближайшего к самому дальнему, что является естественной маленькой верхней кучей. Поэтому здесь используется стандартная библиотека container/heap для создания структуры данных кучи для организации задач по времени, достигая эффективности доступа и обновления O(nlogn).

// timerHeap — это куча на основе приоритетной очереди
type timerHeapType []*timerType

func (heap timerHeapType) getIndexByID(id int64) int {
    for _, t := range heap {
        if t.id == id {
            return t.index
        }
    }
    return -1
}

func (heap timerHeapType) Len() int {
    return len(heap)
}

func (heap timerHeapType) Less(i, j int) bool {
    return heap[i].expiration.UnixNano() < heap[j].expiration.UnixNano()
}

func (heap timerHeapType) Swap(i, j int) {
    heap[i], heap[j] = heap[j], heap[i]
    heap[i].index = i
    heap[j].index = j
}

func (heap *timerHeapType) Push(x interface{}) {
    n := len(*heap)
    timer := x.(*timerType)
    timer.index = n
    *heap = append(*heap, timer)
}

func (heap *timerHeapType) Pop() interface{} {
    old := *heap
    n := len(old)
    timer := old[n-1]
    timer.index = -1
    *heap = old[0 : n-1]
    return timer
}

6.2 Анализ основного кода устройства таймера

TimingWheel запускает отдельный поток для запуска основного кода start() при создании. Он выполняет операции мультиплексирования по нескольким каналам: если он получает timerId из cancelChan, он выполняет операцию отмены, удаляя соответствующую задачу по времени из кучи; отправляет количество задач по времени в sizeChan, чтобы другие потоки могли получить текущее количество задач по времени; если он получает сообщение из quitChan, устройство таймера закрывается и завершается; если оно получает таймер из addChan, оно добавляет эту задачу по времени в кучу; если оно получает сигнал таймера из tw.ticker.C, оно вызывает функцию getExpired() для получения задач по времени с истекшим сроком действия, затем отправляет эти задачи по времени обратно в TimeOutChannel, где другие связанные потоки получают и выполняют обратные вызовы задач по времени. Наконец, tw.update() обновляет периодические задачи по времени и перепланирует выполнение.

func (tw *TimingWheel) update(timers []*timerType) {
    if timers != nil {
        for _, t := range timers {
            if t.isRepeat() {
                t.expiration = t.expiration.Add(t.interval)
                heap.Push(&tw.timers, t)
            }
        }
    }
}

func (tw *TimingWheel) start() {
    for {
        select {
        case timerID := <-tw.cancelChan:
            index := tw.timers.getIndexByID(timerID)
            if index >= 0 {
                heap.Remove(&tw.timers, index)
            }

        case tw.sizeChan <- tw.timers.Len():

        case <-tw.ctx.Done():
            tw.ticker.Stop()
            return

        case timer := <-tw.addChan:
            heap.Push(&tw.timers, timer)

        case <-tw.ticker.C:
            timers := tw.getExpired()
            for _, t := range timers {
                tw.GetTimeOutChannel() <- t.timeout
            }
            tw.update(timers)
        }
    }
}

6.3 Как устройство таймера обеспечивает безопасность потоков

При разработке сервера с использованием Tao-фреймворка сервер часто неожиданно падал. Иногда сервер работал несколько часов, прежде чем внезапно завершал работу. Просмотр стека вызовов показал, что каждый раз программа падала из-за ошибки доступа за пределы массива в устройстве таймера. Это проблема, вызванная одновременным доступом нескольких потоков к данным без блокировки. Почему? Потому что основная функция устройства таймера работает с кучей данных в одном потоке, в то время как его предоставляемые интерфейсы добавления, удаления и т. д. могут вызываться в других потоках. Одновременный доступ нескольких потоков к устройству таймера без блокировок обязательно вызовет проблемы. Решение простое: преобразовать одновременный доступ нескольких потоков в последовательный доступ одного потока, отправив операции добавления, удаления и т.д. в разные каналы, а затем обработав их в потоке start().

// AddTimer добавляет новую задачу по времени.
func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64 {
    if to == nil {
        return int64(-1)
    }
    timer := newTimer(when, interv, to)
    tw.addChan <- timer
    return timer.id
}

// Size возвращает количество задач по времени.
func (tw *TimingWheel) Size() int {
    return
} ### 6.4 Применение слоя сердцебиения

В своей книге «Linux многопоточное программирование для серверов» Чэнь Шо говорит о том, что все серверы, поддерживающие длительные соединения, должны реализовывать механизм сердцебиения на прикладном уровне:

> «В серьёзных сетевых программах протокол сердцебиения прикладного уровня является обязательным. Он должен использоваться для определения того, может ли противоположный процесс нормально работать».

Для использования одного соединения для одновременной отправки сообщений о сердцебиении и других служебных сообщений, если по какой-либо причине приложение не сможет отправлять сообщения, противоположная сторона сразу же узнает об этом через сообщение о сердцебиении. Стоит отметить, что в рамках Tao Framework существует только один таймер, но клиентских подключений может быть много. В режиме длительного подключения каждый клиент должен обрабатывать сообщения о сердцебиении или другие типы периодических задач. Нецелесообразно проектировать фреймворк таким образом, чтобы каждое клиентское соединение имело свой собственный таймер  это привело бы к высокой загрузке процессора при наличии десятков тысяч соединений. Таймер должен быть только один, и он будет отвечать за обработку всех периодических задач, зарегистрированных клиентами. Однако, если все клиентские подключения будут ожидать сообщений от единственного таймера, возникнет проблема параллелизма. Например, задача по времени для клиента 1 истекла, но в данный момент он занят обработкой других сообщений, и эта задача может быть выполнена другим клиентом. Поэтому здесь используется механизм обработки «сначала концентрация, затем распределение»: каждая задача по времени представлена структурой TimeOut, которая содержит функцию обратного вызова и контекст. При запуске задачи по времени клиент заполняет net ID. TCPServer единообразно принимает задачи по времени и извлекает из них net ID, после чего передаёт задачу соответствующему ServerConn или ClientConn для выполнения:
```go
// Retrieve the extra data(i.e. net id), and then redispatch timeout callbacks
// to corresponding client connection, this prevents one client from running
// callbacks of other clients
func (s *Server) timeOutLoop() {
    defer s.wg.Done()

    for {
        select {
        case <-s.ctx.Done():
            return

        case timeout := <-s.timing.GetTimeOutChannel():
            netID := timeout.Ctx.Value(netIDCtx).(int64)
            if sc, ok := s.conns.Get(netID); ok {
                sc.timerCh <- timeout
            } else {
                holmes.Warnf("invalid client %d\n", netID)
            }
        }
    }
}

Три. Также о проблемах и основных идеях параллельного программирования

Когда мы говорим о параллельном программировании, о чём мы говорим? В двух словах: когда несколько потоков одновременно обращаются к незащищённым общим данным, возникают проблемы параллелизма. Таким образом, суть многопоточного программирования заключается в том, как избежать возникновения такой ситуации. Здесь мы подводим некоторые итоги, есть три основных метода.

1. Защита структуры общих данных

Это наиболее распространённый метод в учебниках. Используйте различные сигналы/мьютексы для защиты структуры данных, сначала заблокируйте её, а затем выполните операцию, наконец, разблокируйте. Например, ConnMap, используемый в Tao Framework для управления сетевыми подключениями, реализован таким образом:

// ConnMap is a safe map for server connection management.
type ConnMap struct {
    sync.RWMutex
    m map[int64]*ServerConn
}

// NewConnMap returns a new ConnMap.
func NewConnMap() *ConnMap {
    return &ConnMap{
        m: make(map[int64]*ServerConn),
    }
}

// Clear clears all elements in map.
func (cm *ConnMap) Clear() {
    cm.Lock()
    cm.m = make(map[int64]*ServerConn)
    cm.Unlock()
}

// Get gets a server connection with specified net ID.
func (cm *ConnMap) Get(id int64) (*ServerConn, bool) {
    cm.RLock()
    sc, ok := cm.m[id]
    cm.RUnlock()
    return sc, ok
}

// Put puts a server connection with specified net ID in map.
func (cm *ConnMap) Put(id int64, sc *ServerConn) {
    cm.Lock()
    cm.m[id] = sc
    cm.Unlock()
}

// Remove removes a server connection with specified net ID.
func (cm *ConnMap) Remove(id int64) {
    cm.Lock()
    delete(cm.m, id)
    cm.Unlock()
}

// Size returns map size.
func (cm *ConnMap) Size() int {
    cm.RLock()
    size := len(cm.m)
    cm.RUnlock()
    return size
}

// IsEmpty tells whether ConnMap is empty.
func (cm *ConnMap) IsEmpty() bool {
    return cm.Size() <= 0
}

2 Преобразование многопоточности в однопоточность

Этот метод уже был представлен ранее, он относится к способу программирования без блокировки. Все запросы на операции, отправленные несколькими потоками, помещаются в очередь задач, и, наконец, один поток считывает очередь и выполняет операции последовательно. Этот метод всё ещё имеет узкое место в производительности, когда количество параллельных операций велико. ##3 Использование тщательно разработанных структур данных для параллельной работы Лучший способ — начать с структуры данных. Существует множество методов, которые позволяют структуре данных адаптироваться к сценариям параллельного доступа нескольких потоков. Например, ConcurrentHashMap в стандартной библиотеке Java использует сегментированный замок, который защищает каждый сегмент (Segment), и параллельная запись данных распределяется по различным сегментам с помощью функции хеширования. Когда сегмент A заблокирован, это не влияет на доступ к сегменту B. При решении проблем параллелизма необходимо быть очень осторожным и тщательно обдумывать каждый шаг. Одна неосторожность может привести к ошибкам.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mirrors-Tao.git
git@api.gitlife.ru:oschina-mirror/mirrors-Tao.git
oschina-mirror
mirrors-Tao
mirrors-Tao
master