Search

Learning how to think in pipelines with Combine

Drew Fitzpatrick

7 min read

May 18, 2021

Learning how to think in pipelines with Combine

In this post, we’re going to follow an iterative, real-world example of how to solve a complex problem with Combine. You’ll also learn to how to overcome the pitfalls of procedural thinking when designing Combine pipelines. Let’s get started.

How would you solve this problem?

You’re implementing a complex data loading system.

  • You have data sources A, B, and C to read from
  • Each needs to be connected/initialized before reading any data from it
  • To initialize B and C, you must read a configuration object from A
  • All the data sources are synced from a cloud service automatically when initialized, which could take a variable amount of time for each
  • An auth token is required to open the data sources, which must be fetched from a web service

With each of these requirements, the complexity grows. In a real project, these requirements may have been added over months and multiple shipping versions of the app. Without the full context from the start, accounting for the final complexity becomes very difficult.

An experienced reader may have already recognized these as asynchronous problems. Knowing that the complexity compounds further. We have to manage callbacks and dispatch queues to avoid blocking the main thread, tricky but nothing too painful. You may even reach for operation queues which would also help with the dependency management for this data.

You can download the full Swift Playground and follow along. There are multiple pages, each corresponding to one of the steps below, and a Common.swift file that contains some of the convenience functions and type definitions used in these examples.

Simplicity is Key, Right?

In a naive, single-threaded case (or our glorious async/await future, but that’s another blog post), your code may look something like this:

// From page "01 - Sequential BG Queue"
func getAllDataSources(userName: String) -> MyDataSourceFacade {
    let token = getTokenFromServer()

    let A = getDataSourceA(token)

    let userData = A.getData(for: userName)

    let B = getDataSourceB(userData, token)
    let C = getDataSourceC(userData, token)

    return MyDataSourceFacade(userData, A, B, C)
}

You may notice one big thing that’s missing from this example: error handling. So it would be a bit more complex in reality but roughly the same structure.

To get this off the main thread, we’d need something like the following:

// From page "01 - Sequential BG Queue"
DispatchQueue.global(qos: .userInitiated).async {
    let facade = getAllDataSources(userName: "Jim")

    DispatchQueue.main.async {
        print("done!")
        // do something with facade
    }
}

It’s a familiar pattern, but it’s very brittle and is prone to simple errors when adding functionality. It’s also very static. What if someone refactors the code and forgets to dispatch the code off and on the main thread properly? What if the auth token expires and we need to start the process over?

A First Try with Combine

Thankfully these things are much easier in a pipeline-oriented paradigm like Combine. A very natural way to update this for Combine is to replace the variables with Subjects or @Published properties then fuse them all together like this:

class FacadeProvider {

    @Published private var token: String
    @Published private var A: MyDataSource
    @Published private var B: MyDataSource
    @Published private var C: MyDataSource
    @Published private var userData: MyUserData

    private var cancellables: [AnyCancellable] = []

    func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Never> {

        cancellables = []

        getTokenPublisher()
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .token, on: self)
            .store(in: &cancellables)

        $token
            .tryMap { getDataSourceA($0) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .A, on: self)
            .store(in: &cancellables)

        $A
            .tryMap { $0.getData(for: userName) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .userData, on: self)
            .store(in: &cancellables)

        let userAndTokenPub = $userData.combineLatest($token)

        userAndTokenPub
            .tryMap { getDataSourceB($0.0, $0.1) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .B, on: self)
            .store(in: &cancellables)

        userAndTokenPub
            .tryMap { getDataSourceC($0.0, $0.1) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .C, on: self)
            .store(in: &cancellables)

        return $userData.combineLatest($A, $B, $C)
            .map { (userData, A, B, C) -> MyDataSourceFacade? in
                return MyDataSourceFacade(userData, A, B, C)
            }
            .subscribe(on: backgroundQueue)
            .receive(on: DispatchQueue.main)
            .eraseToAnyPublisher()
    }
}

This is a pretty direct translation from our naive example, and it’s easy to figure out what’s happening. I purposely chose this because it’s what those new to Combine will likely think to do when hearing about @Published, myself included. It’s a bit more verbose, but we constructed valid pipelines, logged errors (albeit with a helper function) and guaranteed the threading behavior we wanted.

Better? Or Worse…

However, I’ve glossed over a pretty big problem with this implementation: it doesn’t actually work. We’ve defined our properties as non-optional, so when we create this type, each property must contain a value. However, we don’t have initial values for these complex data types.

So let’s change this to actually work, using optional properties where needed:

// From page "02 - Combine First Try"
class FacadeProvider {

    @Published private var token: String?
    @Published private var A: MyDataSource?
    @Published private var B: MyDataSource?
    @Published private var C: MyDataSource?
    @Published private var userData: MyUserData?

    private var cancellables: [AnyCancellable] = []

    func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Never> {

        cancellables = []

        getTokenPublisher()
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .token, on: self)
            .store(in: &cancellables)

        $token
            .ignoreNil()
            .tryMap { getDataSourceA($0) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .A, on: self)
            .store(in: &cancellables)

        $A
            .ignoreNil()
            .tryMap { $0.getData(for: userName) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .userData, on: self)
            .store(in: &cancellables)

