Dart: Futures and Streams

Learn how to use Futures and Streams for writing asynchronous code in dart By Michael Katz.

5 (1) · 1 Review

Download materials
Save for later
Share
You are currently viewing page 3 of 4 of this article. Click here to view the first page.

Error Handling with Await

You previously used catchError blocks to handle Future errors. By using await, you can instead use a regular try/catch. For example, rewrite the previous error handling by replacing main with the following:

Future<int> getCityCount() async {
  throw NetworkError();
}

void main() async {
  try {
    final cityCount = await getCityCount();
    print("Got a value: $cityCount");
  } catch(e) {
    print("Got an error: $e");
  }
}

The getCityCount function should theoretically return the number of cities, but it instead throws a NetworkError. In main, you can wrap the call to getCityCount with a try/catch to catch the error and avoid a crash.

In the above example, the first print never executes because getCityCount eventually returns an exception. It’s caught in the catch block and printed to the console.

Just like using test with the catchError method on Future, you can also stack catch blocks to handle various errors in various ways. Change main once again:

void main() async {
  try {
    final cityCount = await getCityCount();
    print("Got a value: $cityCount");
  } on LoginError {
    print("Invalid username or password.");
  } on NetworkError {
    print("Network failed, try again.");
  } catch(e) {
    print("Got an error: $e");
  }
}

With this stack of on and catch blocks, the console outputs the specific NetworkError message instead of the generic one. You’ll also see this code is more straightforward to read and understand than the earlier example with error and test callbacks.

Using Streams

Streams are another aspect to asynchronous programming in Dart. A Future represents a one-time value: the app performs an operation and comes back with some data. A Stream represents a sequence of data. For example, you can use a Future to get the entire contents of a file or use a Stream to get the contents a line at a time. A Future could represent the value of a web form after the user presses “Enter,” and a Stream can encapsulate the changing data as the user types.

Replace main with the following:

void main() async {
  var stream = Stream<int>.periodic(const Duration(seconds: 1), (i) => i);
  stream.forEach(print);
}

Here a periodic Stream is created that emits an increasing integer every second. Using forEach, it prints the next value as it’s available.

If you run this, the console updates each second with a new number.

Creating Streams

You can create a Stream by repeatedly sending data from some source such as a file or a network server or by transforming an existing Stream.

Going back to the city example, add the following:

Stream<String> loadCityStream() async* {
  for(final city in fetchCityList()) {
    yield city;
  }
}

This has a few new factors occurring:

  • The return type is now a Stream, which means this returns a Stream providing String values over time.
  • This has the async* annotation. It will not only return immediately like an async function does, but it will return a series of values and wrap them as a Stream.
  • The body iterates over the city list and uses yield to send each value to the stream as the for loop iterates.

To see how it’s used, replace main once again:

void main() async {
  await for (final city in loadCityStream()) {
    print(city);
  }
}

This gets the stream from loadCityStream and uses await for to iterate over each value as it’s yielded. In the integer example, you used forEach. On a Stream, that is equivalent to await for. It’s the same relation as a list’s forEach is to a regular for operation.

When you run this code, the output will appear too quickly. Change loadCityStream so you see each value appear one at a time:

Stream<String> loadCityStream() async* {
  for(final city in fetchCityList()) {
    await Future.delayed(Duration(milliseconds: 500));
    yield city;
  }
}

Note that because the function is already async*, you don’t have to update the signature to accommodate the await.

Re-running the program will print the cities one at a time at a noticeable pace.

A common method to generate a Stream is by transforming an existing stream. Several list and iterator methods are also on Stream. You can use them to change a Stream by skipping, filtering and mapping values. You might do this by taking network bytes, deserializing them into objects and then extracting one field from that object. For example, if you had an address book you might load all the entries from a local database, collect the first and last names of each participant and then format them to get a single name Stream.

Try out this example. First add a population helper method:

int calculatePopulationOf(String city) {
  final populations = {'Tokyo': 37274000, 'Delhi': 32065760, 'Shanghai': 28516904, 'São Paulo': 22429800, 'Mexico City': 22085140, 'Cairo': 21750020, 'Beijing': 21333332, 'Mumbai': 20961472, 'Kolkāta': 15133888, 'Manila': 14406059, 'Guangzhou': 13964274, 'Moscow': 12640818, 'Jakarta': 11074811, 'Bangkok': 10899698, 'Seoul': 9975709, 'London': 9540576, 'New York': 8177025};
  return populations[city] ?? -1;
}

That method returns population numbers for a city.

Now update main:

void main() async {
  print("CITY: POPULATION");    
  loadCityStream()
    .map((city) => "$city: " + calculatePopulationOf(city).toString())
    .forEach(print);

  loadCityStream()
    .map(calculatePopulationOf)
    .reduce((value, element) => value + element)
    .then((total) => print("Total known population: $total"));
}

This uses a city loading stream in two ways. The first uses map to construct an output string with the name and population. The second use chains a map to turn the city names into population numbers and a reduce to sum them up. Finally, it’s capped with a then, which takes the final value and prints it.

Subscription and Broadcast Streams

In the previous example, you used two calls to loadCityStream, which looks odd. Try reusing the stream by modifying main:

void main() async {
  final stream = loadCityStream();

  print("CITY: POPULATION");    
  stream
    .map((city) => "$city: " + calculatePopulationOf(city).toString())
    .forEach(print);

  stream
    .map(calculatePopulationOf)
    .reduce((value, element) => value + element)
    .then((total) => print("Total known population: $total"));
}

When you run this, you’ll see the following in the console and the total won’t be computed:

Uncaught Error: Bad state: Stream has already been listened to.

The error is because loadCityStream is a single subscription stream. It can only be listened to once and has a finite start and end. In this example, the stream starts when the listener map is attached and continues to the last city.

In contrast, a broadcast stream can have many listeners and those listeners receive events for as long as they are attached. Creating a broadcast stream can be simple. In main, change the first line:

final stream = loadCityStream().asBroadcastStream();

This converts the subscription stream to a broadcast one. The list will print and the total calculated.