1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/scalalibs-scala.rx

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
readme.md 77 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Отправлено 12.03.2025 23:04 cf1723c

Scala.Rx 0.3.2 Статус сборки Присоединиться к чату

Scala.Rx — это экспериментальная библиотека для распространения изменений в Scala. Scala.Rx предоставляет вам реактивные переменные (Rx), которые являются "умными" переменными, автоматически обновляющимися при изменении значений, от которых они зависят. Подобная реализация основана на идеях из статьи Deprecating the Observer Pattern.

Пример простого использования, демонстрирующий поведение:

import rx._
val a = Var(1); val b = Var(2)
val c = Rx{ a() + b() }
println(c.now) // 3
a() = 4
println(c.now) // 6

Основная идея заключается в том, что 99 процентов времени вы пересчитываете переменную таким же образом, как и в первый раз, когда она была рассчитана. Кроме того, вы пересчитываете её только тогда, когда одно из значений, от которых она зависит, изменится. Scala.Rx делает это за вас автоматически и берёт на себя всю рутинную логику обновлений, чтобы вы могли сосредоточиться на других, более интересных вещах!Кроме базового распространения изменений, Scala.Rx предлагает множество других возможностей, таких как набор комбинаторов для удобного конструирования графа данных, проверку на этапе компиляции для высокой степени корректности и бесшовную совместимость с существующими проектами на Scala. Это позволяет легко внедрять его в уже существующие приложения на Scala.Содержание

Начало работы

Scala.Rx доступен на Maven Central. Чтобы начать работу, просто добавьте следующее в ваш файл build.sbt:

libraryDependencies += "com.lihaoyi" %% "scalarx" % "0.3.2"

После этого открытие sbt консоли и вставка приведённого выше примера в консоль должны работать! Вы можете продолжить просмотр примеров на странице Основное использование, чтобы получить представление о том, что может делать Scala.Rx.

ScalaJS

Кроме выполнения на JVM, Scala.Rx также компилируется в Scala-JS! Этот артефакт находится на Maven Central и может использоваться с помощью следующего фрагмента SBT:

libraryDependencies += "com.lihaoyi" %%% "scalarx" % "0.3.2"
```Существуют некоторые небольшие различия между выполнением Scala.Rx на JVM и в JavaScript, особенно вокруг [асинхронных операций](#таймер), модели [параллелизма](#параллелизм-и-scalajs) и модели [памяти](#память-и-scalajs). В общем, однако, все примеры, представленные ниже в документации, будут работать идеально при кросс-скомпиляции в JavaScript и запуске в браузере!

Scala.rx версии 0.3.2 совместима только со ScalaJS версий 0.6.5+.

Использование Scala.Rx
======================

Основные операции требуют `import rx._` перед использованием, с дополнительными операциями также требуя `import rx.ops._`. Некоторые из приведённых ниже примеров также используют различные импорты из `scala.concurrent` или `scalatest`.

Основное использование
----------------------

```scala
import rx._

val a = Var(1); val b = Var(2)
val c = Rx{ a() + b() }
println(c.now) // 3
a() = 4
println(c.now) // 6

Приведённый выше пример является исполняемой программой. В общем случае, import rx._ достаточно для начала работы с Scala.Rx, и это будет предполагаться во всех последующих примерах. Эти примеры взяты из единичных тестов.Основные сущности, которыми вам следует заняться, — это Var, Rx и Obs.

  • Var: умная переменная, которую можно получить с помощью a() и установить с помощью a() = .... Каждый раз, когда её значение меняется, она отправляет сигнал любому нижележащему объекту, который требуется пересчитать.
  • Rx: реактивное определение, которое автоматически захватывает любую Var или другие Rx, вызываемые в его теле, отмечает их как зависимости и пересчитывает каждый раз, когда одно из них изменяется. Как и Var, вы можете использовать синтаксис a() для получения значения, а также отправляет сигнал нижележащим объектам при изменении значения.
  • Obs: наблюдатель за одной или несколькими Var или Rx, выполняющий побочное действие, когда наблюдающийся узел изменяет своё значение и отправляет ему сигнал.

Используя эти компоненты, вы можете легко создать граф потока данных, и иметь различные значения в графе потока данных автоматически обновляться при изменении входных данных графа:```scala val a = Var(1) // 1

val b = Var(2) // 2

val c = Rx{ a() + b() } // 3 val d = Rx{ c() * 5 } // 15 val e = Rx{ c() + 4 } // 7 val f = Rx{ d() + e() + 4 } // 26

println(f.now) // 26 a() = 3 println(f.now) // 38


Граф потока данных для этой программы выглядит следующим образом:

![Граф потока данных](media/Intro.png?raw=true)

Где `Var`s представлены квадратами, `Rx`s — кругами, а зависимости — стрелками. Каждый `Rx` помечен своим именем, его телом и значением.

Модификация значения `a` вызывает изменения, которые распространяются через граф потока данных:

![Граф потока данных](media/IntroProp.png?raw=true)

Как можно видеть выше, изменение значения `a` приводит к тому, что изменения распространяются от `a` до `c`, `d`, `e` и затем до `f`. Вы можете использовать `Var` и `Rx` там, где обычно используются обычные переменные.

Изменения распространяются через граф потока данных волнами. Каждое изменение `Var` запускает волну распространения, которая передает изменения от этого `Var` ко всем `Rx`, которые (непосредственно или косвенно) зависят от его значения. В процессе возможно несколько пересчетов одного и того же `Rx`.

### Наблюдатели

Как уже упоминалось, `Obs` могут быть созданы из `Rx` или `Var` и использоваться для выполнения побочных эффектов при изменении этих значений:

```scala
val a = Var(1)
var count = 0
val o = a.trigger {
  count = a.now + 1
}
println(count) // 2
a() = 4
println(count) // 5
```Это создаёт граф потока данных, который выглядит так:

![Граф потока данных](media/Observer.png?raw=true)

Когда `a` изменяется, наблюдатель `o` выполняет побочный эффект:

![Граф потока данных](media/Observer2.png?raw=true)

Тело Rx должно быть свободным от побочных эффектов, поскольку они могут выполняться более одного раза за одну волну распространения. Для выполнения побочных эффектов следует использовать Obs, поскольку они гарантированно выполняются только один раз после того, как все значения Rx стабилизируются.

Scala.Rx предоставляет удобный метод .foreach(), который позволяет создать Obs из Rx следующим образом:

```scala
val a = Var(1)
var count = 0
val o = a.foreach { x =>
  count = x + 1
}
println(count) // 2
a() = 4
println(count) // 5

