Flow kotlin что это
kotlinx.coroutines 1.4.0: представляем StateFlow и SharedFlow
В преддверии старта курса «Kotlin Backend Developer» приглашаем всех желающих записаться на открытый урок по теме «Kotlin multiplatform: front/back на одном языке».
А сейчас предлагаем к прочтению традиционный перевод статьи.
API-интерфейс Flow в Kotlin предназначен для асинхронной обработки потока данных, который выполняется последовательно. По сути, Flow — это последовательность. В Kotlin с помощью Flow можно выполнять такие же операции, как с помощью Sequences: преобразовывать, фильтровать, сопоставлять и т. д. Основное различие между Sequences и Flow в Kotlin заключается в том, что Flow позволяет приостанавливать выполнение.
Во Flow приостановку можно выполнить в любом месте: в функции сборки или в любом из операторов, предоставляемых API-интерфейсом Flow. Приостановка во Flow работает как контроль backpressure, при этом вам не нужно ничего делать — всю работу выполняет компилятор.
Интерфейс Flow так же прост в использовании, как и Sequences. Однако Flow несет в себе все преимущества реактивного программирования, в котором не требуется управлять backpressure.
Flow является удобным API-интерфейсом, однако он не обеспечивает возможность управлять состоянием, которая требуется в некоторых случаях. Например, у процесса может быть несколько промежуточных и одно конечное состояние. Примером такого процесса является загрузка файла: процесс загрузки длится некоторое время и мы можем определить такие промежуточные состояния процесса, как «Запущен» и «Выполняется», и конечные состояния «Успешно» и «Сбой». В этом случае нам интересны только результаты: успешно была выполнена загрузка или нет.
При реализации описанного выше сценария с помощью API-интерфейса Flow нам нужно публиковать изменения состояния для наблюдателей, которые могут совершать те или иные действия исходя из этих изменений. Ранее мы всегда рекомендовали использовать для этого ConflatedBroadcastChannel. Однако ConflatedBroadcastChannel является слишком сложным для этой задачи. Кроме того, имеются некоторые логические нестыковки, которые возникают при использовании каналов для управления состоянием. Например, канал может быть закрыт или отменен. Это не очень хорошо сочетается с управлением состоянием, поскольку состояние-то нельзя отменить!
Мы решили отказаться от ConflatedBroadcastChannel и вместо этого внедрить пару новых API-интерфейсов — StateFlow и SharedFlow!
StateFlow
StateFlow имеет две разновидности: StateFlow и MutableStateFlow:
Состояние представлено значением. Любое изменение значения будет отражено во всех коллекторах потока путем выдачи значения с изменениями состояния.
Давайте посмотрим, как можно реализовать описанный ранее пример с загрузкой файла с помощью нового API-интерфейса.
Как видите, никаких API для работы с каналами здесь не используется. Мы не запускаем никаких дополнительных корутин, и нет нужды изучать какие-либо новые концепции. Только простой императивный код, в котором для описания реализации используется переменная, а клиентам предоставляется state как Flow.
SharedFlow
Что, если вместо управления состоянием нам потребуется управлять рядом обновлений состояния, то есть потоком событий? Для таких случаев у нас есть новый API-интерфейс под названием SharedFlow. Этот API-интерфейс подходит для обработки ряда выдаваемых значений, например для вычисления скользящего среднего из потока данных.
Общий поток — это просто поток, где есть кэш повтора, который можно использовать в качестве атомарного моментального снимка. Каждый новый подписчик сначала получает значения из кэша повтора, а затем получает новые выданные значения.
Вместе с SharedFlow мы также предоставляем MutableSharedFlow.
С помощью MutableSharedFlow можно выдавать значения из приостанавливающего и неприостанавливающего контекста. Как можно заключить из имени, кэш повтора MutableSharedFlow можно сбрасывать. Кроме того, он предоставляет количество своих коллекторов как поток.
Реализовать собственный MutableSharedFlow может быть довольно сложно. Поэтому мы предоставляем несколько удобных методов для работы с SharedFlow.
Чтобы инициализировать экземпляр MutableSharedFlow с помощью приведенной выше функции, можно указать количество значений, которые повторяются для новых подписчиков, емкость буфера, а также действия, которые следует выполнять в случае заполнения буфера. Например, если буфер заполнится, можно приостановить поток.
Резюме
Новые API-интерфейсы StateFlow и SharedFlow обеспечивают более элегантный способ работы с состоянием в программах на Kotlin с корутинами. Они намного проще и понятнее, чем использование широковещательных каналов для публикации изменений состояния из контекста потока.
Попробуйте новые API, испытайте их на прочность и отправьте нам свой отзыв!
Подробные сведения о нововведениях в Kotlin Coroutines можно узнать, посмотрев выступление Всеволода Толстопятова на конференции Kotlin 1.4 Online Event.
Корутинная эволюция в Kotlin. Чем отличаются Channels, Broadcast channels, Shared flows, State flows
Эта публикация — перевод поста Романа Елизарова «Shared flows, broadcast channels». Опубликовано с одобрения автора оригинала. Примечания переводчика выделены курсивом.
Давным-давно в Kotlin были представлены корутины, одной из особенностей которых является легковесность (создание корутин дешевле, чем с запуск новых Threads). Мы можем запускать несколько корутин, и нам нужен способ взаимодействия между ними избегая “mutable shared state” (неконсистентности данных при записи и чтении из разных корутин).
Для этого был придуман Channel как примитив для связи между корутинами. Channels — отличное изобретение. Они поддерживают связь между корутинами «один к одному», «один ко многим», «многие к одному» и «многие ко многим», но каждое значение, отправляемое в Channel, принимается один раз (в одной из корутин с запущенной подпиской).
Вы не можете использовать Channel для распространения событий или обновлений состояния так, чтобы несколько подписчиков могли независимо получать и реагировать на них.
Для решения этой проблемы был дополнительно добавлен интерфейс BroadcastChannel, хранящий состояние, доступное каждому подписчику, и его реализацию — ConflatedBroadcastChannel. Некоторое время они хорошо выполняли свою задачу, но их развитие оказалось тупиковым. Начиная с версии kotlinx-coroutines 1.4, мы представили новое решение — shared flows. Это была предыстория, а теперь поехали!
Flows are simple
В ранних версиях библиотеки у нас были только Channels, и мы пытались реализовать различные преобразования последовательностей данных как функции, которые принимают один Channel в качестве аргумента и в результате возвращают другой Channel. Это означает, что, например, оператор filter будет работать в своей собственной корутине.
Производительность такого подхода была далека от идеала, особенно по сравнению с простым написанием оператора if. И это неудивительно, потому что Channel — это примитив синхронизации доступа к данным (в общем случае из разных потоков). Любой Channel, даже реализация, оптимизированная для одного producer и одного consumer, должен поддерживать консистентный доступ к данным из разных потоков, а значит между ними требуется синхронизация, которая в современных многоядерных системах обходится дорого. Когда вы начинаете строить архитектуру приложения на основе асинхронных потоков данных, почти сразу возникает необходимость в преобразованиях данных, приходящих от producer. Тяжеловесность решения с каждым преобразованием возрастает.
Первым решением это проблемы можно назвать Flow, который позволяет эффективно добавлять операторы преобразования. По умолчанию данные передаются, преобразуются и собираются в одной корутине без необходимости в синхронизации.
Синхронизация возникает только в том случае, когда producer и consumer работают в разных корутинах (при этом emit и filter из примера на картинке будут работать в одной корутине, что лучше ситуации, описанной двумя абзацами выше).
Flows are cold
Однако вычисления данных для Flow обычно холодные (cold) — Flow, созданный билдером flow <…>, является пассивной сущностью. Рассмотрим следующий код:
Сами Flow не начинают вычисляться и не хранят состояния пока на них не подпишется collector. Каждая корутина с collector-ом создает новый экземпляр кода, упаковывающего данные во Flow. Статья “Cold flow, hot channels” описывает причины, лежащие в основе такой работы Flows, и показывает примеры использования, для которых они подходят лучше, чем Channels.
Но что насчет таких событий, как действия пользователя, события из операционной системы от датчиков устройства или о изменении состояния? Они появляются независимо от того, есть ли сейчас какой-либо collector, который в них потенциально заинтересован. Они также должны поддерживать нескольких collectors внутри приложения. Это так называемые горячие источники данных…
Shared flows
Вот здесь-то и появляется концепция SharedFlow. Shared Flow существует независимо от того, есть-ли сейчас collectors или нет. Collector у SharedFlow называется подписчиком (observer). Все observers получают одинаковую последовательность значений. Он работает как BroadcastChannel, но эффективнее и делает концепцию BroadcastChannel устаревшей.
SharedFlow — это легковесная широковещательный event bus, который вы можете создать и использовать в своей архитектуре приложения.
Он имеет параметры для настройки, такие как количество старых событий, которые нужно сохранить и воспроизвести для новых подписчиков, и extraBufferCapacity, чтобы настроить поведение в случае быстрых emmiters и медленных observers.
Все observers SharedFlow асинхронно собирают данные в своем собственном coroutine context. Emmiter не ждет, пока подписчики закончат обработку данных. Однако, когда общий буфер SharedFlow заполнен, emmiter приостанавливается, пока в буфере не появится место. Альтернативные стратегии работы с переполненным буфером настраиваются параметром BufferOverlow.
State flows
Частый способ справиться с переполнением буфера — отбрасывать старые данные и сохранять только новые. В частности, при единичном размере буфера мы имеем дело со state variable. Это настолько распространенный вариант использования, что у него есть собственный специализированный тип — StateFlow. Он служит заменой ConflatedBroadcastChannel, который также устарел.
Смотрите на StateFlow как на изменяемую переменную, на изменения которой можно подписаться. Его последнее значение всегда доступно, и, фактически, последнее значение — единственное, что важно для observers.
Разница в производительности StateFlow с Channel и обычным Flow становится очевидной — StateFlow обновляет состояние без выделения памяти.
Что будет с Channels
По мере того, как разные виды Flow заменяют разные виды BroadcastChannel, возникает популярный вопрос: что произойдет с Channels? Они останутся в следующих версиях языка по многим причинам. Одна из причин заключается в том, что Channels представляют из себя низкоуровневые примитивы, которые используются для реализации многих сложных операторов, на которых базируется Flow.
Но у Channels также есть свои варианты использования. Channels могут быть использованы для обработки событий, которые должны быть обработаны ровно один раз. Это происходит в проекте с типом события, которое обычно имеет одного подписчика, но периодически (при запуске или во время некоторой реконфигурации) подписчиков вообще нет, и есть требование, чтобы все опубликованные события сохранялись до тех пор, пока не появился подписчик.
В SharedFlow события транслируются неизвестному количеству (⩾0) подписчиков. При отсутствии подписчика любое опубликованное событие немедленно удаляется. Это шаблон проектирования можно использовать для событий, которые должны обрабатываться немедленно или не обрабатываться вообще.
С помощью Channel каждое событие доставляется только одному подписчику. Попытка опубликовать событие без подписчиков будет приостановлена, как только буфер канала заполнится, ожидая появления подписчика. По умолчанию опубликованные события не удаляются.
Как безболезненно мигрировать с RxJava на Kotlin Coroutines+Flow
Для выполнения асинхронных операций в Android-приложениях, где нужна загрузка и обработка любых данных, долгое время использовали RxJava — и о том, как перейти на RxJava 3, мы уже писали в нашем блоге. Сейчас на смену фреймворку постепенно приходят инструменты Kotlin — Coroutines+Flow. Актуальность этой связки подтверждается тем, что Google сделал Kotlin приоритетным языком для Android-разработки.
Корутины позволяют тратить меньше системных ресурсов, чем RxJava. Кроме того, поскольку они являются частью Kotlin, Android предоставляет удобные инструменты для работы с ними — например, viewModelScope и lifecycleScope. В этой статье мы рассмотрим use cases, распространенные в Rx Java, и то, какие возможности вы получите при переходе на Flow.
Переключение потоков и создание
Для начала сравним, как происходит переключение потоков в RxJava и Flow.
RxJava
При этом сложение выполняется в IO шедулере, умножение — в computation шедулере, а подписка — в single.
Повторим этот же пример для Flow:
В результате можно отметить следующее:
1) observeOn переключает поток, в котором будут выполняться последующие операторы, а flowOn определяет диспетчер выполнения для предыдущих операторов.
2) Метод collect() будет выполняться в том же диспетчере, что и launch, а emit данных будет происходить в Dispatchers.IO. Метод subscribe() будет выполняться в Schedulers.single(), потому что идет после него.
3) Flow также имеет стандартные методы создания flow:
5) В RxJava нужно явно вызывать emitter.onComplete(). В Flow метод onCompletion() будет автоматически вызываться после окончания блока flow < >.
6) При попытке сделать эмит данных из другого диспетчера, с помощью withContext, например, приведет к ошибке.
Exception in thread «main» java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine
but emission happened in [DispatchedCoroutine
Please refer to ‘flow’ documentation or use ‘flowOn’ instead
Подписка и отписка на источник данных
В RxJava метод Observable.subscribe() возвращает объект Disposable. Он служит для отписки от источника данных, когда новые порции данных от текущего источника уже не нужны. Важно иметь доступ к этому объекту, чтобы вовремя отписываться и избегать утечек.
Для Flow ситуация схожа: так как метод collect() — suspend метод, он может быть запущен только внутри корутины.
Следовательно, отписка от flow происходит в момент отмены Job корутины:
В случае же использования viewModelScope об этом заботиться не нужно: все корутины, запущенные в рамках этого scope, будут отменены, когда ViewModel будет очищена, т.е. вызовется метод ViewModel.onCleared(). Для lifecycleScope ситуация аналогична: запущенные в его рамках корутины будут отменены, когда соответствующий Lifecycle будет уничтожен.
Обработка ошибок
В RxJava есть метод onError(), который будет вызван в случае возникновения какой-либо ошибки и на вход получит данные о ней. В Flow тоже есть такой метод, он называется catch(). Рассмотрим следующий пример.
RxJava
При возникновении ArithmeticException будет срабатывать onError(), и информация об ошибке будет напечатана в консоль.
Этот же пример, переписанный на flow, можно представить с помощью catch < >, который под капотом имеет вид привычной конструкции try/catch.
Операторы RxJava onErrorResumeNext и onErrorReturn можно представить в виде:
В Flow, как и в RxJava, есть операторы retry и retryWhen, позволяющие повторить операции в случае возникновения ошибки.
Операторы
Рассмотрим наиболее распространенные операторы RxJava и найдем их аналоги из Flow.
Подробнее с операторами Flow можно познакомиться здесь.
Некоторые операторы Flow (например, merge) помечены как экспериментальные или отсутствующие. Их api может измениться (как, например, для flatMapMerge), или их могут задепрекейтить, то есть они станут недоступны. Это важно помнить при работе с Flow. При этом отсутствие некоторых операторов компенсируется тем, что flow всегда можно собрать в список и работать уже с ним. В стандартной библиотеке Kotlin есть множество функций для работы со списками.
Также у Flow отсутствуют отдельные операторы троттлинга и другие операторы, которые работают с временными промежутками. Это можно объяснить «молодостью» библиотеки, а также тем, что, согласно словам разработчика Kotlin Романа Елизарова, команда Jetbrains не планирует «раздувать» библиотеку множеством операторов, оставляя разработчикам возможность компоновать нужные операторы самостоятельно, предоставляя им удобные «блоки» для сборки.
Backpressure
Backpressure – это ситуация, когда производитель данных выдает элементы подписчику быстрее, чем тот их может обработать. Готовые данные, в ожидании того, как подписчик сможет их обработать, складываются в буфер Observable. Проблема такого подхода в том, что буфер может переполниться, вызвав OutOfMemoryError.
В ситуациях, когда возможен backpressure, для Observable нужно применять различные механизмы для предотвращения ошибки MissingBackpressureException.
После появления в RxJava 2 Flowable произошло разделение на источники данных с поддержкой backpressure (Flowable) и Observable, которые теперь не поддерживают backpressure. При работе с RxJava требуется правильно выбрать тип источника данных для корректной работы с ним.
У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava.
Hot streams
«Горячий» источник рассылает новые порции данных по мере их появления, вне зависимости от того, есть ли активные подписчики. Новые подписчики получат не всю сгенерированную последовательность данных с самого начала, а только те данные, что были сгенерированы после подписки. В этом отличие горячих и холодных источников: холодные не начинают генерацию данных, пока нет хотя бы одного подписчика, а новые подписчики получают всю последовательность.
Горячие источники данных полезны, например, при подписке на события от View: при этом нужно получать только новые события, нет смысла обрабатывать заново все пользовательские действия. Также мы не можем запретить пользователю нажимать на экран до тех пор, пока мы не будем готовы обрабатывать его действия. Для обработки событий от View в реактивном виде существует библиотека RxBinding, которая имеет поддержку RxJava3.
В Kotlin Flow есть свои возможности для работы с горячим flow, который производит данные вне зависимости от наличия подписчиков и выдает новые данные одновременно всем имеющимся подписчикам. Для этого можно использовать Channel, SharedFlow, чтобы отправлять новые порции данных одновременно всем подписанным сборщикам.
Кстати, для Flow тоже есть отличная библиотека для обработки событий от View – Corbind. В ней есть поддержка большинства Android-виджетов.
RxJava Subjects
Subject в RxJava – это специальный элемент, который одновременно является источником данных и подписчиком. Он может подписаться на один или несколько источников данных, получать от них порции данных и отдавать их своим подписчикам.
Аналог Subject в Flow – это Channel, в частности, BroadcastChannel. Существуют различные варианты их реализации: с буферизацией данных (ArrayBroadcastChannel), с хранением только последнего элемента (ConflatedBroadcastChannel). Но важно помнить, что, так как библиотека Kotlin Flow молода и постоянно развивается, ее части могут меняться. Так получилось и в случае с BroadcastChannel: в своей статье Роман Елизаров сообщил, что, начиная с версии 1.4 будет предложено лучшее решение – shared flows, а BroadcastChannel ждет deprecation в ближайшем будущем.
Заключение
В данной статье мы сравнили RxJava и Kotlin Flow, рассмотрели их схожие моменты и аналоги частей RxJava в Flow. При этом Flow хорошо подойдет в качестве инструмента для обработки событий в реактивном стиле в проектах на Kotlin, использующих паттерн MVVM: благодаря viewModelScope и lifecycleScope запускать корутины можно быстро и удобно, не боясь утечек. В связи с тем, что популярность Kotlin и его инструментов растет, а также этот язык является приоритетным для разработки Android-приложений, в ближайшие годы связка Coroutines+Flow может заменить RxJava – скорее всего, новые проекты будут написаны именно с помощью нее. На первый взгляд, миграция с RxJava на Flow не представляется болезненной, потому что в обоих случаях есть похожие операторы и разделение общей концепции Reactive streams. Кроме того, Kotlin имеет достаточно большое комьюнити, которое постоянно развивается и помогает разработчикам в изучении новых возможностей.
А вы готовы мигрировать на корутины? Приглашаем поделиться мнениями!
Android, Kotlin Flow во ViewModel — все сложно
Загрузка данных для UI в приложении Android может быть непростой задачей. Нам надо учитывать жизненный цикл компонентов Android и изменения конфигурации, потому что все это приводит к уничтожению и восстановлению Activity.
Отдельные экраны приложения постоянно переключаются между активным и неактивным состоянием, когда пользователь ходит вперед назад по экранам, переключается с одного приложения на другое, блокирует и разблокирует экран. Каждый компонент должен выполнять активную работу в нужном состоянии экрана.
Изменения конфигурации происходят в случаях:
при изменении ориентации экрана;
когда приложение переключается в мульти-оконный режим;
при переключении визуальной темы смартфона;
Повышаем эффективность
Для улучшения пользовательского опыта, эффективная загрузка данных во Fragment и Activity должна учитывать следующие правила:
Кеширование: актуальные загруженные данные, должны быть доставлены немедленно и не загружаться повторно. В частности, когда существующий Fragment или Activity становятся видимыми снова или Activity пересоздается после изменения конфигурации.
Избегать фоновую работу: когда Activity или Fragment скрываются (состояние изменяется со STARTED на STOPPED ), любая работа по загрузке внешних данных должна вставать на паузу или отменяться для экономии ресурсов. Это особенно важно для бесконeчных потоков данных, как например геолокация или периодическое обновление каких-либо данных.
Работа не прерывается при изменении конфигурации: это исключение из правила #2, Во время смены конфигурации, текущая Activity заменяется новым экземпляром с сохранением состояния, поэтому отменять текущую работу в старом экземпляре Activity и перезапускать работу при создании нового экземпляра Activiti было бы контр продуктивно.
Современный подход: ViewModel и LiveData
В 2017 Google зарелизила первый набор библиотек Architecture Components<:target="_blank">, там появились ViewModel и LiveData компоненты, которые помогают разработчикам эффективно работать с данными, поддерживая все 3 правила выше.
ViewModel<:target="_blank">, сохраняет данные при изменении конфигурации, используется для достижения правил #1 и #3: операции загрузки выполняются непрерывно во время изменения конфигурации, полученные данные могут кешироваться и совместно использоваться одним, или несколькими Fragment или Activity.
LiveData<:target="_blank">, простой контейнер данных, поддерживающий подписку на изменения и учитывающий жизненный цикл компонентов Android. Новые данные отправляются наблюдателям только когда их жизненный цикл в состоянии не менее STARTED (видимый), наблюдатели отписываются автоматически, что избавляет от утечек памяти. LiveData используется для достижения правил #1 и #2: кеширует последнее значение данных и это значение автоматически отправляется новым наблюдателям. В дополнение, LiveData уведомляет, когда в состоянии STARTED больше нет наблюдателей и можно избежать ненужной фоновой работы.
ViewModel scope.
Опытный разработчик, как правило уже знаком со всем этим. Но важно вспомнить все возможности, чтобы сравнить их с Flow.
LiveData + Coroutines
LiveData довольна ограничена по сравнению с реактивными решениями (например RxJava):
передает и берет данные только на главном потоке (main thread). Интересно, что оператор map выполняет трансформацию объектов на главном потоке и не может использоваться на I/O потоках или для тяжелых вычислений на CPU. Для этого используется оператор switchMap совместно с ручным запуском асинхронной операции в нужном потоке, даже если в основной поток надо отправить единственное значение.
Чтобы преодолеть эти ограничения, библиотеки Jetpack дают специальные «мосты» из LiveData для других технологий, таких как RxJava или Kotlin корутины.
Самый простой и наиболее элегантный из них, по мнению автора, это LiveData coroutine builder, подключается через androidx.lifecycle:lifecycle-livedata-ktx Gradle зависимость. Этот функционал похож на flow <> builder function из библиотеки Kotlin Coroutines и позволяет грамотно обернуть корутину в экземпляр LiveData:
Вы можете использовать все силу корутин, их контекстов для написания асинхронного кода в синхронной манере без колбеков, автоматически переключаясь между нужными потоками;
Новые значения отправляются наблюдателям LiveData в главном потоке через suspending методы emit() или emitSource() из корутины;
Корутина использует специальную область видимости (scope) и жизненный цикл привязанный к экземпляру LiveData. Когда LiveData становится неактивной (это значит, что больше нет наблюдателей в состоянии STARTED ), то корутина будет автоматически отменена, работает правило #2;
В реальности отмена корутины будет задержана на 5 секунд после того как LiveData станет неактивной для правильной обработки смены конфигурации: если новая Activity немедленно заменит старую и LiveData станет активной до срабатывания задержки, то отмена корутины не будет и цена перезапуска будет нулевой (правило #3);
если пользователь вернется назад на экран и LiveData станет активной, то корутина автоматически перезапустится, но только если она была отменена до завершения. Как только корутина завершится, она больше не будет перезапускаться, те же данные не будут загружаться дважды, если входные параметры не изменились (правило #1).
Вывод: используйте LiveData coroutines builder, это дает простой код и лучшее поведение по умолчанию.
А что если, репозиторий возвращает поток значений в форме Flow (вместо suspend функций с единственным значением)? В этом случае также возможно сконвертировать поток в LiveData и использовать все преимущества перечисленные выше, используя asLiveData() функцию-расширение.
Внутри asLiveData() также использует LiveData coroutines builder для создания простой корутины обрабатывающий Flow пока LiveData активна:
Но давайте остановимся ненадолго – что такое Flow и можно ли им полностью заменить LiveData?
Введение в Kotlin Flow
Flow это класс из библиотеки Kotlin Coroutines представленной в 2019 году, класс является потоком значений, вычисляемый асинхронно. Концептуально похож на RxJava Observable, но основан на корутинах и имеет более простой API.
Изначально были доступны только холодные потоки (cold flows): потоки без состояний, которые создаются по требованию каждый раз, когда наблюдатель начинает собирать значения в области видимости (scope) корутины. Каждый наблюдатель получает собственную последовательность значений, они не общие.
SharedFlow публикует данные, которые распространяются всем слушателям. Класс может управлять дополнительным кешем и/или буфером и фактически заменяет все варианты устаревшего BroadcastChannel API.
StateFlow и LiveData много общего:
Эти классы наблюдаемые (observable)
Они хранят и распространяют последнее значение любому количеству наблюдателей
Но есть и важные отличия:
StateFlow не учитывает жизненный цикл (not lifecycle-aware). Однако, Flow может быть использовано в корутине с учетом жизненного цикла, это требует некоторого кода для настройки без использования LiveData (детали ниже).
Наблюдение за LiveData против сбора данных в Flow
Организовать наблюдение за экземпляром LiveData довольно просто:
Эта операция однократная и дальше LiveData берет на себя синхронизацию потока данных с жизненным циклом наблюдателей.
Аналогичная операция для Flow называется сбором (collecting) и сбор должен выполняться в корутине. Из-за того, что Flow не знает ничего о жизненном цикле, ответственность за жизненный цикл возлагают на корутину, работающую с Flow.
Чтобы создать корутину для работы с Flow, учитывающую жизненный цикл Activity/Fragment (запускать работу с данными при состоянии STARTED и автоматически отменять эту работу при уничтожении):
Однако, есть и другие виды Flow:
горячие потоки, которые всегда активные и посылают результаты всем текущим наблюдателями (включая приостановленные);
холодные потоки с колбэком или поддержкой канала, которые подписываются на активный источник данных, когда сбор данных запускается и останавливает подписку, когда сбор данных отменяется (не приостанавливается).
В этих случаях, основной производитель Flow будет оставаться активным даже когда корутина будет приостановлена, сохраняя (в буфер) новые результаты в фоновом режиме. Ресурсы расходуются впустую, правило #2 нарушается.
Как видно, эффективно и безопасно работать с данными в Actvivity или Fragment проще с помощью LiveData.
Заменяем LiveData на StateFlow во ViewModel
Давайте-ка вернемся к ViewModel. Мы убедились, что это простой и эффективный способ работы с данными в асинхронном режиме:
Похоже, что мы сумели выполнить 3 наших правила и воспроизвести почти такое же поведение как у LiveData с использованием более сложного кода.
Это проблематично? Для простых случаев нет, Activity или Fragment могут сделать дополнительную проверку, чтобы не делать лишнее обновление UI, если данные не изменились.
Проблемы возникают в более сложных, реальных случаях, как мы увидим в следующем разделе.
Использование StateFlow как триггер во ViewModel
MutableLiveData для этого работает очень хорошо:
При обновлении, оператор switchMap() подключает наблюдателей к новому источнику LiveData, заменяя старый. И так как в примере выше, используется LiveData coroutine builder, старая LiveData автоматически отменит связанную с ним корутину через 5 секунд после отключения от своих наблюдателей. Работа с устаревшими данными прекращается с небольшой задержкой.
Этот код достаточно прост и соблюдает все правила по эффективности выше.
Наивный подход
API MutableLiveData и MutableLiveData выглядят очень похоже, код триггера выглядит почти одинаково. Самое большое различие это использование mapLatest, это эквивалент функции switchMap() в LiveData для возвращения единственного значения (для возвращения нескольких значений, надо использовать flatMapLatest).
Вроде выглядит неплохо. Однако здесь всплывает основная проблема: так как StateFlow не поддерживает версионность, триггер отправит повторно последнее значение, когда Flow перезапустится. Это случается каждый раз, когда Activity/Frgament становится видимым опять, после того, как был невидимым более 5 секунд.
Триггер выдает значение повторно, mapLatest() снова запускается, еще раз дергается метод в репозитории с теми же аргументами, хотя результат уже был получен и обработан. Правило #1 не работает: актуальные данные не должны загружаться повторно.
Чиним повторную отправку последнего сообщения
Вопросы приходящие на ум: должны ли мы предотвращать повторную отправку и как это сделать? StateFlow уже позаботился об этом внутри коллекции flow и оператор distinctUntilChanged() делает то же самое для других типов flow. При этом нет стандартного оператора для отмены повторной отправки среди множества коллекций одного и того же flow, потому что flow коллекции должны быть самодостаточные. Это главная разница с LiveData.
К сожалению, логика в коде выше несовершенна и перестанет работать, как задумано, когда трансформация flow будет отменена до завершения.
TL;DR Использование StateFlow в качестве триггера в ViewModel приводит к дублирующейся работе каждый раз когда Activity/Fragment становится видимой повторно и здесь нет простого пути избежать такого поведения.
Выводы
Мои рекомендации на основании примеров выше:
Продолжайте использовать LiveData в вашем Android-UI слое и ViewModels, особенно в качестве триггера. Используйте это везде, для передачи данных в Activity/Fragment: код будет простым и эффективным;
LiveData coroutine builder ваш друг и может заменить Flows во ViewModels в большинстве случаев;
Вы можете использовать мощь Flow операторов при необходимости, конвертируя Flows в LiveData;
Теперь вы знаете все компромиcсы при переходе от LiveData к подходу «полностью на Flow» в вашем Android-UI слое.