本文主要是对Spark的内存管理模块进行了代码走读,从业务逻辑上Spark将内存划分为执行区(Execution区,内存主要用来进行shuffle,join,sort,aggregate的计算)、存储区(Storage区,内存主要用来进行缓存和data transfer)。为了优化JVM内存系统的一些问题,在堆内存和堆外内存的基础上抽象了Tungsten内存系统。文中对涉及到的内的关键方法进行了分析。

代码清单

  • org.apache.spark.memory.MemoryManager
  • org.apache.spark.memory.UnifiedMemoryManager
  • org.apache.spark.memory.MemoryPool
  • org.apache.spark.memory.ExecutionMemoryPool
  • org.apache.spark.memory.StorageMemoryPool
  • org.apache.spark.memory.MemoryConsumer
  • org.apache.spark.memory.TaskMemoryManager
  • org.apache.spark.unsafe.memory.MemoryLocation
  • org.apache.spark.unsafe.memory.MemoryBlock
  • org.apache.spark.unsafe.memory.MemoryAllocator
  • org.apache.spark.unsafe.memory.HeapMemoryAllocator
  • org.apache.spark.unsafe.memory.UnsafeMemoryAllocator
  • org.apache.spark.storage.memory.MemoryStore

总览

a18-1
全局只有唯一一个MemoryManager,里面维护了4个Pool。从业务上分为Execution和Storage,从存储位置分为OnHeap和OffHeap。每个task需要使用多个数据结构,每个数据结构都是一个MemoryConsumer的实现,每个task的这些consumer都通过TaskMemoryManager进行管理,多个TaskMemoryManager共同维护一个Tungsten的页结构。

Tungsten

为了解决JVM对象存储时的overhead问题,以及GC造成的性能损耗,而提出了一个新的内存模型。提供一套像C/C++一样可以直接操作内存的接口(实际操作的是堆外内存),再为了通用性提供了更高层的接口将堆内存和堆外内存进行了统一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MemoryLocation {

@Nullable
Object obj;

long offset;

public MemoryLocation(@Nullable Object obj, long offset) {
this.obj = obj;
this.offset = offset;
}

public MemoryLocation() {
this(null, 0);
}

public void setObjAndOffset(Object newObj, long newOffset) {
this.obj = newObj;
this.offset = newOffset;
}

public final Object getBaseObject() {
return obj;
}

public final long getBaseOffset() {
return offset;
}
}
1
2
3
4
5
public class MemoryBlock extends MemoryLocation {
private final long length;
public int pageNumber = NO_PAGE_NUMBER;
...
}

Tungsten提供了一套类似操作系统页内存管理一样的结构,每页会存储一个MemoryBlock结构。
length是整个Block实际占用的内存大小,pageNumber是在页数组中的index位置。MemoryLocation统一了堆内外内存的寻址,如果是off-heap,则obj为null,offset为绝对内存地址;如果是on-heap,则obj为对象的基地址,offset为偏移量。所以在实际使用过程当中就需要在物理地址与pageNumber,offsetInPage之间进行转换:

  • on-heap: address = page.obj + page.offset + inPageOffset
  • off-heap: address = page.offset + inPageOffset

但是在这套结构中物理地址不会直接的存储,pageNumer + offsetInPage的组合就能唯一的定位一个值的位置,所以提供了一个编码方法用64位的long值存储这个坐标,前13位是pageNumber,后51位是inPageOffset。在TaskMemoryManager当中提供了多个转换的方法:

  • long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage):给定页和页内偏移量计算encode值
  • long encodePageNumberAndOffset(int pageNumer, long offsetInPage):给定页号和页内偏移量计算encode值
  • int decodePageNumber(long pagePlusOffsetAddress):给定encode值,解码pageNumber
  • long decodeOffset(long pagePlusOffsetAddress):给定encode值,解码offset
  • Object getPage(long pagePlusOffsetAddress):给定encode值,获取页
  • long getOffsetInPage(long pagePlusOffsetAddress):给定encode值,获取页内偏移

Memory

MemoryManager

