异度部落格

学习是一种生活态度。

0%

论文原文:MLSys: The New Frontier of Machine Learning Systems

这是一篇发表意义大于内容本身的一篇论文,它是一个全新的会议 MLSys 的开篇。所谓 MLSys,顾名思义就是 Machine Learning 和 System 的交叉领域。

在论文中,作者从两个维度阐述了 MLSys 涉及的核心问题: 维度一:

  • How should software systems be designed to support the full machine learning lifecycle, from program- ming interfaces and data preprocessing to output interpretation, debugging and monitoring?
  • How should hardware systems be designed for machine learning?
  • How should machine learning systems be designed to satisfy metrics beyond predictive accuracy, such as power and memory efficiency, accessibility, cost, latency, privacy, security, fairness, and interpretability?

维度二:

  • high-level systems for ML that support interfaces and workflows for ML development—the analogue of traditional work on programming languages and software engineering.
  • low-level systems for ML that involve hardware or software—and that often blur the lines between the two—to support training and execution of models, the analogue of traditional work on compilers and architecture.

这篇论文最流弊的地方就是它的作者署名(亮点自寻): b88bb01a94ff3754ea89c7abcfaf4fcc.png

术语解释

  • AD: Active Directory。
  • Service Session Key: 服务会话密钥。
  • Logon Session Key: 登录会话密钥。
  • KDC: Key Distribution Center。
  • KAS: Key Kerberos Authentication Service。它是 KDC 的一个服务。
  • TGS: Ticket Granting Service;它是 KDC 的一个服务。
  • Service Ticket: 服务票据,通过 TGS 获取,主要包括用户信息与 Service Session Key。
  • TGT: Ticket Granting Ticket。通过 KAS 获取,主要包括用户信息与 Logon Session Key。
  • Authenticator: 交互双方预先知晓的信息,通过它来对对方做认证。

KDC 整体结构图

f211a61aa6562d3e013345cfbfced73e.jpg

Kerberos 认证的流程

  1. 客户端通过 KDC 的 KAS 服务获取 TGT。
  2. 通过 TGT 获取 Service Ticket。
  3. 通过 Service Ticket 访问服务资源 。

Client 与 Server 之间的消息交互

  1. 客户端发送消息到 KDC 的 KAS 服务一获取 TGT f8fa180d291d0565ff69fe7366cb569e.jpg
  2. KAS 发送信息到 Client 9be76573f94944dd2a59ad3bd838afa7.jpg
  3. 客户端发送消息到 KDC(Key Distribution Center)的 TGS(Ticket Granting Service)服务一获取 ST(Service Ticket) a87493a61a11c90c157a13010298288f.jpg
  4. TGS 服务返回消息给客户端 32a73a40855355657a004489aa958883.jpg
  5. 客户端通过 ST 访问服务器资源 bdca1c145075a7b1438d7a92b3b2542d.jpg
  6. 客户端对服务器的认证 7c5e6d20ec15393f57d83abdcfb9b305.jpg

之前BASE 理论中提到,分布式系统对一致性做出了权衡。因此涌现出很多分布式协议。其中比较著名的是:两段式提交(Two Phase Commit)和三段式提交(Three Phase Commit)。

两段式提交

定义

每个参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报,决定各参与者是否要提交操作还是中止操作。

2pc

所谓的两个阶段分别是:

第一阶段:准备阶段/投票阶段

  1. 协调者节点向所有参与者节点询问是否可以执行提交操作,并开始等待各参与者节点的响应。
  2. 参与者节点执行询问发起为止的所有事务操作,并将 Undo 信息和 Redo 信息写入日志。
  3. 各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个"同意"消息;如果参与者节点的事务操作实际执行失败,则它返回一个"中止"消息。

第二阶段:提交阶段/执行阶段

当协调者节点从所有参与者节点获得的相应消息都为"同意"时:

  1. 协调者节点向所有参与者节点发出"正式提交"的请求。
  2. 参与者节点正式完成操作,并释放在整个事务期间内占用的资源。
  3. 参与者节点向协调者节点发送"完成"消息。
  4. 协调者节点收到所有参与者节点反馈的"完成"消息后,完成事务。

