Swift并发任务是否按顺序运行?

ogsagwnx  于 5个月前  发布在  Swift
关注(0)|答案(1)|浏览(48)

我找不到讨论任务是否并发运行的文档,或者任务是否在某个不可见的队列中按顺序运行。
下面是我的应用程序遇到的一个简化问题,它可以在操场上运行,这引发了这个问题。

import UIKit
import Foundation
import Combine

struct Info {
    var id: String
    var value: Int
}

class DataStore {
        // pretend this is storing into core data
    func store(info: Info, id: String) {
        print("    store \(info)")
        let start = CACurrentMediaTime()
        while CACurrentMediaTime() - start < 2 { }  
    }
}

let dataStore = DataStore()
let subj = PassthroughSubject<Info, Never>()
let cancel = subj.sink { info in
    print("Start task for \(info)")
    // is there a way to queue tasks so that we
    Task {
        print("  start \(info)")
        dataStore.store(info: info, id: info.id)
        print("  finish: \(info)")
    }
}

subj.send(Info(id: "A", value: 1))
subj.send(Info(id: "A", value: 2))
subj.send(Info(id: "A", value: 3))
subj.send(Info(id: "A", value: 4))

let queueA = DispatchQueue(label: "A", attributes: .concurrent)
let queueB = DispatchQueue(label: "B", attributes: .concurrent)

queueA.async {
    subj.send(Info(id: "A", value: 1))
    subj.send(Info(id: "A", value: 2))
    subj.send(Info(id: "A", value: 3))
    subj.send(Info(id: "A", value: 4))
}

queueB.async {
    subj.send(Info(id: "B", value: 1))
    subj.send(Info(id: "B", value: 2))
    subj.send(Info(id: "B", value: 3))
    subj.send(Info(id: "B", value: 4))
}

queueA.async {
    subj.send(Info(id: "A", value: 1))
    subj.send(Info(id: "A", value: 2))
    subj.send(Info(id: "A", value: 3))
    subj.send(Info(id: "A", value: 4))
}

queueB.async {
    subj.send(Info(id: "B", value: 1))
    subj.send(Info(id: "B", value: 2))
    subj.send(Info(id: "B", value: 3))
    subj.send(Info(id: "B", value: 4))
}

// Note that a closure is not started until the other one has finished

字符串
请注意,在前一个闭包完成之前,闭包永远不会开始,现在我不知道这是因为passthrough主题使事情保持顺序,还是因为发布者的其他原因。
我知道这不是一个完美的例子,因为出版商,但我的应用程序有旧的合并代码接口与新的alcoc-await代码。
P.S.如果我使用BLOG序列而不是出版商,会有什么不同吗?

aemubtdh

aemubtdh1#

博士
你可以控制任务是否并发运行。但Swift并发不是队列,你不能保证它们一定会按照你创建它们的顺序运行。需要其他技术来实现严格的FIFO行为。
有几个问题:

  1. Task {…}是否同时运行?
    是的,这是一个并发系统。一旦你到达一个await挂起点,同一个actor上的其他任务就可以自由交错,从而产生并发。
    现在,您的示例不包含await挂起点,因此没有机会享受并发性。但是,更抽象地说,整个async-await系统都是考虑到并发性而设计的。
    1.单独的Task {…}可以并行运行吗?
    它不是用于此目的,但是,从技术上讲,这取决于您是否从参与者隔离的上下文中启动它。
    Task {…}的全部意义在于代表当前的参与者启动一个新的顶级任务 因此,如果您从一个参与者隔离的上下文启动它,这个新的顶级任务将被隔离到同一个参与者,从而防止并行执行(至少在该参与者上)。
    但是如果Task {…}是从一个非参与者隔离的上下文中启动的,那么结果是它也没有被隔离到任何特定的参与者。因此,它可以并行执行。
    所以,底线,这取决于。但是如果意图是并行执行,我不会倾向于依赖这种“如果从非参与者隔离的上下文调用,那么Task也不是参与者隔离的”行为。我更愿意明确并行执行的意图。具体来说,我要么使用Task.detached {…},一个任务组,或者async let。您确实不希望调用者环境中的更改改变您预期的并行执行。
    1.让我们假设你有一系列任务,它们不是以并发的方式运行的,这是否意味着它们是顺序运行的,即FIFO?
    答案是否定的,Swift并发系统不一定会按照任务提交的顺序运行任务。并发系统会倾向于按照任务提交的顺序启动任务,但并不严格如此。(参见下面的示例)
    如果在Swift并发系统中需要严格的FIFO顺序行为,则必须使用awaitAsyncSequence(例如AsyncStreamAsyncChannel)。
    有关更多信息,请参阅WWDC 2021视频Swift concurrency: Behind the scenesMeet AsyncSequence
    一个Task“[运行]给定的.操作异步作为一个新的顶级任务的一部分,代表当前的演员。”正如Swift编程语言:并发性说,“演员只允许一个任务访问他们的可变状态在同一时间”。所以,简而言之,如果你的pasc-await任务是在一个演员,这将阻止并行执行。
    为了说明这一点,我将在Instruments中分析该应用程序(使用OSSignposter间隔),因此我们可以很容易地看到正在发生的事情的时间轴。
    无论如何,考虑这个SwiftUI示例,其中Task * 不 * 隔离到任何特定的actor:
