Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

9. Combining Operators
Written by Alex Sullivan & Florent Pillet

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

In earlier chapters, you learned how to create, filter and transform Observable sequences. RxJava filtering and transformation operators behave much like Kotlin’s standard collection operators. You got a glimpse into the true power of RxJava with flatMap, the workhorse operator that lets you perform a lot of tasks with very little code.

This chapter will show you several different ways to assemble sequences, and how to combine the data within each sequence. Some operators you’ll work with are very similar to Kotlin collection functions. They help combine elements from asynchronous sequences, just as you do with Kotlin lists.

Getting started

This chapter uses IntelliJ to demonstrate some of the concepts. It also uses the exampleOf method you’ve become so familiar with. Open the starter project and run the Main.kt file. It’s empty, so you won’t see any output other than a “process finished” message in the run tab.

RxJava is all about working with and mastering asynchronous sequences. But you’ll often need to make order out of chaos! There is a lot you can accomplish by combining Observables.

Prefixing and concatenating

One of the more obvious needs when working with Observables is to guarantee that an observer receives an initial value. There are situations where you’ll need the “current state” first. Good use cases for this are “current location” and “network connectivity status.” These are some Observables you’ll want to prefix with the current state.

Using startWith

The diagram below should make it clear what this operator does:

exampleOf("startWith") {

  val subscriptions = CompositeDisposable()
  // 1
  val missingNumbers = Observable.just(3, 4, 5)
  // 2
  val completeSet =
    missingNumbers.startWithIterable(listOf(1, 2))

  completeSet
    .subscribe { number ->
      println(number)
    }
    .addTo(subscriptions)
}
--- Example of: startWith ---
1
2
3
4
5

Using concat

As it turns out, the startWith operators are a simple variant of the more general concat family of operators. Your initial value is a stream of one or more elements, to which RxJava appends the sequence that startWith chains to. The Observable.concat static function chains two sequences.

exampleOf("concat") {

  val subscriptions = CompositeDisposable()
  // 1
  val first = Observable.just(1, 2, 3)
  val second = Observable.just(4, 5, 6)
  // 2
  Observable.concat(first, second)
    .subscribe { number ->
    println(number)
  }
  .addTo(subscriptions)
}

Using concatWith

Another way to append sequences together is the concatWith operator (an instance method of Observable, not a class method). Add this code to the function:

exampleOf("concatWith") {
  val subscriptions = CompositeDisposable()

  val germanCities =
    Observable.just("Berlin", "Münich", "Frankfurt")
  val spanishCities =
    Observable.just("Madrid", "Barcelona", "Valencia")

  germanCities
    .concatWith(spanishCities)
    .subscribe { number ->
      println(number)
    }
    .addTo(subscriptions)
}

Using concatMap

A final operator of interest is concatMap, closely related to flatMap which you learned about in Chapter 7, “Transforming Operators.” The lambda you pass to flatMap returns an Observable sequence which is subscribed to, and the emitted Observables are all merged. concatMap guarantees that each sequence produced by the lambda will run to completion before the next is subscribed to. concatMap is therefore a handy way to guarantee sequential order.

exampleOf("concatMap") {
  val subscriptions = CompositeDisposable()
  // 1
  val countries = Observable.just("Germany", "Spain")
  // 2
  val observable = countries
    .concatMap {
    when (it) {
      "Germany" ->
        Observable.just("Berlin", "Münich", "Frankfurt")
      "Spain" ->
        Observable.just("Madrid", "Barcelona", "Valencia")
      else -> Observable.empty<String>()
      }
    }
  // 3
  observable
    .subscribe { city ->
      println(city)
    }
    .addTo(subscriptions)
}
--- Example of: concatMap ---
Berlin
Münich
Frankfurt
Madrid
Barcelona
Valencia

Merging

RxJava offers several ways to combine sequences. The easiest to start with is merge.

Using merge

Can you picture what merge does from the diagram below?

exampleOf("merge") {
  val subscriptions = CompositeDisposable()

  val left = PublishSubject.create<Int>()
  val right = PublishSubject.create<Int>()
}
Observable.merge(left, right)
  .subscribe {
    println(it)
  }
  .addTo(subscriptions)
left.onNext(0)
left.onNext(1)
right.onNext(3)
left.onNext(4)
right.onNext(5)
right.onNext(6)
--- Example of: merge ---
0
1
3
4
5
6

Using mergeWith

Just like for concat and concatWith, there’s also a mergeWith method you can use instead of the statically resolved Observable.merge method. Add the following example:

exampleOf("mergeWith") {

  val subscriptions = CompositeDisposable()

  val germanCities = PublishSubject.create<String>()
  val spanishCities = PublishSubject.create<String>()

  germanCities.mergeWith(spanishCities)
    .subscribe {
      println(it)
    }
    .addTo(subscriptions)
}
germanCities.onNext("Frankfurt")
germanCities.onNext("Berlin")
spanishCities.onNext("Madrid")
germanCities.onNext("Münich")
spanishCities.onNext("Barcelona")
spanishCities.onNext("Valencia")
--- Example of: mergeWith ---
Frankfurt
Berlin
Madrid
Münich
Barcelona
Valencia

Combining elements

Using combineLatest

An essential operator in RxJava is the combineLatest operator. It combines values from several sequences:

exampleOf("combineLatest") {

  val subscriptions = CompositeDisposable()

  val left = PublishSubject.create<String>()
  val right = PublishSubject.create<String>()
}
Observables
  .combineLatest(left, right) { leftString, rightString ->
    "$leftString $rightString"
}.subscribe {
  println(it)
}.addTo(subscriptions)
left.onNext("Hello")
right.onNext("World")
left.onNext("It’s nice to")
right.onNext("be here!")
left.onNext("Actually, it’s super great to")
--- Example of: combineLatest ---
Hello World
It’s nice to World
It’s nice to be here!
Actually, it’s super great to be here!
val observable = Observables
  .combineLatest(left, right) {
    leftString: String, rightString: String ->
   
    leftString to rightString
  }
  .filter { !it.first.isEmpty() }
Observable.combineLatest<String, String, String>(left, right,
      BiFunction { leftString, rightString ->
  "$leftString $rightString"
})

Using zip

Another combination operator is the zip family of operators. Like the combineLatest family, it comes in several variants:

exampleOf("zip") {

  val subscriptions = CompositeDisposable()

  val left = PublishSubject.create<String>()
  val right = PublishSubject.create<String>()
}
Observables.zip(left, right) { weather, city ->
  "It’s $weather in $city"
}.subscribe {
  println(it)
}.addTo(subscriptions)
left.onNext("sunny")
right.onNext("Lisbon")
left.onNext("cloudy")
right.onNext("Copenhagen")
left.onNext("cloudy")
right.onNext("London")
left.onNext("sunny")
right.onNext("Madrid")
right.onNext("Vienna")
--- Example of: zip ---
It’s sunny in Lisbon
It’s cloudy in Copenhagen
It’s cloudy in London
It’s sunny in Madrid

Triggers

Apps have diverse needs and must manage multiple input sources. You’ll often need to accept input from several Observables at once. Some will simply trigger actions in your code, while others will provide data. RxJava has you covered with powerful operators that will make your life easier. Well, your coding life at least!

Using withLatestFrom

You’ll first look at withLatestFrom. Often overlooked by beginners, it’s a useful companion tool when dealing with user interfaces, among other things.

exampleOf("withLatestFrom") {
  val subscriptions = CompositeDisposable()

  // 1
  val button = PublishSubject.create<Unit>()
  val editText = PublishSubject.create<String>()

  // 2
  button.withLatestFrom(editText) { _: Unit, value: String ->
    value
  }.subscribe {
    println(it)
  }.addTo(subscriptions)

  // 3
  editText.onNext("Par")
  editText.onNext("Pari")
  editText.onNext("Paris")
  button.onNext(Unit)
  button.onNext(Unit)
}
--- Example of: withLatestFrom ---
Paris
Paris

