Как десятая часть серии статей, эта статья предоставляет всестороннее понимание одного из ключевых дизайнов в Flutter — потока данных (Stream). Мы углубимся в принципы работы, чтобы помочь вам лучше понять все аспекты Stream.
Полный практический гид по Flutter
Специальные статьи о мире Flutter
Stream
является очень важной концепцией в Flutter. В Flutter управление состоянием часто использует Stream
, будь то rxdart
, Bloc
модель, flutter_redux
или fish_redux
. Однако важно отметить, что Stream
не уникален для Flutter; это встроенная возможность языка Dart.
Проще говоря, Stream
представляет собой поток событий или канал. Поток событий знаком многим: это способность управлять кодом с помощью событий, слушать события и реагировать на изменения этих событий.
В Flutter основные объекты, представленные в модели Stream
, включают StreamController
, Sink
, Stream
и StreamSubscription
.
Ниже приведен пример простого использования Stream
. Обычно нам требуется:
создать StreamController
,
получить StreamSink
для добавления новых значений,
получить Stream
для прослушивания событий,
управлять подписками через StreamSubscription
, и закрыть его при необходимости.```dart
class DataBloc {
final StreamController<List> _dataController = StreamController<List>();
final StreamSink<List> _dataSink = _dataController.sink; final Stream<List> _dataStream = _dataController.stream; StreamSubscription<List> _dataSubscription;
void init() { _dataSubscription = _dataStream.listen((List value) { // Обработка события });
_dataSink.add(['первый', 'второй', 'третий', 'ещё']); }
void close() { _dataSubscription?.cancel(); _dataController?.close(); } }
После того как слушатель установлен, метод внутри `listen` будет вызываться каждый раз при изменении события. Также можно использовать операторы для преобразования потока `Stream`.
Как показано ниже, это действительно напоминает стиль Rx:
```dart
_dataStream.where(test).map(convert).transform(streamTransformer).listen(onData);
В Flutter этот подход дополняется компонентом StreamBuilder
, который позволяет создать асинхронный контроллер состояния на основе событийного потока.
StreamBuilder<List<String>>(
stream: dataStream,
initialData: ["none"],
// здесь snapshot — это снимок данных
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
// получаем данные и можем делать что угодно для обновления UI
var data = snapshot.data;
return Container();
});
Теперь возникает вопрос: как они реализованы внутренне? Какова их работающий принцип? Какие задачи выполняют каждые из них? Какие у них особенности? В следующих разделах мы будем подробно анализировать эту логику.
Из вышеописанного следует, что в Flutter используются четыре основных объекта для работы с Stream
. Но как они взаимодействуют друг с другом? Какую роль играет каждый из них?
Сначала рассмотрим диаграмму более продвинутого уровня, которая демонстрирует внутренний рабочий процесс Stream
.
В Flutter объекты Stream
, StreamController
, StreamSink
и StreamSubscription
являются абстрактными. Они предоставляют внешние интерфейсы, а большинство внутренних реализаций начинаются с _
, таких как _SyncStreamController
, _ControllerStream
и так далее. Общая идея заключается в том, что:
**Есть источник событий, называемый Stream
. Для удобства управления Stream
официальное API предоставляет StreamController
. Он также предлагает StreamSink
для входящих событий, доступ к которому осуществляется через свойство sink
; свойство stream
используется для прослушивания и изменения Stream
, а затем StreamSubscription
управляет подписками на события.**Поэтому мы можем сделать вывод, что:
StreamController
: как следует из названия класса, используется для управления всем процессом Stream
, предоставляя различные интерфейсы для создания различных потоков событий.StreamSink
: обычно используется в качестве входной точки для событий, предоставляя методы, такие как add
, addStream
и т.д.Stream
: сам источник событий, который обычно используется для прослушивания событий или преобразования событий, таких как listen
, where
.StreamSubscription
: объект после подписки на события, используемый для управления различными операциями, такими как cancel
, pause
. Внутри он также является ключевым компонентом для передачи событий.Возвращаемся к рабочему процессу Stream
. Из приведенной выше схемы мы знаем, что при вызове метода StreamSink.add
, чтобы добавить событие, это событие в конечном итоге вызывает метод onData
внутри метода listen
. Этот процесс выполняется через метод zone.runUnaryGuarded
. Мы рассмотрим более подробно, как работает этот метод позже, но сначала нам следует понять, откуда берется этот метод onData
.
Как видно из схемы:
listen
объекта Stream
, он передает обратный вызов onData
внутрь объекта StreamSubscription
. Затем этот обратный вызов регистрируется через метод zone.registerUnaryCallback
, который создает объект _onData
(не путайте его с первоначальным обратным вызовом onData
).StreamSink
добавляет событие, он вызывает метод _sendData
внутри объекта StreamSubscription
. Затем этот метод вызывает _zone.runUnaryGuarded(_onData, data)
для выполнения ранее созданного объекта _onData
, что активирует обратный вызов, переданный в метод listen
.Можно заметить, что весь этот процесс связан с объектом StreamSubscription
. Теперь мы уже знаем, как происходит работа от входа события до выхода события. Но вопрос остаётся: как этот процесс выполняется асинхронно? А что такое часто встречающееся слово "zone"?
Сначала нам нужно понять, как реализуется асинхронность в Stream
.
Для этого стоит обратиться к логике реализации асинхронности в Dart. Поскольку Dart является однопоточной программой, она использует механизм сообщений для своего выполнения, который состоит из двух очередей задач — одной внутренней (microtask
) и внешней (event
). При этом очередь microtask
имеет более высокий приоритет по сравнению с event
.По умолчанию все события в Dart, такие как click, swipe, input/output data, rendering, находятся в очереди event
. Очередь microtask
обычно заполняется внутренними операциями Dart. В то же время асинхронное выполнение в Stream
осуществляется через метод scheduleMicrotask
.> Поскольку очередь microtask
имеет более высокий приоритет, чем event
, большое количество задач в этой очереди может заблокировать внешние события, такие как касание экрана или отрисовку.
Ниже представлена схема работы асинхронной операции внутри Stream
:
А теперь вопрос: что такое Zone
? Откуда он берется? В предыдущей главе было сказано, что асинхронные операции вроде Future
в Dart нельзя отлавливать с помощью try/catch
в текущем коде. Однако в Dart можно указать объекту зону (Zone
), которая работает как изолированная среда выполнения, и внутри этой среды вы можете поймать, перехватить или изменить поведение некоторых частей кода, например, все незахваченные исключения. Какой по умолчанию Zone
используется в проекте? В Flutter'e запуск Zone
происходит в методе _runMainZoned
, как показано ниже:
/// Dart 中
@pragma('vm:entry-point')
// ignore: unused_element
void _runMainZoned(Function startMainIsolateFunction, Function userMainFunction) {
startMainIsolateFunction(() {
runZoned<Future<void>>(...);
}, null);
}
Что делает zone.runUnaryGuarded
? Отличие от асинхронной операции с использованием scheduleMicrotask
: выполняется данное действие с передачей одного параметра внутри данной зоны и пойманы синхронные ошибки. Аналогичные методы также существуют, такие как runUnary
, runBinaryGuarded
. Таким образом, мы знаем, что zone.runUnaryGuarded
выполняет уже зарегистрированную функцию _onData
в данной зоне и ловит исключения.#### 5. Асинхронность и синхронность
Ранее мы говорили о внутреннем выполнении потока Stream
. Какова разница между синхронными и асинхронными операциями? Как это реализуется?
Допустим, что у нас есть пример использования по умолчанию Stream
. Создание объекта StreamController
через его конструктор можно указать с помощью sync
для выбора синхронного или асинхронного режима; по умолчанию используется асинхронный режим. Независимо от того, является ли это синхронным или асинхронным, они все наследуются от _StreamController
объекта, различия заключаются в том, какой из _EventDispatch
миксинов используется:
_AsyncStreamControllerDispatch
_SyncStreamControllerDispatch
Основное отличие этих двух _EventDispatch
заключается в том, вызывается ли метод _add
прямым вызовом StreamSubscription
или же вызывается метод _addPending(new _DelayedData<T>(data))
.
На следующем рисунке показана логика асинхронного выполнения, которая была упомянута выше в контексте scheduleMicrotask
. После выполнения scheduleMicrotask
в _StreamImplEvents
, вызывается метод perform
у _DelayedData
, который затем через _sendData
вызывает обратный вызов данных у StreamSubscription
.
Пожалуйста, заметьте, что представленный текст представляет собой адаптацию исходного текста к структуре и терминологии, используемым в документации Flutter на русском языке.
#### 6. БROADCAST И НЕБROADCAST.
В Stream
есть два режима — broadcast и non-broadcast. В случае использования broadcast-режима реализация StreamController
выглядит следующим образом:
_SyncBroadcastStreamController
_AsyncBroadcastStreamController
Разница между broadcast и non-broadcast заключается в том, как они вызывают метод _createSubscription
. Различия заключаются в следующем:
В _StreamController
, если Stream
находится в начальном состоянии (_isInitialState
) и уже был подписан, то будет выброшено исключение "Stream has already been listened to.". Только если подписка ещё не была создана, создаётся новый объект StreamSubscription
.
В _BroadcastStreamController
проверка _isInitialState
отсутствует, вместо этого используется проверка isClosed
. В случае с broadcast _sendData
выполняется через forEach
:
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
Stream
поддерживают преобразование данных. Для получения нужного результата можно применять несколько преобразований. Как это реализовано?
Как правило, классы, реализующие преобразования Stream
, наследуют _ForwardingStream
. Внутри _ForwardingStreamSubscription
происходит добавление _handleData
обратного вызова при помощи метода listen
предыдущего Stream
. Затем этот обратный вызов снова вызывает метод _handleData
нового Stream
.
По сути, все преобразования представляют собой вложенные вызовы метода listen
для Stream
.
Кроме того, Stream
могут быть преобразованы в Future
, такие как firstWhere
, elementAt
, reduce
и другие операторы. Эти методы обычно создают внутренний _Future
и затем используют его в обратном вызове listen
для возврата значения Future
.
Как показано ниже, в Flutter можно использовать StreamBuilder
для создания виджета, предоставляя ему экземпляр Stream
. Объект AsyncSnapshot
представляет собой снимок данных, где текущие данные и состояние хранятся в поле data
. Но как StreamBuilder
связывается с Stream
?
StreamBuilder<List<String>>(stream: dataStream, initialData: ["none"],
builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
var data = snapshot.data;
return Container();
});
Как показано на приведённой выше диаграмме, логика вызова StreamBuilder
сосредоточена в классе _StreamBuilderBaseState
. В методах initState
и didUpdateWidget
вызывается метод _subscribe
, который запускает операцию listen
для Stream
и обновляет UI через setState
, всё это происходит очень просто!
Мы часто используем
setState
, которое внутри себя вызывает функциюmarkNeedsBuild
. Эта функция ставит меткуdirty
наelement
, после чего в следующей кадреWidgetsBinding.drawFrame
производит перерисовку. Таким образом,setState
не действует мгновенно.
На самом деле, как подписка, так и преобразование потока в Dart уже имеют аналогичные эффекты RxJS. Однако, чтобы сделать использование этих функций более удобным для пользователей RxJS, ReactiveX разработал библиотеку rxdart
, которая предоставляет знакомый опыт, как показано на следующей диаграмме:
В rxdart
Observable
представляет собой Stream
, а Subject
наследуется от Observable
и также является Stream
. При этом Subject
реализует интерфейс StreamController
, поэтому он также выполняет роль контроллера.
Вот пример простого кода rxdart
, где скрыты требования к пониманию таких концепций, как StreamSubscription
и StreamSink
. Это делает его удобным для разработчиков, имеющих опыт работы с RxJS.
final subject = PublishSubject<String>();
subject.stream.listen(observerA);
subject.add("AAAA1");
subject.add("AAAA2");
subject.stream.listen(observerB);
subject.add("BBBB1");
subject.close();
Давайте кратко проанализируем этот код:
PublishSubject
фактически создаёт объект типа StreamController<T>.broadcast
;add
или addStream
в конечном итоге передаются методу StreamController.add
;onListen
, регистрируется слушатель на объекте StreamController
.rxdart
мы получаем Observable
, который представляет собой тот же самый объект, то есть сам PublishSubject
. Все последующие преобразования также основаны на объекте stream
, переданном при создании, например: @override
Observable<S> asyncMap<S>(FutureOr<S> convert(T value)) =>
Observable<S>(_stream.asyncMap(convert));
Это показывает, что rxdart
концептуально преобразует Stream
в более знакомые нам объекты и операторы, что объясняет возможность использования rxdart
непосредственно в StreamBuilder
.
Так вот, вы теперь полностью понимаете работу с Stream
в Flutter?
Наконец, завершается одиннадцатая глава! (///▽///)
Открытый проект на Flutter: https://github.com/CarGuo/GSYGithubAppFlutter
Много примеров учебного проекта на Flutter: https://github.com/CarGuo/GSYFlutterDemo
**Открытая электронная книга по Flutter: https://github.com/CarGuo/GSYFlutterBook**#### Рекомендация полного открытого проекта:
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )