本文主要是对Spark的内存管理模块进行了代码走读,从业务逻辑上Spark将内存划分为执行区(Execution区,内存主要用来进行shuffle,join,sort,aggregate的计算)、存储区(Storage区,内存主要用来进行缓存和data transfer)。为了优化JVM内存系统的一些问题,在堆内存和堆外内存的基础上抽象了Tungsten内存系统。文中对涉及到的内的关键方法进行了分析。
代码清单
- org.apache.spark.memory.MemoryManager
- org.apache.spark.memory.UnifiedMemoryManager
- org.apache.spark.memory.MemoryPool
- org.apache.spark.memory.ExecutionMemoryPool
- org.apache.spark.memory.StorageMemoryPool
- org.apache.spark.memory.MemoryConsumer
- org.apache.spark.memory.TaskMemoryManager
- org.apache.spark.unsafe.memory.MemoryLocation
- org.apache.spark.unsafe.memory.MemoryBlock
- org.apache.spark.unsafe.memory.MemoryAllocator
- org.apache.spark.unsafe.memory.HeapMemoryAllocator
- org.apache.spark.unsafe.memory.UnsafeMemoryAllocator
- org.apache.spark.storage.memory.MemoryStore
总览
全局只有唯一一个MemoryManager,里面维护了4个Pool。从业务上分为Execution和Storage,从存储位置分为OnHeap和OffHeap。每个task需要使用多个数据结构,每个数据结构都是一个MemoryConsumer
的实现,每个task的这些consumer都通过TaskMemoryManager
进行管理,多个TaskMemoryManager
共同维护一个Tungsten
的页结构。
Tungsten
为了解决JVM对象存储时的overhead问题,以及GC造成的性能损耗,而提出了一个新的内存模型。提供一套像C/C++一样可以直接操作内存的接口(实际操作的是堆外内存),再为了通用性提供了更高层的接口将堆内存和堆外内存进行了统一。
1 | public class MemoryLocation { |
1 | public class MemoryBlock extends MemoryLocation { |
Tungsten提供了一套类似操作系统页内存管理一样的结构,每页会存储一个MemoryBlock
结构。length
是整个Block实际占用的内存大小,pageNumber
是在页数组中的index位置。MemoryLocation
统一了堆内外内存的寻址,如果是off-heap,则obj
为null,offset
为绝对内存地址;如果是on-heap,则obj
为对象的基地址,offset
为偏移量。所以在实际使用过程当中就需要在物理地址与pageNumber
,offsetInPage
之间进行转换:
- on-heap:
address = page.obj + page.offset + inPageOffset
- off-heap:
address = page.offset + inPageOffset
但是在这套结构中物理地址不会直接的存储,pageNumer
+ offsetInPage
的组合就能唯一的定位一个值的位置,所以提供了一个编码方法用64位的long值存储这个坐标,前13位是pageNumber,后51位是inPageOffset。在TaskMemoryManager
当中提供了多个转换的方法:
long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage)
:给定页和页内偏移量计算encode值long encodePageNumberAndOffset(int pageNumer, long offsetInPage)
:给定页号和页内偏移量计算encode值int decodePageNumber(long pagePlusOffsetAddress)
:给定encode值,解码pageNumberlong decodeOffset(long pagePlusOffsetAddress)
:给定encode值,解码offsetObject getPage(long pagePlusOffsetAddress)
:给定encode值,获取页long getOffsetInPage(long pagePlusOffsetAddress)
:给定encode值,获取页内偏移
Memory
MemoryManager
该类是内存管理的统筹类,定义了所有的内存管理动作。因为是一个抽象类,所以这些动作有的会下放给实现类实现,有些动作会委托MemoryPool
类实现。下面是接口的分类:
获取内存大小:
abstract maxOnHeapStorageMemory
:获取Storage区最大能使用的堆内存大小(动态变化的)abstract maxOffHeapStorageMemory
:获取Storage区最大能使用的堆外内存大小(动态变化的)storageMemoryUsed
: Storage区已使用的内存大小onHeapStorageMemoryUsed
:Storage区已使用的堆内存大小offHeapStorageMemoryUsed
:Storage区已使用的堆外内存大小executionMemoryUsed
: Execution区已使用的内存大小onHeapExecutionMemoryUsed
:Execution区已使用的堆内存大小offHeapExecutionMemoryUsed
:Execution区已使用的堆外内存大小getExecutionMemoryUsageForTask
:获取一个task在Execution区占用的内存大小
获取更多的内存空间:
abstract acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode)
:为一个Block获取numBytes
的Storage区内存空间,如果获取不到足够的空间可能会删除一个存在的Block。abstract acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode)
:为一个task获取numBytes
的Execution区内存空间,当不能获取到足够执行的内存空间时,这个方法会阻塞,直到获取到足够多的内存。
释放内存空间:
releaseAllExecutionMemoryForTask(taskAttemptId: Long)
:释放一个task占用的所有Execution区内存空间releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode)
:释放numBytes
的Storage区内存空间releaseAllStorageMemory()
:释放所有的Storage区内存空间releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode)
:释放numBytes
的用于unroll block的内存空间
Tungsten
相关
UnifiedMemoryManager
1 | private[spark] class UnifiedMemoryManager( |
构造函数中的:
maxHeapMemory
:是堆内存的总大小onHeapStorageRegionSize
:是堆内存中Storage区的起始大小
1 | override def maxOnHeapStorageMemory: Long = synchronized { |
这两个方法就是简单的计算,不过maxHeapMemory
是创建UnifiedMemoryManager
时传入的参数,而maxOffHeapMemory
是从spark.memory.offHeap.size
参数中读入。
::acquireExecutionMemory::acquireExecutionMemory
中主要的任务就是要给出MemoryPool.acquireMemory()
中的两个回调,一个是获取更多的Execution区内存的回调,一个是获取Execution区最多能获取到的内存大小。
1 | def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { |
👆获取更多的Execution区内存的回调,这里最重要的是计算可以归还内存大小的逻辑,在memoryFree
(空闲的内存大小)和poolSize-storageRegionSize
(向Executions区借的内存大小)中取一个更大的值。然后真正归还的内存大小是在memoryReclaimableFromStorage
(可以归还的内存大小)和extraMemoryNeeded
(Executions区需要扩大的内存大小)之间取一个更小的值。
计算完成以后需要真正的进行内存操作释放需要的内存,该方法在StorageMemoryPool
中:
1 | def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { |
先计算空闲空间的大小,如果空闲空间大于等于需要释放的空间大小,则不需要进行内存对象操作。否则的话,需要删除一些内存Block。删除的方法在MemoryStore
中:
1 | private[spark] def evictBlocksToFreeSpace( |
该类当中有一个存储所有Block的Map,即:
1 | private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) |
LinkedHashMap
不是线程安全的,所以每次操作之前也是需要加锁。如果没有获取到需要释放的内存空间大小则遍历Block,判断遍历的Block与需要存储的Block是否是同一个存储区域(还判断了遍历的Block与需要存储的Block是否是同一个),如果通过了判断则需要先将该Block锁住,加入候选名单。
找够所有的候选者以后还没有达到需要释放的内存空间大小则将所有锁住的Block解锁,返回0,表示这个操作失败。如果达到了,则开始释放内存的过程。将每一个Block执行dropBlock
,afterDropAction
的操作。在dropBlock
中会删除该Block本身的数据(除非Block还在被操作),检查Block是否还在被其他的storage存储,如果是的话就先不删除其metadata,否则的话继续删除metadata。afterDropAction
是个hook,可以由调用方指定删除之后的动作。如果在删除过程当中失败的话,需要将没有删除的Block解锁。
1 | def computeMaxExecutionPoolSize(): Long = { |
👆获取Execution区最多能获取到的内存大小,是通过最大的内存大小减去Storage区最大能占用的内存大小。Storage区能占用的上限是storageRegionSize
。
下面来看真正进行内存分配的函数acquireMemory
,该方法在ExecutionMemoryPool
中:
1 | private[memory] def acquireMemory( |
首先关注一下锁的对象,在调用方MemoryManager
初始化的时候有声明锁的对象:
1 | "this") ( |
1 | private[memory] class ExecutionMemoryPool( |
通过上面两个代码片段可以看出,多个Pool的锁对象都是MemoryManger
,所以多个Pool之间是互斥的,不论是StorageMemoryPool
还是ExecutionMemoryPool
。
然后整个函数的工作方式:
- 如果是一个新的task,先帮它加入到
memoryForTask
中,内存设为0,然后唤醒所有等待队列里的线程开始等锁。memoryForTask
是一个保存taskId -> memory
的map。 - 进入一个死循环中,先查看是否需要获取更多的内存,如果需要的话则调用
maybeGrowPool
回调。计算一个task理论能分配到的最大内存和最小内存,即1/2N * maxPoolSize <= cache <= 1/N * maxPoolSize
。接着计算实际最大能分配到的内存以及最终实际分配的内存。 - 如果实际分配到的内存小于需要的内存或者这个任务分配到的总内存都没有达到理论最小内存的话,则将锁还掉以后继续等锁。如果拿到了需要的内存以后就更新
memoryForTask
并进行返回。
::acquireStorageMemory::
1 | override def acquireStorageMemory( |
如果需要申请的内存大于最大内存则返回false,申请的内存大于Storage区的剩余内存,则需要从Execution区借内存。Storage区不能将正在运行的task踢出Execution区,所以只能从中获取空闲的空间大小。数值计算完成以后,开始真正的分配。
1 | def acquireMemory( |
先从Storage中删除一些Block释放一些内存,如果有足够的内存申请就更新已使用的内存计数器,否则直接返回false。
MemoryPool
这是一个抽象类,整个类都在维护一个变量_poolSize
,表示内存使用量。提供了维护这个量的一些方法,如:
poolSize: Long
:获取_poolSize
memoryFree: Long
:获取空闲的内存空间大小incrementPoolSize(delta: Long)
:提高_poolSize
decrementPoolSize(delta: Long)
:降低_poolSize
以及一个抽象方法:
memoryUsed: Long
ExecutionMemoryPool
该类中维护了一个taskId -> memory
的Map:memoryForTask
来管理内存。
1 | override def memoryUsed: Long = lock.synchronized { |
实现父类的抽象方法,直接将memoryForTask
中的values累加。
acquireMemory
已经在上文中分析过了。releaseMemory
在MemoryManager.releaseExecutionMemory
中被调用:
1 | def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { |
在正式释放之前会先比较一下现在该task所占用的内存和需要释放的内存的大小,如果task所占内存小于需要释放的内存也只会释放task所占内存,不会再释放其他的task。因为有新的内存空间出现,所以可以唤醒等待队列里的线程,开始给新任务争取内存。
1 | def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized { |
该方法会释放一个task所有的内存,直接获取task所占用的内存以后调用上面的releaseMemory
方法。
StorageMemoryPool
该类负责Storage区的内存管理,在类中维护了一个_memoryUsed
参数,来表示使用了多少内存。并且会关联一个MemoryStore
对象,该对象会完成真正的内存管理操作。
重要的acquireMemory
和freeSpaceToShrinkPool
函数均在上文中进行了介绍。
TaskMemoryManager
该类负责管理一个task的内存,该类中不会直接操作内存,会通过MemoryManager
来进行管理。不过因为底层使用了Tungsten
内存模型,该类中还会维护内存模型使用的页机制相关的变量。所有的TaskMemoryManager
会共用一个MemoryManager
。
1 | public long acquireExecutionMemory(long required, MemoryConsumer consumer) { |
该方法是为一个task新的consumer分配内存,一进来会先尝试使用ExecutorPool申请required
大小的内存,如果能直接获取到就结束。否则的话需要从consumer中挑选合适的consumer进行spill操作(也就是将内存中的数据冲写到硬盘上)来释放足够多的内存。
挑选的过程也很常规,会选出大于需要的内存的consumer中最小的一个,如果不存在则从大到小依次spill,直到释放的内存达到需求。不过筛选大于需要的内存中最小的一个用了一个很简洁快速的方式,创建了一个memory -> List<MemoryConsumer>
的TreeMap,直接使用TreeMap.ceilingEntry
方法。每次释放完成以后都再重新申请更多的内存,直到申请到了足够多的内存。
如果在上面的操作执行完成以后(也就是能释放的都释放掉了)还是不够,那么就将这个要加入的新的consumer的部分数据冲写到硬盘上,使他能被放入MemoryPool中。
Allocate page
1 | public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { |
页管理主要由一个 BitSet
(标示页位情况)和MemoryBlock[]
()实现,true表示页位被占。该方法会先调用acquireExecutionMemory
申请实际的物理内存,然后通过BitSet.nextClearBit()
函数获取第一个空位置,并进行占位。完成以后就会通过tungstenMemoryAllocator
来真正进行内存申请,下面会分析一下on-heap和off-heap两种不同的内存申请:
::Unsafe memory allocate::
1 | public MemoryBlock allocate(long size) throws OutOfMemoryError { |
Off-heap的所有内存操作都是通过Unsafe工具类来完成,这个方法非常的简单。会先通过Unsafe.allocateMemory
申请内存,然后初始化一个页结构,off-heap不会映射对象,所以obj传入null即可。
::Heap memory allocate::
1 | public MemoryBlock allocate(long size) throws OutOfMemoryError { |
这里多大内存有一个优化机制,类中有一个Map会保存大内存块的引用,减少GC和申请内存的时间。
1 | "this") ( |
触发这个机制的内存大小是1024 * 1024
,所以我们能看到在allocate方法中会先判断是否触发该机制,如果触发则从未被回收的大内存块中取出相应的块进行存储,否则会重新申请内存。
Free page
1 | public void freePage(MemoryBlock page, MemoryConsumer consumer) { |
对应于申请页也会有释放页的操作,这个过程比较简单,就是对页相关的数据结构进行更新,做一些清空操作。最后会调用tungstenMemoryAllocator.free
进行真正的释放,并且调用底层的Executor区的pool进行释放。下面也会分析一下on-heap和off-heap的不同释放操作。
::Unsafe memory free::
1 | public void free(MemoryBlock memory) { |
整个过程也很简单,调用Unsafe.freeMemory
进行内存释放,将页对象设置为一个清空后的状态。
::Heap memory free::
1 | public void free(MemoryBlock memory) { |
将内存区域置为空,如果是一个大内存块的话就保留弱引用,以供下次需要的时候直接进行使用。为了加大命中概率可以看到在计算占用内存的时候都会找到比当前内存大的最近的一个8的倍数,保证了从弱引用区域中找到的一定是足够能装的下数据中最小的一块。
参考
spark 源码分析之十五 — Spark内存管理剖析 - JohnnyBai - 博客园
GitHub - hustnn/TungstenSecret: Explore the project Tungsten
Java 6 thread states and life cycle UML protocol state machine diagram example.