scala中cats.effect的io.async回调问题

wdebmtf2  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(302)

我试图通过java11重写httpclient HttpClient 在斯卡拉
这是我的密码:

import cats.effect._
import java.net.http._
import java.net.http.HttpResponse._
import java.net.http.HttpClient._

trait HttpClients[F[_]] {
  def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]]
}

object HttpClients {
  val client: HttpClient = HttpClient.newBuilder().followRedirects(Redirect.ALWAYS).build()
  def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
    override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async { cb =>
      val resp = client.sendAsync(req, BodyHandlers.ofString())
      val s = resp.handle((res: HttpResponse[String], err: Throwable) => {
        if (err == null)
          cb(Right(res))
        else
          cb(Left(err))
      })
      s // TODO ?
      // Type missmatch
      // Required: F[Option[F[Unit]]]
      // Found:    Unit
    }
  }
}

来自此的句柄回调
我猜错误来自这里,但我不知道下一步该怎么写。
然后我做了一些改变:

def newClient[F[_] : Async](): HttpClients[F] = new HttpClients[F] {
    override def send(req: HttpRequest)(implicit F: Async[F]): F[HttpResponse[_]] = F.async[HttpResponse[_]] { cb =>
      val s = Sync[F](F: Async[F]).delay {
        val resp = client.sendAsync(req, BodyHandlers.ofString())
        resp.handle((res: HttpResponse[String], err: Throwable) => {
          if (err == null)
            cb(Right(res))
          else
            cb(Left(err))
        }).join()
      }
      F.delay(s.some)
    }
  }

这一次,没有错误,但我不知道如何得到响应的身体
谢谢你的回复!

xxb16uws

xxb16uws1#

@olegpyzhcov已经提供了在您使用ce3的情况下的洞察力,这个答案是在您想要的情况下使用ce2。
代码的第一个版本是正确的,下面是一个完整的运行示例,使用ammonite进行了一些样式改进,并确保为每个调用和对 newClient ```
// scala 2.13.5

import $ivy.org.typelevel::cats-effect:2.5.0

import cats.effect.{Async, IO}
import cats.syntax.all._
import java.net.URI
import java.net.http.{HttpClient, HttpRequest, HttpResponse}

trait HttpClients[F[_]] {
def send(req: HttpRequest): F[HttpResponse[String]]
}

object HttpClients {
def newClient[F[_]](implicit F: Async[F]): F[HttpClients[F]] =
F.delay {
HttpClient
.newBuilder
.followRedirects(HttpClient.Redirect.ALWAYS)
.build()
} map { client =>
new HttpClients[F] {
override def send(req: HttpRequest): F[HttpResponse[String]] =
F.async { cb =>
client.sendAsync(req, HttpResponse.BodyHandlers.ofString).handle {
(res: HttpResponse[String], err: Throwable) =>
if (err == null) cb(Right(res))
else cb(Left(err))
}
}
}
}
}

object Main {
private val request =
HttpRequest
.newBuilder
.GET
.uri(URI.create("https://stackoverflow.com/questions/tagged/scala?tab=Newest"))
.build()

private val program = for {
_ <- IO.delay(println("Hello, World!"))
client <- HttpClients.newClient[IO]
response <- client.send(request)
_ <- IO.delay(println(response))
_ <- IO.delay(println(response.body))
} yield ()

def run(): Unit = {
program.unsafeRunSync()
}
}

@main
def main(): Unit = {
Main.run()
}

相关问题