Chapters

Hide chapters

RxSwift: Reactive Programming with Swift

Fourth Edition · iOS 13 · Swift 5.1 · Xcode 11

15. Intro to Schedulers
Written by Florent Pillet

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Until now, you’ve managed to work with schedulers while avoiding any explanation about what they actually are and how they handle threading or concurrency. In earlier chapters, you used methods which implicitly used some sort of concurrency/threading level, such as the buffer, delaySubscription or interval operators.

You might feel like schedulers have some sort of magic under the hood, but before you understand schedulers, you’ll also need to understand what that observeOn operator is all about.

This chapter is going to cover the beauty behind schedulers, where you’ll learn why the RxSwift abstraction is so powerful and why working with asynchronous programming is far less painful than using locks or queues.

Note: Creating custom schedulers is beyond of the scope of this book. Keep in mind that the schedulers and initializers provided by RxSwift, RxCocoa and RxBlocking generally cover 99% of cases. Always try to use the built-in schedulers.

What is a scheduler?

Before getting your hands dirty with schedulers, it’s important to understand what they are — and what they are not. To summarize, a scheduler is a context where a process takes place. This context can be a thread, a dispatch queue or similar entities, or even an Operation used inside the OperationQueueScheduler.

Here’s a good example as to how schedulers can be used:

In this diagram, you have the concept of a cache operator. An observable makes a request to a server and retrieves some data. This data is processed by a custom operator named cache, which stores the data somewhere. After this, the data is passed to all subscribers on a different scheduler, most likely the MainScheduler which sits on top of the main thread, making the update of the UI possible.

Demystifying the scheduler

One common misconception about schedulers is that they are equally related to threads. And that might seem logical at first — after all, schedulers do work similarly to GCD‘s dispatch queues.

Setting up the project

Time to write some code! In this project, you are going to create a simple command-line tool for macOS. Why a command-line tool? Since you are playing with threads and concurrency, plain-text output will be easier to understand than any visual elements you could create in an app.

===== Schedulers =====

00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread

Switching schedulers

One of the most important things in RxSwift is the ability to switch schedulers at any time, without any restrictions except for ones imposed by the inner process generating events. There are good reasons why you want to be able to control which scheduler an operator receives elements on:

let fruit = Observable<String>.create { observer in
  observer.onNext("[apple]")
  sleep(2)
  observer.onNext("[pineapple]")
  sleep(2)
  observer.onNext("[strawberry]")
  return Disposables.create()
}

fruit
  .dump()
  .dumpingSubscription()
  .disposed(by: bag)
===== Schedulers =====

00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Main Thread
00s | [S] [apple] received on Main Thread
02s | [E] [pineapple] emitted on Main Thread
02s | [S] [pineapple] received on Main Thread
04s | [E] [strawberry] emitted on Main Thread
04s | [S] [strawberry] received on Main Thread

Using subscribeOn

In some cases you might want to change on which scheduler the observable computation code runs — not the code in any of the subscription operators, but the code that is actually emitting the observable events.

let globalScheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())
fruit
  .subscribeOn(globalScheduler)
  .dump()
  .dumpingSubscription()
  .disposed(by: bag)
RunLoop.main.run(until: Date(timeIntervalSinceNow: 13))
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Anonymous Thread
00s | [S] [apple] received on Anonymous Thread
02s | [E] [pineapple] emitted on Anonymous Thread
02s | [S] [pineapple] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Anonymous Thread

Using observeOn

Observing is one of the three fundamental concepts of Rx. It involves an entity producing events, and an observer for those events. In this case, and in opposition to subscribeOn, the observeOn operator changes the scheduler where the observation happens.

fruit
  .subscribeOn(globalScheduler)
  .dump()
  .observeOn(MainScheduler.instance)
  .dumpingSubscription()
  .disposed(by: bag)
00s | [E] [dog] emitted on Main Thread
00s | [S] [dog] received on Main Thread
00s | [E] [apple] emitted on Anonymous Thread
00s | [S] [apple] received on Main Thread
02s | [E] [pineapple] emitted on Anonymous Thread
02s | [S] [pineapple] received on Main Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread

Pitfalls

The ability to switch schedulers and threads looks amazing, but it comes with some pitfalls. To see why, you’ll push some events to the subject using a new thread. Since you need to track on which thread the computation takes place, a good solution is to use an OS Thread.

let animalsThread = Thread() {
  sleep(3)
  animal.onNext("[cat]")
  sleep(3)
  animal.onNext("[tiger]")
  sleep(3)
  animal.onNext("[fox]")
  sleep(3)
  animal.onNext("[leopard]")
}
animalsThread.name = "Animals Thread"
animalsThread.start()
...
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Animals Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Animals Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Animals Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Animals Thread
animal
  .dump()
  .observeOn(globalScheduler)
  .dumpingSubscription()
  .disposed(by: bag)
...
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Anonymous Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Anonymous Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Anonymous Thread
animal
  .subscribeOn(MainScheduler.instance)
  .dump()
  .observeOn(globalScheduler)
  .dumpingSubscription()
  .disposed(by: bag)
03s | [E] [cat] emitted on Animals Thread
03s | [S] [cat] received on Anonymous Thread
04s | [E] [strawberry] emitted on Anonymous Thread
04s | [S] [strawberry] received on Main Thread
06s | [E] [tiger] emitted on Animals Thread
06s | [S] [tiger] received on Anonymous Thread
09s | [E] [fox] emitted on Animals Thread
09s | [S] [fox] received on Anonymous Thread
12s | [E] [leopard] emitted on Animals Thread
12s | [S] [leopard] received on Anonymous Thread

Hot vs. cold

The section above touched on the topic of hot and cold observables. The topic of hot and cold observables is quite opinionated and generates a lot of debate, so let‘s briefly look into it here. The concept can be reduced to a very simple question:

Best practices and built-in schedulers

Schedulers are a non-trivial topic, so they come with some best practices for the most common use cases. In this section, you’ll get a quick introduction to serial and concurrent schedulers, learn how they process the data and see which type works better for a particular context.

Serial vs concurrent schedulers

Considering that a scheduler is simply a context, which could be anything (dispatch queue, thread, custom context), and that all operators transforming sequences need to preserve the implicit guarantees, you need to be sure you’re using the right scheduler.

MainScheduler

MainScheduler sits on top of the main thread. This scheduler is used to process changes on the user interface and perform other high-priority tasks. As a general practice when developing applications on iOS, tvOS or macOS, long-running tasks should not be performed using this scheduler, so avoid things like server requests or other heavy tasks.

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler manages to abstract the work on a serial DispatchQueue. This scheduler has the great advantage of several optimizations when using observeOn.

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler, similar to SerialDispatchQueueScheduler, manages to abstract work on a DispatchQueue. The main difference here is that instead of a serial queue, the scheduler uses a concurrent one.

OperationQueueScheduler

OperationQueueScheduler is similar to ConcurrentDispatchQueueScheduler, but instead of abstracting the work over a DispatchQueue, it performs the job over an OperationQueue. Sometimes you need more control over the concurrent jobs you are running, which you can’t do with a concurrent DispatchQueue.

TestScheduler

TestScheduler is a special kind of beast. It’s meant only to be used in testing, so you should not use it in production code. This special scheduler simplifies operator testing; it’s part of the RxTest library. You will have a look into using this scheduler in the dedicated chapter about testing, but let‘s have a quick look since you‘re doing the grand tour of RxSwift schedulers.

let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
  next(50, 42),
  next(60, 43),
  completed(70)
])
let res = scheduler.start {
  xs.delaySubscription(30, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
  next(280, 42),
  next(290, 43),
  completed(300)
])
XCTAssertEqual(xs.subscriptions, [
  Subscription(230, 300)
])

Where to go from here?

Schedulers are a non-trivial topic in the RxSwift space; they’re responsible for computing and performing all tasks in RxSwift. The golden rule of a Scheduler is that it can be anything. Keep this in mind, and you’ll get along just fine when working with observables and using and changing schedulers.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now