【源码解读】|SparkEnv源码解读

**可以关注下公众号:857Hub,专注数据开发、数据架构之路,热衷于分享技术干货。 **

导读

:: DeveloperApi ::
Holds all the runtime environment objects for a running Spark instance (either master or worker),
保存一个运行中的Spark实例的所有运行时环境对象(master或worker),
including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
包括序列化器、RpcEnv、块管理器、映射输出跟踪器等
Spark code finds the SparkEnv through a global variable, so all the threads can access the same
SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
目前Spark代码通过一个全局变量查找SparkEnv,这样所有线程都可以访问它

SparkEnv。它可以被SparkEnv访问。get(例如在创建SparkContext之后)。
NOTE: This is not intended for external use. This is exposed for Shark and may be made private in a future release.

SparkEnv 创建入口

「SparkContext.scala」 中创建,老版本参数中还有actorsystem

    // Create the Spark execution environment (cache, map output tracker, etc)
    // 创建SparkEev 执行环境(cache, map输出追踪器, 等等)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)

SparkEnv 架构组件

组件英文名 组件中文含义
rpcEnv Spark 通讯组件环境
serializer 序列化器
closureSerializer 闭包序列化器
serializerManager 给各种 Spark 组件提供序列化、压缩及加密的服务
mapOutputTracker Master/Slave 架构,管理 mapTask 输出状态
shuffleManager 管理整个 shuffle 过程包括执行、计算
broadcastManager 广播管理器
blockManager Spark 运行时任务的数据读写管理
securityManager 安全管理器,用来验证权限
metriCSSystem 指标监控系统
memoryManager 内存管理器,整个 Spark 运行时的执行内存管理
outputCommitCoordinator 决定任务是否可以向 HDFS 提交输出的权限。使用“第一个提交者获胜”政策。

前置

判断是否是 Driver

   //判断其是不是driver
    val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

    // Listener bus is only used on the driver
    // 监听总线只能在Driver用
    if (isDriver) {
      assert(listenerBus != null"Attempted to create driver SparkEnv with null listener bus!")
    }

创建SecurityManager安全管理器

Spark class responsible for security.

Spark 负责安全的类

  val securityManager = new SecurityManager(conf, ioEncryptionKey)
    if (isDriver) {
      securityManager.initializeAuth()
    }

    ioEncryptionKey.foreach { _ =>
      if (!securityManager.isEncryptionEnabled()) {
        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
          "wire.")
      }
    }

创建RPCEnv 环境

A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor

一个RpcEnv实现必须有一个[[RpcEnvFactory]]实现和一个空的构造函数

「Server  :=> endpoint」

「Client  :=> endpointRef」

    //判断是driverSystemName 还是executorSystemName
    // private[spark] val driverSystemName = "sparkDriver"
    // private[spark] val executorSystemName = "sparkExecutor"
    val systemName = if (isDriver) driverSystemName else executorSystemName
    //传入系统名称、通讯地址(socket)、rpc通讯地址、端口、配置、安全管理器、corenum 、是否是driver(因为会根据是否是driver来创建rpcendpoint)
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

    // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
    //配置端口
    if (isDriver) {
      conf.set("spark.driver.port", rpcEnv.address.port.toString)
    }

    // Create an instance of the class with the given name, possibly initializing it with our conf
    // 使用给定名称创建该类SparkEnv的实例,使用conf进行初始化
    def instantiateClass[T](className: String): T = {
      val cls = Utils.classForName(className)
      // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
      // SparkConf, then one taking no arguments
      // 寻找一个接受SparkConf和一个布尔型isDriver的构造函数,如果没有找到NoSuchMethodException
      // 会用一个仅使用 SparkConf 参数的构造函数,如果还是没有找到NoSuchMethodException
      // 然后会使用一个不使用任何参数的构造函数
      try {
        cls.getConstructor(claSSOf[SparkConf], Java.lang.Boolean.TYPE)
          .newInstance(conf, new java.lang.Boolean(isDriver))
          .asInstanceOf[T]
      } catch {
        case _: NoSuchMethodException =>
          try {
            cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              cls.getConstructor().newInstance().asInstanceOf[T]
          }
      }
    }

    // Create an instance of the class named by the given SparkConf property, or defaultClassName
    // if the property is not set, possibly initializing it with our conf
    // 创建一个由SparkConf属性或defaultClassName命名的类实例
