Functional Programming in Kotlin by Tutorials

First Edition · Android 12 · Kotlin 1.6 · IntelliJ IDEA 2022

Before You Begin

Section 0: 5 chapters

Section II: Data Types & Typeclasses

Section 2: 5 chapters

17. Sequence & Flow Written by Massimo Carli

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Heads up... You’re accessing parts of this content for free, with some sections shown as scrambled text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

In Chapter 16, “Handling Side Effects”, you learned how to use the `IO<T>` monad as a special case of the `State<S, T>` data type. You also learned that Kotlin provides coroutines to handle side effects as pure functions in a more idiomatic way. In this chapter, you’ll learn everything you need to know about the following special Kotlin data types:

• `Sequence<T>`
• `Flow<T>`

You’ll also have a quick overview of `SharedFlow<T>` and `StateFlow<T>`.

You’ll learn how these data types work from a functional programming point of view. In particular, you’ll answer the following questions for each of these:

• What is the context they provide?
• Are they a functor?
• Are they an applicative functor?

Note: If you don’t know coroutines yet, Kotlin Coroutines by Tutorials is the book for you.

This chapter is a big exercise that helps you take the concepts you’ve learned so far and apply them to the types you use every day in your job. It’s time to have fun!

The Sequence<T> data type

In Chapter 9, “Data Types”, you learned that `List<T>` is a data type with the functor and monad superpowers with the `map` and `flatMap` functions. In Chapter 4, “Expression Evaluation, Laziness & More About Functions”, you also learned that laziness is one of the main characteristics of functional programming. To remind you what this means, open ListDataType.kt in this chapter’s material and write the following code:

``````fun main() {
listOf(1, 2, 3, 4, 5) // 1
.filter(filterOdd.logged("filterOdd")) // 2
.map(double.logged("double")) // 3
}
``````

In this code, you:

1. Use `listOf` as a builder for a `List<Int>` with five elements of type `Int`.
2. Invoke `filter`, passing the reference to the logged version of `filterOdd`.
3. Use `map` to transform the filter’s values using a logged version of `double`.

Note: `filterOdd` and `double` are two very simple functions you find in Util.kt in the lib sub-package. `logged` is a utility higher-order function that decorates another function with a log message. Take a look at their simple implementation, if you want.

The interesting fact about the previous code happens when you run it, getting the following output:

``````filterOdd(1) = false
filterOdd(2) = true
filterOdd(3) = false
filterOdd(4) = true
filterOdd(5) = false
double(2) = 4
double(4) = 8
``````

This happens because, in each line, you:

1. Create a `List<Int>` with five elements.
2. Invoke `filter`, which returns another `List<Int>` containing only the even values. It’s crucial to see that `filterOdd` has been invoked for all the elements of the original `List<Int>`.
3. Use `map`, getting a new `List<Int>` with the double of the values in the previous `List<Int>`.

With this code, you basically created three lists without using any of the individual lists’ values. What happens if you don’t really need the values in the `List<Int>`? In this case, you started with a `List<Int>` of five elements. What if the list has a lot more elements? What if the elements in the `List<T>` are infinite?

Well, you don’t have to blame the `List<T>` data type because its job is to contain an ordered collection of elements of type `T`. That’s why it’s been created that way. That’s its context, or purpose, if you will. Another way to say it is that `List<T>` is eager.

If you don’t want to keep all the possible values in a `List<T>`, Kotlin provides the `Sequence<T>` data type.

Open SequenceDataType.kt and write the following code:

``````fun main() {
sequenceOf(1, 2, 3, 4, 5) // HERE
.filter(filterOdd.logged("filterOddSeq"))
.map(double.logged("doubleSeq"))
}
``````

This code differs from the previous one because of the use of `sequenceOf` instead of `listOf`. More importantly, if you run the code, you’ll get nothing as output. This is because `Sequence<T>` is lazy. If you want to actually consume the values in the `Sequence<Int>` you just created, you need to consume them using a terminal operator. To see how, add `.count()` to the end of the method chain. It should now look like this:

``````fun main() {
sequenceOf(1, 2, 3, 4, 5)
.filter(filterOdd.logged("filterOddSeq"))
.map(double.logged("doubleSeq"))
.count() // HERE
}
``````

