scala 在流中链化Akka-http-client请求

xghobddn  于 5个月前  发布在  Scala
关注(0)|答案(2)|浏览(47)

我想使用akka-http-client作为Stream来链接http请求。链中的每个http请求都依赖于先前请求的成功/响应,并使用它来构造新的请求。如果请求不成功,Stream应该返回不成功请求的响应。
如何在akka-http中构造这样的流?我应该使用哪个akka-http客户端级别的API?

vktxenjb

vktxenjb1#

如果你正在做一个网络爬虫,看看this post,这个答案解决了一个更简单的情况,比如下载分页的资源,其中到下一个页面的链接在当前页面响应的标题中。
您可以使用Source.unfoldAsync方法创建一个链式源-其中一个项目通向下一个项目。这需要一个函数,该函数接受元素S并返回Future[Option[(S, E)]]以确定流是否应该继续发出E类型的元素,并将状态传递给下一次调用。
在你的情况下,这有点像:
1.取初始值HttpRequest
1.生成Future[HttpResponse]
1.如果响应指向另一个URL,则返回Some(request -> response),否则返回None

然而,有一个问题,如果流中不包含指向下一个请求的指针,它将不会从流中发出响应。

为了解决这个问题,你可以让传递给unfoldAsync的函数返回Future[Option[(Option[HttpRequest], HttpResponse)]]。这允许你处理以下情况:

  • 当前响应为错误
  • 当前响应指向另一个请求
  • 当前响应不指向另一个请求

下面是一些注解代码,概述了这种方法,但首先是初步的:

  • 当HTTP请求流到Akka流中的响应时,您需要确保响应主体被消耗,否则会发生不好的事情(死锁等)。如果您不需要主体,您可以忽略它,但这里我们使用一个函数将HttpEntity从(潜在的)流转换为严格的实体:*
import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

字符串
接下来,几个函数从HttpResponse创建Option[HttpRequest]。这个例子使用了类似Github的分页链接的方案,其中Links头部包含,例如:<https://api.github.com/...> rel="next"

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))


接下来,我们将传递给unfoldAsync的真实的函数。它使用Akka HTTP Http().singleRequest() API获取HttpRequest并生成Future[HttpResponse]

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }


请注意,如果我们得到一个错误响应,流将不会继续,因为我们在下一个状态中返回None
你可以调用它来获得一个HttpResponse对象流,如下所示:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)


至于返回最后一个(或错误)响应的值,您只需使用Sink.last,因为流将在成功完成或第一个错误响应时结束。例如:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)

n1bvdmb6

n1bvdmb62#

可以使用Source.unfoldAsync

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
}

字符串
完整的源代码和可运行的项目可以在over on GitHub中找到

相关问题