,如果没有设置该属性,可以使用我们的conf初始化它
    def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
      instantiateClass[T](conf.get(propertyName, defaultClassName))
    }

    //调用反射生成序列化
    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer""org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")

创建SerializerManager序列化管理器

Component which configures serialization, compression and encryption for various Spark components, including automatic selection of which [[Serializer]] to use for shuffles.


为各种Spark配置序列化、压缩和加密的组件,包括自动选择其中[[Serializer]]用于洗牌。

  //为各种Spark 组件配置序列化,压缩和加密的组件默认使用KryoSerializer
  //def setDefaultClassLoader(classLoader: ClassLoader): Unit =   //{
  //  kryoSerializer.setDefaultClassLoader(classLoader)
  //}
  
    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
    // 闭包序列化,task任务很常见
    val closureSerializer = new JavaSerializer(conf)

    //判断是否是Driver来引用rpcendpoint
    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        //如果是dirver端那么调用创建函数endpointCreator: => RpcEndpoint,并注册进RpcEnv返回RpcEndpointRef
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        //如果是Executor端那么直接创建一个Ref
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }

创建BroadcastManager广播管理器

Spark2.x后使用Bit-torrent协议分发内容

「org.apache.spark.broadcast.TorrentBroadcast」

    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

创建mapOutputTracker

Class that keeps track of the location of the map output of a stage. This is abstract because the driver and executor have different versions of the MapOutputTracker. In principle the driver and executor-side classes don't need to share a common base class; the current shared base class is maintained primarily for backwards-compatibility in order to avoid having to update existing test code.

MapOutputTracker在spark shuffle过程中的map和reduce起着衔接作用。具体点就是:在shuffle map过程中,executor端MapOutputTrackerWorker会将task结束后产生的map状态上报给Driver端的MapOutputTrackerMaster,所以在MapOutputTrackerMaster端保存中spark在shuffle map过程中所有block的相关的详细(包括位置,block大小等信息)。在shuffle reduce的时候,通过读取MapOutputTrackerMaster中的这些位置大小信息,从而决定去远程或者本地fetch相关block数据。shuffleId会保存到mapStatuses返回~

    //根据是不是Driver来创建MapOutputTrackerMaster、MapOutputTrackerWorker
    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }

    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
    // requires the MapOutputTracker itself
    // 初始化后必须分配trackerEndpoint,因为MapOutputTrackerEndpoint需要MapOutputTracker本身
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

创建Shuffle管理器

Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles with it, and executors (or tasks running locally in the driver) can ask to read and write data.

SparkEnv基于spark.shuffle.manager设置在驱动程序和每个执行程序上创建一个ShuffleManager。驱动程序注册shuffle,执行者(或在驱动程序中本地运行的任务)可以请求读写数据。

 // Let the user specify short names for shuffle managers
    //通过反射拿到shuffle管理器,目前已经没有HashShuffleManager了,只有SortShuffleManager
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager""sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

创建动态内存管理器

Spark 内存资源管理。参照[857思维导图]

   val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode"false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)//静态内存管理器
      } else {
        UnifiedMemoryManager(conf, numUsableCores)//动态内存管理器 :统一内存管理器
      }

    val blockManagerPort = if (isDriver) {
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
      conf.get(BLOCK_MANAGER_PORT)
    }

创建BlockTransferService块传输服务

主要是用Netty来传输数据块服务

     val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

创建BlockManagerMaster

负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令

      val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

创建BlockManager

Manager running on every node (driver and executors) which provides interfaces for putting and retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).

在每个节点(驱动程序和执行程序)上运行的管理器,它提供了在本地和远程将块放入和检索到各种存储(内存、磁盘和堆外)的接口

    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

创建监控指标系统MetricsSystem

Spark Metrics System, created by a specific "instance", combined by source,sink, periodically polls source metrics data to sink destinations.

Spark Metrics System由一个特定的“实例”创建,由source和sink组合,定期轮询source Metrics数据到sink目的地。

    //如果是Driver 需要appId,executor 需要 executorId
    val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

创建OutputCommitCoordinator

输出协调器,决定任务是否可以向 HDFS 提交输出的权限。使用“第一个提交者获胜”政策。

   val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

包装SparkEnv组件

包装SparkEnv组件返回实例

  val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

总结

Spark Env 源码顺序大致就是上面的流程,更细致的后面的博文中会持续更新解读。

在深入学习之前,建议先了解RPC 调用流程。以及ListenerBus。



原文始发于微信公众号(857Hub):【源码解读】|SparkEnv源码解读

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/22634.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!