Chapters

Hide chapters

Modern Concurrency in Swift

First Edition · iOS 15 · Swift 5.5 · Xcode 13

Section I: Modern Concurrency in Swift

Section 1: 11 chapters
Show chapters Hide chapters

10. Actors in a Distributed System
Written by Marin Todorov

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

In the previous chapters, you learned how to run concurrent tasks in parallel on multiple CPU cores. Furthermore, you learned how to use actor types to make concurrency safe. In this last chapter of the book, you’ll cover the advanced topic of distributed actors: actors that run not only locally, but also in other processes — or even on different machines altogether.

actor actor actor Laptop Desktop

At the time of this writing, Apple is:

  • Gathering feedback on an experimental distributed actors language feature through a Swift Evolution process pitch. Once the proposal is accepted, the feature will land in a future release of Swift.
  • Working on and implementing feedback for a “Swift Distributed Actors” package — a cluster library for the upcoming distributed actor language feature.

Since these are currently works-in-progress, you’ll build your own custom distributed system in this chapter, to play around with the idea of using actors in a distributed environment. You’ll only use local actors for now, and you’ll write your own logic to make them cooperate across the network.

Once Apple merges the distributed actors language feature in an upcoming Swift release, we’ll update this chapter to remove the custom implementation and redesign the step-by-step instructions to use the latest and greatest syntax.

Note: The generic design of the language syntax for distributed actors is thoroughly described in the proposal linked above. In a few places in this chapter, we’ll make a parallel between your custom implementation and how the feature is likely to work when it ships.

The distributed actors model has been around for some time, and libraries offer actors and distributed actors for many languages. Therefore, this chapter includes only a minimal amount of theory that covers the model in general, since Apple hasn’t released that language feature just yet.

Without further ado — could distributed actors come to the stage, please?

Going from local to distributed

You’re already familiar with actors — they let you isolate state by putting an automatic barrier between the type’s synchronized internal state and access from “outside” the synchronized scope. That means that calls from other types are considered outside access and automatically made asynchronous:

otherMethod() OtherType state isolation outside asynchronous access MyActor myVar myMethod() inside synchronous access

Since accessing the actor’s state from outside is done asynchronously, that process may take an arbitrary amount of time. In fact, in Chapter 6, “Testing Asynchronous Code”, you had to implement a custom way to time out asynchronous calls that took longer than expected.

For local actors, the compiler transparently serializes calls that access the actor’s state:

OtherType serializes calls to the actor to protect the shared state MyActor

But the compiler isn’t limited to always injecting the same logic into the isolation layer. For example, distributed actors take advantage of the isolation layer’s asynchronous nature and allow you to add a transport service there. A transport can relay calls to actor methods to another process, to another machine on the network or to a JSON API available on a web server.

Distributed actors introduce the notion of location transparency, which allows you to work with both local and distributed actors in much the same way. In fact, at the point-of-use in your app, they’re interchangeable with very few code changes. Furthermore, location transparency makes it easy to develop your actors locally and release them into a distributed environment. Or, vice versa, you can develop distributed actors and design their unit tests to run locally!

You can choose any kind of transport because the distributed actor language feature is transport agnostic! You can talk to an actor over the network, through Bluetooth connectivity or via custom JSON API of your choice.

For example, in a shopping app, you could call a userOrders() actor method to get a user’s orders. A transport service forwards your call to the server, where an actor counterpart executes usersOrders() and reads the orders from the server database:

userOrders() Database Actor state isolation network transport Database Actor userOrders() Server Device MyView Orders Database

The diagram above shows an example of working with a local database actor that transparently forwards database queries to a server on the web. Your UI layer, MyView, doesn’t know if the database actor is local or distributed — it doesn’t make a difference at the point of use.

If that looks a little bit like magic, don’t fret; distributed actors function by strict rules that make it all possible.

Distributed actors have the following traits:

  • They allow no direct access to data from outside the actor. The model encourages async method calls, which are easy to relay over the transport layer.
  • Distributed actors are addressable. Depending on the type of transport, a distributed actor’s address may be a REST endpoint, a local process ID, a file path or something else that uniquely identifies it.
  • The input and output of actor methods are automatically serialized for transport.

Once the distributed actors language feature ships, most of the traits above will be built in, letting you focus exclusively on designing your business logic. Until then, if you want to create your own distributed systems, you’ll need to write a little more code — as you’ll do in this chapter.

Getting started with SkyNet

Once more, you’ll work on the Sky project from Chapter 7, “Concurrent Code With TaskGroup”. You’ll improve it by adding an actor system that connects to local network devices over Bonjour. This will let you perform more concurrent scans by using system resources across the network.

Enabling the network layer

At the end of Chapter 7, “Concurrent Code With TaskGroup”, you left the Sky project — an app that scans satellite images for alien life signs — in good shape. The user can start a scan by tapping Engage systems, and the app will concurrently iterate over sectors in satellite imagery and scan them.

PpehMfalqlikf TQZeupsr DuvnosuQtibpod KLFaolms DamkariExwuypaxup MCDilcieb boydq esyejd op vaghozc awxedtegaz ruht pa dozxahx tirrf/bahoijag mumi

Creating an actor system

In this section, you’ll add an actor system to the project — a system that controls how scan tasks execute on a single machine. The app model will execute tasks via the local system. By the end of the chapter, it will detect other devices on the network and use their systems, too.

import Foundation

actor ScanSystem {
  let name: String
  let service: ScanTransport?
  
  init(name: String, service: ScanTransport? = nil) {
    self.name = name
    self.service = service
  }
}
private(set) var count = 0

func commit() {
  count += 1
}

func run(_ task: ScanTask) async throws -> String {
  defer { count -= 1 }
  return try await task.run()
}

Connecting to remote systems

To stay in touch with other systems on the network, you’ll need to keep a list of their addresses. ScanTransport already takes care of connecting all the nodes, but it’s up to you to do the “talking” between them.

import Foundation

actor Systems {
  private(set) var systems: [ScanSystem]

  init(_ localSystem: ScanSystem) {
    systems = [localSystem]
  }
}
var localSystem: ScanSystem { systems[0] }
func addSystem(name: String, service: ScanTransport) {
  removeSystem(name: name)
  let newSystem = ScanSystem(name: name, service: service)
  systems.append(newSystem)
}

func removeSystem(name: String) {
  systems.removeAll { $0.name == name }
}

Monitoring your systems’ connectivity

Open ScanModel.swift and add these new properties:

@MainActor @Published var isConnected = false
private var systems: Systems
private(set) var service: ScanTransport
Nigeq’z aZad Xoru’c oVlidi Tot’y oBzafu Wopav’h eCef Povg’s iTzalo Vet’r uPkipo Nusuk’f oQaj Difz’v oQqoni Daqi’k eQyepa Qoxm’y iLjile Noli’v iRyaru Zah’w uQdaxi Rerz’c eFgocu Vibeq’r eDid Ceq’c uVvusu Biqi’b aJcari

let localSystem = ScanSystem(name: localName)
systems = Systems(localSystem)
service = ScanTransport(localSystem: localSystem)
service.taskModel = self
func systemConnectivityHandler() {
  Task {

  }
  Task {
  
  }
}
for await notification in 
  NotificationCenter.default.notifications(named: .connected) {

  guard let name = notification.object as? String else { continue }
  print("[Notification] Connected: \(name)")
  await systems.addSystem(name: name, service: self.service)
  Task { @MainActor in
    isConnected = await systems.systems.count > 1
  }
}
for await notification in
  NotificationCenter.default.notifications(named: .disconnected) {

  guard let name = notification.object as? String else { return }
  print("[Notification] Disconnected: \(name)")
  await systems.removeSystem(name: name)
  Task { @MainActor in 
    isConnected = await systems.systems.count > 1
  }
}
systemConnectivityHandler()