该类是内存管理的统筹类,定义了所有的内存管理动作。因为是一个抽象类,所以这些动作有的会下放给实现类实现,有些动作会委托MemoryPool类实现。下面是接口的分类:

  • 获取内存大小:

    • abstract maxOnHeapStorageMemory:获取Storage区最大能使用的堆内存大小(动态变化的)
    • abstract maxOffHeapStorageMemory:获取Storage区最大能使用的堆外内存大小(动态变化的)
    • storageMemoryUsed: Storage区已使用的内存大小
    • onHeapStorageMemoryUsed:Storage区已使用的堆内存大小
    • offHeapStorageMemoryUsed:Storage区已使用的堆外内存大小
    • executionMemoryUsed: Execution区已使用的内存大小
    • onHeapExecutionMemoryUsed:Execution区已使用的堆内存大小
    • offHeapExecutionMemoryUsed:Execution区已使用的堆外内存大小
    • getExecutionMemoryUsageForTask:获取一个task在Execution区占用的内存大小
  • 获取更多的内存空间:

    • abstract acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode):为一个Block获取numBytes的Storage区内存空间,如果获取不到足够的空间可能会删除一个存在的Block。
    • abstract acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode):为一个task获取numBytes的Execution区内存空间,当不能获取到足够执行的内存空间时,这个方法会阻塞,直到获取到足够多的内存。
  • 释放内存空间:

    • releaseAllExecutionMemoryForTask(taskAttemptId: Long):释放一个task占用的所有Execution区内存空间
    • releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode):释放numBytes的Storage区内存空间
    • releaseAllStorageMemory():释放所有的Storage区内存空间
    • releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode):释放numBytes的用于unroll block的内存空间
  • Tungsten相关

UnifiedMemoryManager

1
2
3
4
5
private[spark] class UnifiedMemoryManager(
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)

构造函数中的:

  • maxHeapMemory:是堆内存的总大小
  • onHeapStorageRegionSize:是堆内存中Storage区的起始大小
1
2
3
4
5
6
7
override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

这两个方法就是简单的计算,不过maxHeapMemory是创建UnifiedMemoryManager时传入的参数,而maxOffHeapMemory是从spark.memory.offHeap.size参数中读入。

::acquireExecutionMemory::
acquireExecutionMemory中主要的任务就是要给出MemoryPool.acquireMemory()中的两个回调,一个是获取更多的Execution区内存的回调,一个是获取Execution区最多能获取到的内存大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}

👆获取更多的Execution区内存的回调,这里最重要的是计算可以归还内存大小的逻辑,在memoryFree(空闲的内存大小)和poolSize-storageRegionSize(向Executions区借的内存大小)中取一个更大的值。然后真正归还的内存大小是在memoryReclaimableFromStorage(可以归还的内存大小)和extraMemoryNeeded(Executions区需要扩大的内存大小)之间取一个更小的值。