Этот пример делает то же самое, что и код выше.

Обратите внимание, что тело Obs выполняется один раз при первоначальной декларации. Это соответствует тому, как каждый Rx вычисляется один раз при его первоначальной декларации. Однако возможно, что вам потребуется Obs, который срабатывает в первый раз только тогда, когда Rx, на который он слушает, меняется. Вы можете это сделать, используя альтернативный синтаксис triggerLater:

val a = Var(1)
var count = 0
val o = a.triggerLater {
  count = count + 1
}
println(count) // 0
a() = 2
println(count) // 1
```[Obs][2] служит для упаковки обратного вызова, который он запускает. Они могут передаваться, храниться в переменных и т.д.. Когда [Obs][2] собирается мусором, обратный вызов прекращает срабатывать. Таким образом, [Obs][2] следует хранить в объекте, который он влияет: если обратный вызов влияет только на этот объект, ему всё равно, когда будет собран [Obs][2], так как это произойдет после того, как объект, содержащий его, станет недостижимым, в этом случае его эффекты также будут недостижимыми. [Obs][2] также может быть активно отключён, если требуется более сильная гарантия:```scala
val a = Var(1)
val b = Rx{ 2 * a() }
var target = 0
val o = b.trigger {
  target = b.now
}
println(target) // 2
a() = 2
println(target) // 4
o.kill()
a() = 3
println(target) // 4

После ручного вызова метода .kill(), Obs больше не активируется. Кроме отключения Obs с помощью метода .kill(), можно также отключить Rx, что предотвращает дальнейшие обновления.

В целом, Scala.Rx строится вокруг создания графиков потока данных, которые автоматически поддерживают синхронность, которую легко взаимодействовать с внешним императивным кодом. Это включает использование:

  • Var как входных данных для графика потока данных из императивного мира
  • Rx как промежуточных узлов в графике потока данных
  • Obs как выходных данных из графика потока данных обратно в императивный мир

Сложные реактивные системы

Rx не ограничивается значением типа Int. Можно использовать значения типа String, Seq[Int], Seq[String] и любое другое значение внутри Rx:

val a = Var(Seq(1, 2, 3))
val b = Var(3)
val c = Rx{ b() +: a() }
val d = Rx{ c().map("omg" * _) }
val e = Var("wtf")
val f = Rx{ (d() :+ e()).mkString }

println(f.now) // "omgomgomgomgomgomgomgomgomgwtf"
a() = Nil
println(f.now) // "omgomgomgwtf"
e() = "wtfbbq"
println(f.now) // "omgomgomgwtfbbq"

Как показано, вы можете использовать реактивные переменные Scala.Rx для моделирования задач любой сложности, а не только простых, связанных с примитивными числами.

Обработка ошибокПоскольку тело Rx может быть любым произвольным кодом Scala, он может выбрасывать исключения. Пропагация исключения вверх по стеку вызовов не имеет большого смысла, так как код, который оценивает Rx, вероятно, не контролирует причину его сбоев. Вместо этого все исключения ловятся самим Rx и хранятся внутренне в виде Try.Это можно видеть на следующем юнит-тесте:

val a = Var(1)
val b = Rx { 1 / a() }
println(b.now) // 1
println(b.toTry) // Success(1)
a() = 0
intercept[ArithmeticException] {
  b()
}
assert(b.toTry.isInstanceOf[Failure])

Изначально значение a равно 1, поэтому значение b также равно 1. Вы также можете извлечь внутренний Try с помощью b.toTry, которое вначале равно Success(1).

Однако когда значение a становится равным 0, тело b выбрасывает исключение ArithmeticException. Это поймано b и повторно выбрасывается при попытке извлечения значения из b с помощью b(). Вы можете извлечь весь Try с помощью toTry и применять шаблонное соответствие для обработки как случая Success, так и случая Failure.

Когда у вас есть много Rx, связанных цепочками, исключения распространяются вперед, следуя графу зависимостей, как это можно было бы ожидать. Код ниже:

val a = Var(1)
val b = Var(2)

val c = Rx { a() / b() }
val d = Rx { a() * 5 }
val e = Rx { 5 / b() }
val f = Rx { a() + b() + 2 }
val g = Rx { f() + c() }

inside(c.toTry) { case Success(0) => () }
inside(d.toTry) { case Success(5) => () }
inside(e.toTry) { case Success(2) => () }
inside(f.toTry) { case Success(5) => () }
inside(g.toTry) { case Success(5) => () }

b() = 0

inside(c.toTry) { case Failure(_) => () }
inside(d.toTry) { case Success(5) => () }
inside(e.toTry) { case Failure(_) => () }
inside(f.toTry) { case Success(3) => () }
inside(g.toTry) { case Failure(_) => () }

Создает граф зависимости, который выглядит следующим образом:

Граф потока данных

В этом примере изначально все значения для a, b, c, d, e, f и g корректны. Однако, когда b устанавливается в 0:Граф потока данных

c и e оба приводят к исключениям, и исключение от c передается до g. Попытка извлечения значения из g с помощью g.now, например, снова выбросит исключение ArithmeticException. Также использование toTry работает аналогично.### Вложенные Rx

