Chapters

Hide chapters

Reactive Programming with Kotlin

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 4.0

Before You Begin

Section 0: 3 chapters
Show chapters Hide chapters

Section II: Operators & Best Practices

Section 2: 7 chapters
Show chapters Hide chapters

11. Time-Based Operators
Written by Alex Sullivan & Florent Pillet

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

Timing is everything. The core idea behind reactive programming is to model asynchronous data flow over time.

In this respect, RxJava provides a range of operators that allow you to deal with time and the way that sequences react and transform over time. As you’ll see throughout this chapter, managing the time dimension of your sequences is easy and straightforward.

To learn about time-based operators, you’ll practice with an animated app that demonstrates visually how data flows over time. This chapter comes with a basic app with several buttons that lead to different pages. You’ll use each page to exercise one or more related operators. The app also includes a number of ready-made classes that’ll come in handy to build the examples.

Getting started

Open the starter project for this section, then build and run the app. You should see a white screen with five gray buttons:

Clicking any of these buttons will send you to another screen that, for now, just has some text. As you work through this chapter, you’ll flesh out each page to demonstrate a different set of time-based reactive operators.

Buffering operators

The first group of time-based operators deal with buffering. They will either replay past elements to new subscribers, or buffer them and deliver them in bursts. They allow you to control how and when past and new elements get delivered.

Replaying past elements

When a sequence emits items, you’ll often need to make sure that a future subscriber receives some or all of the past items. This is the purpose of the replay and replayAll operators.

val elementsPerSecond = 1
val replayedElements = 1
val replayDelayInMs = 3500L
val maxElements = 5
val sourceObservable = Observable.create<Int> { emitter ->
  var value = 1
  val disposable = timer(elementsPerSecond) {
    if (value <= maxElements) {
      emitter.onNext(value)
      value++
    }
  }
}
.replay(replayedElements)
sourceObservable.subscribe(replay_1)
dispatchAfter(replayDelayInMs) {
  sourceObservable.subscribe(replay_2)
}
sourceObservable.connect()

Unlimited replay

In addition to the replay operator that takes in a maximum number of elements, there’s an overloaded version of replay that takes no arguments. If used with no arguments, the replay operator will ensure that every item in your Observable is replayed. This one should be used with caution: Only use it in scenarios where you know the total number of buffered elements will stay reasonable. For example, it’s appropriate to use replay with no arguments in the context of HTTP requests. You know the approximate memory impact of retaining the data returned by a query. On the other hand, using it on a sequence that may not terminate and may produce a lot of data will quickly clog your memory. This could grow to the point where you see an OutOfMemoryException!

.replay(replayedElements)
.replay()

Controlled buffering

Now that you touched on replayable sequences, you can look at a more advanced topic: controlled buffering. You’ll first look at the buffer operator. Switch to the second page in the app called BUFFER. As in the previous example, you’ll begin with some constants. Add the following to the top of the BufferActivity.kt file:

private val bufferMaxCount = 2
private val bufferTimeSpan = 4L
val sourceObservable = PublishSubject.create<String>()
sourceObservable
  .subscribe(buffer_1)
sourceObservable
  .buffer(bufferTimeSpan, TimeUnit.SECONDS, bufferMaxCount)
  .map { it.size }
  .subscribe(buffer_2)
dispatchAfter(5000) {
  sourceObservable.onNext("🐱")
  sourceObservable.onNext("🐱")
  sourceObservable.onNext("🐱")
}

val elementsPerSecond = 1

timer(elementsPerSecond) {
  sourceObservable.onNext("🐱")
}.addTo(disposables)

Windows of buffered Observables

A last buffering technique very close to buffer is window. It has roughly the same signature and nearly does the same thing. The only difference is that it emits an Observable of the buffered items, instead of emitting an array.

private val elementsPerSecond = 3
private val windowTimeSpan = 4L
private val windowMaxCount = 10L
val sourceObservable = PublishSubject.create<String>()
timer(elementsPerSecond) {
  sourceObservable.onNext("🐱")
}.addTo(disposables)
sourceObservable.subscribe(windowSource)
sourceObservable.window(windowTimeSpan, TimeUnit.SECONDS, AndroidSchedulers.mainThread(), windowMaxCount)
.flatMap { windowedObservable ->
  val marbleView = MarbleView(this)
  marble_views.addView(marbleView)
  windowedObservable
    .map { value -> value to marbleView}
    .concatWith(Observable.just("" to marbleView))
}
.subscribe { (value, marbleView) ->
  if (value.isEmpty()) {
    marbleView.onComplete()
  } else {
    marbleView.onNext(value)
  }
}
.addTo(disposables)