Here, you’re just counting the elements in the sequence and, to do it, you need to consume all of them. This time, running the code, you’ll get the following:

``````filterOddSeq(1) = false
filterOddSeq(2) = true
doubleSeq(2) = 4
filterOddSeq(3) = false
filterOddSeq(4) = true
doubleSeq(4) = 8
filterOddSeq(5) = false
``````

Note how the order of the log messages is different from the one you got from the `List<T>`. In that case, each operator read the values from the input `List<T>`. Now, the chain of operators is called for each value you consume.

Note: If you’re curious and want to look at the definition of `Sequence<T>`, you’ll find that it differs from the `Iterable<T>` interface in the use of the `operator` keyword, which allows its use in an enhanced form.

This clarifies the context for a `Sequence<T>` as a container that produces the values it contains only when required. That means it’s lazy. But is `Sequence<T>` a functor?

Sequence<T> as a functor

Looking at the `Sequence<T>` documentation, you see the definition of `map` with the following signature:

``````public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R>
``````
``````fun interface Generator<T> {
fun generate(n: Int): List<T>
}
``````
``````fun <T, R> Generator<T>.map(fn: (T) -> R): Generator<R> = object : Generator<R> {
override fun generate(n: Int): List<R> = this@map.generate(n).map(fn)
}
``````
``````fun <A, B> funGenerator(bGen: Generator<B>): Generator<Fun<A, B>> =
bGen.map { b: B -> { b } }
``````
``````  @Test
fun `Identity Functor Law for Sequences`() {
val intToStringFunGenerator =
funGenerator<Int, String>(StringGenerator(5)) // 1
val i = { s: String -> s } // 2
100.times {  // 3
val f = intToStringFunGenerator.one() // 4
val seq = IntGenerator.generate(5).asSequence() // 5
val list1 = seq.map(f compose i).toList() // 6
val list2 = seq.map(f).toList() // 7
Truth.assertThat(list1).isEqualTo(list2) // 8
}
}
``````
``````  @Test
fun `Composition Functor Law for Sequences`() {
val intToStringFunGenerator =
funGenerator<Int, String>(StringGenerator(5))
val stringToLongFunGenerator =
funGenerator<String, Long>(LongGenerator) // 1
100.times {
val f = intToStringFunGenerator.one()
val g = stringToLongFunGenerator.one() // 2
val seq = IntGenerator.generate(5).asSequence()
val list1 = seq.map(f compose g).toList() // 3
val list2 = seq.map(f).map(g).toList() // 4
Truth.assertThat(list1).isEqualTo(list2) // 5
}
}
``````

Sequence<T> as an applicative functor

You just proved that the `Sequence<T>` data type is a functor because of the existing implementation of `map`. But what about applicative functors? Looking at the Kotlin documentation, you don’t see any higher-order functions like your `ap` and `app`. No problem — you can do this!

``````fun <A, B> Sequence<A>.ap(
fn: Sequence<(A) -> B>
): Sequence<B> = TODO()
``````
``````fun <A, B> Sequence<A>.ap(fn: Sequence<(A) -> B>): Sequence<B> =
sequence { // 1
val iterator = iterator() // 2
while (iterator.hasNext()) {
val fnIterator = fn.iterator() // 3
val item = iterator.next()
while (fnIterator.hasNext()) {
yield(fnIterator.next().invoke(item)) // 4
}
}
}
``````
``````infix fun <A, B> Sequence<(A) -> B>.appl(a: Sequence<A>) = a.ap(this)
``````
``````fun main() {
data class User(  // 1
val id: Int,
val name: String,
val email: String
)

val userBuilder = ::User.curry() // 2
val userBuilderSeq = sequenceOf(userBuilder) // 3
val idSeq = sequenceOf(10, 20, 30) // 4
val nameSeq = sequenceOf("Minnie", "Donald", "Mickey") // 4
val emailSeq =
sequenceOf("aaaaaa@aaaaa.com", "bbbbb@bbbbbb.com") // 4

val userSeq =
userBuilderSeq appl idSeq appl nameSeq appl emailSeq // 5

userSeq.forEach(::println) // 6
}
``````
``````User(id=10, name=Minnie, email=aaaaaa@aaaaa.com)
User(id=20, name=Minnie, email=aaaaaa@aaaaa.com)
// ...
User(id=20, name=Mickey, email=bbbbb@bbbbbb.com)
User(id=30, name=Mickey, email=bbbbb@bbbbbb.com)
``````