        let userAndTokenPub = $userData.ignoreNil().combineLatest($token.ignoreNil())

        userAndTokenPub
            .tryMap { getDataSourceB($0.0, $0.1) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .B, on: self)
            .store(in: &cancellables)

        userAndTokenPub
            .tryMap { getDataSourceC($0.0, $0.1) }
            .logError()
            .subscribe(on: backgroundQueue)
            .assign(to: .C, on: self)
            .store(in: &cancellables)

        return $userData.combineLatest($A, $B, $C)
            .compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
                guard let userData = userData,
                      let A = A,
                      let B = B,
                      let C = C else {
                    return nil
                }

                return MyDataSourceFacade(userData, A, B, C)
            }
            .subscribe(on: backgroundQueue)
            .receive(on: DispatchQueue.main)
            .eraseToAnyPublisher()
    }
}

This is starting to get messy. Not to mention that our error handling could use some improvement. In this implementation, the caller of this function will never receive an Error, because the Publisher they’re returned is only connected to the @Published properties (whose Failure types are Never). This is a problem because if any setup goes awry and the process needs to start over, the caller will just wait quietly for a value/error that will never come. That’s obviously not ideal.

Wield the Pipeline(s)

The problem here is with how we’ve decided to model the problem with Combine. We did something that seemed natural to a developer who has worked almost exclusively with procedural code, which I’d bet is most of us in the iOS/Mac developer community. But that’s not what Combine is made for. We need to model this as a reactive stream: multiple signals that come together to give a complex output value.

Here’s a more “Combine-flavored” solution:

// From page "03 - Combine Flavored"
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

    let tokenPub = getTokenPublisher()

    let APub = tokenPub
        .tryMap { getDataSourceA($0) }

    let userDataPub = APub
        .tryMap { $0.getData(for: userName) }

    let userAndTokenPub = userDataPub.combineLatest(tokenPub)

    let BPub = userAndTokenPub
        .tryMap { getDataSourceB($0.0, $0.1) }

    let CPub = userAndTokenPub
        .tryMap { getDataSourceC($0.0, $0.1) }

    return userDataPub.combineLatest(APub, BPub, CPub)
        .compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
            print("Returning facade")
            return MyDataSourceFacade(userData, A, B, C)
        }
        .subscribe(on: backgroundQueue)
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
}

This is so much better! No more managing subscriptions with AnyCancellable! No more assigning to properties! We’re returning errors properly to the caller! But there is one wrinkle in this code that can trip you up. When we run this, we notice in our server logs that we’re contacting the auth server six times for the token every time we create a facade. Huh, that’s weird… Let’s take a look at why this is happening.

Above is a diagram of what we expected our above code to do, and what actually happened. On the left, we see the simple data flow we intended to define in the previous code sample, where each value is only created once. On the right, we see the actual outcome, where every intermediate value is being duplicated for every receiver. This is because the publishers we defined are only “recipes” for creating Subscriptions.

Subscriptions are the actual data-flow connections that are created when a subscriber connects to the pipeline. The subscription process happens in reverse, against the flow of data. By default, publishers don’t know about their existing subscriptions, they must create a new subscription to their upstream source each time they receive a downstream connection. That’s what you want in most cases, where stateless, value-type semantics offer safety and convenience, but in our case we only need these intermediate publishers to load their data a single time.

Spread the Word with Reference-type Publishers

Luckily Combine has a solution for this: class-type publishers like ShareMulticast, and Autoconnect. Share is the simplest to use since (per Apple’s documentation) it’s “effectively a combination of Multicast and PassthroughSubject, with an implicit .autoconnect().” We’ll update our re-used publishers to use .share() so they can publish to multiple downstreams.

// From page "04 - Shared Publishers
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

    let tokenPub = getTokenPublisher()
        .share()

    let APub = tokenPub
        .tryMap { getDataSourceA($0) }
        .share()

    let userDataPub = APub
        .tryMap { $0.getData(for: userName) }
        .share()

    let userAndTokenPub = userDataPub.combineLatest(tokenPub)
        .share()

    let BPub = userAndTokenPub
        .tryMap { getDataSourceB($0.0, $0.1) }

    let CPub = userAndTokenPub
        .tryMap { getDataSourceC($0.0, $0.1) }

    return userDataPub.combineLatest(APub, BPub, CPub)
        .compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
            print("Returning facade on (Thread.current.description)")
            return MyDataSourceFacade(userData, A, B, C)
        }
        .subscribe(on: backgroundQueue)
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
}

 

And that’s it! For real this time. Sorry for the deception, but I wanted to present this in a realistic, iterative-problem-solving way, so you could directly see what sort of issues you may run into when using Combine in the real world.

In fact, this blog post is almost exactly the path I took in a recent project (minus a lot of frustration and soul-searching along the way). But once I had a breakthrough on how to use Combine “The Right Way,” I was honestly giddy. And I never use that word (it sounds gross to me). So I felt the need to share and hopefully help anyone else out there struggling to take their first steps into the reactive world.

Speak with a nerd

Schedule a call today! Our team of nerds are ready to help

Let's Talk

Related Posts

We are ready to discuss your needs.

Stay in Touch WITH Big Nerd Ranch News