在scala中使用curl时的性能问题

xxls0lw8  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(447)

我正在尝试从配置单元中获取一列,并根据该列的值,执行 curl 命令。
这就是我生成列列表的方式

val list = hiveContext.sql("select application_number from t").collect()

for(l1 <- list) {
    val z = a.toString().replace("[", "").replace("]", "")
    val cmd = Seq("curl", "-X", "POST", "--insecure", "--header",
                  "Content-Type: application/json", "--header", 
                  "Accept: application/json", "-d", 
                  "{\"searchText\":\"+'z'+\",\"qf\":\"applId\"}", "https://link")
    val r = cmd.!!
}

现在,我把数据存储在 r 以完美的方式变化。
但我的名单上有1000万张唱片。所以在迭代过程中需要花费大量的时间。
有没有更好的办法?

hsgswve4

hsgswve41#

你也可以用Spark .map 或者一个udf来迭代每个记录并执行来自工作节点的curl请求。这样做的好处是,您永远不必收集主服务器上的所有记录,然后运行10m curl请求。缺点是所有的curl请求都是异步的,所以顺序不能保证,如果集群足够大,那么实际上可以关闭curl请求的接收端。另一个缺点是,这会带来严重的副作用。如果spark作业中途失败,则已发布500万个请求。
除了使用从系统激发的curl请求,我还将使用本机jvmhttp import scalaj.http.Http 像这样的

import org.apache.spark.sql.SparkSession
import scala.util.Try
import scalaj.http.Http

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .getOrCreate()

import spark.implicits._

// val list = (0 to 100).toDF("application_number").as[Int]
val list = hiveContext.sql("select application_number from t").as[Int]

val r = list.map(application_number => {
  Try(Http("https://link").postData(s"""{"searchText": "$application_number","qf":"applId"}""")
    .header("Content-Type", "application/json")
    .asString.body).toOption
}).collect()

println(r.toList)

scalajhttp是阻塞的,但是线程安全的,因此为了获得更高的性能和风险,您可以将它 Package 在一个执行框架(futures)中,并带有一个超时时间

相关问题