Is `Sequence<T>` finally a monad? Of course it is, because of the `flatMap` operation that Kotlin APIs provide with the following signature, similar to the Kleisli category:

``````fun <T, R> Sequence<T>.flatMap(
transform: (T) -> Sequence<R>
): Sequence<R>
``````
``````fun main() {
// ...
val seqTo = { n: Int -> (1..n).toList().asSequence() }
val seqOfSeq = sequenceOf(1, 2, 3, 4, 5).flatMap(seqTo)
seqOfSeq.forEach { print("\$it ") }
}
``````
``````1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
``````

The Flow<T> data type

In Chapter 16, “Handling Side Effects”, you learned that Kotlin allows you to achieve with suspendable functions what you can do with the `IO<T>` monad. In this chapter, you’ve already learned how to produce a theoretically infinite sequence of values in a lazy way. If the values you want to generate are the result of a suspendable function, the `Flow<T>` data type is what you need.

Flow<T> as a functor

To prove that `Flow<T>` is a functor, you could repeat the same process you did for `Sequence<T>` using property-based testing. In this case, you’ll keep things easier, implementing some practical examples.

``````fun inputStringFlow(question: String = "") = flow { // 1
val scanner = java.util.Scanner(System.`in`) // 2
print(question) // 3
while (scanner.hasNextLine()) { // 4
val line = scanner.nextLine() // 4
if (line.isNullOrEmpty()) { // 5
break
}
emit(line) // 6
print(question) // 3
}
scanner.close() // 7
}
``````
``````fun main() {
val strLengthFlow = inputStringFlow("Insert a word: ") // 1
.map { str -> // 2
str to str.length
}
runBlocking { // 3
strLengthFlow.collect { strInfo -> // 4
println("\${strInfo.first} has length \${strInfo.second}")
}
}
}
``````

Flow<T> as an applicative functor

To see if the `Flow<T>` also behaves as an applicative functor, either repeat what you did for the `Sequence<T>` or just follow along. In FlowDataType.kt, add the following code:

``````fun <A, B> Flow<A>.ap(fn: Flow<(A) -> B>): Flow<B> = flow { // 1
collect { a -> // 2
fn.collect { f -> // 3
emit(f(a)) // 4
}
}
}

infix fun <A, B> Flow<(A) -> B>.appl(
a: Flow<A>
) = a.ap(this) // 5
``````
``````fun main() {
val userBuilder = { id: Int ->
{ name: String ->
{ email: String -> User(id, name, email) }
}
}

val userBuilderFlow = flowOf(userBuilder)
val idFlow = listOf(10, 20, 30).asFlow()
val nameFlow = listOf("Pippo", "Pippo2", "Pippo3").asFlow()
val emailFlow = listOf(
"pippo@pippo.com", "pippo2@pippo.com", "pippo3@pippo.com"
).asFlow()

val userFlow =
userBuilderFlow appl idFlow appl nameFlow appl emailFlow
runBlocking {
userFlow.collect(::println)
}
}
``````
``````User(id=10, name=Pippo, email=pippo@pippo.com)
User(id=20, name=Pippo, email=pippo@pippo.com)
// ...
User(id=20, name=Pippo3, email=pippo3@pippo.com)
User(id=30, name=Pippo3, email=pippo3@pippo.com)
``````

To answer the last question, you’ll implement a more complex example using some of the code you already implemented in Chapter 14, “Error Handling With Functional Programming”, that you can find in the material for this project. You basically want to use `inputStringFlow` to allow a user to insert some text to search for in the TV show database using the TVmaze API. Now, you’re in the world of coroutines, so you should use their power. It’s time for an interesting exercise to improve your functional thinking.