import SwiftUI
import os.log

let poi = OSSignposter(subsystem: "Test", category: .pointsOfInterest)

struct ContentView: View {
    var body: some View {
        VStack {
            Button("Launch Ten Tasks") {
                launchTenTasks()
            }
        }
        .padding()
    }

    func launchTenTasks() {
        let experiment = Experiment()

        for i in 0 ..< 10 {
            Task {
                experiment.spin(index: i, for: .seconds(1))
            }
        }
    }
}

class Experiment {
    func spin(index: Int, for duration: Duration) {
        let id = poi.makeSignpostID()
        poi.withIntervalSignpost(#function, id: id, "\(index)") {
            let start = ContinuousClock().now
            while start.duration(to: .now) < duration { }              // blocking thread is bad idea, but just simulating some slow, synchronous task
        }
    }
}

字符串
这产生:
x1c 0d1x的数据
您可以看到任务正在并行运行。
这是一个例外,这只是一个演示,说明单独使用Task不一定足以避免并行执行。
但是我们经常将任务与参与者结合使用,在这种情况下,您确实实现了非并行执行。
因此,如果我将Experiment设置为actor,则这些方法将是actor隔离的,因此不会并行运行。或者,如果Task {…}是actor隔离的,则这些同步函数调用也不会并行运行。
为了说明这一点,可以让它在带有@MainActor限定符的主actor上运行(在这个例子中这是不好的,因为它会阻塞主线程)。或者你可以将它隔离到你自己的actor上:

actor Experiment {
    func spin(index: Int, for interval: TimeInterval) {
        ...
    }
}


屈服:



我想提醒你注意的是,这实现了串行行为,但这不是一个队列。请注意,第8和第9项不是按顺序的。(它们基本上是按顺序处理的,但不是严格按顺序处理的。)如果你想要以严格的FIFO方式处理项的类似于队列的行为,你可以使用AsyncSequence,比如AsyncStreamAsyncChannel
在你的例子中,如果你的DataStore是一个actor,那将阻止并发执行。而且,FWIW,store不应该被标记为async方法,因为它没有做任何异步的事情(至少,在这一点上)。
关于你的例子还有一些其他的警告:
1.你的任务是如此之快,以至于很难看出它是顺序运行还是并行运行:请注意,我用一个人为的较慢的例子来执行我的测试,精确地表明并行与非并发执行。

1.你在操场上做了你的测试:恕我直言,操场不是一个测试并发的好环境。你想在一个测试应用程序中运行这个,这个测试应用程序运行在一个与你最终目标平台平行的环境中。操场上有各种各样的特殊行为,很容易得出错误的结论。
1.我建议不要将合并和(特别是)GCD与GCD-await代码混合在一起。
1.我会避免使用非结构化并发(例如,Task {…}Task.detached {…})。当然,当从同步上下文桥接到异步上下文时,我们必须使用这些模式,但否则我们会谨慎使用它。对于非结构化并发,我们负责手动将取消传播到我们创建的任务。如果您不将结果Task对象保存在某个属性中,你就失去了以后取消它的能力。
所以,在一个真实的应用程序中运行它,如果你想确保它不会并行运行,请确保你的方法与一个actor隔离。

相关问题