Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

5. Filtering Operators
Written by Alex Sullivan & Scott Gardner

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Learning a new technology stack is a bit like building a skyscraper: You’ve got to build a solid foundation before you can kiss the sky. By now, you’ve established a solid RxJava foundation, and it’s time to start building up your knowledge base and skill set, one floor at a time.

This chapter will teach you about RxJava’s filtering operators that you can use to apply conditional constraints to next events, so that the subscriber only receives the elements it wants to deal with. If you’ve ever used the filter method in the Kotlin Standard Library, you’re already half way there. But if not, no worries; you’re going to be an expert at this filtering business by the end of this chapter.

Getting started

The starter project for this chapter is an IntelliJ project. Open it up and give it a build. You won’t see anything in the console yet.

Ignoring operators

Without further ado, you’re going to jump right in and look at some useful filtering operators in RxJava, beginning with ignoreElements. As depicted in the following marble diagram, ignoreElements will do just that: ignore next event elements. It will, however, allow through stop events, i.e., complete or error events. Allowing through stop events is usually implied in marble diagrams.

exampleOf("ignoreElements") {

  val subscriptions = CompositeDisposable()
  // 1
  val strikes = PublishSubject.create<String>()
  // 2
  subscriptions.add(
      strikes.ignoreElements() 
          // 3
          .subscribeBy {
            println("You’re out!")
          })
}
strikes.onNext("X")
strikes.onNext("X")
strikes.onNext("X")
strikes.onComplete()
--- Example of: ignoreElements ---
You’re out!

elementAt operator

There may be times when you only want to handle the nth (ordinal) element emitted by an observable, such as the third strike. For that you can use elementAt, which takes the index of the element you want to receive, and it ignores everything else.

exampleOf("elementAt") {

  val subscriptions = CompositeDisposable()
  // 1
  val strikes = PublishSubject.create<String>()
  // 2
  subscriptions.add(
      strikes.elementAt(2) 
          // 3
          .subscribeBy(
              onSuccess = { println("You’re out!") }
          ))
}
strikes.onNext("X")
strikes.onNext("X")
strikes.onNext("X")
--- Example of: elementAt ---
You’re out!

filter operator

ignoreElements and elementAt are filtering elements emitted by an observable. When your filtering needs go beyond all or one, there’s filter. filter takes a predicate lambda, which it applies to each element, allowing through only those elements for which the predicate resolves to true.

exampleOf("filter") {

  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.fromIterable(
        listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
          // 2
          .filter { number ->
            number > 5
          }.subscribe {
            // 3
            println(it)
          })
}
--- Example of: filter ---
6
7
8
9
10

Skipping operators

It might be that you need to skip a certain number of elements. Consider observing a weather forecast, where maybe you don’t want to start receiving hourly forecast data until later in the day, because you’re stuck in a cubicle until then anyway. The skip operator allows you to ignore from the first to the number you pass as its parameter. All subsequent elements will then pass through.

exampleOf("skip") {
  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.just("A", "B", "C", "D", "E", "F")
          // 2
          .skip(3)
          .subscribe {
            println(it)
          })
}
--- Example of: skip ---
D
E
F

skipWhile operator

There’s a small family of skip operators. Like filter, skipWhile lets you include a predicate to determine what should be skipped. However, unlike filter, which will filter elements for the life of the subscription, skipWhile will only skip up until something is not skipped, and then it will let everything else through from that point on. Also, with skipWhile, returning true will cause the element to be skipped, and returning false will let it through: it uses the return value in the opposite way to filter.

exampleOf("skipWhile") {

  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.just(2, 2, 3, 4)
          // 2
          .skipWhile { number ->
            number % 2 == 0
          }.subscribe {
            println(it)
          })

}
--- Example of: skipWhile ---
3
4

skipUntil operator

So far, the filtering has been based on some static condition. What if you wanted to dynamically filter elements based on some other observable? There are a couple of operators that you’ll learn about here that can do this. The first is skipUntil, which will keep skipping elements from the source observable (the one you’re subscribing to) until some other trigger observable emits.

exampleOf("skipUntil") {

  val subscriptions = CompositeDisposable()
  // 1
  val subject = PublishSubject.create<String>()
  val trigger = PublishSubject.create<String>()

  subscriptions.add(
      // 2
      subject.skipUntil(trigger)
          .subscribe {
            println(it)
          })
}
subject.onNext("A")
subject.onNext("B")
trigger.onNext("X")
subject.onNext("C")
--- Example of: skipUntil ---
C