Rx могут содержать другие Rx, глубоко вложенными. Этот пример показывает Rx, вложенные на два уровня:

val a = Var(1)
val b = Rx{
    (Rx{ a() }, Rx{ math.random })
}
val r = b.now._2.now
a() = 2
println(b.now._2.now) // r

В этом примере видно, что хотя мы изменили значение a, это влияет только на внутренний левый Rx. Ни правый внутренний Rx (который принимает случайное значение каждый раз при пересчете), ни внешний Rx (который привел бы к пересчету всего выражения) не затронуты этим изменением. Более реалистичный пример может выглядеть так:

var fakeTime = 123
trait WebPage{
    def fTime = fakeTime
    val time = Var(fTime)
    def update(): Unit  = time() = fTime
    val html: Rx[String]
}
class HomePage(implicit ctx: Ctx.Owner) extends WebPage {
    val html = Rx{"Главная страница! время: " + time()}
}
class AboutPage(implicit ctx: Ctx.Owner) extends WebPage {
    val html = Rx{"Обо мне, время: " + time()}
}

val url = Var("www.mysite.com/home")
val page = Rx{
    url() match{
        case "www.mysite.com/home" => new HomePage()
        case "www.mysite.com/about" => new AboutPage()
    }
}

println(page.now.html.now) // "Главная страница! время: 123"

fakeTime = 234
page.now.update()
println(page.now.html.now) // "Главная страница! время: 234"

fakeTime = 345
url() = "www.mysite.com/about"
println(page.now.html.now) // "Обо мне, время: 345"
``````markdown
fakeTime = 456
page.now.update()
println(page.now.html.now) # "О себе, время: 456"

В данном случае мы определяем веб-страницу с значением html (тип Rx[String]). Однако, в зависимости от значения url, это может быть либо HomePage, либо AboutPage. Таким образом, объект page имеет тип Rx[WebPage].

Имея Rx[WebPage], где WebPage содержит внутри себя Rx[String], кажется естественным и очевидным подходом. Scala.Rx позволяет легко и естественно реализовать такие вложения. Такие ситуации с объектами внутри объектов возникают очень естественно при моделировании задачи в объектно-ориентированной манере. Возможность Scala.Rx грациозно обрабатывать Rx внутри Rx позволяет ему грациозно интегрироваться в этот парадигматический подход, чего я не нашёл в большинстве исследованных аналогичных работ.

Большинство примеров здесь взяты из единичных тестов, которые предоставляют больше примеров и руководства по использованию этой библиотеки. Контекст владения

В последнем примере выше нам пришлось ввести концепцию владения, где используется Ctx.Owner. В самом деле, если бы мы пропустили (implicit ctx: Ctx.Owner), мы получили бы следующую ошибку во время компиляции:

ошибка: Этот Rx может утечь! Либо явно отметьте его как небезопасный (Rx.unsafe), либо обеспечьте наличие неявного контекста RxCtx.
           val html = Rx{"Главная страница! время: " + time()}
```Чтобы понять `владение`, важно понять проблему, которую он решает: `утечка`. Например, рассмотрим это небольшое изменение первого примера:

```scala
var count = 0
val a = Var(1); val b = Var(2)
def mkRx(i: Int) = Rx.unsafe { count += 1; i + b() }
val c = Rx{
  val newRx = mkRx(a())
  newRx()
}
println(c.now, count) // (3, 1)

В этом варианте была добавлена функция mkRx, но вычисленное значение c остаётся неизменным. А изменение a кажется правильным:

a() = 4
println(c.now, count) // (6, 2)

Но если мы изменим b, мы можем начать замечать что-то не совсем верное:

b() = 3
println(c.now, count) // (7, 5) -- 5??
  
(0 to 100).foreach { i => a() = i }
println(c.now, count) // (103, 106)

b() = 4
println(c.now, count) // (104, 211) -- 211!!!

В этом примере, даже несмотря на то, что b обновляется всего несколько раз, значение счётчика начинает расти, когда меняется a. Это утечка! То есть каждый раз, когда c пересчитывается, создаётся новый Rx, который продолжает существовать и оцениваться, даже после того, как больше не является доступной зависимостью данных и забытым. Так что после выполнения этого (0 до 100).foreach у нас остаются более 100 Rx, которые все срабатывают каждый раз, когда меняется b. Это явно недопустимо.

Однако, добавив явного владельца (и убрав unsafe), можно исправить утечку:```scala var count = 0 val a = Var(1); val b = Var(2) def mkRx(i: Int)(implicit ctx: Ctx.Owner) = Rx { count += 1; i + b() } val c = Rx{ val newRx = mkRx(a()) newRx() } println(c.now, count) // (3,1) a() = 4 println(c.now, count) // (6,2) b() = 3 println(c.now, count) // (7,4) (0 to 100).foreach { i => a() = i } println(c.now, count) //(103,105) b() = 4 println(c.now, count) //(104,107)


Функция владения предотвращает утечки, позволяя родителю `Rx` отслеживать свои «владеющие» вложенные `Rx`. То есть каждый раз, когда `Rx` пересчитывается, он сначала уничтожает все свои владеющие зависимости, обеспечивая тем самым, что они не утекут. В этом примере `c` является владельцем всех `Rx`, созданных в `mkRx`, и автоматически уничтожает их каждый раз, когда `c` пересчитывается.

Контекст данных
---------------

При использовании либо [Rx](https://github.com/alexandrosstergiou/RxScala), либо [Var](https://github.com/alexandrosstergiou/RxScala/blob/master/src/main/scala/com/github/alexandrosstergiou/rxscala/Var.scala) с помощью `()` (также известного как `apply`) текущее значение распаковывается, а сам объект добавляется в качестве зависимости к тому `Rx`, который сейчас выполняется. Вместо этого можно использовать `.now`, чтобы просто распаковать значение и пропустить создание зависимости данных:

[1]: https://github.com/alexandrosstergiou/RxScala
[3]: https://github.com/alexandrosstergiou/RxScala/blob/master/src/main/scala/com/github/alexandrosstergiou/rxscala/Var.scala```scala
val a = Var(1); val b = Var(2)
val c = Rx{ a.now + b.now } //Не очень полезный `Rx`
println(c.now) // 3
a() = 4
println(c.now) // 3 
b() = 5
println(c.now) // 3 