如果任一参与者节点在第一阶段返回的响应消息为"终止",或者 协调者节点在第一阶段的询问超时之前无法获取所有参与者节点的响应消息时:

  1. 协调者节点向所有参与者节点发出"回滚操作"的请求。
  2. 参与者节点利用之前写入的 Undo 信息执行回滚,并释放在整个事务期间内占用的资源。
  3. 参与者节点向协调者节点发送"回滚完成"消息。
  4. 协调者节点收到所有参与者节点反馈的"回滚完成"消息后,取消事务。

优缺点

  • 优点:原理简单,实现方便。
  • 缺点:同步阻塞,单点问题,数据不一致,容错性不好

三段式提交

定义

由于两段式提交存在诸多缺陷,因此研究者在两段式基础上进行改进,实现了三段式提交。与两阶段提交不同的是,三阶段提交有两个改动点:

  • 引入超时机制 - 同时在协调者和参与者中都引入超时机制。
  • 在第一阶段和第二阶段中插入一个准备阶段,保证了在最后提交阶段之前各参与节点的状态是一致的。

3pc

所谓的三个阶段分别是:

第一阶段:CanCommit

  1. 协调者向参与者发送 CanCommit 请求。询问是否可以执行事务提交操作。然后开始等待参与者的响应。
  2. 参与者接到 CanCommit 请求之后,正常情况下,如果其自身认为可以顺利执行事务,则返回 Yes 响应,并进入预备状态;否则反馈 No。

第二阶段:PreCommit

  1. 协调者向所有参与者节点发出 preCommit 的请求,并进入 prepared 状态。
  2. 参与者受到 preCommit 请求后,会执行事务操作,对应 2PC 准备阶段中的 “执行事务”,也会 Undo 和 Redo 信息记录到事务日志中。
  3. 如果参与者成功执行了事务,就反馈 ACK 响应,同时等待指令:提交(commit) 或终止(abort)。

第三阶段:Do Commit

  1. 协调者接收到各参与者发送的 ACK 响应,那么他将从预提交状态进入到提交状态。并向所有参与者发送 doCommit 请求。
  2. 参与者接收到 doCommit 请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。
  3. 事务提交完之后,向协调者发送 ACK 响应。
  4. 协调者接收到所有参与者的 ACK 响应之后,完成事务。

优缺点

  • 优点:相对于二阶段提交,三阶段提交主要解决的单点故障问题,并减少了阻塞的时间。
  • 缺点:三阶段提交也会导致数据一致性问题。由于网络原因,协调者发送的 abort 响应没有及时被参与者接收到,那么参与者在等待超时之后执行了 commit 操作。这样就和其他接到 abort 命令并执行回滚的参与者之间存在数据不一致的情况。

参考资料

  • https://en.wikipedia.org/wiki/Two-phase_commit_protocol
  • https://en.wikipedia.org/wiki/Three-phase_commit_protocol
  • https://juejin.im/post/5b26648e5188257494641b9f

BASE定理简介

BASE理论是Basically Available(基本可用),Soft State(软状态)和Eventually Consistent(最终一致性)三个短语的缩写,由eBay架构师Dan Pritchett提出来的。

BASE是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网分布式系统实践的总结,是基于CAP定律逐步演化而来。其核心思想是即使无法做到强一致性,但每个应用都可以根据自身业务特点,才用适当的方式来使系统打到最终一致。

BASE理论的内容

基本可用(Basically Available)

所谓基本可用就是在出现不可预知的故障时,系统主体功能依然可用。一个比较典型的例子就是在电商促销时,为保护购物系统,部分消费有可能会被导到一个降级页面。

软状态(Soft State)

所谓软状态是指允许系统中的数据存在中间状态,并认为这种状态不影响系统的整体可用性。典型的例子如在分布式文件系统中,数据的写入往往是先写入一份,再异步生成多个副本(同步生成副本不属于这种情况)。

最终一致性(Eventually Consistent)

上面提到了软状态,但是系统不可以一直处于软状态,必须有一个期限。在期限过后,应当保证所有副本数据是的一致的,从而达到数据的最终一致性。而在实际工程实践中,最终一致性分为5种:

因果一致性(Causal consistency)

如果节点A在更新完某个数据后通知了节点B,那么节点B之后对该数据的访问和修改都是基于A更新后的值。于此同时,和节点A无因果关系的节点C的数据访问则没有这样的限制。

