swift 如何将URLSessionStreamTask与URLSession一起用于分块编码传输

vyu0f0g1  于 2023-02-18  发布在  Swift
关注(0)|答案(2)|浏览(320)

我正在尝试连接Twitter流媒体API端点。看起来URLSession支持通过URLSessionStreamTask进行流媒体传输,但是我不知道如何使用该API。我也没有找到任何示例代码。
我尝试测试以下内容,但未记录任何网络流量:

let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
let stream = session.streamTask(withHostName: "https://stream.twitter.com/1.1/statuses/sample.json", port: 22)
stream.startSecureConnection()
stream.readData(ofMinLength: 0, maxLength: 100000, timeout: 60, completionHandler: { (data, bool, error) in
   print("bool = \(bool)")
   print("error = \(String(describing: error))")
})
stream.resume()

我还实现了委托方法(包括URLSessionStreamDelegate),但它们没有被调用。
如果有人能提供一个代码示例来说明如何为来自流端点的chunked响应打开一个持久连接,这将非常有帮助。另外,我正在寻找不涉及第三方库的解决方案。一个类似于https://stackoverflow.com/a/9473787/5897233但使用URLSession等价物更新的响应将是理想的。

  • 注意:上面的示例代码中省略了授权信息。*
jslywgbw

jslywgbw1#

收到了大量的信息礼貌奎因"爱斯基摩人"在苹果。
唉,你搞错了。URLSessionStreamTask是用来处理一个裸露的TCP(或TCP上的TLS)连接的,上面没有HTTP帧。你可以把它看作是BSD Sockets API的高级等价物。
分块传输编码是HTTP的一部分,因此所有其他URLSession任务类型(数据任务、上载任务、下载任务)都支持它。您不需要执行任何特殊操作来启用它。分块传输编码是HTTP 1.1标准的强制部分,因此总是启用。
但是,您可以选择如何接收返回的数据。(dataTask(with:completionHandler:)等等),URLSession将缓冲所有传入的数据,然后以一个大的Data值将其传递给完成处理程序。这在许多情况下很方便,但它不适用于流资源。在这种情况下,您需要使用URLSession委托-基于API(dataTask(with:)等),它将在数据块到达时调用urlSession(_:dataTask:didReceive:)会话委托方法。
至于我正在测试的特定终点,发现了以下内容:看起来服务器只有在客户端向其发送流请求时才启用其流响应(分块传输编码),这有点奇怪,而且HTTP规范肯定不需要。
幸运的是,可以强制URLSession发送流请求:
1.使用uploadTask(withStreamedRequest:)创建任务
1.实现urlSession(_:task:needNewBodyStream:)委托方法以返回输入流,该输入流在读取时返回请求正文
1.利润!
我已经附上了一些测试代码来演示这一点,在这个例子中,它使用了一个绑定的流对,将输入流传递给请求(按照上面的步骤2),并保留输出流。
如果您希望实际将数据作为请求主体的一部分发送,可以通过写入输出流来实现。

class NetworkManager : NSObject, URLSessionDataDelegate {

static var shared = NetworkManager()

private var session: URLSession! = nil

override init() {
    super.init()
    let config = URLSessionConfiguration.default
    config.requestCachePolicy = .reloadIgnoringLocalCacheData
    self.session = URLSession(configuration: config, delegate: self, delegateQueue: .main)
}

private var streamingTask: URLSessionDataTask? = nil

var isStreaming: Bool { return self.streamingTask != nil }

func startStreaming() {
    precondition( !self.isStreaming )

    let url = URL(string: "ENTER STREAMING URL HERE")!
    let request = URLRequest(url: url)
    let task = self.session.uploadTask(withStreamedRequest: request)
    self.streamingTask = task
    task.resume()
}

func stopStreaming() {
    guard let task = self.streamingTask else {
        return
    }
    self.streamingTask = nil
    task.cancel()
    self.closeStream()
}

var outputStream: OutputStream? = nil

private func closeStream() {
    if let stream = self.outputStream {
        stream.close()
        self.outputStream = nil
    }
}

func urlSession(_ session: URLSession, task: URLSessionTask, needNewBodyStream completionHandler: @escaping (InputStream?) -> Void) {
    self.closeStream()

    var inStream: InputStream? = nil
    var outStream: OutputStream? = nil
    Stream.getBoundStreams(withBufferSize: 4096, inputStream: &inStream, outputStream: &outStream)
    self.outputStream = outStream

    completionHandler(inStream)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
    NSLog("task data: %@", data as NSData)
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
    if let error = error as NSError? {
        NSLog("task error: %@ / %d", error.domain, error.code)
    } else {
        NSLog("task complete")
    }
}
}

您可以从任何地方调用网络代码,例如:

class MainViewController : UITableViewController {

override func tableView(_ tableView: UITableView, didSelectRowAt indexPath: IndexPath) {
    if NetworkManager.shared.isStreaming {  
       NetworkManager.shared.stopStreaming() 
    } else {
       NetworkManager.shared.startStreaming() 
    }
    self.tableView.deselectRow(at: indexPath, animated: true)
}
}

希望这个有用。

uqjltbpv

uqjltbpv2#

因此,这比没有明确任务取消或写入流的示例要弱得多,但如果您只是YOLO侦听服务器发送事件流,则此示例将在2023年2月生效。它基于“Use async/await with URLSession”WWDC 21会话。该会话也有一个使用自定义委托的示例。
https://developer.apple.com/videos/play/wwdc2021/10095/

func streamReceiverTest(streamURL:URL, session:URLSession)  async throws {
        let (bytes, response) = try await session.bytes(from:streamURL)
        guard let httpResponse = response as? HTTPURLResponse else {
            throw APIngError("Not an HTTPResponse")
        } 
        guard httpResponse.statusCode == 200 else {
            throw APIngError("Not a success: \(httpResponse.statusCode)")
        }
    
        for try await line in bytes.lines {
            print(line)
            print()
        }
    }

try await streamReceiverTest(streamURL:URL(string:"https://httpbin.org/get")!, session:URLSession.shared)检查请求并没有显示设置了Accepts头,但是我使用的API似乎需要它来提供流,有些服务器可能会(?),所以我也会包括那个版本。

func streamReceiverTestWithManualHeader(streamURL:URL, session:URLSession)  async throws {
        var request = URLRequest(url:streamURL)
        request.setValue("text/event-stream", forHTTPHeaderField:"Accept") 
        let (bytes, response) = try await session.bytes(for:request)
        guard let httpResponse = response as? HTTPURLResponse else {
            throw APIngError("Not an HTTPResponse")
        } 
        guard httpResponse.statusCode == 200 else {
            throw APIngError("Not a success: \(httpResponse.statusCode)")
        }
    
        for try await line in bytes.lines {
            print(line)
            print()
        }
    }

相关问题