Coroutines are great. Period. When they were introduced along with Kotlin 1.3 we immediately knew that they were going to be a game changer. And they are indeed. Easy to learn. Easy to understand. And so easy to use. When coroutines 1.0.0
were released, many anticipated that it would be the end for RxJava on Android.
It turned out that despite coroutines being great, RxJava was still being picked for many new projects. Why? Probably one of the reasons was that it was well known, sure. But there was also a more important reason: coroutines still couldn’t be used in some use cases where RxJava could.
RxJava is not only Single
In the majority of the apps RxJava was used mostly for API requests handling. We all know that – make a REST API call with Retrofit, return RxJava Single, apply proper schedulers, and then map the response. In this scenario, coroutines are a great fit. We just need to change our endpoint methods to suspending ones, and we’re more or less ready to go. But of course, RxJava is much more than that, right? The real strength of reactive programming is processing of ongoing data streams.
Example 1: we have a few-step data upload process in the app (done in a background thread), and the update at each step should cause real-time update of the UI. Seems like a great case for good old Observable
and its onNext()
. This scenario could not be handled with plain coroutines, as suspend
functions can return only a single value. So we lacked a proper tool. It all changed with the introduction of coroutines 1.2.0
, when we were given Kotlin Flow. A mighty Kotlin Flow that handled (almost) all cases that previously could be handled only with RxJava, and what’s the most important – it was handling them using coroutines.
If you have any experience with both RxJava and coroutines, switching to Flows should be really seamless, as their syntax and logic is using both aforementioned concepts. Let’s see the example code for Example1.
Using RxJava:
fun uploadData(): Observable<UpdateStep> = Observable.create<UpdateStep> {
it.onNext(UpdateStep.Step1)
doUpdateStep1()
it.onNext(UpdateStep.Step2)
doUpdateStep2()
it.onNext(UpdateStep.Step3)
doUpdateStep3()
it.onNext(UpdateStep.Success)
}.onErrorReturn {
UpdateStep.Error(it)
}
And using Flow:
fun uploadData(): Flow<UpdateStep> = flow {
emit(UpdateStep.Step1)
doUpdateStep1()
emit(UpdateStep.Step2)
doUpdateStep2()
emit(UpdateStep.Step3)
doUpdateStep3()
emit(UpdateStep.Success)
}.catch {
emit(UpdateStep.Error(it))
}
Now we just need to subscribe()
our Observable
or collect()
our Flow
, and we’ve got all we need. Easy peasy. This way coroutines made another step to replace RxJava. However, that wasn’t the last step.
You’re hot then you’re cold
For anyone working with RxJava a bit more extensively, the concept of cold/hot streams is well known. Streams created by Observable.create()
or Flowable.just()
are cold, which means that they start executing the code and emitting values only when something subscribes to them. This is the case that can be also handled using flow{}
or flowOf()
- flows created with these methods are called cold flows (no surprise here).
The last big RxJava use case that coroutines were still missing was handling hot flows. In reactive programming hot streams are the ones that don’t have to be observed by anything to emit values. This makes them a perfect candidate eg. for handling BLE connection state - app knows about connection state changes at all times, and informs observers if there are any, or as soon as they arrive (Example 2). Hot streams in RxJava can be created using eg. PublishSubject
and BehaviorSubject
. Here is how we can use them to implement Example2:
class BleManager : BleConnectionObserver {
private val _bleConnectionStateSubject = BehaviorSubject.createDefault(ConnectionState.Disconnected)
val bleConnectionStateObservable: Observable<ConnectionState> = _bleConnectionStateSubject.hide()
val currentConnectionState: ConnectionState
get() = _bleConnectionStateSubject.value
override fun onDeviceConnected(device: BluetoothDevice) {
/* ... */
_bleConnectionStateSubject.onNext(ConnectionState.Connected)
}
override fun onDeviceDisconnected() {
/* ... */
_bleConnectionStateSubject.onNext(ConnectionState.Disconnected)
}
}
For a long time, this part of functionality was missing in stable coroutines API, and that changed with the release of coroutines 1.4.0
in October 2020. But let’s take a step back. Even before Kotlin Flow was released, we heard of a concept called channels
. It was designed to solve the Producer-Consumer problem - of course using coroutines. Channel
interface, which is offering suspendable send()
and receive()
methods, seemed to be quite straightforward but in reality is a bit more complex.
It is using independent coroutines on both sides of the communication that introduces concurrency which has to be handled by us, and may lead to some issues with concurrent access to the same resources. It also wasn’t helpful that channels API was still not fully stable – there were (and still are) many experimental/preview methods there. Lucky for us, the Kotlin team came up with a concept of SharedFlow and StateFlow that doesn’t mess with multiple coroutines and makes our data flow much cleaner. If you want to read more about channels-flow relation, take a look at this great article by Roman Elizarov.
Now to the best part. SharedFlow
is an equivalent of RxJava’s PublishSubject
. It allows us to create hot flows and specify strategies for handling backpressure and replay. StateFlow
is a special case of SharedFlow
which is an equivalent of RxJava’s BehaviorSubject
. It creates a hot flow that persists its latest value – which makes it a perfect candidate for handling BLE connection state from Example2.
class BleManager : BleConnectionObserver {
private val _bleConnectionStateFlow = MutableStateFlow(ConnectionState.Disconnected)
val bleConnectionStateFlow: StateFlow<ConnectionState> = _bleConnectionStateFlow
override fun onDeviceConnected(device: BluetoothDevice) {
/* ... */
_bleConnectionStateFlow.value = ConnectionState.Connected
}
override fun onDeviceDisconnected() {
/* ... */
_bleConnectionStateFlow.value = ConnectionState.Disconnected
}
}
It’s worth noticing that both Shared and State flows can be used in mutable form and be exposed as immutables – this concept is well known for anyone using LiveData
. In RxJava we had to hide BehaviorSubject behind plain Observable
and expose current state separately. With flows, the immutable form of StateFlow
still allows us to get its latest value.
To use SharedFlow
and StateFlow
correctly we need to remember a few things:
▸ StateFlow
is conflated, which means that if we update its value with a new value that is equal to the previous one, the update will not be propagated.
▸ SharedFlow
needs to have a proper replay/buffer configuration. In most cases, we just need to make sure that our producer will not get suspended if there’s no consumer, eg. by setting replay or buffer capacity to a positive value or setting bufferOverflow
to something else than SUSPEND.
▸ StateFlow
and SharedFlow
will never complete, so you cannot depend on their onCompletionCallback
, and need to remember that collecting them will never finish normally (neither will the coroutine in which we called their collect
).
Ok, cool, but do we even need that?
For me, SharedFlow
and StateFlow
were real game changers, and only when they were introduced I decided that I can fully quit using RxJava. What is more, recently StateFlow
also received alpha support for working with data binding. And what is great is that using StateFlow
in data binding is lifecycle aware – just like LiveData
! No more updating UI when it’s not ready ❤️ However, you need to remember that collecting SharedFlow
in the Fragment using lifecycleScope.launch{}
is not lifecycle aware – you need to use launchWhenStarted
or cancel the job when the app goes to background. This topic has been well discussed in this article.
Some of you may say: “Why do we even need SharedFlow? Most of the described use cases can be handled with LiveData!”. And then I’ll ask: “Have you ever heard of clean architecture?" 😉 Sure, when you need to observe changes from ViewModel in the Fragment, LiveData
is absolutely enough, but sometimes you need to observe events in different modules, or in general – somewhere behind the abstraction layer. And LiveData
unfortunately won’t work in this case because it is an Android class, and our domain module (or in general - base abstraction layer) should be pure Kotlin.
This is where SharedFlow
purely shines, because it is a Kotlin class. We can expose it from our database or API without tight coupling our domain code to Android classes. Sounds like a really nice approach 😀 What is more, it seems that SharedFlow
can be used as an alternative solution for the SingleLiveEvent problem that needs a special approach when using LiveData
.
I’ve been using SharedFlow and StateFlow in my latest project for about 7 months now, and they’re really fun to use!
Have you already tried SharedFlow or StateFlow? What are your thoughts about them? I know that I finally can stop using RxJava and just love the Flow 😀
About the Author
Aleksandra Krzemień has profound experience in building all sorts of mobile apps, including those connected with external devices. She is a natural mentor and loves to share her knowledge. Throughout her career, she has been actively engaged in many mentoring programs and IT-related organizations, such as Women in Technology or Google Developers Group. She likes to spend her free time singing and playing board games.