1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/langhuihui-RxGo

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

RxGo неофициальная реализация

Особенности реализации:

  • код упрощён и легко читаем (реализован с использованием минимального количества кода);
  • дизайн продуман, реализация элегантна (максимально использованы особенности и преимущества языка Go);
  • расширяемость сильная (можно настраивать Observable и Operator);
  • использование системных ресурсов низкое (все возможные меры предприняты для уменьшения создания goroutine и других объектов);
  • производительность высокая (все возможные меры предприняты для снижения вычислительных затрат).

Каждая строка кода тщательно продумана...

Реализованные функции

Observable

FromSlice FromChan Of Range Subject Timeout Interval Merge Concat Race CombineLatest Empty Never Throw

Operator

Do Take TakeWhile TakeUntil Skip SkipWhile SkipUntil IgnoreElements Share StartWith Zip Filter Distinct DistinctUntilChanged Debounce DebounceTime Throttle ThrottleTime First Last Count Max Min Reduce Map MapTo MergeMap MergeMapTo SwitchMap SwitchMapTo

Использование

Цепное использование

import (
    . "github.com/langhuihui/RxGo/rx"
)
func main(){
    err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) {
        
    }))
}

Использование в виде конвейера

import (
    . "github.com/langhuihui/RxGo/rx"
    . "github.com/langhuihui/RxGo/pipe"
)
func main(){
    err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) {
        
    }))
}

Конвейерный режим обеспечивает большую гибкость в использовании операторов, позволяя пользователям создавать собственные операторы.

type Operator func(Observable) Observable

Для создания оператора достаточно вернуть тип Operator. Например, можно создать оператор, который завершает работу, если happy равен false.

func MyOperator(happy bool) Operator {
    return func(source Observable) Observable {
        return func (sink *Observer) error {
            if happy{
                return source(sink)
            }
            return nil
        }
    }
}

Создание пользовательских Observable

В любое время вы можете создать свой собственный Observable для отправки любых событий.

import (
    . "github.com/langhuihui/RxGo/rx"
)
func MyObservable (sink *Control) error {
    sink.Next("hello")
    return nil
}
func main(){
    ob := Observable(MyObservable)
    ob.Subscribe(ObserverFunc(func(event *Event) {
        
    }))
}

Дизайн

Основные знания

Observable — это источник событий, на который можно подписаться и который будет отправлять события.

                                time -->

(*)-------------(o)--------------(o)---------------(x)----------------|>
 |               |                |                 |                 |
Start          value            value             error              Done

Эта диаграмма представляет процесс подписки на событие после запуска (Start) и непрерывной отправки событий до тех пор, пока не будет отправлено сообщение об ошибке или Done (завершение).

Некоторые Observable не отправляют событие завершения, например Never.

Ссылка: rxmarbles.

Общая схема

Ключевыми элементами реализации Rx являются следующие вопросы:

  1. Как определить Observable? (структура, функция, интерфейс, канал?)
  2. Как реализовать логику подписки? (вызов функции, отправка данных?)
  3. Как обрабатывать полученные данные? (как реализовать Observer?)
  4. Как реализовать передачу завершения/ошибки?
  5. Как отменить подписку? (сложная задача: отмена во время реакции на событие и отмена из любого другого goroutine)
  6. Как реализовать операторы?
  7. Как операторы обрабатывают цепные реакции, такие как описанные ниже?
  8. Как реализовать два режима программирования: цепочку и конвейер?
  9. Как позволить пользователям расширять (настраивать) Observable и операторы?
  10. Как объяснить сложные концепции обычным пользователям?
  • Когда пользователю необходимо подписаться или остановить поток событий, происходит передача по цепочке, подписка или остановка всех промежуточных источников событий.

Observable---------Operator----------Operator-----------Observer
             <|                <|                <|          
           订阅/取消          订阅/取消          订阅/取消         
  • Когда поток событий завершается или возникает ошибка, необходимо уведомить нижестоящий поток событий о завершении или ошибке.

