Chapters

Hide chapters

Kotlin Coroutines by Tutorials

Second Edition · Android 10 · Kotlin 1.3 · Android Studio 3.5

Section I: Introduction to Coroutines

Section 1: 9 chapters
Show chapters Hide chapters

12. Broadcast Channels
Written by Nishant Srivastava

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

A channel is all about transferring a stream of values. It is quite common to put a stream of items in the channel and then have receivers consume the items as they are emitted. It works when an item is sent in a basic channel and when emitted it is consumed by a receiver. Other receivers do not get the same item; instead, they wait for another item to consume from the channel.

Often times, you will encounter use cases in which you would like all the receivers to consume the same value. This is where a broadcast channel comes into the picture. This and much more related to broadcast channels are covered in this chapter.

Broadcast Channels
Broadcast Channels

Getting started with broadcast channels

With the channel, if you have many receivers waiting to receive items from the channel, the emitted item will be consumed by the first receiver and all other receivers will not get the item individually. In fact, in such a scenario wherein there are more than one receivers, there is the possibility of a race condition.

Take a look at this code snippet:


fun main() {
 
  // 1
  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")
  // 2
  val kotlinChannel = Channel<String>()

  // 3
  runBlocking {

    // 4 Producer
    GlobalScope.launch {
     // Send data in channel
     kotlinChannel.send(fruitArray[0])
   }

    // 5 Consumers
    GlobalScope.launch {
      kotlinChannel.consumeEach { value ->
        println("Consumer 1: $value")
      }
    }
    GlobalScope.launch {
      kotlinChannel.consumeEach { value ->
        println("Consumer 2: $value")
      }
    }

    // 6
    println("Press a key to exit...")
    readLine()

    // 7
    kotlinChannel.close()
  }
}

Here:

  1. A string array of fruit names is created, named fruitArray.
  2. A basic channel is created named kotlinChannel.
  3. Next, a runBlocking section is defined to run coroutines in our main function.
  4. Start producing items and send them in the channel, all inside a launch coroutine builder.
  5. Start consuming items from the channel, all inside two different launch coroutine builders.
  6. Wait for a keystroke to exit the program. readLine() basically waits for standard input, and it is used here to stop the program from exiting before finishing its async operations.
  7. Close the channel so that the consumers on it are canceled, too.

The output of this code snippet when run will be:

Press a key to exit...
Consumer 1: Apple

Note: To finish the program, you need to press the Enter key.

Here, you can see that there is one channel to which some values are sent. Then there are two consumers — i.e., two consumeEach calls on the channel being executed to consume the values being emitted by the channel. Now, which of these two consumers gets the value is not obvious. In fact, if you run the same program many times you might see the below output, too:

Consumer 2: Apple
Press a key to exit...

Note: You can find the executable version of the above snippet of code in the starter project in the file called RaceConditionChannel.kt.

Thus, as you can see, it is not obvious which consumer will get the value every time the program is executed. Based on which consumer receives the value first, the value is consumed by that consumer and the other consumer does not get the value.

To mitigate this, the Kotlin Standard Library provides another type of channel called the BroadcastChannel.

The BroadcastChannel is non-blocking by nature and maintains a stream of values between the sender and the many receivers that subscribe.

Note: This is an experimental API. It may be changed in future updates.

To do this BroadcastChannel uses the openSubscription function and subscribes to values being sent into the channel. It is important to understand here that only when the subscription is obtained will the consumer receive the values being sent into the channel. Anything sent before obtaining the subscription is not received by the subscribed consumers of the channel.

You will use a similar code snippet of the channel’s race condition when there are many receivers. But this time you will make a slight modification. Take a look:

fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")
  // 1
  val kotlinChannel = BroadcastChannel<String>(3)

  runBlocking {

    // 2
    kotlinChannel.apply {
      send(fruitArray[0])
      send(fruitArray[1])
      send(fruitArray[2])
    }

    //3  Consumers
    GlobalScope.launch {
      // 4
      kotlinChannel.openSubscription().let { channel ->
        // 5
        for (value in channel) {
          println("Consumer 1: $value")
        }
        // 6
      }
    }
    GlobalScope.launch {
      kotlinChannel.openSubscription().let { channel ->
        for (value in channel) {
          println("Consumer 2: $value")
        }
      }
    }

  // 7
    kotlinChannel.apply {
      send(fruitArray[3])
      send(fruitArray[4])
    }

  // 8
    println("Press a key to exit...")
    readLine()

    // 9
    kotlinChannel.close()
  }
}

Here’s what’s going on above:

  1. A BroadcastChannel with a capacity of three is created named kotlinChannel.
  2. Start producing items and send them in the channel, all inside a launch coroutine builder. The first three items from fruitArray have already been sent in kotlinChannel.
  3. Start consuming items from the channel.
  4. Here, a subscription is opened on the kotlinChannel using the openSubscription() function — i.e., start listening to values being sent in the kotlinChannel.
  5. Iterate over all the values in the channel and print them out.
  6. When finished iterating over the values in the channel, the subscription is closed — i.e., stop listening to values being sent in the kotlinChannel.
  7. Now that the subscription has been obtained on the kotlinChannel, send two more values in the kotlinChannel.
  8. Wait for a keystroke to exit the program.
  9. Close the channel so that the consumers on it are canceled, too.

The output of executing this code snippet will be:

Press a key to exit...
Consumer 2: Grapes
Consumer 1: Grapes
Consumer 2: Strawberry
Consumer 1: Strawberry

Note: To finish the program, you need to press the Enter key.

As you can see, both the consumers that had opened subscription on the BroadcastChannel received both the values sent into the channel — i.e., Grapes and Strawberry. That is how the broadcast channel simplifies the whole process of broadcasting the values in the channel to all receivers.

Note: You can find the executable version of the above snippet of code in the starter project in the file called BroadcastChannelOpenSubscriptionExample.kt.

Yet, there is one optimization that you can still do. Similar to how there is a consumeEach helper DSL defined for a channel, there is one defined for BroadcastChannel, which subscribes and performs the specified operation for each received item.

Take a look at the implementation of the consumeEach method:

public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
    consume {
        for (element in this) action(element)
    }

Diving deeper into the source code:

public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
    val channel = openSubscription()
    try {
        return channel.block()
    } finally {
        channel.cancel()
    }
}

You will notice that consumeEach will call the openSubscription() method. That means we can replace openSubscription() calls with just the consumeEach DSL straight up.

In the last code snippet, simply replace the following lines of code:

kotlinChannel.openSubscription().let { channel ->
        for (value in channel) {
          println("Consumer 1: $value")
        }
        // subscription will be closed
}

With:

kotlinChannel.consumeEach { value ->
   println("Consumer 1: $value")
}

Note: The replacement code snippet shown is only for Consumer 1, you will need to do the same replacement for Consumer 2

Now run the code snippet again. You will see that the results are the same. This is just a more concise and idiomatic Kotlin way of consuming values on a channel.

Note: You can find the updated executable version of the above snippet of code in the starter project in the file called BroadcastChannelExample.kt.

Often times, one of the common use cases is to be able to get, at least, the most recently emitted value on subscription. This is where ConflatedBroadcast channel comes into play and is explained in the next section.

ConflatedBroadcast channel

Like a BroadcastChannel, ConflatedBroadcastChannel enables many subscribed receivers to consume items sent in the channel but it differs in one aspect: a ConflatedBroadcastChannel only emits the most recently sent item while the older items are lost. Also, any future subscribers to this channel will receive the item that was most recently emitted.

fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")
 
  // 1
  val kotlinChannel = ConflatedBroadcastChannel<String>()

  runBlocking {

    // 2 
    kotlinChannel.apply {
      send(fruitArray[0])
      send(fruitArray[1])
      send(fruitArray[2])
    }

    // 3
    GlobalScope.launch {
      kotlinChannel.consumeEach { value ->
        println("Consumer 1: $value")
      }
    }
    GlobalScope.launch {
      kotlinChannel.consumeEach { value ->
        println("Consumer 2: $value")
      }
    }

  // 4
    kotlinChannel.apply {
      send(fruitArray[3])
      send(fruitArray[4])
    }

    // 5
    println("Press a key to exit...")
    readLine()

    // 6
    kotlinChannel.close()
  }
}

Press a key to exit...
Consumer 2: Strawberry
Consumer 1: Strawberry

ReactiveX vs. BroadcastChannel

Reactive programming uses a similar kind of approach to handle streams of data as the Kotlin coroutine channel. Like channels, reactive programming has observables with data sources that emit items and should be observed. Then you also have the observer, which is basically a consumer of items emitted by the observables. To track the flow of data, observers subscribe to observables, which emit data items and those are then consumed by observers. There can be many observers observing an observable.

implementation "io.reactivex.rxjava2:rxjava:2.2.6"

fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")
  // 1
  val subject = PublishSubject.create<String>()

  // 2
  subject.apply {
    onNext(fruitArray[0])
    onNext(fruitArray[1])
    onNext(fruitArray[2])
  }

  // 3
  subject.subscribe {
    println("Consumer 1: $it")
  }
  subject.subscribe {
    println("Consumer 2: $it")
  }

  // 4
  subject.apply {
    onNext(fruitArray[3])
    onNext(fruitArray[4])
  }

  // 5
  println("Press a key to exit...")
  readLine()
 
  // 6
  subject.onComplete()
}
Consumer 1: Grapes
Consumer 2: Grapes
Consumer 1: Strawberry
Consumer 2: Strawberry
Press a key to exit...
fun main() {

  val fruitArray = arrayOf("Apple", "Banana", "Pear", "Grapes",
      "Strawberry")
  // 1
  val subject = BehaviorSubject.create<String>()

  // 2
  subject.apply {
    onNext(fruitArray[0])
    onNext(fruitArray[1])
    onNext(fruitArray[2])
  }

  // 3
  subject.subscribe {
    println("Consumer 1: $it")
  }

  subject.subscribe {
    println("Consumer 2: $it")
  }

  // 4
  subject.apply {
    onNext(fruitArray[3])
    onNext(fruitArray[4])
  }

  // 5
  println("Press a key to exit...")
  readLine()

  // 6
  subject.onComplete()
}
Consumer 1: Pear
Consumer 2: Pear
Consumer 1: Grapes
Consumer 2: Grapes
Consumer 1: Strawberry
Consumer 2: Strawberry
Press a key to exit...

Key points

Where to go from here?

Kotlin channels introduce a very simplified approach to handling a stream of data with well set-up constructs to enable better and faster development. This is not all the information about channels because there are operators, which enable various operations on the consumption of results. You will read about those in detail in the next chapter.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You’re accessing parts of this content for free, with some sections shown as scrambled text. Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now