``````fun doSomeWork(name: String): Int = 10
``````
``````suspend fun doSomeBgWork(
ctx: CoroutineContext,
name: String
): Int = withContext(ctx) {
doSomeWork(name)
}
``````
``````fun main() {
doSomeBgWork.curry()
}
``````

``````typealias SuspendFun<A, B> = suspend (A) -> B // 1
typealias SuspendFun2<A, B, C> = suspend (A, B) -> C // 2
typealias SuspendChain2<A, B, C> =
suspend (A) -> suspend (B) -> C // 3
``````
``````fun <A, B, C> SuspendFun2<A, B, C>.curry(): SuspendChain2<A, B, C> =
{ a: A ->
{ b: B ->
this(a, b)
}
}
``````

CoroutineContext as a state

To relate the `State<S, T>` monad to what you saw about suspendable functions, look again at `doSomeBgWork`, which you wrote in Basic.kt:

``````suspend fun doSomeBgWork(ctx: CoroutineContext, name: String): Int =
withContext(ctx) {
doSomeWork(name)
}
``````
``````suspend fun doSomeMoreBgWork(
ctx: CoroutineContext,
name: String
): Pair<CoroutineContext, Int> = withContext(ctx) {
ctx to doSomeWork(name)
}
``````
``````typealias SuspendStateTransformer<S, T> =
suspend (S) -> Pair<S, T>
``````
``````data class SuspendableState<S, T>(
val sst: SuspendStateTransformer<S, T>
) {

companion object {
@JvmStatic
fun <S, T> lift(
value: T
): SuspendableState<S, T> =
SuspendableState { state -> state to value }
}
}
``````
``````fun <S, A, B> SuspendableState<S, A>.map(
fn: SuspendFun<A, B>
): SuspendableState<S, B> =
SuspendableState { s0: S ->
val (s1, a) = this.sst(s0)
s1 to fn(a)
}
``````
``````fun <S, A, B> SuspendableState<S, A>.flatMap(
fn: suspend (A) -> SuspendableState<S, B>
): SuspendableState<S, B> =
SuspendableState { s0: S ->
val (s1, a) = this.sst(s0)
fn(a).sst(s1)
}
``````

Back to the TV show

In the previous section, you created the `SuspendableState<S, T>` data type and implemented `lift`, `map` and `flatMap`. How can you use these for getting data about a TV show? In the tools sub-package in this chapter’s material, you find `TvShowFetcher` and `TvShowParser` for, respectively, fetching and parsing data using the TVmaze API.

``````suspend fun fetchTvShowResult( // 1
ctx: CoroutineContext,
query: String
): Result<String> = // 2
withContext(ctx) { // 3
try {
Result.success(TvShowFetcher.fetch(query)) // 4
} catch (ioe: IOException) {
Result.failure(ioe) // 5
}
}
``````
``````suspend fun parseTvShowResult(
ctx: CoroutineContext,
json: String
): Result<List<ScoredShow>> =
withContext(ctx) {
try {
Result.success(TvShowParser.parse(json))
} catch (e: Exception) {
Result.failure(e)
}
}
``````
``````val fetchSuspend: (String) -> SuspendableState<
CoroutineContext, Result<String>> = { query ->
SuspendableState { ctx: CoroutineContext ->
ctx to fetchTvShowResult(ctx, query)
}
}

val parseSuspend: (String) -> SuspendableState<
CoroutineContext, Result<List<ScoredShow>>> = { json ->
SuspendableState { ctx: CoroutineContext ->
ctx to parseTvShowResult(ctx, json)
}
}
``````

Composing SuspendableState<CoroutineContext, Result<T>>

To implement composition now is simpler than it seems. Open SuspendableStateResult.kt, and add the following code:

``````typealias SuspendStateResultTransformer<S, T> =
suspend (S) -> Pair<S, Result<T>> // 1

data class SuspendableStateResult<S, T>( // 2
val sst: SuspendStateResultTransformer<S, T>
) {

companion object {
@JvmStatic
fun <S, T> lift( // 3
value: T
): SuspendableStateResult<S, T> =
SuspendableStateResult { state ->
state to Result.success(value)
}
}
}