读己之所写(Read your writes)

节点A更新一个数据后,它自身总是能访问到自身更新过的最新值,而不会看到旧值。其实也算一种因果一致性。

会话一致性(Session consistency)

会话一致性将对系统数据的访问过程框定在了一个会话当中:系统能保证在同一个有效的会话中实现 “读己之所写” 的一致性,也就是说,执行更新操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值。

单调读一致性(Monotonic read consistency)

单调读一致性指的是:如果一个节点从系统中读取出一个数据项的某个值后,那么系统对于该节点后续的任何数据访问都不应该返回更旧的值。

单调写一致性(Monotonic write consistency)

单调写一致性指的是:一个系统要能够保证来自同一个节点的写操作被顺序的执行。

在实际的实践中,这5种系统往往会结合使用,以构建一个具有最终一致性的分布式系统。事实上,最终一致性并不是只有那些大型分布式系统才涉及的特性,许多现代的关系型数据库都采用了最终一致性模型。在现代关系型数据库中,大多都会采用同步和异步方式来实现主备数据复制技术。在同步方式中,数据的复制过程通常是更新事务的一部分,因此在事务完成后,主备数据库的数据就会达到一致。而在异步方式中,备库的更新往往会存在延时,这取决于事务日志在主备数据库之间传输的时间长短,如果传输时间过长或者甚至在日志传输过程中出现异常导致无法及时将事务应用到备库上,那么很显然,从备库中读取的数据将是旧的,因此就出现了数据不一致的情况。当然,无论是采用多次重试还是人为数据订正,关系型数据库还是能够保证最终数据达到一致。

总结

总的来说,BASE理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性是相反的,它完全不同于ACID的强一致性模型,而是提出通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。

参考资料

  • https://juejin.im/post/5b2663fcf265da59a401e6f8
  • https://xinklabi.iteye.com/blog/2341034

综述

这篇文章记录了 Spark 应用开发过程遇到的各种问题及解决方案。主要来自于个人开发的实践,也有部分解决方案来自互联网,如有不足之处欢迎批评指正。本人会持续更新转载请保留原文地址

问题描述及解决方案

java.lang.OutOfMemoryError: Java heap space 错误

这个是 Spark 应用常见错误。JVM 堆内存空间不足。解决方案如下:

  • 首先要判断是 Driver 或者 Executor 出现 OOM,通过--driver-memory 或者--executor-memory 进行调整。
  • 如果是 Spark SQL 或者 Spark Streaming 的程序,建议适当地提高 heap size。

java.lang.OutOfMemoryError: GC overhead limit exceeded 错误

这个也是 Spark 应用常见错误,由于 GC 时间过长导致的。解决方案如下:

  • 直接通过--driver-memory 或者--executor-memory 增加 heap size
  • 修改 GC policy。可以使用-XX:UseG1GC 或者-XX:UseParallelGC

编译 OK,运行时出 NoClassDefineError 错误

这个错误非常清晰,根本原因就是 jar 没有放入 classpath 之中。首先需要判断到底是 Driver 还是 Executor 缺少这个 jar 包。

  • 将 jar 包路径配置到 spark.driver.extraClassPath 或者 spark.executor.extraClassPath。
  • 将 jar 包路径通过 spark-submit 的--driver-class-path 或者--executor-class-path 指定。

其实,spark-submit 还有一个--packages 参数,这个参数让 Spark 通过 Maven 从本地或者远程的 repository 处获取 jar 包。这个参数看似非常方便,但实际使用的时候不是很实用。因为 Hadoop 和 Spark 集群应用一般都是部署在内网的,为了数据安全,一般情况都是无法访问外网的。

org.apache.spark.SparkException: Task not serializable

关于这个问题有单独的一篇文章进行分析,详见:Spark Troubleshooting - Task not serializable 问题分析

java.io.IOException: No space left on device 错误

具体 stack trace 如下:

stage 89.3 failed 4 times, most recent failure:
Lost task 38.4 in stage 89.3 (TID 30100, rhel4.cisco.com): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)

这个错误是由于,Spark "scratch" space 不足,具体路径通过 spark.local.dir 参数设置,默认是/tmp。官方对于 scratch space 的解释是

Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk.
This should be on a fast, local disk in your system.
It can also be a comma-separated list of multiple directories on different disks.

