我正在尝试从配置单元中获取一列,并根据该列的值,执行 curl
命令。
这就是我生成列列表的方式
val list = hiveContext.sql("select application_number from t").collect()
for(l1 <- list) {
val z = a.toString().replace("[", "").replace("]", "")
val cmd = Seq("curl", "-X", "POST", "--insecure", "--header",
"Content-Type: application/json", "--header",
"Accept: application/json", "-d",
"{\"searchText\":\"+'z'+\",\"qf\":\"applId\"}", "https://link")
val r = cmd.!!
}
现在,我把数据存储在 r
以完美的方式变化。
但我的名单上有1000万张唱片。所以在迭代过程中需要花费大量的时间。
有没有更好的办法?
1条答案
按热度按时间hsgswve41#
你也可以用Spark
.map
或者一个udf来迭代每个记录并执行来自工作节点的curl请求。这样做的好处是,您永远不必收集主服务器上的所有记录,然后运行10m curl请求。缺点是所有的curl请求都是异步的,所以顺序不能保证,如果集群足够大,那么实际上可以关闭curl请求的接收端。另一个缺点是,这会带来严重的副作用。如果spark作业中途失败,则已发布500万个请求。除了使用从系统激发的curl请求,我还将使用本机jvmhttp
import scalaj.http.Http
像这样的scalajhttp是阻塞的,但是线程安全的,因此为了获得更高的性能和风险,您可以将它 Package 在一个执行框架(futures)中,并带有一个超时时间