Чтобы понять необходимость контекста данных и различия между контекстами данных и владельцами, рассмотрите следующий пример:

def foo()(implicit ctx: Ctx.Owner) = {
  val a = rx.Var(1)
  a()
  a
}

val x = rx.Rx{val y = foo(); y() = y() + 1; println("done!") }

С концепцией владения, если a() позволяет создать зависимость данных от своего владельца, это приведёт к бесконечной рекурсии и переполнению стека! Вместо этого вышеупомянутый код даёт эту ошибку компиляции:

<console>:17: ошибка: Нет явного значения Ctx.Data здесь!
        a()

Можно «починить» ошибку, явно разрешив создание зависимостей данных (и заметить, что стек переполняется):

def foo()(implicit ctx: Ctx.Owner, data: Ctx.Data) = {
  val a = rx.Var( Yöntemdeki hata: "Ctx.Data" вместо "data". 
  Предлагаю исправление: "val a = rx.Var(1)".
  
  Исправленный текст:
```scala
def foo()(implicit ctx: Ctx.Owner, data: Ctx.Data) = {
  val a = rx.Var(1)
  a()
  a
}
val x = rx.Rx{val y = foo(); y() = y() + 1; println("done!") }
...
в rx.Rx$Dynamic$Internal$$anonfun$calc$2.apply(Core.scala:180)
  в scala.util.Try$.apply(Try.scala:192)
  в rx.Rx$Dynamic$Internal$.calc(Core.scala:180)
  в rx.Rx$Dynamic$Internal$.update(Core.scala:184)
  в rx.Rx$.doRecalc(Core.scala:130)
  в rx.Var.update(Core.scala:280)
  в $anonfun$1.apply(<console>:15)
  в $anonfun$1.apply(<console>:15)
  в rx.Rx$Dynamic$Internal$$anonfun$calc$2.apply(Core.scala:180)
  в scala.util.Try$.apply(Try.scala:192)
...

Контекст данных — это механизм, используемый Rx для принятия решения о необходимости пересчета. Контекст владения решает проблему утечки. Объединение двух контекстов может привести к бесконечной рекурсии: когда объект является одновременно владельцем и зависимостью данных того же родительского Rx.


```scala
def f(...)(implicit ctx: Ctx.Owner) = Rx { ... }

Контекст Data требуется реже, но полезен в случае, когда желательно избежать повторений (DRY) некоторых часто используемых блоков кода с помощью Rx. Такая функция будет иметь следующий вид:

def f(...)(implicit data: Ctx.Data) = ...

Это позволит вынести общую зависимость данных из тела каждого Rx в общую функцию.

Разделение взаимоисключающих концепций «владения» и «зависимостей данных» значительно ограничивает проблему бесконечной рекурсии, как это было указано выше. Явные зависимости данных делают более очевидным использование Var или Rx как зависимости данных, а не просто простого чтения текущего значения (например, .now). Без этой разницы легче случайно создать непредвиденные зависимости данных.

Дополнительные операции ------------------------Помимо базовых строительных блоков Var/Rx/Obs, Scala.Rx также предоставляет набор композиционных операторов, позволяющих легко преобразовывать ваши Rx; это позволяет программисту избегать постоянного переписывания логики для распространённых способов построения графа потока данных. Пять основных композиционных операторов: map(), flatMap, filter(), reduce() и fold() моделируются по аналогии со сборочной библиотекой Scala и предоставляют удобный способ преобразования значений, выходящих из Rx.### Карта

val a = Var(10)
val b = Rx{ a() + 2 }
val c = a.map(_*2)
val d = b.map(_+3)
println(c.now) // 20
println(d.now) // 15
a() = 1
println(c.now) // 2
println(d.now) // 6

Функция map выполняет то, что от неё можно ожидать — создаёт новый Rx с преобразованным значением старого Rx. Например, a.map(_*2) эквивалентна Rx{ a() * 2 }, но немного удобнее для записи.

FlatMap

val a = Var(10)
val b = Var(1)
val c = a.flatMap(a => Rx { a*b() })
println(c.now) // 10
b() = 2
println(c.now) // 20

Функция flatMap аналогична flatMap из стандартной библиотеки Scala, позволяющей объединять вложенные Rx типа Rx[Rx[_]] в единственный Rx[_].

Этот метод вместе с оператором map позволяет использовать синтаксис for-comprehension Scala для работы с Rx и Var:

val a = Var(10)
val b = for {
  aa <- a
  bb <- Rx { a() + 5 }
  cc <- Var(1).map(_ * 2)
} yield {
  aa + bb + cc
}

Фильтрация

val a = Var(10)
val b = a.filter(_ > 5)
a() = 1
println(b.now) // 10
a() = 6
println(b.now) // 6
a() = 2
println(b.now) // 6
a() = 19
println(b.now) // 19

Оператор filter игнорирует изменения значения Rx, если они не удовлетворяют предикату.

Обратите внимание, что ни один из методов filter не может отфильтровать первое, начальное значение Rx, так как нет более раннего значения, на которое можно было бы опираться. Поэтому этот пример:

val a = Var(2)
val b = a.filter(_ > 5)
println(b.now)

выведет "2".

Суммирование

val a = Var(1)
val b = a.reduce(_ * _)
a() = 2
println(b.now) // 2
a() = 3
println(b.now) // 6
a() = 4
println(b.now) // 24
```Оператор `reduce` объединяет последовательные значения [Rx][1], начиная с начального значения. Каждое изменение исходного [Rx][1] объединяется с ранее сохранённым значением и становится новым значением суммирующегося [Rx][1].### Агрегация

```scala
val a = Var(1)
val b = a.fold(List.empty[Int])((acc, elem) => elem :: acc)
a() = 2
println(b.now) // List(2, 1)
a() = 3
println(b.now) // List(3, 2, 1)
a() = 4
println(b.now) // List(4, 3, 2, 1)

Агрегация позволяет накапливать данные аналогично оператору reduce, но может накапливать в типе отличном от типа исходного Rx.

Каждый из этих пяти комбинаторов имеет эквивалент в пространстве имён .all, который работает с Try[T] вместо T, когда требуется гибкость для обработки ошибок специальным образом.

Асинхронные комбинаторы

Это комбинаторы, которые выполняют больше, чем просто преобразование одного значения в другое. Эти комбинаторы имеют асинхронные эффекты и могут спонтанно модифицировать граф данных и запускать циклы распространения без внешних триггеров. Хотя это может звучать несколько тревожно, функциональность, предоставляемая этими комбинаторами, часто необходима, и вручную записывать логику вокруг чего-то вроде дебонсинга, например, гораздо более ошибочна, чем использование предоставленных комбинаторов.

Обратите внимание, что ни один из этих комбинаторов не делает ничего такого, что нельзя было бы сделать с помощью комбинации Obs и Var; они просто упаковывают общие шаблоны, экономят вам время на повторное написание одних и тех же вещей, и снижают вероятность возникновения багов.### Будущее```scala import scala.concurrent.Promise import scala.concurrent.ExecutionContext.Implicits.global import rx.async._

val p = PromiseInt val a = p.future.toRx(10) println(a.now) //10 p.success(5) println(a.now) //5


Комбинатор `toRx` применим только к объектам типа `Future[_]`. Он принимает начальное значение, которое будет значением Rx до тех пор, пока Future не завершится, после чего значение станет значением Future.

Этот `async` может создавать `Future` любое количество раз. В этом примере показано, как он создаёт два различных `Future`:

```scala
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
import rx.async._

var p = Promise[Int]()
val a = Var(1)

val b: Rx[Int] = Rx {
  val f = p.future.toRx(10)
  f() + a()
}
println(b.now) //11
p.success(5)
println(b.now) //6

p = Promise[Int]()
a() = 2
println(b.now) //12

p.success(7)
println(b.now) //9

Значение b() обновляется так, как можно было бы ожидать, когда последовательность Future завершается (в данном случае, с использованием Promise).

Это полезно, если ваш граф зависимостей содержит асинхронные элементы. Например, вы можете иметь Rx, который зависит от другого Rx, но требует асинхронного сетевого запроса для вычисления своего окончательного значения. С помощью async результаты асинхронного сетевого запроса будут автоматически возвращены в граф данных при завершении Future, запуская новый цикл распространения и удобно обновляя остальную часть графа, которая зависит от нового результата.


```scala
import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val t = Timer(100 millis)
var count = 0
val o = t.trigger {
    count = count + 1
}

println(count) // 3
println(count) // 8
println(count) // 13

Таймер — это Rx, который генерирует события на регулярной основе. В приведённом выше примере использование println в консоли показывает, что значение count увеличивается со временем. Запланированная задача автоматически отменяется, когда объект Timer становится недоступным, поэтому его можно удалить сборщиком мусора. Это означает, что вам не нужно беспокоиться о управлении жизненным циклом объекта Timer. С другой стороны, это означает, что программист должен гарантировать, что ссылка на объект Timer содержится тем же объектом, который также хранит любое Rx, слушающее его. Это обеспечит то, что момент, в который объект Timer будет удален сборщиком мусора, не будет иметь значения, так как к тому времени сам объект (и любое Rx, которое он мог бы повлиять), будут недоступны.

Задержка

import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val a = Var(10)
val b = a.delay(250 millis)

a() = 5
println(b.now) // 10
eventually{
  println(b.now) // 5
}

a() = 4
println(b.now) // 5
eventually{
  println(b.now) // 4
}

Комбинатор delay(t) создаёт отложенную версию Rx, значение которой отстаёт от оригинального значения на заданный временной промежуток t. Когда Rx изменяет своё состояние, отложенная версия этого Rx не будет меняться до тех пор, пока не истечёт время задержки t.Этот пример демонстрирует применение задержки к объекту типа Var, но этот подход можно использовать и с другими типами Rx.

Отброс

import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val a = Var(10)
val b = a.debounce(200 milliseconds)
a() = 5
println(b.now) // 5

a() = 2
println(b.now) // 5

eventually {
  println(b.now) // 2
}

a() = 1
println(b.now) // 2

eventually {
  println(b.now) // 1
}

Комбинатор debounce(t) создаёт версию Rx, которая не будет обновляться более одного раза за каждый временной период t.

Если несколько обновлений происходят в короткий промежуток времени (менее чем через t), первое обновление произойдет сразу, а второе — только после того, как пройдет время t. Например, это может использоваться для ограничения частоты вычисления дорогостоящего результата: возможно допустить, чтобы рассчитанный результат был немного устаревшим, если это позволяет избежать повторного выполнения дорогостоящих вычислений каждые несколько секунд.

Разработка ==========Простота использования

Для обеспечения простоты использования синтаксис для записи программ должен был быть максимально лёгким. Программы, написанные с использованием FRP, должны были "выглядеть" такими же, как их обычные, старомодные, императивные аналоги. Это требование потребовало использования DynamicVariable вместо implicits для автоматической передачи аргументов, что позволило жертвовать правильной лексической областью видимости ради удобства синтаксиса.Использование чисто монадического стиля (например, reactive-web) было исключено, поскольку хотя бы реализация библиотеки стала бы намного проще таким образом, её использование стало бы значительно сложнее. Также не хотелось бы иметь дело с необходимостью явного объявления зависимостей, так как это нарушило бы принцип DRY при объявлении зависимостей дважды: один раз в заголовке Rx, и ещё раз при использовании его в теле программы. Цель заключалась в возможности писать код, добавлять несколько Rx и иметь автоматическое отслеживание зависимостей и распространение изменений, которое просто работает. В целом, считаю, что это было довольно успешным!

Простота понимания

Это означает многое, но прежде всего — отсутствие глобальных переменных. Это значительно упрощает многие вещи для пользователя библиотеки, так как теперь нет необходимости рассматривать взаимодействие различных частей программы через библиотеку. Использование Scala.Rx в разных частях большого проекта совершенно допустимо; они полностью независимы друг от друга.

Ещё одно дизайнерское решение заключается в том, чтобы параллелизм и планирование распространения были оставлены главным образом на неявной ExecutionContext, с тем, чтобы по умолчанию распространение волн происходило на том же потоке, который сделал обновление графа данных.- Первое означает, что любой, кто привык писать параллельные программы на Scala/Akka, уже знаком с тем, как работать с параллелизацией в Scala.Rx.

  • Второе делает гораздо проще понять, когда происходит распространение, хотя бы в случае по умолчанию: это происходит немедленно, и к тому времени, как функция Var.update() завершится, распространение будет завершено.Общими словами, ограничение области побочных эффектов и удаление глобального состояния делает Scala.Rx простым для понимания, а разработчику можно сосредоточиться на использовании Scala.Rx для создания графов данных. Это библиотека: она обычная библиотека на Scala. В ней нет преобразования исходного кода в другой исходный код, нет необходимости использовать специальный средний уровень выполнения для использования Scala.Rx. Вы скачиваете исходный код в свой проект на Scala и начинаете его использовать. Она дает возможность использовать любую конструкцию программирования или функциональность библиотек внутри ваших Rx: Scala.Rx сам выявляет зависимости без того чтобы программисту пришлось беспокоиться об этом, что позволяет избежать ограничений неконкретной части языка. Она также дает возможность использовать Scala.Rx в более крупном проекте практически безболезненно. Вы можете легко включать графики потока данных в большее объектно-ориентированное пространство и взаимодействовать с ними через установку Var и прослушивание Obs.

3: https://github.com/ReactiveX/RxJava/wiki/Subject#var-subjectМногие из рассмотренных статей демонстрируют красивый новый мир FRP, где можно было бы программировать, если бы вы перенесли весь свой код на FRP-Haskell и ограничились небольшим набором конструкторов для создания графиков данных. С другой стороны, благодаря тому, что Scala Rx позволяет внедрять фрагменты FRP в любом месте внутри существующего кода, использовать идеи FRP в уже существующих проектах без полной приверженности, а также обеспечивает легкую совместимость между вашими FRP и неконтролируемыми кодами, он стремится принести преимущества FRP в грязный, запутанный мир, в котором мы программируем сегодня.Ограничения

Scala.Rx имеет ряд значительных ограничений, некоторые из которых возникают из-за компромиссов в дизайне, другие — из-за ограничений базовой платформы.

Отсутствие «пустых» реактивных объектов

API реактивных объектов (Rxs) в Scala.Rx пытается следовать API коллекций как можно ближе: вы можете применять методы map, filter и reduce к Rxs так же, как это делаете с коллекциями. Однако в настоящее время невозможно иметь Rx, который был бы пустым, как это возможно для коллекций: фильтрация всех значений в Rx всё ещё оставляет хотя бы начальное значение (даже если оно не проходит проверку) и асинхронные Rx должны получать начальное значение для старта.

Это ограничение связано с трудностями объединения возможных пустых Rx с хорошим пользовательским опытом. Например, если у меня есть график данных потока:

val a = Var()
val b = Var()
val c = Rx {
    .... a() ...
    ... некоторое вычисление ...
    ... b() ...
    результат
}

Где a и b изначально пусты, у меня фактически есть две основные возможности:- Блокировать текущий поток, который вычисляет c, ожидая, пока a и затем b станут доступными.

  • Вызывать исключение при вызове a() и b(), прекращая вычисление c, но регистрируя его для повторного запуска, когда a() или b() станут доступными.
  • Переписать этот код в монадическом стиле с использованием for-comprehensions.
  • Использовать плагин ограниченных продолжений для автоматического преобразования вышеуказанного кода в монадический код.Первая возможность является проблемой производительности: потоки обычно являются очень тяжёлыми на большинстве операционных систем. Вы не можете разумно создать более нескольких тысяч потоков, что является ничтожно малым количеством по сравнению с количеством объектов, которое вы можете создать. Поэтому, хотя блокировка была бы самым простым решением, она осуждается во многих системах (например, в Akka, на основе которого создан Scala.Rx) и не кажется подходящим решением. Второй вариант представляет собой проблему производительности по-другому: с n различными зависимостями, все они могут начинаться пустыми, вычисление c может потребовать запуска и прерывания вычислений до n раз даже до первого завершения. Хотя это не блокирует потоки, кажется чрезмерно затратным.

Третий вариант неприемлем с точки зрения пользовательского опыта: он требует глобальных изменений в базе кода и стиле программирования для получения выгод от распространения изменений, что мне не хочется требовать.

Последний вариант является проблематичным из-за багов плагина ограниченных продолжений. Хотя теоретически он должен решать все проблемы, множество мелких ошибок (вмешивающихся в вывод типов, влияющих на неявное решение) вместе с несколькими фундаментальными проблемами делали его болезненным для использования даже в малых проектах (менее 1000 строк реактивного кода).### Автоматическое параллелирование НЕ происходит по умолчанию

Как уже упоминалось ранее, Scala.Rx способна автоматически параллелировать обновления, происходящие в графе данных: достаточно предоставить подходящее ExecutionContext, и независимые Rxs будут иметь свои обновления распределены по нескольким ядрам процессора.

Однако это работает только для обновлений, а не при первоначальной установке графа данных: в этом случае каждый Rx вычисляет своё тело один раз для получения значения по умолчанию, и всё происходит последовательно на одном потоке. Это ограничение возникает из-за того, что мы не имеем хорошего способа работы с "пустыми" Rxs, и мы не знаем зависимостей Rxs до первого вычисления.

Поэтому мы не можем начать все наши Rxs параллельно, так как некоторые могут завершиться раньше других, на которых они зависят, что сделает их пустыми, пока их начальное значение ещё вычисляется. Мы также не можем выбрать параллелизацию тех, которые не зависят друг от друга, так как перед выполнением мы не знаем эти зависимости!

Таким образом, нам не остаётся выбора кроме как выполнять первоначальные определения Rxs последовательно. Если необходимо, программист может вручную создать независимые Rxs параллельно используя Futures.### Шумность и избыточные вычисления

В контексте FRP (функционального реактивного программирования), глитч — это временное несоответствие в графе данных. Поскольку обновления не происходят мгновенно, а требуют времени для вычислений, значения внутри системы FRP могут временно выходить из синхронизации во время процесса обновления. Кроме того, в зависимости от природы системы FRP, возможно, что узлы будут обновлены более одного раза в ходе распространения.

Это может быть проблемой или нет, в зависимости от того, насколько терпимо приложение к периодически устаревающим неконсистентным данным. В однопоточной системе эту проблему можно избежать несколькими способами:

  • Сделайте граф данных статичным и выполните топологическую сортировку для ранжирования узлов в порядке их обновления. Это значит, что узел всегда будет обновлен после своих зависимостей, следовательно, они никогда не получат устаревшие данные.
  • Приостановите обновление узла, когда он пытается обратиться к зависимости, которая ещё не была обновлена. Это можно сделать путём блокировки потока, например, и возобновления только после обновления зависимости.Однако оба эти подхода имеют проблемы. Первый подход крайне ограничивающий: статический граф данных означает запрет на полезное поведение, например, создание и удаление разделов графа динамически во время выполнения. Это противоречит цели Scala.Rx, которая позволяет программисту писать код «обычным» образом без ограничений и позволяет системе FRP самостоятельно разбираться.Второй случай является проблемой для языков, которые не легко позволяют приостановить вычисления. В Java и, следовательно, в Scala используются потоки операционной системы (ОС), которые очень затратны. Поэтому блокировка потока ОС нежелательна. Для этого также могли бы использоваться корутины и продолжения, но Scala не имеет ни тех, ни других средств.Последняя проблема заключается в том, что оба этих модели имеют смысл только в случае однопоточного последовательного кода. Как указано в разделе о конкурентном программировании и параллелизме, Scala.Rx позволяет использовать несколько потоков для параллелизации распространения и позволяет начинать распространение несколькими потоками одновременно. Это означает, что строгое запрещение глитчей невозможно. Scala.Rx использует somewhat более свободную модель: тело каждого Rx может быть вычислено более одного раза при распространении, и Scala.Rx гарантирует лишь "наилучшие усилия" для сокращения количества избыточных обновлений. При условии, что тело каждого Rx является чистым, это означает, что избыточные обновления должны влиять только на время и вычисления, необходимые для завершения распространения, но не должны влиять на значение каждого узла после завершения распространения. Кроме того, Scala.Rx предоставляет Obs, которые являются специальными конечными узлами, гарантированными обновляться только один раз при распространении, с целью создания побочных эффектов. Это означает, что хотя распространение может вызывать временные расхождения значений Rx в графе данных, окончательные побочные эффекты распространения произойдут только после завершения всего процесса распространения, когда все Obs активируют свои побочные эффекты.Если несколько распространений происходят параллельно, Scala.Rx гарантирует, что каждый Obs будет активирован не более одного раза при каждом распространении, и не менее одного раза в целом. Кроме того, каждый Obs будет активирован как минимум один раз после того, как весь граф данных стабилизируется и распространения завершены. Это означает, что если вы полагаетесь на Obs для отправки обновлений через сеть на удалённый клиент, вы можете быть уверены, что у вас нет лишнего шума, передаваемого через сеть, а когда система находится в спокойном состоянии, удалённый клиент получит обновления, представляющие собой самую актуальную версию графа данных.

Связанные работы

Scala.Rx был создан не в вакууме и заимствует идеи и вдохновение из различных существующих проектов.

Scala.React

Scala.React, как описано в Отказ от использования паттерна наблюдателя, содержит реактивную часть распространения изменений (там называемых Signal), которая похожа на то, что делает Scala.Rx. Однако это делает гораздо больше: оно содержит реализации для использования потоков событий и нескольких DSL с использованием ограниченных продолжений для удобства написания асинхронных рабочих процессов.Я использовал эту библиотеку, и мой опыт показывает, что она крайне сложна для установки и начала использования. Она требует значительной глобальной настройки, с глобальным движком, выполняющим планирование и распределение, даже запускающим свои собственные пулы потоков. Это сделало очень трудным понимание взаимодействий между частями программы: будут ли полностью отдельные графы данных влиять друг на друга через этот глобальный движок? Будет ли производительность многопоточных программ снижаться по мере увеличения количества потоков, поскольку движок становится затором? Я никогда не находил ответы на многие из этих вопросов и не смог связаться с автором. Глобальный механизм распределения также делает начало сложным. Ушло несколько дней, чтобы сделать базовый граф потока данных (похожий на пример в верхней части этого документа) работоспособным. Это после множества борьбы, чтения соответствующих статей десятки раз и модификаций исходного кода способами, которые мне были непонятны. Без лишних слов, эти основы не внушали мне уверенности для дальнейшего развития.реактивный-веб

Reactive Web также послужил источником вдохновения. Этот проект представляет собой относительно независимую реализацию по сравнению с Scala.Rx, акцентируя внимание на потоках событий и интеграцию с Lift-web-фреймворком, тогда как Scala.Rx сфокусирован на временно меняющихся значениях.

Однако Reactive Web предлагает свои временно меняющиеся значения (называемые Signal'ами), которые манипулируются с помощью комбинаторов, аналогичных тем, что используются в Scala.Rx (map, filter, flatMap, и т.д.). Тем не менее, Reactive Web не предоставляет простого способа композиции этих Signal'ов: программисту приходится полагаться исключительно на map и flatMap, возможно используя Scala's for-comprehensions.

Мне не понравилось то, что вам приходилось программировать в монадическом стиле (то есть жить внутри .map() и .flatMap() и for{} comprehensions постоянно), чтобы воспользоваться преимуществами механизма распространения изменений. Это особенно затрудняет работу в случае [вложенных [Rx]], где Scala.Rx позволяет:

// a, b и c являются Rx
x = Rx { a() + b().c() }

становиться

x = for {
  va <- a
  vb <- b
  vc <- vb.c
} yield (va + vc)

Как можно видеть, использование for-comprehensions, как в Reactive Web, приводит к тому, что код становится значительно длиннее и намного более запутанным.Knockout.js

Knockout.js делает что-то похожее для JavaScript, а также предлагает некоторые дополнительные возможности, такие как связывание с DOM. В действительности, дизайн, реализация и опыт разработчика автоматического отслеживания зависимостей практически одинаковы. Это:

this.firstName = ko.observable('Bob');
this.lastName = ko.observable('Smith');
fullName = ko.computed(function() {
    return this.firstName() + " " + this.lastName();
}, this);

семантически эквивалентно следующему коду Scala.Rx:

val firstName = Var("Bob")
val lastName = Var("Smith")
fullName = Rx{ firstName() + " " + lastName() }

ko.observable напрямую отображается на Var, а ko.computed — на Rx. Исключая более длинные названия переменных и увеличенную длину кода в JavaScript, семантика почти одинакова.

Кроме предоставления функциональности, эквивалентной Var и Rx, Knockout.js сосредоточен на других направлениях. Он не предоставляет большинство полезных комбинаторов, доступных в Scala.Rx, но предлагает множество других возможностей, таких как интеграция с браузерным DOM, чего нет в Scala.Rx.

Другое

Понятие распространения изменений, то есть изменения значений со временем, которые уведомляют все зависящие от них значения, относится к области функционального реактивного программирования (FRP). Это хорошо исследованная область, которая имеет много научных достижений. Scala.Rx основан на этой работе и использует идеи из следующих проектов:

Все эти проекты полны хороших идей. Однако обычно они являются очень чистыми исследованиями: чтобы получить преимущества от FRP, вам потребуется писать весь ваш код на сложном языке, и мало шансов взаимодействовать с существующими не-FRP кодами.В одном неизвестном фреймворке, таком как FRP, создание производственного программного обеспечения само по себе представляет серьезный риск. Кроме того, написание производственного программного обеспечения на неизвестном языке увеличивает еще одну переменную, а создание его в неизвестном фреймворке на неизвестном языке без возможности взаимодействия со старым кодом — это просто безрассудство. Поэтому не удивительно, что эти библиотеки не используются широко. Scala.Rx стремится решить эти проблемы, предоставляя преимущества FRP в знакомом языке и обеспечивая бесшовное взаимодействие FRP с другими традиционными командными или объектно-ориентированными кодами.

Версия истории


0.3.2


  • Обновлен до Scala 2.12.0.

0.3.1


  • Устранена утечка наблюдателя (они тоже требуют контекста).

  • Исправлен тип flatMap.

0.3.0


  • Введены контексты Owner и Data. Это совершенно новый подход к управлению зависимостями и жизненным циклом, который позволяет безопасно строить динамические графы выполнения.

  • Больше стандартных комбинаторов: fold и flatMap теперь реализованы по умолчанию.

Кредиты

======= Авторское право © 2013 года Ли Хаоюй (haoyi.sg@gmail.com)

Это соглашение позволяет любому лицу, которому была предоставлена копия этого программного обеспечения и связанных с ним документов ("Программное обеспечение"), свободно использовать Программное обеспечение без каких-либо ограничений, включая без ограничений права использовать, копировать, изменять, объединять, публиковать, распространять, передавать, лицензировать и продавать копии Программного обеспечения, а также позволять другим лицам, которым данное Программное обеспечение было передано, делать то же самое, при условии соблюдения следующих условий:

Вышеуказанное авторское право и это разрешение должны быть включены во все копии или значительные части Программного обеспечения.ПРЕДМЕТНОЕ ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЯЕТСЯ «КАК ЕСТЬ», БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ, ВЫРАЖЕННЫХ ИЛИ ПОДРАЗУМЕВАННЫХ, ВКЛЮЧАЯ, НО НЕ ОГРАНИЧИВАЯСЬ, ГАРАНТИЯМИ ТОРГОВЛЕНИЯ, ПРИГОДНОСТИ ДЛЯ КОНКРЕТНЫХ ЦЕЛЕЙ И НЕНАРУШЕНИЯ ПРАВ. В НИКАКОМ СЛУЧАЕ АВТОРЫ И ДОЛЖНИКИ АВТОРСКОГО ПРАВА НЕ ДОЛЖНЫ БЫТЬ УЧАСТНИКАМИ ЛЮБОГО ТРЕБОВАНИЯ, УЩЕРБА ИЛИ ДРУГИХ ОТВЕТСТВЕННОСТЕЙ, ВОЗНИКШИХ ИЗ, В СВЯЗИ С ИЛИ В СВЯЗИ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ ИЛИ ИСПОЛЬЗОВАНИЕМ ИЛИ ДРУГИМИ ДЕЙСТВИЯМИ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ.

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/scalalibs-scala.rx.git
git@api.gitlife.ru:oschina-mirror/scalalibs-scala.rx.git
oschina-mirror
scalalibs-scala.rx
scalalibs-scala.rx
master