spark.driver.maxResultSize 超出错误

数据拉回 Driver 端是有限制的,通过 spark.driver.maxResultSize 控制:

  • 默认是 1g
  • 可以设置为 0 或者 unlimited
  • 如果设置成 unlimited 就不会再遇到这个错误,取而代之的是 OOM。

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

具体 stack trace 如下:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:51 7) at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at
org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:146 ) at
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

这个问题是由于 shuffle block 大于 2GB 导致的,这个是 Spark 实现上的一个问题。Spark 使用 ByteBuffer 作为 storing blocks。

val buf = ByteBuffer.allocate(length.toInt) * ByteBufferislimitedbyInteger.MAX_SIZE

这就是 2GB 的由来。

参考资料

  • http://spark.apache.org/docs/latest/configuration.html

CAP 定理简介

在理论计算机科学中,CAP 定理(CAP theorem),又被称作布鲁尔定理(Brewer’s theorem),指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性)这三个基本需求,最多只能同时满足其中的 2 个

  • 一致性(Consistency):数据在多个副本之间能够保持一致的特性
  • 可用性(Availability):系统提供的服务必须一直处于可用的状态,每次请求都能获取到非错的响应(不保证获取的数据为最新数据)
  • 分区容错性(Partition tolerance):分布式系统在遇到任何网络分区故障的时候,仍然能够对外提供满足一致性和可用性的服务,除非整个网络环境都发生了故障

CAP 权衡

既然根据 CAP 定理,我们无法同时满足一致性,可用性和分区容错性,那要舍弃哪个呢?

  • CA without P 如果不要求 P(不允许分区),则 C(强一致性)和 A(可用性)是可以保证的。但其实分区不是你想不想的问题,而是始终会存在,因此 CA 的系统更多的是允许分区后各子系统依然保持 CA。
  • CP without A 如果不要求 A(可用性),相当于每个请求都需要在 Server 之间强一致,而 P(分区)会导致同步时间无限延长,如此 CP 也是可以保证的。很多传统的数据库分布式事务都属于这种模式。
  • AP wihtout C 要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。实际上,目前大部分 NoSQL 都属于这一类。

参考文档

  • https://en.wikipedia.org/wiki/CAP_theorem

问题描述

出现“org.apache.spark.SparkException: Task not serializable”这个错误,一般是因为在 map、filter 等的参数使用了外部的变量,但是这个变量不能序列化。其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

解决办法与编程建议

这个问题主要是引用了某类的成员变量或函数,并且相应的类没有做好序列化处理导致的。因此解决这个问题无非以下两种方法:

不在(或不直接在)map 等闭包内部直接引用某类成员函数或成员变量

  • 对于依赖某类成员变量的情形 如果程序依赖的值相对固定,可取固定的值,或定义在 map、filter 等操作内部,或定义在 scala object 对象中。 如果依赖值需要程序调用时动态指定(以函数参数形式),则在 map、filter 等操作时,可不直接引用该成员变量,而是根据成员变量的值重新定义一个局部变量,这样 map 等算子就无需引用类的成员变量。

  • 对于依赖某类成员函数的情形 如果函数功能独立,可定义在 scala object 对象中(类似于 Java 中的 static 方法),这样就无需一来特定的类。

如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理

首先该类继承 Serializable 接口,然后对于不能序列化的成员变量使用“@transent”标注,告诉编译器不需要序列化。 此外如果可以,可将依赖的变量独立放到一个小的 class 中,让这个 class 支持序列化,这样做可以减少网络传输量,提高效率。

参考资料

  • https://blog.csdn.net/javastart/article/details/51206715

概述

在我们编写 Spark Application 的的过程中,会涉及到很多概念,下面会介绍一些基本概念用于更好地理解 Spark 的设计以及应用开发。

RDD(Resillent Distributed Dataset)

弹性数据集。它是 Spark 的基本数据结构,Spark 中的所有数据都是通过 RDD 的形式进行组织。RDD 是不可变的数据集合,不可变的意思是 RDD 中的每个分区数据是只读的。RDD 数据集是要做逻辑分区的,每个分区可以单独在集群节点进行计算。

DAG(Directed Acycle Graph)

有向无环图。在图论中,如果一个有向图无法从有一个定点出发经过若干条边回到该点,则这个图是有向无环图。Spark 中用 DAG 来表示 RDD 之间的血缘关系。