Using sample

A close relative to withLatestFrom is the sample operator.

exampleOf("sample") {
  val subscriptions = CompositeDisposable()

  val button = PublishSubject.create<Unit>()
  val editText = PublishSubject.create<String>()

  editText.sample(button)
    .subscribe {
      println(it)
    }.addTo(subscriptions)

  editText.onNext("Par")
  editText.onNext("Pari")
  editText.onNext("Paris")
  button.onNext(Unit)
  button.onNext(Unit)
}

Switches

Using amb

RxJava comes with one main so-called “switching” operator: amb. It allows you to produce an Observable sequence by switching between the events of the combined source sequences. This allows you to decide which sequence’s events the subscriber will receive at runtime.

exampleOf("amb") {

  val subscriptions = CompositeDisposable()

  val left = PublishSubject.create<String>()
  val right = PublishSubject.create<String>()

  // 1
  left.ambWith(right)
    .subscribe {
      println(it)
    }
    .addTo(subscriptions)

  // 2
  left.onNext("Lisbon")
  right.onNext("Copenhagen")
  left.onNext("London")
  left.onNext("Madrid")
  right.onNext("Vienna")
}

Combining elements within a sequence

All cooks know that the more you reduce, the tastier your sauce will be. Although not aimed at chefs, RxJava has the tools to reduce your sauce to its most flavorful components!

Using reduce

Through your coding adventures in Kotlin, you may already know about its reduce collection operator. If you don’t, here’s a great opportunity to learn about it, as this knowledge applies to pure Kotlin collections as well.

exampleOf("reduce") {

  val subscriptions = CompositeDisposable()

  val source = Observable.just(1, 3, 5, 7, 9)
  source
    .reduce(0) { a, b -> a + b }
    .subscribeBy(onSuccess = {
      println(it)
    })
    .addTo(subscriptions)
}
--- Example of: reduce ---
25

Using scan

A close relative to reduce is the scan operator. Can you spot the difference in the diagram below, comparing to the last one above?

exampleOf("scan") {

  val subscriptions = CompositeDisposable()

  val source = Observable.just(1, 3, 5, 7, 9)

  source
    .scan(0) { a, b -> a + b }
    .subscribe {
      println(it)
    }
    .addTo(subscriptions)
}
--- Example of: scan ---
1
4
9
16
25

Challenge: The zip case

You learned a great deal about many combining operators in this chapter. But there is so much more to learn (and more fun to be had) about sequence combination!

Key points

  • You can prepend or append Observable sequences to one another using operators like startWith, concatWith, and concatMap.
  • The merge family of operators lets you merge sequences together so that items are received in the order that they are emitted.
  • The combineLatest operator lets you combine heterogeneous observables into a type that gets emitted each time one of the inner observables emits.
  • The zip operators emit only when each of the inner Observables have all emitted a new value, called indexed sequencing; the overall Observable completes when any of the inner Observables complete.
  • In combined sequences, if an inner sequence emits an error, then generally the overall Observable emits the error and the sequence terminates.
  • Triggering operators like withLatestFrom and sample let you limit the emitting of elements to only when certain triggering events occur.
  • The amb or “ambiguous” operator lets you switch between multiple Observables by sticking to the first one that is active.
  • The reduce and scan operators let you combine the elements in a sequence based on an input lambda; reduce only emits the final value when it receives the complete event, whereas scan emits intermediate accumulated values.

Where to go from here?

Having been introduced to combining operators, in the next chapter you’ll see them in action in an Android app. The app project will retrieve data from a NASA API that you will combine in various ways. Despite being Earth-based data, it’s sure to be out of this world!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now