如何实现比spark集群中的核数更高的并行化?

mum43rcc  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(358)

我有一个spark工作,最后一步就是通过网络发送数据。另一端的接收器可以处理大约10到50倍于我当前发送的并发请求,所以我在寻找一种比内核数量更进一步的并行化方法。我找到了给一个执行器多个核心的方法,但没有找到如何为每个核心使用多个执行器的方法。 --num-executors 选择 spark-submit 没用。选项也没有

spark.dynamicAllocation.enabled
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
siotufzp

siotufzp1#

一般来说,spark非常倾向于并行计算,而不是并行i/o,因此对于超出内核数量的并行化,您只能靠自己。
听起来你用来发送请求的api是阻塞的,也就是说,签名基本上是

type Request = ...
type Response = ...

def sendRequest(req: Request): Response

// alternatively...
trait Request { // maybe built by some builder?
  def perform(): Response
}

您可以通过rdd api调用它:

rdd.map(x => sendRequest(makeRequestFrom(x)))

其基本思想是首先构建一个 scala.concurrent.ExecutionContext 为了形成请求。由于此ec上的任务大部分时间都被阻塞,因此我们可以安全地使此ec大于核心数。然后,我们安排任务并将它们组合到该ec中。

object AsyncRequestor {
  import scala.concurrent.{ ExecutionContext, Future }
  import java.util.concurrent.Executors

  val requestsPerCore = 10 // or 22, or 50...
  implicit val ctx = {
    // Number of cores the JVM thinks are available... may vary depending on the JVM version, running in a docker container, etc.
    val cores = Runtime.getRuntime.availableProcessors
    ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(requestsPerCore * cores))
  }

  def performRequests(reqs: Iterator[Request]): Iterator[Response] = {
    val batched: Iterator[Seq[Request]] = reqs.grouped(requestsPerCore).withPartial(true)
    batched.flatMap { subseq =>
      import scala.concurrent.Await
      import scala.concurrent.Duration

      val futs = Future.sequence(
        subseq.map { req =>
          Future(sendRequest(req))
        }
      )

      Await.result(futs, Duration.Inf)
    }
  }
}

然后您可以:

rdd.map(makeRequestFrom)
  .mapPartitions(AsyncRequestor.performRequests)

请注意,如果spark作业主要只是发出这些异步请求,那么使用类似akka streams或fs2这样更倾向于并行i/o的东西来组织和调度操作可能是有意义的。

相关问题