Adding a connectivity indicator

Open SkyApp.swift and, just below the alert view modifier, insert:

.toolbar {
  if scanModel.isConnected {
    Image(systemName: "link.circle")
  }
}

[MCNearbyDiscoveryPeerConnection] Read failed.
[MCNearbyDiscoveryPeerConnection] Stream error occurred: Code=54 "Connection reset by peer"
[Notification] Connected: iPhone 12
[GCKSession] Failed to send a DTLS packet with 117 bytes; sendmsg error: No route to host (65).
[GCKSession] Something is terribly wrong; no clist for remoteID [1104778395] channelID [-1].
...

Sending a task to a remote system

Next, you want to send a ScanTask over the network for remote execution. To do this, you need to make it Codable. Open ScanTask.swift and add a Codable conformance to that type:

struct ScanTask: Identifiable, Codable {
func send(
  task: ScanTask,
  to recipient: String
) async throws -> String {
  guard let targetPeer = session.connectedPeers.first(
    where: { $0.displayName == recipient }) else {
      throw "Peer '\(recipient)' not connected anymore."
    }

  let payload = try JSONEncoder().encode(task)
  try session.send(payload, toPeers: [targetPeer], with: .reliable)
}

Managing a network request lifetime

There are three possible outcomes of sending a network request to another SkyNet system:

Cecrov noweevd Vykom e baziiox abbok Fadu qiriigz Yay pyi diob kifxonnixzeq? Or cme wabesad zonusiew caixmas? Toyomy rte ramjoqqa la mod jic fu

actor TimeoutTask<Success> {
let networkRequest = TimeoutTask(seconds: 5) { () -> String in

}

Receiving a response from a remote system

At this point, you need to add a new type that can transport the result of a scan back to the original system. You need a simple type to wrap the string that Task.run() returns.

struct TaskResponse: Codable {
  let result: String
  let id: UUID
}
for await notification in
  NotificationCenter.default.notifications(named: .response) {

  guard let response = notification.object as? TaskResponse,
        response.id == task.id else { continue }
  return "\(response.result) by \(recipient)"
}
fatalError("Will never execute")
Task {
  for await notification in
    NotificationCenter.default.notifications(named: .disconnected) {

    guard notification.object as? String == recipient else { continue }
    await networkRequest.cancel()
  }
}
return try await networkRequest.value

Executing requests from other systems

In this section, you’ll add a method to your model that executes a task when a remote system asks it to do so. In the end, there’s no point in asking remote systems to run tasks if they don’t really do it, right?

func run(_ task: ScanTask) async throws -> String {
  Task { @MainActor in scheduled += 1 }
  defer {
    Task { @MainActor in scheduled -= 1 }
  }
  return try await systems.localSystem.run(task)
}

Sending a result back

You’ve made a lot of progress in this chapter! You’ve now reached the point when you’ll send the result of a scan back to the original system.

func send(response: TaskResponse, to peerID: MCPeerID) throws {
  guard session.connectedPeers.contains(peerID) else {
    throw "Peer '\(peerID)' not connected anymore."
  }

  let payload = try JSONEncoder().encode(response)
  try session.send(payload, toPeers: [peerID], with: .reliable)
}

Handling incoming data

With the methods to send requests and responses in place, you also need to add the session handler method that accepts data and handles it correctly, depending on whether it’s an incoming request or a response.

let decoder = JSONDecoder()
if let task = try? decoder.decode(ScanTask.self, from: data) {

}
Task { [weak self] in
  guard let self = self, 
        let taskModel = self.taskModel else { return }

  let result = try await taskModel.run(task)
  let response = TaskResponse(result: result, id: task.id)
  try self.send(response: response, to: peerID)
}

Handling responses

Append the following to the bottom of session(_:didReceive:fromPeer:):

if let response = try? decoder
  .decode(TaskResponse.self, from: data) {

  NotificationCenter.default.post(
    name: .response, 
    object: response
  )
}

Putting everything together

To review, you’ve taken care of the following issues so far:

func firstAvailableSystem() async -> ScanSystem {
  while true {
    for nextSystem in systems where await nextSystem.count < 4 {
      await nextSystem.commit()
      return nextSystem
    }
    await Task.sleep(seconds: 0.1)
  }
  fatalError("Will never execute")
}
func runAllTasks() async throws {
  started = Date()
  try await withThrowingTaskGroup(
    of: Result<String, Error>.self
  ) { [unowned self] group in
  
    for try await result in group {
      switch result {
      case .success(let result):
        print("Completed: \(result)")
      case .failure(let error):
        print("Failed: \(error.localizedDescription)")
      }
    }
    await MainActor.run {
      completed = 0
      countPerSecond = 0
      scheduled = 0
    }
    print("Done.")
  }
}
func worker(number: Int, system: ScanSystem) async 
  -> Result<String, Error> {
result = try .success(await system.run(task))
if let service = service {
  return try await service.send(task: task, to: name)
} else {
  return try await task.run()
}
for number in 0 ..< total {
  let system = await systems.firstAvailableSystem()
  group.addTask {
    return await self.worker(number: number, system: system)
  }
}

Adding some UI bling

While it’s pretty impressive to make simulators join SkyNet and work together, presentation is important, too. Right now, collaborating on the search for alien life seems a little… unspectacular.

@MainActor @Published var scheduled = 0 {
  didSet {
    Task {
      let systemCount = await systems.systems.count
      isCollaborating = scheduled > 0 && systemCount > 1
    }
  }
}

Retrying failed tasks

While it might seem like your work in this chapter is done, there’s one final task to take care of.

struct ScanTaskError: Error {
  let underlyingError: Error
  let task: ScanTask
}

Result<String, ScanTaskError>
let result: Result<String, ScanTaskError>
result = .failure(.init(
  underlyingError: error,
  task: task
))
withThrowingTaskGroup(of: Result<String, ScanTaskError>.self)
case .failure(let error):
  print("Failed: \(error.localizedDescription)")
case .failure(let error):
  group.addTask(priority: .high) {
    print("Re-run task: \(error.task.input).", 
          "Failed with: \(error.underlyingError)")
    return await self.worker(
      number: error.task.input,
      system: self.systems.localSystem)
  }
Completed: 11
Completed: 9 by Marin's iPod
Re-run task: 16. Failed with: UnreliableAPI.action(failingEvery:) failed. <---
Completed: 12
Completed: 13 by Ted's iPhone
Completed: 14 by Ted's iPhone
Completed: 17
Completed: 15
Completed: 18
Completed: 16
Re-run task: 19. Failed with: UnreliableAPI.action(failingEvery:) failed. <---
Completed: 19
Done.

Key points

  • An upcoming distributed actor language feature will allow developers to talk to remote actors almost as if they were local ones.
  • There is a work-in-progress “Swift Distributed Actors” package for running actors on server clusters.
  • Systems of distributed actors communicate over a transport layer that can use many different underlying services: local network, Bonjour, REST service, web socket and more.
  • Thanks to location transparency, regardless of whether the actor method calls are relayed to another process or a different machine, you use a simple await call at the point of use.
  • Building a custom distributed system isn’t difficult once you implement the transport layer.
  • In a system of distributed actors, each one needs a unique address so requests can be relayed reliably to the target peer and the responses delivered back to the original actor.
  • Using distributed actors can fail for a myriad of reasons, so asynchronous error handling plays an even more significant role in such apps.
  • Last but not least, a distributed app uses the same APIs as a local app: async/await, task groups and actors. The actor model allows for encapsulating the transport layer and keeping its implementation hidden from the API consumers.

Where to go from here?

Completing this book is no small feat!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now