Dart: Futures and Streams

Learn how to use Futures and Streams for writing asynchronous code in dart By Michael Katz.

5 (1) · 1 Review

Download materials
Save for later
Share
You are currently viewing page 4 of 4 of this article. Click here to view the first page.

Listening to Streams

The Stream transform functions like map listen to the source stream and return a new Stream. Any function that receives events from a is a listener.

With all the convenient methods, including then, you'll rarely have to write your own listener. You'll have finer control over what happens when data loads and you'll perform actions on events — such as when the Stream ends.

For example, add to main:

stream
  .listen(
    (city) => print("loaded `$city`"),
    onDone: () => print("all cities loaded")
);

The first callback occurs when data is ready, and onDone is called when the stream is complete. In this case, an all-done message is printed when it's complete.

One other reason to use listen is that it returns a StreamSubscription object, which gives the option to pause or cancel the stream.

Replace what you just added with:

final subscription = stream.listen(
  (city) => print("loaded `$city`"),
  onDone: () => print("all cities loaded")
);

await Future.delayed(Duration(seconds: 2));
subscription.pause();
await Future.delayed(Duration(seconds: 2));
subscription.resume();
await Future.delayed(Duration(seconds: 2));
subscription.cancel();

The change stores the subscription object so it can be paused, resumed and canceled after a short delay. It's worth looking at the console output to understand what the result was for performing these operations:

CITY: POPULATION
Bangkok: 10899698
loaded `Bangkok`
Beijing: 21333332
loaded `Beijing`
Cairo: 21750020
loaded `Cairo`
Delhi: 32065760
Guangzhou: 13964274
Jakarta: 11074811
Kolkāta: 15133888
loaded `Delhi`
loaded `Guangzhou`
loaded `Jakarta`
loaded `Kolkāta`
Manila: 14406059
loaded `Manila`
Mexico City: 22085140
loaded `Mexico City`
Moscow: 12640818
loaded `Moscow`
Mumbai: 20961472
loaded `Mumbai`
New York: 8177025
São Paulo: 22429800
Seoul: 9975709
Shanghai: 28516904
Tokyo: 37274000
Total known population: 302688710

Because city names appear at regular intervals, you can inspect the output to learn what's happening with the Stream and with each of the listeners.

An alternating printout shows the population and the "loaded" message of each city. After 2 seconds (when Cairo was loaded), the listener displaying the "loaded" message was paused. For a while, only the population messages displayed.

After another 2 seconds, the "loaded message" listener resumed. Because it's listening to a broadcast stream, all the missed events were buffered and sent at once, resulting in a few loaded messages being displayed before Manila comes along and both listeners are printing again.

Two seconds later, the subscription cancels and the loaded messages stop altogether.

In the case where you have a single subscriber stream, pausing the subscription stops generating events, and resuming will restart the event generator where it left off.

To try that out, replace main with:

void main() async {
  final stream = loadCityStream();

  final subscription = stream.listen(
    (city) => print("loaded `$city`"),
    onDone: () => print("all cities loaded")
  );

  await Future.delayed(Duration(seconds: 2));
  subscription.pause();
  await Future.delayed(Duration(seconds: 2));
  subscription.resume();
  await Future.delayed(Duration(seconds: 2));
  subscription.cancel();
}

The resulting console output will be:

loaded `Bangkok`
loaded `Beijing`
loaded `Cairo`
loaded `Delhi`
loaded `Guangzhou`
loaded `Jakarta`
loaded `Kolkāta`

No intermediate events occurred when the Stream pauses. When it cancels after 4 seconds, it only loads as far as Kolkāta.

Note: Canceling the Stream means onDone will not get called. If cleanup is needed, it's critical that you do it in onDone as well as the optional completion block of cancel.

You don't have to use listeners with Streams. Instead, you can use await for to iterate over a Streams data or even await to wait until the Streamis done.

For example, replacing main with this simple use brings back the old Future behavior of waiting for all the data to load before continuing.

void main() async {
  final cities = await loadCityStream().toList();
  printCities(cities);
}

Handling Stream Errors

Error handling with Stream is the same as with Futures. You can use catchError or a try/catch with await for.

For example, replace main with:

Stream<String> failingStream() async* {
  throw LoginError();
}

void main() async {
  failingStream()
    .forEach(print)
    .catchError((e) => print("Stream failed: $e"));
}

This has a new helper, failingStream that throws an exception. This is caught and printed with the catchError method.

You can instead use catch with an await for in this example. Replace main with:

void main() async {
  try {
    await for (final city in failingStream()) {
      print(city);
    }
  } catch (e) {
    print("Caught error: $e");
  }
}

Adding Timeouts

Another condition you might want to handle is timeouts. A Stream will wait for the next event as long as there is a listener subscribed. In the case where you might have an indefinitely long computation or waiting for a remote resource to respond, you might want to build in a timeout.

The timeout method takes a duration. As long as an event happens before the duration expires, the Stream continues. Otherwise, by default a TimeoutException is thrown. Like everything else, the timeout behavior can be determined with a callback.

Replace main again:

void main() async {
  print("Loading Cities:");
  loadCityStream()
    .timeout(Duration(milliseconds: 400))
    .forEach(print)
    .catchError((e) => print("Stream timed out, try again."), test: (e) => e is TimeoutException);
}

This adds a 400 millisecond timeout to the operation. Because each city is yielded at 500 milliseconds, it will time out right away and be caught by the catchError block and and an error will be printed.

Where to Go From Here?

That's it for a quick introduction to Futures and Streams in Dart.

From here you can continue to explore more interesting ways to manipulate data in useful ways. Many of the methods in the API are useful for building custom streams and event handling; you're likely to never use them and only build on the higher order transforms.

The power of streams comes in handy for routing data and user events through Flutter widgets, like StreamBuilder. To learn more about asynchronous programming, there is a chapter in The Dart Apprentice that covers streams and more in deep detail.