fun <S, A, B> SuspendableStateResult<S, A>.map( // 4
fn: SuspendFun<A, B>
): SuspendableStateResult<S, B> =
SuspendableStateResult { s0: S ->
val (s1, a) = this.sst(s0)
s1 to a.fold(
onSuccess = { Result.success(fn(it)) },
onFailure = { Result.failure(it) }
)
}

fun <S, A, B> SuspendableStateResult<S, A>.flatMap( // 5
fn: suspend (A) -> SuspendableStateResult<S, B>
): SuspendableStateResult<S, B> = SuspendableStateResult { s0 ->
val (s1, res) = sst(s0)
res.fold(onSuccess = { a: A ->
fn(a).sst(s1)
}, onFailure = { thowable ->
s1 to Result.failure(thowable)
})
}
``````

Finally flatMap

It’s finally time to put everything together so you can access the TVmaze database. Open ShowSearchService.kt, and add the following code:

``````val fetchSuspendResult: (String) -> SuspendableStateResult<
CoroutineContext, String> = { query ->
SuspendableStateResult { ctx: CoroutineContext ->
ctx to fetchTvShowResult(ctx, query)
}
}

val parseSuspendResult: (String) -> SuspendableStateResult<
CoroutineContext, List<ScoredShow>> = { json ->
SuspendableStateResult { ctx: CoroutineContext ->
ctx to parseTvShowResult(ctx, json)
}
}
``````
``````@OptIn(FlowPreview::class) // 1
suspend fun searchTvShow(ctx: CoroutineContext) = // 2
withContext(ctx) {
inputStringFlow("Search Your Show: ") // 3
.flatMapConcat { query ->  // 4
fetchSuspendResult(query)
.flatMap(parseSuspendResult).sst(ctx) // 5
.second.fold(
onSuccess = { it.asFlow() }, // 6
onFailure = { emptyFlow() }) // 7
}
}
``````
``````@OptIn(FlowPreview::class)
fun main() {
runBlocking { // 1
searchTvShow(Dispatchers.IO) // 2
.collect { // 3
println("Score: \${it.score}  " +
"Name: \${it.show.name} " +
"Genres: \${it.show.genres}") // 4
println(it.show.summary)
println("--------------------------")
}
}
}
``````

The SharedFlow<T> & StateFlow<T> data types

`SharedFlow<T>` and `StateFlow<T>` are two additional flavors the coroutines API provides for flows. In terms of data types and the functions they provide, you can think of `SharedState<T>` and `StateFlow<T>` as implementations of `Flow<T>` with specific behavior when collected by multiple collectors.

Key points

• The `List<T>` data type allows you to store an ordered collection of elements of type `T` in an eager way.
• All the elements of a `List<T>`, which is immutable, are present at the moment you create it.
• The `List<T>` data type is a functor and monad because of the presence of `map` and `flatMap`. You can also make it an applicative functor by implementing `ap`.
• The `Sequence<T>` data type allows you to generate a sequence of values of type `T` in a lazy way.
• In a `Sequence<T>`, `map` and `flatMapConcat` are invoked when the values need to be collected and consumed.
• A `Sequence<T>` can work as a functor, applicative functor and monad.
• The `Flow<T>` data type is similar to `Sequence<T>` but in the context of a coroutine.
• Suspendable functions are an idiomatic and powerful tool to handle side effects in Kotlin.
• A `Flow<T>` allows you to generate a sequence, or flow, of values of type `T` that can be generated from suspendable functions.
• You can implement `curry` and composition for suspendable functions as you did for non-suspendable ones, just following the functional programming principles you learned in the previous chapters.
• You can repeat for `SharedFlow<T>` and `StateFlow<T>` the same process you followed for a `Flow<T>`.

Where to go from here?

Congratulations! In this chapter, you had the opportunity to apply concepts you learned in the previous chapter in a concrete example that allowed you to fetch information about your favorite TV shows. You’ve learned how to create `Sequence<T>` and how to use `Flow<T>` in an environment of concurrency. Finally, you’ve empowered your functional thinking by implementing abstractions for composing suspendable functions, returning a `Result<T>` monad. It’s been a lot of work and also a lot of fun!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.