Composing functions and operators, zip combineLatest and withLatestFrom
In this article, we will learn about functions and operator that allows to combine a few observable sequences in a different ways. I assume that you are familiars withRxJava and kotlin Flow. In this articleWe will review Zip combineLatest and withLatestFrom operator for RxJava and how to achieve same using Flow.
zip
Zip combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

As can be seen in the diagram, So we’ve had two observable sequences to zip function and want to receive array of values, only after both observables produce values.
Observable.zip(publish1, publish2) { text, l ->
"$text and l = $l"
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->",it)
}
Here both observables start parallelly and waits until all the params observables emit values with same index number and only then emits array of these values.
Flow also has the same name(Zip) operator with the same functionality
flowOne.zip(flowTwo){ firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}
combineLatest
CombineLatest combines multiple observables to create an observable, those values are calculated from the latest values of each of its input observables, once any of them emit irrespective of their index.

As we can see, any time there’s a new event from any of the sources we get the latest values from all sources. It’s up to us to define how the values should be combined. For example we could wrap the values in a Pair
, or a custom object
Observable.combineLatest(publish1, publish2) { text, l ->
"$text and l = $l"
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->",it)
}
combineLatest simply combines multiple sources and emits any time there’s a new value from any of them.
Flow has combine operator for this functionality
flowOne.combine(flowTwo) { firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}
withLatestFrom
withlatestFrom combine the source observable with other observables to create a result observable. Those values are calculated from the latest values of each, only when source emits so source emission is the trigger.

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.
publish1.withLatestFrom(publish2) { text, l ->
"$text and l = $l"
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->", it)
}
WithLatest simply fetch latest value from other observable when source emit a value
Flow currently does not have operator for that but we can achieve the same functionality by below workaround
fun <A, B: Any, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch(e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
latestB.get()?.let { b -> emit(transform(a, b)) }
}
}
}
To know more please check this thread here , Now using this workaround
flowOne.withLatestFrom(flowTwo) { firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}
So, That's it for today, I hope you learn something new today. Happy learning!