NarrowDependency

窄依赖,子 RDD 依赖于父 RDD 中固定的 Partition。NarrowDependency 分为 OneToOneDependency 和 RangeDependency。

ShuffleDependency

Shuffle 依赖,也称作宽依赖,子 RDD 可能对父 RDD 中所有的 Partition 都有依赖。子 RDD 对父 RDD 各个 Partition 的依赖取决于分区计算器(Partitioner)的算法。

Job

所谓一个 Job,就是由一个 RDD 的 action 触发的动作,可以简单的理解为,当你需要执行一个 RDD 的 action 的时候,会生成一个 job。

Stage

Stage 是一个 Job 的组成单位,就是说,一个 Job 会被切分成 1 个或多个的 Stage,然后各个 Stage 会按照执行顺序依次执行。至于 Job 根据什么标准来切分 Stage,可以简单理解为按照是否需要 shuffle 来划分 stage。当一个操作需要 shuffle 时,Spark 就会将其划分为一个 Stage。具体细节后续会有文章单独介绍。

Task

Task 是 Spark 的执行单元,每个 Stage 都会包含一个或者一组 Task。一般来说,数据有多少个 partition 就会有多少个 Task。

Partition

Partition 类似 Hadoop 的 Split,是数据的一种划分方式,计算以 Partition 为单位进行的。Partition 的划分依据有很多,也可以自己定义。例如 HDFS 的文件的划分方式就是按照 HDFS Block 大小来划分的。

Shuffle

Shuffle 是 Spark Application 中一个非常重要的阶段。Shuffle 用于打通 map task(在 Spark 称为 ShuffleMapTask)的输出与 reduce task 的任务(在 Spark 中就是 ResultTask)的输入,map task 的输出结果按照指定的分区策略分配给某一个分区的 reduce task。

引言

Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。

一个 Spark Application 一般包括 Driver 和 Executor 两种 JVM 进程。Driver 为主控进程,负责创建 Context,提交 Job,并将 Job 转换为 Task,协调 Executor 间的 Task 执行。而 Executor 主要负责执行具体的计算任务,将结果返回 Driver。 由于 Driver 的内存管理比较简单,和一般的 JVM 程序区别不大,所以本文重点分析 Executor 的内存管理。所以,本文提到的内存管理都是指 Executor 的内存管理。

堆内内存和堆外内存

Executor 作为一个 JVM 进程,它的内存管理是基于 JVM 之上的。所以 JVM 的内存管理包括两种方式:

  • 堆内内存管理(On-Heap):对象分配的在 JVM 的堆上,对象会受 GC 束缚。
  • 堆外内存管理(Off-Heap):对象通过序列化分配在 JVM 之外的内存里,由应用程序对其进行管理,且不受 GC 束缚。这种内存管理方式可以避免频繁的 GC,但缺点是必须自己编写内存申请和释放的逻辑。

一般来说对象读写速度是:on-heap > off-heap > disk

内存空间分配

在 Spark 中,支持两种内存管理方式:静态内存管理(Static Memory Manager)和统一内存管理(Unified Memory Manager)。

