Spark应用程序的第一步就是创建并初始化SparkContext,SparkContext的初始化过程包含了内部组件的创建和准备,主要涉及网络通信、分布式、消息、存储、计算、调度、缓存、度量、清理、文件服务和UI等方面。它是Spark主要功能的入口点,链接Spark集群,创建RDD、累加器和广播变量,一个线程只能运行一个SparkContext。SparkContext在应用程序中将外部数据转换成RDD,因此建立了第一个RDD,也就是说SparkContext建立了RDD血缘关系的根,是DAG的根源。
在将SparkContext初始化过程之前,需要先了解SparkContext伴生对象中的两个变量,它们分别是activeContext: AtomicReference[SparkContext]和contextBeingConstructed: Option[SparkContext]。
activeContext: AtomicReference[SparkContext]记录了当前SparkContext是否处于活跃状态,当活跃的时候activeContext的value就是当前SparkContext,否则value就是null。
contextBeingConstructed: Option[SparkContext]则是SparkContext正在启动时的一个标识,SparkContext初始化时有很多组件需要进行初始化设置,需要消耗一些时间,同时又要保证一个线程中只运行一个SparkContext,通过设置SparkContext启动时的表示,来保证一个线程中只运行一个SparkContext,当SparkContext正在启动时,contextBeingConstructed=Some(sc),否则contextBeingConstructed=None。
源码如下:
/**
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
* 正在运行的,完全构建完成的SparkContext
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
* SPARK_CONTEXT_CONSTRUCTOR_LOCK保护对该字段的访问。
*/
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
* 如果某个线程在SparkContext构造函数中,则指向部分构造的SparkContext;如果没有正在构造的SparkContext,则指向“ None”。
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
*/
private var contextBeingConstructed: Option[SparkContext] = None
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
// 为了防止同一时间内,在一个线程中同时创建多个SparkContexts,确保当前线程中没有SparkContext在运行,
// 因此会在SparkContexts创建一开始时调用
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
/**
* SparkContext构造器初始化的第一步:确保当前线程中没有SparkContext在运行。如果当前线程有SparkContext
*
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
// 若assertNoOtherContextIsRunning没有抛出异常,
// 设置contextBeingConstructed = Some(sc),表示当前线程中正在建立SparkContext
contextBeingConstructed = Some(sc)
}
}
/**
* Called to ensure that no other SparkContext is running in this JVM.
* 判断当前JVM中没有其他SparkContext正在运行
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*如果检测到正在运行的SparkContext则抛出异常;如果其他线程正在构建SparkContext则打印一个警告,为了区分不同上下文抛出的异常
*
*/
private def assertNoOtherContextIsRunning(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (allowMultipleContexts) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
}
contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
// Since otherContext might point to a partially-constructed context, guard against
// its creationSite field being null:
val otherContextCreationSite =
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" running in this JVM (see SPARK-2243)." +
s" The other SparkContext was created at:\n$otherContextCreationSite"
logWarning(warnMsg)
}
}
}
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
初始化过程分为十几个步骤,主要分为三个大的阶段:
// 通过克隆的方式获取sparkconf
_conf = config.clone()
// 对conf中的配置信息进行校验(部署模式、appName、yarn模式校验等等)
_conf.validateSettings()
// 部署模式:spark.master必填
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
// 应用程序名称必填:spark.app.name
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
// 如果使用Yarn Cluster方式,spark.yarn.app.id必填
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
// 设置driver的IP、端口号
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
// 设置Driver ID
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
// 处理jar路径和文件路径
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
// 设置事件日志路径、是否压缩事件
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}
几个要的核心组件如下:
LiveListenerBus
// 创建事件总线
_listenerBus = new LiveListenerBus(_conf)
AppStatusStore
// 创建任务进度监听器,并增加到事件总线中
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)
SparkEnv
// 创建Spark运行环境
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
// 创建Spark Driver运行环境
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
SparkStatusTracker
// 创建状态跟踪器
_statusTracker = new SparkStatusTracker(this, _statusStore)
progressBar
// 创建进度条,在控制台显示进度条
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
SparkUI
_ui =
// 默认情况下启动应用程序的UI监控,在监控过程中加入把作业处理监听器JobProgressListener
// 注入到消息总线ListenerBus中,用于监控作业处理状态
if (conf.get(UI_ENABLED)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
// 如果端口被占用就会逐步递增,默认端口是4040
_ui.foreach(_.bind())
HadoopConfiguration
创建hadoop的配置信息(SPARK_YARN_MODE=true时,采用yarn配置信息)
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
Load Jar && Files
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
Executor运行环境
创建心跳接收器,在创建taskScheduler之前创建,因为Executor需要再构造函数中检索heartbeatReceiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
HeartbeatReceiver
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
// 在创建TaskScheduler之前注册HeartbeatReceiver,因为Executor需要再构造函数中检索heartbeatReceiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
schedulerBackend && taskScheduler
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*
* 根据入参--master 创建TaskScheduler,返回schedulerBackend和taskScheduler
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
// 根据不同的部署模式开启TaskScheduler和SchedulerBackend
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
// 这里如果是yarn模式 masterUrl是yarn,对应的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
DAGScheduler
向dagScheduler引入了taskScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
启动TaskScheduler
启动taskScheduler,并根据taskScheduler生成的_applicationId启动度量系统,并且将监控信息发送给SparkUI进行展示
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
// 启动TaskScheduler
_taskScheduler.start()
// 当前应用程序ID
_applicationId = _taskScheduler.applicationId()
// 当前应用程序尝试ID,第一次尝试执行任务_applicationAttemptId=1,第二次尝试执行任务_applicationAttemptId=2
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
// Spark UI 中根据应用程序ID展示应用程序信息
_ui.foreach(_.setAppId(_applicationId))
// 块管理器根据应用程序做初始化工作
_env.blockManager.initialize(_applicationId)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
// 根据app ID,启动度量系统并且将运行情况发送给Spark UI
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
EventLoggingListener
// 建立事件日志的监听器,并添加到事件总线中
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
ExecutorAllocationManager
创建并启动Executor动态分配管理器
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
ContextCleaner
创建并启动上下文清理器
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
setupAndStartListenerBus
设置并启动时间总线
setupAndStartListenerBus()
postEnvironmentUpdate
发布环境更新事件
postEnvironmentUpdate()
postApplicationStart
发布应用程序启动事件
postApplicationStart()
metricsSystem.registerSource
将dagScheduler、BlockManagerSource和ExecutorAllocationManager注册到度量系统中
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
// 为了防止同时激活多个SparkContext,请将此上下文标记为已完成构造。
// 注意:这必须在SparkContext构造器结尾
SparkContext.setActiveContext(this, allowMultipleContexts)
SparkContext的runJob方法,其中调用了多层runJob方法。
/**
* Run a job on all partitions in an RDD and return the results in an array.
*
* 在RDD中的所有分区上运行作业,然后将结果返回到数组中。
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
第一层runJob
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
第二层runJob
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
* The function that is run against each partition additionally takes `TaskContext` argument.
*
* 针对每个分区运行的函数还带有`TaskContext`参数。
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
// 针对每个分区运行的函数还带有`TaskContext`参数。
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
第三层runJob,底层调用DAGScheduler的runJob方法
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* 在一个RDD的给定分区上运行function,将结果传递给handler function,这是Spark中所有动作的主要切入点
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
// 清理SparkContext,使其可以序列化并发送给任务
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 启动dagScheduler,在SparkContext初始化时就已经构建了DAGScheduler
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
// 进度条
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
内容来源于网络,如有侵权,请联系作者删除!