Observable---------Operator----------Operator-----------Observer
             |>                |>                |>          
           完成/错误          完成/错误          完成/错误         

На самом деле ситуация гораздо сложнее, и мы рассмотрим её позже. // Кэш текущей ошибки }

Данный контроллер представляет собой структуру, в которой next хранит текущий NextHandler.

В любой момент, если вы закроете dispose этого канала, это будет означать **отмену подписки**.
```go
//Dispose Отмена подписки
func (c *Observer) Dispose() {
    select {
    case <-c.dispose:
    default:
        close(c.dispose)
    }
}

//Aborted Проверка, была ли уже отменена подписка или завершена
func (c *Observer) Aborted() bool {
    select {
    case <-c.dispose:
        return true
    case <-c.complete:
        return true
    default:
        return false
    }
}

Поскольку закрытие канала Channel может вызвать пробуждение всех блокирующих действий, считывающих этот канал, можно повторно использовать этот канал на разных уровнях.

Кроме того, поскольку уже закрытый канал можно многократно считывать для получения информации о состоянии закрытия, нет необходимости вести дополнительный учёт.

Объект Observer совместно используется Observable и логикой обработки событий и является мостом между ними.

NextHandler

type Event struct {
    Data    interface{}
    Target  *Observer
}
NextHandler interface {
    OnNext(*Event)
}

NextHandler — это интерфейс, реализующий функцию OnNext, которая вызывается, когда данные передаются от Observable к Observer.

Атрибут Target используется для хранения текущего объекта Observer, отправляющего событие. У него есть две важные функции:

  1. Замена NextHandler для уменьшения процесса передачи данных.
  2. Завершение потока событий в процессе NextHandler.

Преимущество этого подхода заключается в том, что он позволяет реализовать различные наблюдатели, такие как функции или каналы.

type(
    NextFunc func(*Event)
    NextChan chan *Event
)
func (next NextFunc) OnNext(event *Event) {
    next(event)
}
func (next NextChan) OnNext(event *Event) {
    next <- event
}

Пример реализации TakeUntil

//TakeUntil Получать события до тех пор, пока не придёт указанное событие
func (ob Observable) TakeUntil(until Observable) Observable {
    return func(sink *Observer) error {
        go until(sink.New3(NextFunc(func(event *Event) {
            //Получение любых данных приводит к завершению нижестоящего потока
            sink.Complete() //Поскольку используется общий сигнал complete, это приведёт к завершению всех потоков, использующих его
        })))
        return ob(sink)
    }
}

TakeUnitl используется для передачи события until, которое, когда оно получает событие, приводит к «завершению» текущего потока. Это похоже на своего рода сигнал прерывания.

Несмотря на то, что код кажется коротким, он учитывает множество различных ситуаций.

Основные детали реализации:

  1. Подписка на источник событий until с использованием ключевого слова go для создания goroutine, чтобы предотвратить блокировку текущего goroutine.
  2. Использование функционального NextHandler, который позволяет пользователю получать события из источника until и вызывать sink.Complete(), когда происходит любое событие, чтобы завершить текущий поток событий.
  3. Подписка Observer на источник событий until (создаётся с помощью sink.New3) использует сигналы sink.dispose и sink.complete. Когда пользователь отменяет подписку, это вызывает отмену подписки на источник until.
  4. Последний шаг — подписка на вышестоящий источник событий. Мы не создаём новый Observer, а передаём существующий, избегая ненужной логики пересылки.
  5. Любая отмена подписки или завершение вышестоящего источника событий могут привести к возврату функции и, следовательно, к завершению TakeUntil.
  6. Завершение или ошибка источника событий until игнорируются, поэтому мы не получаем возвращаемое значение функции until.

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Реактивный X в версии, реализованной на Golang, неофициальный вариант реализации. Развернуть Свернуть
MIT
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/langhuihui-RxGo.git
git@api.gitlife.ru:oschina-mirror/langhuihui-RxGo.git
oschina-mirror
langhuihui-RxGo
langhuihui-RxGo
master