Time-shifting operators

Every now and again, you need to travel in time. While RxJava can’t help with fixing your past relationship mistakes, it has the ability to freeze time for a little while to let you wait until self-cloning is available.

Delayed subscriptions

Start off by adding the constants to the top of the class:

private val elementsPerSecond = 1
private val delayInSeconds = 3L
val sourceObservable = PublishSubject.create<Int>()
var current = 1
timer(elementsPerSecond) {
  sourceObservable.onNext(current)
  current++
}
sourceObservable.subscribe(source)
sourceObservable
  .delaySubscription(delayInSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
  .subscribe(delayed)

Delayed elements

The other kind of delay in RxJava lets you time-shift the whole sequence. Instead of subscribing late, the operator subscribes immediately to the source observable, but delays every emitted element by the specified amount of time. The net result is a concrete time-shift.

sourceObservable
  .delay(delayInSeconds, TimeUnit.SECONDS,
    AndroidSchedulers.mainThread())
  .subscribe(delayed)

Timer operators

A common need in any kind of application is a timer. Android comes with a few methods to accomplish timing tasks. Typically, Android developers use the Handler class to accomplish this sort of task. Handler works OK, but the API is somewhat complicated unless you wrap it, like we did in this app with the dispatchAfter function.

Intervals

This chapter used the timer function several times to create interval timers through a handy custom function. In fact, the timer function uses another special RxJava function to achieve its timing tasks. Specifically, it uses the Observable.interval function. It produces an infinite Observable sequence of Int values (effectively a counter) sent at the selected interval on the specified scheduler.

val sourceObservable = Observable.interval(1L / elementsPerSecond,
  TimeUnit.SECONDS,
  AndroidSchedulers.mainThread()).replay(replayedElements)

One-shot or repeating timers

You may want a more powerful timer Observable. You can use the Observable.timer operator that is very much like Observable.interval but adds the following features:

Observable.timer(3, TimeUnit.SECONDS)
  .flatMap {
    sourceObservable.delay(delayInSeconds, TimeUnit.SECONDS)
  }
  .subscribe(delayed)

Timeouts

You’ll complete this roundup of time-based Operators with a special one: timeout. Its primary purpose is to semantically distinguish an actual timer from a timeout (error) condition. Therefore, when a timeout operator fires, it emits an TimeoutException error event; if not caught, it terminates the sequence.

button.clicks()
  .map { "•" }
  .timeout(5, TimeUnit.SECONDS)
  .subscribe(timeout)
.timeout(5, TimeUnit.SECONDS, Observable.just("X"))

Challenge

Challenge: Circumscribe side effects

In the discussion of the window operator, you created timelines on the fly inside the closure of a flatMap operator. While this was done to keep the code short, one of the guidelines of reactive programming is to “not leave the monad”. In other words, avoid side effects except for specific areas created to apply side effects. Here, the “side effect” is the creation of a new marble view in a spot where only a transformation should occur.

Key points

  • When a sequence emits items, you’ll often need to make sure that a future subscriber receives some or all of the past items. This is the purpose of the replay and replayAll operators.
  • Buffering operators are a group of time-based operators that deal with buffering. They will either replay past elements to new subscribers, or buffer them and deliver them in bursts. They allow you to control how and when past and new elements get delivered.
  • dispatchAfter is a special function to make it easier to dispatch one-off actions. This displays elements received by the second subscription in another marble view.
  • delaySubscription operators delay the time a subscriber starts receiving elements from its subscription. delay operators push the elements to they arrive later.
  • The Observable.interval function produces an infinite Observable sequence of Int values (effectively a counter) sent at the selected interval on the specified scheduler.
  • Timeout is an operator that semantically distinguishes an actual timer from a timeout (error) condition. Therefore, when a timeout operator fires, it emits an TimeoutException error event; if not caught, it terminates the sequence.
Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now