Каждая строка кода тщательно продумана...
FromSlice
FromChan
Of
Range
Subject
Timeout
Interval
Merge
Concat
Race
CombineLatest
Empty
Never
Throw
Do
Take
TakeWhile
TakeUntil
Skip
SkipWhile
SkipUntil
IgnoreElements
Share
StartWith
Zip
Filter
Distinct
DistinctUntilChanged
Debounce
DebounceTime
Throttle
ThrottleTime
First
Last
Count
Max
Min
Reduce
Map
MapTo
MergeMap
MergeMapTo
SwitchMap
SwitchMapTo
import (
. "github.com/langhuihui/RxGo/rx"
)
func main(){
err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) {
}))
}
import (
. "github.com/langhuihui/RxGo/rx"
. "github.com/langhuihui/RxGo/pipe"
)
func main(){
err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) {
}))
}
Конвейерный режим обеспечивает большую гибкость в использовании операторов, позволяя пользователям создавать собственные операторы.
type Operator func(Observable) Observable
Для создания оператора достаточно вернуть тип Operator. Например, можно создать оператор, который завершает работу, если happy равен false.
func MyOperator(happy bool) Operator {
return func(source Observable) Observable {
return func (sink *Observer) error {
if happy{
return source(sink)
}
return nil
}
}
}
В любое время вы можете создать свой собственный Observable для отправки любых событий.
import (
. "github.com/langhuihui/RxGo/rx"
)
func MyObservable (sink *Control) error {
sink.Next("hello")
return nil
}
func main(){
ob := Observable(MyObservable)
ob.Subscribe(ObserverFunc(func(event *Event) {
}))
}
Observable — это источник событий, на который можно подписаться и который будет отправлять события.
time -->
(*)-------------(o)--------------(o)---------------(x)----------------|>
| | | | |
Start value value error Done
Эта диаграмма представляет процесс подписки на событие после запуска (Start) и непрерывной отправки событий до тех пор, пока не будет отправлено сообщение об ошибке или Done (завершение).
Некоторые Observable не отправляют событие завершения, например Never.
Ссылка: rxmarbles.
Ключевыми элементами реализации Rx являются следующие вопросы:
Observable---------Operator----------Operator-----------Observer
<| <| <|
订阅/取消 订阅/取消 订阅/取消
Observable---------Operator----------Operator-----------Observer
|> |> |>
完成/错误 完成/错误 完成/错误
На самом деле ситуация гораздо сложнее, и мы рассмотрим её позже. // Кэш текущей ошибки }
Данный контроллер представляет собой структуру, в которой next хранит текущий NextHandler.
В любой момент, если вы закроете dispose этого канала, это будет означать **отмену подписки**.
```go
//Dispose Отмена подписки
func (c *Observer) Dispose() {
select {
case <-c.dispose:
default:
close(c.dispose)
}
}
//Aborted Проверка, была ли уже отменена подписка или завершена
func (c *Observer) Aborted() bool {
select {
case <-c.dispose:
return true
case <-c.complete:
return true
default:
return false
}
}
Поскольку закрытие канала Channel может вызвать пробуждение всех блокирующих действий, считывающих этот канал, можно повторно использовать этот канал на разных уровнях.
Кроме того, поскольку уже закрытый канал можно многократно считывать для получения информации о состоянии закрытия, нет необходимости вести дополнительный учёт.
Объект Observer
совместно используется Observable
и логикой обработки событий и является мостом между ними.
type Event struct {
Data interface{}
Target *Observer
}
NextHandler interface {
OnNext(*Event)
}
NextHandler
— это интерфейс, реализующий функцию OnNext
, которая вызывается, когда данные передаются от Observable
к Observer
.
Атрибут Target
используется для хранения текущего объекта Observer
, отправляющего событие. У него есть две важные функции:
Преимущество этого подхода заключается в том, что он позволяет реализовать различные наблюдатели, такие как функции или каналы.
type(
NextFunc func(*Event)
NextChan chan *Event
)
func (next NextFunc) OnNext(event *Event) {
next(event)
}
func (next NextChan) OnNext(event *Event) {
next <- event
}
//TakeUntil Получать события до тех пор, пока не придёт указанное событие
func (ob Observable) TakeUntil(until Observable) Observable {
return func(sink *Observer) error {
go until(sink.New3(NextFunc(func(event *Event) {
//Получение любых данных приводит к завершению нижестоящего потока
sink.Complete() //Поскольку используется общий сигнал complete, это приведёт к завершению всех потоков, использующих его
})))
return ob(sink)
}
}
TakeUnitl используется для передачи события until, которое, когда оно получает событие, приводит к «завершению» текущего потока. Это похоже на своего рода сигнал прерывания.
Несмотря на то, что код кажется коротким, он учитывает множество различных ситуаций.
Основные детали реализации:
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )