Home Android & Kotlin Books Functional Programming in Kotlin by Tutorials

17
Sequence & Flow Written by Massimo Carli

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

You can unlock the rest of this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

In Chapter 16, “Handling Side Effects”, you learned how to use the IO<T> monad as a special case of the State<S, T> data type. You also learned that Kotlin provides coroutines to handle side effects as pure functions in a more idiomatic way. In this chapter, you’ll learn everything you need to know about the following special Kotlin data types:

  • Sequence<T>
  • Flow<T>

You’ll also have a quick overview of SharedFlow<T> and StateFlow<T>.

You’ll learn how these data types work from a functional programming point of view. In particular, you’ll answer the following questions for each of these:

  • What is the context they provide?
  • Are they a functor?
  • Are they an applicative functor?
  • Are they a monad?

Note: If you don’t know coroutines yet, Kotlin Coroutines by Tutorials is the book for you.

This chapter is a big exercise that helps you take the concepts you’ve learned so far and apply them to the types you use every day in your job. It’s time to have fun!

The Sequence<T> data type

In Chapter 9, “Data Types”, you learned that List<T> is a data type with the functor and monad superpowers with the map and flatMap functions. In Chapter 4, “Expression Evaluation, Laziness & More About Functions”, you also learned that laziness is one of the main characteristics of functional programming. To remind you what this means, open ListDataType.kt in this chapter’s material and write the following code:

fun main() {
  listOf(1, 2, 3, 4, 5) // 1
    .filter(filterOdd.logged("filterOdd")) // 2
    .map(double.logged("double")) // 3
}

In this code, you:

  1. Use listOf as a builder for a List<Int> with five elements of type Int.
  2. Invoke filter, passing the reference to the logged version of filterOdd.
  3. Use map to transform the filter’s values using a logged version of double.

Note: filterOdd and double are two very simple functions you find in Util.kt in the lib sub-package. logged is a utility higher-order function that decorates another function with a log message. Take a look at their simple implementation, if you want.

The interesting fact about the previous code happens when you run it, getting the following output:

filterOdd(1) = false
filterOdd(2) = true
filterOdd(3) = false
filterOdd(4) = true
filterOdd(5) = false
double(2) = 4
double(4) = 8

This happens because, in each line, you:

  1. Create a List<Int> with five elements.
  2. Invoke filter, which returns another List<Int> containing only the even values. It’s crucial to see that filterOdd has been invoked for all the elements of the original List<Int>.
  3. Use map, getting a new List<Int> with the double of the values in the previous List<Int>.

With this code, you basically created three lists without using any of the individual lists’ values. What happens if you don’t really need the values in the List<Int>? In this case, you started with a List<Int> of five elements. What if the list has a lot more elements? What if the elements in the List<T> are infinite?

Well, you don’t have to blame the List<T> data type because its job is to contain an ordered collection of elements of type T. That’s why it’s been created that way. That’s its context, or purpose, if you will. Another way to say it is that List<T> is eager.

If you don’t want to keep all the possible values in a List<T>, Kotlin provides the Sequence<T> data type.

Open SequenceDataType.kt and write the following code:

fun main() {
  sequenceOf(1, 2, 3, 4, 5) // HERE
    .filter(filterOdd.logged("filterOddSeq"))
    .map(double.logged("doubleSeq"))
}

This code differs from the previous one because of the use of sequenceOf instead of listOf. More importantly, if you run the code, you’ll get nothing as output. This is because Sequence<T> is lazy. If you want to actually consume the values in the Sequence<Int> you just created, you need to consume them using a terminal operator. To see how, add .count() to the end of the method chain. It should now look like this:

fun main() {
  sequenceOf(1, 2, 3, 4, 5)
    .filter(filterOdd.logged("filterOddSeq"))
    .map(double.logged("doubleSeq"))
    .count() // HERE
}

Here, you’re just counting the elements in the sequence and, to do it, you need to consume all of them. This time, running the code, you’ll get the following:

filterOddSeq(1) = false
filterOddSeq(2) = true
doubleSeq(2) = 4
filterOddSeq(3) = false
filterOddSeq(4) = true
doubleSeq(4) = 8
filterOddSeq(5) = false

Note how the order of the log messages is different from the one you got from the List<T>. In that case, each operator read the values from the input List<T>. Now, the chain of operators is called for each value you consume.

Note: If you’re curious and want to look at the definition of Sequence<T>, you’ll find that it differs from the Iterable<T> interface in the use of the operator keyword, which allows its use in an enhanced form.

This clarifies the context for a Sequence<T> as a container that produces the values it contains only when required. That means it’s lazy. But is Sequence<T> a functor?

Sequence<T> as a functor

Looking at the Sequence<T> documentation, you see the definition of map with the following signature:

public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R>
fun interface Generator<T> {
  fun generate(n: Int): List<T>
}
fun <T, R> Generator<T>.map(fn: (T) -> R): Generator<R> = object : Generator<R> {
  override fun generate(n: Int): List<R> = this@map.generate(n).map(fn)
}
fun <A, B> funGenerator(bGen: Generator<B>): Generator<Fun<A, B>> =
  bGen.map { b: B -> { b } }
  @Test
  fun `Identity Functor Law for Sequences`() {
    val intToStringFunGenerator =
      funGenerator<Int, String>(StringGenerator(5)) // 1
    val i = { s: String -> s } // 2    
    100.times {  // 3
      val f = intToStringFunGenerator.one() // 4
      val seq = IntGenerator.generate(5).asSequence() // 5
      val list1 = seq.map(f compose i).toList() // 6
      val list2 = seq.map(f).toList() // 7
      Truth.assertThat(list1).isEqualTo(list2) // 8
    }
  }
  @Test
  fun `Composition Functor Law for Sequences`() {
    val intToStringFunGenerator =
      funGenerator<Int, String>(StringGenerator(5))
    val stringToLongFunGenerator =
      funGenerator<String, Long>(LongGenerator) // 1
    100.times {
      val f = intToStringFunGenerator.one()
      val g = stringToLongFunGenerator.one() // 2
      val seq = IntGenerator.generate(5).asSequence()
      val list1 = seq.map(f compose g).toList() // 3
      val list2 = seq.map(f).map(g).toList() // 4
      Truth.assertThat(list1).isEqualTo(list2) // 5
    }
  }

Sequence<T> as an applicative functor

You just proved that the Sequence<T> data type is a functor because of the existing implementation of map. But what about applicative functors? Looking at the Kotlin documentation, you don’t see any higher-order functions like your ap and app. No problem — you can do this!

fun <A, B> Sequence<A>.ap(
  fn: Sequence<(A) -> B>
): Sequence<B> = TODO()
fun <A, B> Sequence<A>.ap(fn: Sequence<(A) -> B>): Sequence<B> =
  sequence { // 1
    val iterator = iterator() // 2
    while (iterator.hasNext()) {
      val fnIterator = fn.iterator() // 3
      val item = iterator.next()
      while (fnIterator.hasNext()) {
        yield(fnIterator.next().invoke(item)) // 4
      }
    }
  }
infix fun <A, B> Sequence<(A) -> B>.appl(a: Sequence<A>) = a.ap(this)
fun main() {
  data class User(  // 1
    val id: Int,
    val name: String,
    val email: String
  )

  val userBuilder = ::User.curry() // 2
  val userBuilderSeq = sequenceOf(userBuilder) // 3
  val idSeq = sequenceOf(10, 20, 30) // 4
  val nameSeq = sequenceOf("Minnie", "Donald", "Mickey") // 4
  val emailSeq =
    sequenceOf("aaaaaa@aaaaa.com", "bbbbb@bbbbbb.com") // 4

  val userSeq =
    userBuilderSeq appl idSeq appl nameSeq appl emailSeq // 5

  userSeq.forEach(::println) // 6
}
User(id=10, name=Minnie, email=aaaaaa@aaaaa.com)
User(id=20, name=Minnie, email=aaaaaa@aaaaa.com)
// ...
User(id=20, name=Mickey, email=bbbbb@bbbbbb.com)
User(id=30, name=Mickey, email=bbbbb@bbbbbb.com)

Sequence<T> as a monad

Is Sequence<T> finally a monad? Of course it is, because of the flatMap operation that Kotlin APIs provide with the following signature, similar to the Kleisli category:

fun <T, R> Sequence<T>.flatMap(
  transform: (T) -> Sequence<R>
): Sequence<R>
fun main() {
  // ...
  val seqTo = { n: Int -> (1..n).toList().asSequence() }
  val seqOfSeq = sequenceOf(1, 2, 3, 4, 5).flatMap(seqTo)
  seqOfSeq.forEach { print("$it ") }
}
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

The Flow<T> data type

In Chapter 16, “Handling Side Effects”, you learned that Kotlin allows you to achieve with suspendable functions what you can do with the IO<T> monad. In this chapter, you’ve already learned how to produce a theoretically infinite sequence of values in a lazy way. If the values you want to generate are the result of a suspendable function, the Flow<T> data type is what you need.

Flow<T> as a functor

To prove that Flow<T> is a functor, you could repeat the same process you did for Sequence<T> using property-based testing. In this case, you’ll keep things easier, implementing some practical examples.

fun inputStringFlow(question: String = "") = flow { // 1
    val scanner = java.util.Scanner(System.`in`) // 2
    print(question) // 3
    while (scanner.hasNextLine()) { // 4
        val line = scanner.nextLine() // 4
        if (line.isNullOrEmpty()) { // 5
            break
        }
        emit(line) // 6
        print(question) // 3
    }
    scanner.close() // 7
}
fun main() {
  val strLengthFlow = inputStringFlow("Insert a word: ") // 1
    .map { str -> // 2
      str to str.length
    }
  runBlocking { // 3
    strLengthFlow.collect { strInfo -> // 4
      println("${strInfo.first} has length ${strInfo.second}")
    }
  }
}
Figure 17.1: Testing the flow data type
Vodati 82.0: Nuvtabn kgo xnef pini hxlo

Flow<T> as an applicative functor

To see if the Flow<T> also behaves as an applicative functor, either repeat what you did for the Sequence<T> or just follow along. In FlowDataType.kt, add the following code:

fun <A, B> Flow<A>.ap(fn: Flow<(A) -> B>): Flow<B> = flow { // 1
  collect { a -> // 2
    fn.collect { f -> // 3
      emit(f(a)) // 4
    }
  }
}

infix fun <A, B> Flow<(A) -> B>.appl(
  a: Flow<A>
) = a.ap(this) // 5
fun main() {
  val userBuilder = { id: Int ->
    { name: String ->
      { email: String -> User(id, name, email) }
    }
  }

  val userBuilderFlow = flowOf(userBuilder)
  val idFlow = listOf(10, 20, 30).asFlow()
  val nameFlow = listOf("Pippo", "Pippo2", "Pippo3").asFlow()
  val emailFlow = listOf(
    "pippo@pippo.com", "pippo2@pippo.com", "pippo3@pippo.com"
  ).asFlow()

  val userFlow =
    userBuilderFlow appl idFlow appl nameFlow appl emailFlow
  runBlocking {
    userFlow.collect(::println)
  }
}
User(id=10, name=Pippo, email=pippo@pippo.com)
User(id=20, name=Pippo, email=pippo@pippo.com)
// ...
User(id=20, name=Pippo3, email=pippo3@pippo.com)
User(id=30, name=Pippo3, email=pippo3@pippo.com)

Flow<T> as a monad

To answer the last question, you’ll implement a more complex example using some of the code you already implemented in Chapter 14, “Error Handling With Functional Programming”, that you can find in the material for this project. You basically want to use inputStringFlow to allow a user to insert some text to search for in the TV show database using the TVmaze API. Now, you’re in the world of coroutines, so you should use their power. It’s time for an interesting exercise to improve your functional thinking.

fun doSomeWork(name: String): Int = 10
suspend fun doSomeBgWork(
  ctx: CoroutineContext,
  name: String
): Int = withContext(ctx) {
    doSomeWork(name)
}
fun main() {
  doSomeBgWork.curry()
}
Figure 17.2: Unresolved reference: curry
Begobu 50.6: Acritifxad varozetye: vadrh

typealias SuspendFun<A, B> = suspend (A) -> B // 1
typealias SuspendFun2<A, B, C> = suspend (A, B) -> C // 2
typealias SuspendChain2<A, B, C> =
  suspend (A) -> suspend (B) -> C // 3
fun <A, B, C> SuspendFun2<A, B, C>.curry(): SuspendChain2<A, B, C> =
  { a: A ->
    { b: B ->
      this(a, b)
    }
  }
Figure 17.3: Curry for suspendable function
Famewo 61.6: Jahsx qeq mavcuymaqbu huqcgaun

CoroutineContext as a state

To relate the State<S, T> monad to what you saw about suspendable functions, look again at doSomeBgWork, which you wrote in Basic.kt:

suspend fun doSomeBgWork(ctx: CoroutineContext, name: String): Int =
  withContext(ctx) {
    doSomeWork(name)
  }
suspend fun doSomeMoreBgWork(
  ctx: CoroutineContext,
  name: String
): Pair<CoroutineContext, Int> = withContext(ctx) {
  ctx to doSomeWork(name)
}
typealias SuspendStateTransformer<S, T> =
  suspend (S) -> Pair<S, T>
data class SuspendableState<S, T>(
  val sst: SuspendStateTransformer<S, T>
) {

  companion object {
    @JvmStatic
    fun <S, T> lift(
      value: T
    ): SuspendableState<S, T> =
      SuspendableState { state -> state to value }
  }
}
fun <S, A, B> SuspendableState<S, A>.map(
  fn: SuspendFun<A, B>
): SuspendableState<S, B> =
  SuspendableState { s0: S ->
    val (s1, a) = this.sst(s0)
    s1 to fn(a)
  }
fun <S, A, B> SuspendableState<S, A>.flatMap(
  fn: suspend (A) -> SuspendableState<S, B>
): SuspendableState<S, B> =
  SuspendableState { s0: S ->
    val (s1, a) = this.sst(s0)
    fn(a).sst(s1)
  }

Back to the TV show

In the previous section, you created the SuspendableState<S, T> data type and implemented lift, map and flatMap. How can you use these for getting data about a TV show? In the tools sub-package in this chapter’s material, you find TvShowFetcher and TvShowParser for, respectively, fetching and parsing data using the TVmaze API.

suspend fun fetchTvShowResult( // 1
  ctx: CoroutineContext,
  query: String
): Result<String> = // 2
  withContext(ctx) { // 3
    try {
      Result.success(TvShowFetcher.fetch(query)) // 4
    } catch (ioe: IOException) {
      Result.failure(ioe) // 5
    }
  }
suspend fun parseTvShowResult(
  ctx: CoroutineContext,
  json: String
): Result<List<ScoredShow>> =
  withContext(ctx) {
    try {
      Result.success(TvShowParser.parse(json))
    } catch (e: Exception) {
      Result.failure(e)
    }
  }
val fetchSuspend: (String) -> SuspendableState<
  CoroutineContext, Result<String>> = { query ->
    SuspendableState { ctx: CoroutineContext ->
      ctx to fetchTvShowResult(ctx, query)
    }
  }

val parseSuspend: (String) -> SuspendableState<
  CoroutineContext, Result<List<ScoredShow>>> = { json ->
    SuspendableState { ctx: CoroutineContext ->
      ctx to parseTvShowResult(ctx, json)
    }
  }

Composing SuspendableState<CoroutineContext, Result<T>>

To implement composition now is simpler than it seems. Open SuspendableStateResult.kt, and add the following code:

typealias SuspendStateResultTransformer<S, T> =
  suspend (S) -> Pair<S, Result<T>> // 1

data class SuspendableStateResult<S, T>( // 2
  val sst: SuspendStateResultTransformer<S, T>
) {

  companion object {
    @JvmStatic
    fun <S, T> lift( // 3
      value: T
    ): SuspendableStateResult<S, T> =
      SuspendableStateResult { state ->
        state to Result.success(value)
      }
  }
}

fun <S, A, B> SuspendableStateResult<S, A>.map( // 4
  fn: SuspendFun<A, B>
): SuspendableStateResult<S, B> =
  SuspendableStateResult { s0: S ->
    val (s1, a) = this.sst(s0)
    s1 to a.fold(
      onSuccess = { Result.success(fn(it)) },
      onFailure = { Result.failure(it) }
    )
  }

