Modern Concurrency: Beyond the Basics

Oct 20 2022 · Swift 5.5, iOS 15, Xcode 13.4

Part 1: AsyncStream & Continuations

02. AsyncStream

Episode complete

Play next episode

Next
About this episode

Leave a rating/review

See forum comments
Cinema mode Mark complete Download course materials
Previous episode: 01. Introduction Next episode: 03. Using AsyncStream to Count Down

Get immediate access to this and 4,000+ other videos and books.

Take your career further with a Kodeco Personal Plan. With unlimited access to over 40+ books and 4,000+ professional videos in a single subscription, it's simply the best investment you can make in your development career.

Learn more Already a subscriber? Sign in.

Notes: 02. AsyncStream

  • For more about push-based vs pull-based AsyncStreams, see AsyncSequence & AsyncStream Tutorial for iOS.
  • This video uses Xcode 14’s Task.sleep(until:clock:). If you use Xcode 13, replace this with Task.sleep(nanoseconds: 1_000_000_000).

Heads up... You’re accessing parts of this content for free, with some sections shown as obfuscated text.

Heads up... You’re accessing parts of this content for free, with some sections shown as obfuscated text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

In this episode, you’ll learn about AsyncStream, an easier way to create a custom AsyncSequence.

Custom AsyncSequence from episode 4 of Getting Started

In the preceding course, you created this simple typewriter with a custom AsyncSequence that “types” a phrase, adding a character every second.

struct Typewriter: AsyncSequence {
  typealias Element = String
  let phrase: String

  func makeAsyncIterator() -> TypewriterIterator {
    return TypewriterIterator(phrase)
  }
}

struct TypewriterIterator: AsyncIteratorProtocol {
  typealias Element = String
  let phrase: String
  var index: String.Index

  init(_ phrase: String) {
    self.phrase = phrase
    self.index = phrase.startIndex
  }

  mutating func next() async throws -> String? {
    guard index < phrase.endIndex else { return nil }
    try await Task.sleep(until: .now + .seconds(1),
                         clock: .continuous)
    defer { index = phrase.index(after: index) }
    return String(phrase[phrase.startIndex...index])
  }
}
Task {
  for try await item in Typewriter(phrase: "Hello, world!") {
    print(item)
  }
  print("AsyncSequence Done")
}
He
Hel
Hell
Hello
Hello,
Hello, 
Hello, w
Hello, wo
Hello, wor
Hello, worl
Hello, world
Hello, world!
AsyncSequence Done

AsyncStream Typewriter

In the next section, the starter has already set up test phrase and index properties:

let phrase = "Hello, world!"
var index = phrase.startIndex
let stream_pull = AsyncStream<String> {

}
let stream_pull = AsyncStream<String> {
  🟩
  guard index < phrase.endIndex else { return nil }
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
  defer { index = phrase.index(after: index) }
  return String(phrase[phrase.startIndex...index])
  🟥
}
let stream_pull = 🟩AsyncThrowingStream<String, Error🟥> {
do {
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
} catch {
  return nil
}
Task {
  for try await item in stream_pull {
    print(item)
  }
  print("Pull AsyncStream Done")
}

AsyncStream: pull or push?

Option-click AsyncStream then Open in Developer Documentation. Scroll down to Topics.

// These two lines are already above the pull-based stream code
let phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in  // closure receives a continuation
  Task {  // wrap any asynchronous code in a Task
  }
}
while index < phrase.endIndex {  // while index hasn't reached the end of phrase

}
while index < phrase.endIndex {
  do {
    try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
  } catch {
    // delete return nil
  }
}
continuation.yield(String(phrase[phrase.startIndex...index])) // copy pull-based return value
index = phrase.index(after: index)  // copy index increment from pull-based defer
continuation.finish()
// These two lines are already above the pull-based stream code
var phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in
  Task {
    while index < phrase.endIndex {
      do {
        try await Task.sleep(until: .now + .seconds(1),
                             clock: .continuous)
        continuation.yield(String(phrase[phrase.startIndex...index]))
        index = phrase.index(after: index)
      } catch {
        continuation.finish()
      }
    }
    continuation.finish()
  }
}

🟩Task {
  for try await item in stream_push {  // change pull to push
    print(item)
  }
  print("Push AsyncStream Done")  // change pull to push
}🟥