Taking operators

Taking is the opposite of skipping. When you want to only take certain elements, RxJava has you covered. The first taking operator you’ll learn about is take. As shown in this marble diagram, the result will take the first of the number of elements you specified and ignore everything that follows.

exampleOf("take") {
  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.just(1, 2, 3, 4, 5, 6)
          // 2
          .take(3)
          .subscribe {
            println(it)
          })
}
--- Example of: take ---
1
2
3

takeWhile operator

There’s also a takeWhile operator that works similarly to skipWhile, except you’re taking instead of skipping. takeWhile works like take, but uses a predicate instead of a number of next events, as in this marble diagram:

exampleOf("takeWhile") {
  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.fromIterable(
        listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1))
          // 2
          .takeWhile { number ->
            number < 5
          }.subscribe {
            println(it)
          })
}
--- Example of: takeWhile ---
1
2
3
4

takeUntil operator

Like skipUntil, there’s also a takeUntil operator, shown in the next marble diagram. It takes from the source observable until the trigger observable emits an element.

exampleOf("takeUntil") {
  val subscriptions = CompositeDisposable()
  // 1
  val subject = PublishSubject.create<String>()
  val trigger = PublishSubject.create<String>()

  subscriptions.add(
      // 2
      subject.takeUntil(trigger)
          .subscribe {
            println(it)
          })
  // 3
  subject.onNext("1")
  subject.onNext("2")
}
--- Example of: takeUntil ---
1
2
trigger.onNext("X")

subject.onNext("3")

Distinct operators

The next couple of operators you’re going to learn about let you prevent duplicate items one-after-another from getting through. As shown in this marble diagram, distinctUntilChanged only prevents duplicates that are right next to each other. The second 2 does not emit but second 1 gets through since it is a change relative to what came before it.

exampleOf("distinctUntilChanged") {
  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.just("Dog", "Cat", "Cat", "Dog")
          // 2
          .distinctUntilChanged()
          .subscribe {
            println(it)
          })
}
--- Example of: distinctUntilChanged ---
Dog
Cat
Dog

exampleOf("distinctUntilChangedPredicate") {
  val subscriptions = CompositeDisposable()

  subscriptions.add(
      // 1
      Observable.just(
        "ABC", "BCD", "CDE", "FGH", "IJK", "JKL", "LMN")
          // 2
          .distinctUntilChanged { first, second ->
            // 3
            second.any { it in first }
          }
          // 4
          .subscribe {
            println(it)
          }
  )
}
--- Example of: distinctUntilChangedPredicate ---
ABC
FGH
IJK

Challenge

Challenge: Create a phone number lookup

Open the challenge starter project and have a look at what’s to be found inside!

val contacts = mapOf(
    "603-555-1212" to "Florent",
    "212-555-1212" to "Junior",
    "408-555-1212" to "Marin",
    "617-555-1212" to "Scott")
fun phoneNumberFrom(inputs: List<Int>): String {
  val phone = inputs.map { it.toString() }.toMutableList()
  phone.add(3, "-")
  phone.add(7, "-")
  return phone.joinToString("")
}
val input = PublishSubject.create<Int>()
input.onNext(0)
input.onNext(603)

input.onNext(2)
input.onNext(1)

// Confirm that 7 results in "Contact not found", and then
// change to 2 and confirm that Junior is found
input.onNext(2)

"5551212".forEach {
  // Need toString() or else Char conversion is done
  input.onNext(it.toString().toInt())
}

input.onNext(9)
if (contact != null) {
  println("Dialing $contact ($phone)...")
} else {
  println("Contact not found")
}

Key points

  • Ignoring operators like ignoreElements, elementAt, and filter let you remove certain elements from an observable stream.
  • Skipping operators let you skip certain elements and then begin emitting.
  • Conversely, taking operators let you take certain elements and then stop emitting.
  • Distinct operators let you prevent duplicates from being emitted back-to-back in an observable stream.

Where to go from here?

You’ve seen the theory behind filttering operators in an IntelliJ project. Next up, transfer that knowledge into a real Android app by going back to the Combinestagram photo collage app.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2025 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now