计算完成以后需要真正的进行内存操作释放需要的内存,该方法在StorageMemoryPool中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
`

先计算空闲空间的大小,如果空闲空间大于等于需要释放的空间大小,则不需要进行内存对象操作。否则的话,需要删除一些内存Block。删除的方法在MemoryStore中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}

def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
if (newEffectiveStorageLevel.isValid) {
// The block is still present in at least one store, so release the lock
// but don't delete the block info
blockInfoManager.unlock(blockId)
} else {
// The block isn't present in any store, so delete the block info so that the
// block can be stored again
blockInfoManager.removeBlock(blockId)
}
}

if (freedMemory >= space) {
var lastSuccessfulBlock = -1
try {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
(0 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
val entry = entries.synchronized {
entries.get(blockId)
}
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
afterDropAction(blockId)
}
lastSuccessfulBlock = idx
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} finally {
// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
// with InterruptedException
if (lastSuccessfulBlock != selectedBlocks.size - 1) {
// the blocks we didn't process successfully are still locked, so we have to unlock them
(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
blockInfoManager.unlock(blockId)
}
}
}
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id")
}
selectedBlocks.foreach { id =>
blockInfoManager.unlock(id)
}
0L
}
}
}

该类当中有一个存储所有Block的Map,即:

1
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

LinkedHashMap不是线程安全的,所以每次操作之前也是需要加锁。如果没有获取到需要释放的内存空间大小则遍历Block,判断遍历的Block与需要存储的Block是否是同一个存储区域(还判断了遍历的Block与需要存储的Block是否是同一个),如果通过了判断则需要先将该Block锁住,加入候选名单。

找够所有的候选者以后还没有达到需要释放的内存空间大小则将所有锁住的Block解锁,返回0,表示这个操作失败。如果达到了,则开始释放内存的过程。将每一个Block执行dropBlockafterDropAction的操作。在dropBlock中会删除该Block本身的数据(除非Block还在被操作),检查Block是否还在被其他的storage存储,如果是的话就先不删除其metadata,否则的话继续删除metadata。afterDropAction是个hook,可以由调用方指定删除之后的动作。如果在删除过程当中失败的话,需要将没有删除的Block解锁。

1
2
3
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}

👆获取Execution区最多能获取到的内存大小,是通过最大的内存大小减去Storage区最大能占用的内存大小。Storage区能占用的上限是storageRegionSize

下面来看真正进行内存分配的函数acquireMemory,该方法在ExecutionMemoryPool中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}

// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)

// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)

// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}

首先关注一下锁的对象,在调用方MemoryManager初始化的时候有声明锁的对象:

1
2
3
4
5
6
7
8
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
1
2
3
4
private[memory] class ExecutionMemoryPool(
lock: Object,
memoryMode: MemoryMode
)

通过上面两个代码片段可以看出,多个Pool的锁对象都是MemoryManger,所以多个Pool之间是互斥的,不论是StorageMemoryPool还是ExecutionMemoryPool

然后整个函数的工作方式:

  • 如果是一个新的task,先帮它加入到memoryForTask中,内存设为0,然后唤醒所有等待队列里的线程开始等锁。memoryForTask是一个保存taskId -> memory的map。
  • 进入一个死循环中,先查看是否需要获取更多的内存,如果需要的话则调用maybeGrowPool回调。计算一个task理论能分配到的最大内存和最小内存,即1/2N * maxPoolSize <= cache <= 1/N * maxPoolSize。接着计算实际最大能分配到的内存以及最终实际分配的内存。
  • 如果实际分配到的内存小于需要的内存或者这个任务分配到的总内存都没有达到理论最小内存的话,则将锁还掉以后继续等锁。如果拿到了需要的内存以后就更新memoryForTask并进行返回。

::acquireStorageMemory::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}

如果需要申请的内存大于最大内存则返回false,申请的内存大于Storage区的剩余内存,则需要从Execution区借内存。Storage区不能将正在运行的task踢出Execution区,所以只能从中获取空闲的空间大小。数值计算完成以后,开始真正的分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}

先从Storage中删除一些Block释放一些内存,如果有足够的内存申请就更新已使用的内存计数器,否则直接返回false。

MemoryPool

这是一个抽象类,整个类都在维护一个变量_poolSize,表示内存使用量。提供了维护这个量的一些方法,如:

  • poolSize: Long:获取_poolSize
  • memoryFree: Long:获取空闲的内存空间大小
  • incrementPoolSize(delta: Long):提高_poolSize
  • decrementPoolSize(delta: Long):降低_poolSize

以及一个抽象方法:

  • memoryUsed: Long

ExecutionMemoryPool

该类中维护了一个taskId -> memory的Map:memoryForTask来管理内存。

1
2
3
override def memoryUsed: Long = lock.synchronized {
memoryForTask.values.sum
}

实现父类的抽象方法,直接将memoryForTask中的values累加。

acquireMemory已经在上文中分析过了。releaseMemoryMemoryManager.releaseExecutionMemory中被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
val memoryToFree = if (curMem < numBytes) {
logWarning(
s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
s"of memory from the $poolName pool")
curMem
} else {
numBytes
}
if (memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
memoryForTask.remove(taskAttemptId)
}
}
lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
}

在正式释放之前会先比较一下现在该task所占用的内存和需要释放的内存的大小,如果task所占内存小于需要释放的内存也只会释放task所占内存,不会再释放其他的task。因为有新的内存空间出现,所以可以唤醒等待队列里的线程,开始给新任务争取内存。

1
2
3
4
5
def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
releaseMemory(numBytesToFree, taskAttemptId)
numBytesToFree
}

该方法会释放一个task所有的内存,直接获取task所占用的内存以后调用上面的releaseMemory方法。

StorageMemoryPool

该类负责Storage区的内存管理,在类中维护了一个_memoryUsed参数,来表示使用了多少内存。并且会关联一个MemoryStore对象,该对象会完成真正的内存管理操作。

重要的acquireMemoryfreeSpaceToShrinkPool函数均在上文中进行了介绍。

TaskMemoryManager

该类负责管理一个task的内存,该类中不会直接操作内存,会通过MemoryManager来进行管理。不过因为底层使用了Tungsten内存模型,该类中还会维护内存模型使用的页机制相关的变量。所有的TaskMemoryManager会共用一个MemoryManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
assert(consumer != null);
MemoryMode mode = consumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

// Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
// which is just spilled in last few times and re-spilling on it will produce many small
// spill files.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has used memory more than the remaining required memory.
// Get the consumer of largest used memory.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + c, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}
}

// call spill() on itself
if (got < required) {
try {
long released = consumer.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumer, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}

consumers.add(consumer);
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
}

该方法是为一个task新的consumer分配内存,一进来会先尝试使用ExecutorPool申请required大小的内存,如果能直接获取到就结束。否则的话需要从consumer中挑选合适的consumer进行spill操作(也就是将内存中的数据冲写到硬盘上)来释放足够多的内存。

挑选的过程也很常规,会选出大于需要的内存的consumer中最小的一个,如果不存在则从大到小依次spill,直到释放的内存达到需求。不过筛选大于需要的内存中最小的一个用了一个很简洁快速的方式,创建了一个memory -> List<MemoryConsumer>的TreeMap,直接使用TreeMap.ceilingEntry方法。每次释放完成以后都再重新申请更多的内存,直到申请到了足够多的内存。

如果在上面的操作执行完成以后(也就是能释放的都释放掉了)还是不够,那么就将这个要加入的新的consumer的部分数据冲写到硬盘上,使他能被放入MemoryPool中。

Allocate page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new TooLargePageException(size);
}

long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}

final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}

页管理主要由一个 BitSet(标示页位情况)和MemoryBlock[]()实现,true表示页位被占。该方法会先调用acquireExecutionMemory申请实际的物理内存,然后通过BitSet.nextClearBit()函数获取第一个空位置,并进行占位。完成以后就会通过tungstenMemoryAllocator来真正进行内存申请,下面会分析一下on-heap和off-heap两种不同的内存申请:

::Unsafe memory allocate::

1
2
3
4
5
6
7
8
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
MemoryBlock memory = new MemoryBlock(null, address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}

Off-heap的所有内存操作都是通过Unsafe工具类来完成,这个方法非常的简单。会先通过Unsafe.allocateMemory申请内存,然后初始化一个页结构,off-heap不会映射对象,所以obj传入null即可。

::Heap memory allocate::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public MemoryBlock allocate(long size) throws OutOfMemoryError {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
}
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}

这里多大内存有一个优化机制,类中有一个Map会保存大内存块的引用,减少GC和申请内存的时间。

1
2
@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();

触发这个机制的内存大小是1024 * 1024,所以我们能看到在allocate方法中会先判断是否触发该机制,如果触发则从未被回收的大内存块中取出相应的块进行存储,否则会重新申请内存。

Free page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert(allocatedPages.get(page.pageNumber));
pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.pageNumber);
}
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
}

对应于申请页也会有释放页的操作,这个过程比较简单,就是对页相关的数据结构进行更新,做一些清空操作。最后会调用tungstenMemoryAllocator.free进行真正的释放,并且调用底层的Executor区的pool进行释放。下面也会分析一下on-heap和off-heap的不同释放操作。

::Unsafe memory free::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void free(MemoryBlock memory) {
assert (memory.obj == null) :
"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";

if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
Platform.freeMemory(memory.offset);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
memory.offset = 0;
// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}

整个过程也很简单,调用Unsafe.freeMemory进行内存释放,将页对象设置为一个清空后的状态。

::Heap memory free::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void free(MemoryBlock memory) {
assert (memory.obj != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";

final long size = memory.size();
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}

// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;

// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
} else {
// Do nothing
}
}

将内存区域置为空,如果是一个大内存块的话就保留弱引用,以供下次需要的时候直接进行使用。为了加大命中概率可以看到在计算占用内存的时候都会找到比当前内存大的最近的一个8的倍数,保证了从弱引用区域中找到的一定是足够能装的下数据中最小的一块。

参考

spark 源码分析之十五 — Spark内存管理剖析 - JohnnyBai - 博客园
GitHub - hustnn/TungstenSecret: Explore the project Tungsten
Java 6 thread states and life cycle UML protocol state machine diagram example.

Comments

⬆︎TOP