Composing functions and operators, zip combineLatest and withLatestFrom

jigar mori
3 min readOct 2, 2020

In this article, we will learn about functions and operator that allows to combine a few observable sequences in a different ways. I assume that you are familiars withRxJava and kotlin Flow. In this articleWe will review Zip combineLatest and withLatestFrom operator for RxJava and how to achieve same using Flow.

zip

Zip combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

RxMarble — Zip

As can be seen in the diagram, So we’ve had two observable sequences to zip function and want to receive array of values, only after both observables produce values.

Observable.zip(publish1, publish2) { text, l ->
"$text and l = $l"
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->",it)
}

Here both observables start parallelly and waits until all the params observables emit values with same index number and only then emits array of these values.

Flow also has the same name(Zip) operator with the same functionality

flowOne.zip(flowTwo){ firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}

combineLatest

CombineLatest combines multiple observables to create an observable, those values are calculated from the latest values of each of its input observables, once any of them emit irrespective of their index.

RxMarble — combineLatest

As we can see, any time there’s a new event from any of the sources we get the latest values from all sources. It’s up to us to define how the values should be combined. For example we could wrap the values in a Pair, or a custom object

Observable.combineLatest(publish1, publish2) { text, l ->
"$text and l = $l"
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->",it)
}

combineLatest simply combines multiple sources and emits any time there’s a new value from any of them.

Flow has combine operator for this functionality

flowOne.combine(flowTwo) { firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}

withLatestFrom

withlatestFrom combine the source observable with other observables to create a result observable. Those values are calculated from the latest values of each, only when source emits so source emission is the trigger.

RxMarble — withLatestFrom

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

publish1.withLatestFrom(publish2) { text, l ->
"$text and l = $l"
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("--->", it)
}

WithLatest simply fetch latest value from other observable when source emit a value

Flow currently does not have operator for that but we can achieve the same functionality by below workaround

fun <A, B: Any, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = flow {
coroutineScope {
val latestB = AtomicReference<B?>()
val outerScope = this
launch {
try {
other.collect { latestB.set(it) }
} catch(e: CancellationException) {
outerScope.cancel(e) // cancel outer scope on cancellation exception, too
}
}
collect { a: A ->
latestB.get()?.let { b -> emit(transform(a, b)) }
}
}
}

To know more please check this thread here , Now using this workaround

flowOne.withLatestFrom(flowTwo) { firstString, secondString ->
"$firstString $secondString"
}.collect {
Log.d(TAG, it)
}

So, That's it for today, I hope you learn something new today. Happy learning!

--

--