Combine framework serialize async operations

How do I get the asynchronous pipelines that constitute the Combine framework to line up synchronously (serially)?

Suppose I have 50 URLs from which I want to download the corresponding resources, and let's say I want to do it one at a time. I know how to do that with Operation / OperationQueue, e.g. using an Operation subclass that doesn't declare itself finished until the download is complete. How would I do the same thing using Combine?

At the moment all that occurs to me is to keep a global list of the remaining URLs and pop one off, set up that one pipeline for one download, do the download, and in the sink of the pipeline, repeat. That doesn't seem very Combine-like.

I did try making an array of the URLs and map it to an array of publishers. I know I can "produce" a publisher and cause it to publish on down the pipeline using flatMap. But then I'm still doing all the downloading simultaneously. There isn't any Combine way to walk the array in a controlled manner — or is there?

(I also imagined doing something with Future but I became hopelessly confused. I'm not used to this way of thinking.)


Solution 1:

Use flatMap(maxPublishers:transform:) with .max(1), e.g.

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
        .flatMap(maxPublishers: .max(1)) { $0 }
        .eraseToAnyPublisher()
}

Where

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: $0.data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

and

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    } receiveValue: { image in
        // do whatever you want with the images as they come in
    }
}

That resulted in:

serial

But we should recognize that you take a big performance hit doing them sequentially, like that. For example, if I bump it up to 6 at a time, it’s more than twice as fast:

concurrent

Personally, I’d recommend only downloading sequentially if you absolutely must (which, when downloading a series of images/files, is almost certainly not the case). Yes, performing requests concurrently can result in them not finishing in a particular order, but we just use a structure that is order independent (e.g. a dictionary rather than a simple array), but the performance gains are so significant that it’s generally worth it.

But, if you want them downloaded sequentially, the maxPublishers parameter can achieve that.

Solution 2:

I've only briefly tested this, but at first pass it appears that each request waits for the previous request to finish before starting.

I'm posting this solution in search of feedback. Please be critical if this isn't a good solution.

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


A more concise version of this solution (provided by @matt):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            $0.append($1).eraseToAnyPublisher()
        }
    }
}