fun <S, A, B> SuspendableStateResult<S, A>.flatMap( // 5
  fn: suspend (A) -> SuspendableStateResult<S, B>
): SuspendableStateResult<S, B> = SuspendableStateResult { s0 ->
  val (s1, res) = sst(s0)
  res.fold(onSuccess = { a: A ->
    fn(a).sst(s1)
  }, onFailure = { thowable ->
    s1 to Result.failure(thowable)
  })
}

Finally flatMap

It’s finally time to put everything together so you can access the TVmaze database. Open ShowSearchService.kt, and add the following code:

val fetchSuspendResult: (String) -> SuspendableStateResult<
  CoroutineContext, String> = { query ->
    SuspendableStateResult { ctx: CoroutineContext ->
      ctx to fetchTvShowResult(ctx, query)
    }
  }

val parseSuspendResult: (String) -> SuspendableStateResult<
  CoroutineContext, List<ScoredShow>> = { json ->
    SuspendableStateResult { ctx: CoroutineContext ->
      ctx to parseTvShowResult(ctx, json)
    }
  }
@OptIn(FlowPreview::class) // 1
suspend fun searchTvShow(ctx: CoroutineContext) = // 2
  withContext(ctx) {
    inputStringFlow("Search Your Show: ") // 3
      .flatMapConcat { query ->  // 4
        fetchSuspendResult(query)
          .flatMap(parseSuspendResult).sst(ctx) // 5
          .second.fold(
            onSuccess = { it.asFlow() }, // 6
            onFailure = { emptyFlow() }) // 7
      }
}
@OptIn(FlowPreview::class)
fun main() {
  runBlocking { // 1
    searchTvShow(Dispatchers.IO) // 2
      .collect { // 3
        println("Score: ${it.score}  " +
          "Name: ${it.show.name} " +
          "Genres: ${it.show.genres}") // 4
        println(it.show.summary)
        println("--------------------------")
      }
  }
}
Figure 17.4: Querying the TVmaze API
Podiri 06.4: Coektogk xda DXtece ETA

The SharedFlow<T> & StateFlow<T> data types

SharedFlow<T> and StateFlow<T> are two additional flavors the coroutines API provides for flows. In terms of data types and the functions they provide, you can think of SharedState<T> and StateFlow<T> as implementations of Flow<T> with specific behavior when collected by multiple collectors.

Key points

  • The List<T> data type allows you to store an ordered collection of elements of type T in an eager way.
  • All the elements of a List<T>, which is immutable, are present at the moment you create it.
  • The List<T> data type is a functor and monad because of the presence of map and flatMap. You can also make it an applicative functor by implementing ap.
  • The Sequence<T> data type allows you to generate a sequence of values of type T in a lazy way.
  • In a Sequence<T>, map and flatMapConcat are invoked when the values need to be collected and consumed.
  • A Sequence<T> can work as a functor, applicative functor and monad.
  • The Flow<T> data type is similar to Sequence<T> but in the context of a coroutine.
  • Suspendable functions are an idiomatic and powerful tool to handle side effects in Kotlin.
  • A Flow<T> allows you to generate a sequence, or flow, of values of type T that can be generated from suspendable functions.
  • You can implement curry and composition for suspendable functions as you did for non-suspendable ones, just following the functional programming principles you learned in the previous chapters.
  • You can repeat for SharedFlow<T> and StateFlow<T> the same process you followed for a Flow<T>.

Where to go from here?

Congratulations! In this chapter, you had the opportunity to apply concepts you learned in the previous chapter in a concrete example that allowed you to fetch information about your favorite TV shows. You’ve learned how to create Sequence<T> and how to use Flow<T> in an environment of concurrency. Finally, you’ve empowered your functional thinking by implementing abstractions for composing suspendable functions, returning a Result<T> monad. It’s been a lot of work and also a lot of fun!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

© 2022 Razeware LLC

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

Unlock Now

To highlight or take notes, you’ll need to own this book in a subscription or purchased by itself.