Spark 为 Storage 内存和 Execution 内存的管理提供了统一的接口MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存。MemoryManager 的实现上,Spark 1.6 以前默认采用的是静态内存管理([StaticMemoryManager]((https://github.com/apache/spark/blob/branch-2.3/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala))的方式;而在Spark1.6以后,默认采用的是统一内存管理(UnifiedMemoryManager)的方式。在中Spark 1.6+中,可以通过 spark.memory.useLegacyMode 参数启用静态内存管理。

静态内存管理(Static Memory Manager)

静态内存管理机制下,Storage 内存、Execution 内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。由于这种分配已经逐渐被淘汰,但出于兼容性考虑,Spark 依然保留下来。有兴趣的话,可以参考:https://blog.csdn.net/Lin_wj1995/article/details/79924542

这边主要讲下静态内存管理的弊端:静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成 Storage 内存和 Execution 内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。

统一内存管理(Unified Memory Manager)

Spark 1.6 之后引入了统一内存管理机制,该机制与静态内存管理的区别在于,Storage 内存和 Execution 内存是共享一块内存空间的,双方可以互相占用对方的空闲区域。

堆内模型

默认情况下,Spark 仅使用了堆内内存。堆内内存的大小由 Spark Application 启动时的--executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存。

Executor 端的堆内内存区域大致可以分为以下四大块:

  • Storage 内存(Storage Memory):主要用于存储 Spark 的 cache 数据,例如 RDD 的缓存、Broadcast 变量,Unroll 数据等。
  • Execution 内存(Execution Memory):主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据。
  • 用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。
  • 预留内存(Reserved Memory):系统预留内存,会用来存储 Spark 内部对象。

内存分布如下图所示: unified_memory_managment_on_heap

堆外模型

Spark 1.6 开始引入了 Off-heap memory(SPARK-11389)。默认情况下,堆外内存是关闭的,我们可以通过 spark.memory.offHeap.enabled 参数启用,通过 spark.memory.offHeap.size 设置堆外内存大小。相比堆内内存,堆外内存的模型比较简单,只包括 Storage 内存和 Execution 内存,其分布如下图所示: unified_memory_management_off_heap

如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。下图为 Spark 堆内和堆外示意图 spark_on_heap_and_off_heap_memory

动态占用机制

  • 在程序提交时,会根据 spark.memory.storageFraction 参数设置 Storage 内存区域和 Execution 内存区域。
  • 在程序运行时,如果双方的空间不不足(存储空间不足以放下一个完整的 Block),则按照 LRU 规则存储到磁盘;如果己方空间不足而对方空间有空余,则借用对方的空间。
  • Storage 占用对方内存,可将占用的部分转存到硬盘,然后"归还"借用的空间。
  • Execution 占用对方内存,目前的实现是无法让对方"归还"的。因为 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用,归还内存可能会导致性能严重下降。

参考资料

Spark 架构简介

Spark 是一个 master/slave 架构的分布式系统,它的架构主要包含有

  • Spark Driver
  • Spark Executor
  • Cluster13366ce2e12e3b7d25579d4a574eff44.png%201.png)

一个 Spark 集群一般拥有单个的 Driver 和多个的 Executor。Spark Driver 和 Executor 都是独立运行的 JVM 进程,它们可以运行在单台机器上,也可以运行在多台机器上。

Spark Driver

Spark Driver 是一个 Spark Application 的主入口,它可以用 Scala,Python 或者 R 进行编写。一个 Spark Driver 包含有一个 SparkContext,这是整个 Spark Application 中最核心的组件。同时,还包含有 DAGScheduler, TaskScheduler, BackendScheduler 和 BlockManager 等组件用于将用户代码转换为 Spark job 运行在集群当中。Spark Driver 的主要功能包括:

  • 负责协调 Job 的运行和以及 Cluster Manager 进行交互。
  • 将 RDD 转换为执行的 DAG 图,同时把 DAG 图分为不同的 Stage
  • 将 Job 切割成更小的执行单元,Task,由 Executor 执行。
  • 启动一个 HTTP Server,端口为 4040。这个 Web UI 会把 Spark Application 运行时的信息展示出来。

Spark Executor

Spark Executor 是 Task 的实际执行者。每个 Application 的 Executor 数量可以通过配置指定(Static Allocation)或者有 Spark 动态分配(Dynamic Allocation)。Executor 的主要功能包括:

  • 负责所有的数据处理工作
  • 用于读取和写入外部数据源
  • 缓存着计算过程中的数据

Cluster Manager

严格上说 Cluster Manager 并不是 Spark 的一部分,而是一个外部的 Service(除了 Standalone)。Spark Driver 会和其进行交互用于从集群里获取资源(CPU,Memory 等)。目前 Spark 支持 4 种 Cluster Manager:

  • Standalone:这是一种 Spark 自带的集群管理模式,设计也比较简单。
  • Apache Mesos:Mesos 是一种通用的集群资源管理服务,用于管理 MapReduce 应用或者其他类型的应用。
  • Hadoop YARN :YARN 是由 Hadoop 2.0 引入的集群资源管理服务。
  • Kubernetes:Kubernetes 是一种管理 containerized 的应用的服务。Spark 2.3 以后引入了对 Kubernetes 的支持。

至于选择使用哪一种 Cluster Manager,完全取决于生产环境以及业务场景,并没有绝对的优劣。