「@JIKE」 Data engineer/Code player
post @ 2020-09-10

本文主要是对Spark的内存管理模块进行了代码走读,从业务逻辑上Spark将内存划分为执行区(Execution区,内存主要用来进行shuffle,join,sort,aggregate的计算)、存储区(Storage区,内存主要用来进行缓存和data transfer)。为了优化JVM内存系统的一些问题,在堆内存和堆外内存的基础上抽象了Tungsten内存系统。文中对涉及到的内的关键方法进行了分析。

代码清单

  • org.apache.spark.memory.MemoryManager
  • org.apache.spark.memory.UnifiedMemoryManager
  • org.apache.spark.memory.MemoryPool
  • org.apache.spark.memory.ExecutionMemoryPool
  • org.apache.spark.memory.StorageMemoryPool
  • org.apache.spark.memory.MemoryConsumer
  • org.apache.spark.memory.TaskMemoryManager
  • org.apache.spark.unsafe.memory.MemoryLocation
  • org.apache.spark.unsafe.memory.MemoryBlock
  • org.apache.spark.unsafe.memory.MemoryAllocator
  • org.apache.spark.unsafe.memory.HeapMemoryAllocator
  • org.apache.spark.unsafe.memory.UnsafeMemoryAllocator
  • org.apache.spark.storage.memory.MemoryStore

总览

a18-1
全局只有唯一一个MemoryManager,里面维护了4个Pool。从业务上分为Execution和Storage,从存储位置分为OnHeap和OffHeap。每个task需要使用多个数据结构,每个数据结构都是一个MemoryConsumer的实现,每个task的这些consumer都通过TaskMemoryManager进行管理,多个TaskMemoryManager共同维护一个Tungsten的页结构。

Tungsten

为了解决JVM对象存储时的overhead问题,以及GC造成的性能损耗,而提出了一个新的内存模型。提供一套像C/C++一样可以直接操作内存的接口(实际操作的是堆外内存),再为了通用性提供了更高层的接口将堆内存和堆外内存进行了统一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MemoryLocation {

@Nullable
Object obj;

long offset;

public MemoryLocation(@Nullable Object obj, long offset) {
this.obj = obj;
this.offset = offset;
}

public MemoryLocation() {
this(null, 0);
}

public void setObjAndOffset(Object newObj, long newOffset) {
this.obj = newObj;
this.offset = newOffset;
}

public final Object getBaseObject() {
return obj;
}

public final long getBaseOffset() {
return offset;
}
}
1
2
3
4
5
public class MemoryBlock extends MemoryLocation {
private final long length;
public int pageNumber = NO_PAGE_NUMBER;
...
}

Tungsten提供了一套类似操作系统页内存管理一样的结构,每页会存储一个MemoryBlock结构。
length是整个Block实际占用的内存大小,pageNumber是在页数组中的index位置。MemoryLocation统一了堆内外内存的寻址,如果是off-heap,则obj为null,offset为绝对内存地址;如果是on-heap,则obj为对象的基地址,offset为偏移量。所以在实际使用过程当中就需要在物理地址与pageNumber,offsetInPage之间进行转换:

  • on-heap: address = page.obj + page.offset + inPageOffset
  • off-heap: address = page.offset + inPageOffset

但是在这套结构中物理地址不会直接的存储,pageNumer + offsetInPage的组合就能唯一的定位一个值的位置,所以提供了一个编码方法用64位的long值存储这个坐标,前13位是pageNumber,后51位是inPageOffset。在TaskMemoryManager当中提供了多个转换的方法:

  • long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage):给定页和页内偏移量计算encode值
  • long encodePageNumberAndOffset(int pageNumer, long offsetInPage):给定页号和页内偏移量计算encode值
  • int decodePageNumber(long pagePlusOffsetAddress):给定encode值,解码pageNumber
  • long decodeOffset(long pagePlusOffsetAddress):给定encode值,解码offset
  • Object getPage(long pagePlusOffsetAddress):给定encode值,获取页
  • long getOffsetInPage(long pagePlusOffsetAddress):给定encode值,获取页内偏移

Memory

MemoryManager

该类是内存管理的统筹类,定义了所有的内存管理动作。因为是一个抽象类,所以这些动作有的会下放给实现类实现,有些动作会委托MemoryPool类实现。下面是接口的分类:

  • 获取内存大小:

    • abstract maxOnHeapStorageMemory:获取Storage区最大能使用的堆内存大小(动态变化的)
    • abstract maxOffHeapStorageMemory:获取Storage区最大能使用的堆外内存大小(动态变化的)
    • storageMemoryUsed: Storage区已使用的内存大小
    • onHeapStorageMemoryUsed:Storage区已使用的堆内存大小
    • offHeapStorageMemoryUsed:Storage区已使用的堆外内存大小
    • executionMemoryUsed: Execution区已使用的内存大小
    • onHeapExecutionMemoryUsed:Execution区已使用的堆内存大小
    • offHeapExecutionMemoryUsed:Execution区已使用的堆外内存大小
    • getExecutionMemoryUsageForTask:获取一个task在Execution区占用的内存大小
  • 获取更多的内存空间:

    • abstract acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode):为一个Block获取numBytes的Storage区内存空间,如果获取不到足够的空间可能会删除一个存在的Block。
    • abstract acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode):为一个task获取numBytes的Execution区内存空间,当不能获取到足够执行的内存空间时,这个方法会阻塞,直到获取到足够多的内存。
  • 释放内存空间:

    • releaseAllExecutionMemoryForTask(taskAttemptId: Long):释放一个task占用的所有Execution区内存空间
    • releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode):释放numBytes的Storage区内存空间
    • releaseAllStorageMemory():释放所有的Storage区内存空间
    • releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode):释放numBytes的用于unroll block的内存空间
  • Tungsten相关

UnifiedMemoryManager

1
2
3
4
5
private[spark] class UnifiedMemoryManager(
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)

构造函数中的:

  • maxHeapMemory:是堆内存的总大小
  • onHeapStorageRegionSize:是堆内存中Storage区的起始大小
1
2
3
4
5
6
7
override def maxOnHeapStorageMemory: Long = synchronized {
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

这两个方法就是简单的计算,不过maxHeapMemory是创建UnifiedMemoryManager时传入的参数,而maxOffHeapMemory是从spark.memory.offHeap.size参数中读入。

::acquireExecutionMemory::
acquireExecutionMemory中主要的任务就是要给出MemoryPool.acquireMemory()中的两个回调,一个是获取更多的Execution区内存的回调,一个是获取Execution区最多能获取到的内存大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}

👆获取更多的Execution区内存的回调,这里最重要的是计算可以归还内存大小的逻辑,在memoryFree(空闲的内存大小)和poolSize-storageRegionSize(向Executions区借的内存大小)中取一个更大的值。然后真正归还的内存大小是在memoryReclaimableFromStorage(可以归还的内存大小)和extraMemoryNeeded(Executions区需要扩大的内存大小)之间取一个更小的值。

计算完成以后需要真正的进行内存操作释放需要的内存,该方法在StorageMemoryPool中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
`

先计算空闲空间的大小,如果空闲空间大于等于需要释放的空间大小,则不需要进行内存对象操作。否则的话,需要删除一些内存Block。删除的方法在MemoryStore中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue
if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
}

def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
if (newEffectiveStorageLevel.isValid) {
// The block is still present in at least one store, so release the lock
// but don't delete the block info
blockInfoManager.unlock(blockId)
} else {
// The block isn't present in any store, so delete the block info so that the
// block can be stored again
blockInfoManager.removeBlock(blockId)
}
}

if (freedMemory >= space) {
var lastSuccessfulBlock = -1
try {
logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
s"(${Utils.bytesToString(freedMemory)} bytes)")
(0 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
val entry = entries.synchronized {
entries.get(blockId)
}
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
dropBlock(blockId, entry)
afterDropAction(blockId)
}
lastSuccessfulBlock = idx
}
logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
freedMemory
} finally {
// like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
// with InterruptedException
if (lastSuccessfulBlock != selectedBlocks.size - 1) {
// the blocks we didn't process successfully are still locked, so we have to unlock them
(lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
val blockId = selectedBlocks(idx)
blockInfoManager.unlock(blockId)
}
}
}
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id")
}
selectedBlocks.foreach { id =>
blockInfoManager.unlock(id)
}
0L
}
}
}

该类当中有一个存储所有Block的Map,即:

1
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

LinkedHashMap不是线程安全的,所以每次操作之前也是需要加锁。如果没有获取到需要释放的内存空间大小则遍历Block,判断遍历的Block与需要存储的Block是否是同一个存储区域(还判断了遍历的Block与需要存储的Block是否是同一个),如果通过了判断则需要先将该Block锁住,加入候选名单。

找够所有的候选者以后还没有达到需要释放的内存空间大小则将所有锁住的Block解锁,返回0,表示这个操作失败。如果达到了,则开始释放内存的过程。将每一个Block执行dropBlockafterDropAction的操作。在dropBlock中会删除该Block本身的数据(除非Block还在被操作),检查Block是否还在被其他的storage存储,如果是的话就先不删除其metadata,否则的话继续删除metadata。afterDropAction是个hook,可以由调用方指定删除之后的动作。如果在删除过程当中失败的话,需要将没有删除的Block解锁。

1
2
3
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}

👆获取Execution区最多能获取到的内存大小,是通过最大的内存大小减去Storage区最大能占用的内存大小。Storage区能占用的上限是storageRegionSize

下面来看真正进行内存分配的函数acquireMemory,该方法在ExecutionMemoryPool中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}

// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

// In every iteration of this loop, we should first try to reclaim any borrowed execution
// space from storage. This is necessary because of the potential race condition where new
// storage blocks may steal the free execution memory that this task was waiting for.
maybeGrowPool(numBytes - memoryFree)

// Maximum size the pool would have after potentially growing the pool.
// This is used to compute the upper bound of how much memory each task can occupy. This
// must take into account potential free memory as well as the amount this pool currently
// occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
// we did not take into account space that could have been freed by evicting cached blocks.
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)

// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}

首先关注一下锁的对象,在调用方MemoryManager初始化的时候有声明锁的对象:

1
2
3
4
5
6
7
8
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
1
2
3
4
private[memory] class ExecutionMemoryPool(
lock: Object,
memoryMode: MemoryMode
)

通过上面两个代码片段可以看出,多个Pool的锁对象都是MemoryManger,所以多个Pool之间是互斥的,不论是StorageMemoryPool还是ExecutionMemoryPool

然后整个函数的工作方式:

  • 如果是一个新的task,先帮它加入到memoryForTask中,内存设为0,然后唤醒所有等待队列里的线程开始等锁。memoryForTask是一个保存taskId -> memory的map。
  • 进入一个死循环中,先查看是否需要获取更多的内存,如果需要的话则调用maybeGrowPool回调。计算一个task理论能分配到的最大内存和最小内存,即1/2N * maxPoolSize <= cache <= 1/N * maxPoolSize。接着计算实际最大能分配到的内存以及最终实际分配的内存。
  • 如果实际分配到的内存小于需要的内存或者这个任务分配到的总内存都没有达到理论最小内存的话,则将锁还掉以后继续等锁。如果拿到了需要的内存以后就更新memoryForTask并进行返回。

::acquireStorageMemory::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}

如果需要申请的内存大于最大内存则返回false,申请的内存大于Storage区的剩余内存,则需要从Execution区借内存。Storage区不能将正在运行的task踢出Execution区,所以只能从中获取空闲的空间大小。数值计算完成以后,开始真正的分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long): Boolean = lock.synchronized {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}

先从Storage中删除一些Block释放一些内存,如果有足够的内存申请就更新已使用的内存计数器,否则直接返回false。

MemoryPool

这是一个抽象类,整个类都在维护一个变量_poolSize,表示内存使用量。提供了维护这个量的一些方法,如:

  • poolSize: Long:获取_poolSize
  • memoryFree: Long:获取空闲的内存空间大小
  • incrementPoolSize(delta: Long):提高_poolSize
  • decrementPoolSize(delta: Long):降低_poolSize

以及一个抽象方法:

  • memoryUsed: Long

ExecutionMemoryPool

该类中维护了一个taskId -> memory的Map:memoryForTask来管理内存。

1
2
3
override def memoryUsed: Long = lock.synchronized {
memoryForTask.values.sum
}

实现父类的抽象方法,直接将memoryForTask中的values累加。

acquireMemory已经在上文中分析过了。releaseMemoryMemoryManager.releaseExecutionMemory中被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
val memoryToFree = if (curMem < numBytes) {
logWarning(
s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
s"of memory from the $poolName pool")
curMem
} else {
numBytes
}
if (memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) -= memoryToFree
if (memoryForTask(taskAttemptId) <= 0) {
memoryForTask.remove(taskAttemptId)
}
}
lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
}

在正式释放之前会先比较一下现在该task所占用的内存和需要释放的内存的大小,如果task所占内存小于需要释放的内存也只会释放task所占内存,不会再释放其他的task。因为有新的内存空间出现,所以可以唤醒等待队列里的线程,开始给新任务争取内存。

1
2
3
4
5
def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
releaseMemory(numBytesToFree, taskAttemptId)
numBytesToFree
}

该方法会释放一个task所有的内存,直接获取task所占用的内存以后调用上面的releaseMemory方法。

StorageMemoryPool

该类负责Storage区的内存管理,在类中维护了一个_memoryUsed参数,来表示使用了多少内存。并且会关联一个MemoryStore对象,该对象会完成真正的内存管理操作。

重要的acquireMemoryfreeSpaceToShrinkPool函数均在上文中进行了介绍。

TaskMemoryManager

该类负责管理一个task的内存,该类中不会直接操作内存,会通过MemoryManager来进行管理。不过因为底层使用了Tungsten内存模型,该类中还会维护内存模型使用的页机制相关的变量。所有的TaskMemoryManager会共用一个MemoryManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
assert(consumer != null);
MemoryMode mode = consumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

// Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
// which is just spilled in last few times and re-spilling on it will produce many small
// spill files.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has used memory more than the remaining required memory.
// Get the consumer of largest used memory.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + c, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}
}

// call spill() on itself
if (got < required) {
try {
long released = consumer.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumer, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
+ e.getMessage());
// checkstyle.on: RegexpSinglelineJava
}
}

consumers.add(consumer);
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
}

该方法是为一个task新的consumer分配内存,一进来会先尝试使用ExecutorPool申请required大小的内存,如果能直接获取到就结束。否则的话需要从consumer中挑选合适的consumer进行spill操作(也就是将内存中的数据冲写到硬盘上)来释放足够多的内存。

挑选的过程也很常规,会选出大于需要的内存的consumer中最小的一个,如果不存在则从大到小依次spill,直到释放的内存达到需求。不过筛选大于需要的内存中最小的一个用了一个很简洁快速的方式,创建了一个memory -> List<MemoryConsumer>的TreeMap,直接使用TreeMap.ceilingEntry方法。每次释放完成以后都再重新申请更多的内存,直到申请到了足够多的内存。

如果在上面的操作执行完成以后(也就是能释放的都释放掉了)还是不够,那么就将这个要加入的新的consumer的部分数据冲写到硬盘上,使他能被放入MemoryPool中。

Allocate page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new TooLargePageException(size);
}

long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}

final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}

页管理主要由一个 BitSet(标示页位情况)和MemoryBlock[]()实现,true表示页位被占。该方法会先调用acquireExecutionMemory申请实际的物理内存,然后通过BitSet.nextClearBit()函数获取第一个空位置,并进行占位。完成以后就会通过tungstenMemoryAllocator来真正进行内存申请,下面会分析一下on-heap和off-heap两种不同的内存申请:

::Unsafe memory allocate::

1
2
3
4
5
6
7
8
public MemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
MemoryBlock memory = new MemoryBlock(null, address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}

Off-heap的所有内存操作都是通过Unsafe工具类来完成,这个方法非常的简单。会先通过Unsafe.allocateMemory申请内存,然后初始化一个页结构,off-heap不会映射对象,所以obj传入null即可。

::Heap memory allocate::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public MemoryBlock allocate(long size) throws OutOfMemoryError {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
}
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}

这里多大内存有一个优化机制,类中有一个Map会保存大内存块的引用,减少GC和申请内存的时间。

1
2
@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();

触发这个机制的内存大小是1024 * 1024,所以我们能看到在allocate方法中会先判断是否触发该机制,如果触发则从未被回收的大内存块中取出相应的块进行存储,否则会重新申请内存。

Free page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
assert(allocatedPages.get(page.pageNumber));
pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.pageNumber);
}
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
}

对应于申请页也会有释放页的操作,这个过程比较简单,就是对页相关的数据结构进行更新,做一些清空操作。最后会调用tungstenMemoryAllocator.free进行真正的释放,并且调用底层的Executor区的pool进行释放。下面也会分析一下on-heap和off-heap的不同释放操作。

::Unsafe memory free::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void free(MemoryBlock memory) {
assert (memory.obj == null) :
"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";

if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
Platform.freeMemory(memory.offset);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
memory.offset = 0;
// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
}

整个过程也很简单,调用Unsafe.freeMemory进行内存释放,将页对象设置为一个清空后的状态。

::Heap memory free::

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void free(MemoryBlock memory) {
assert (memory.obj != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";

final long size = memory.size();
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}

// Mark the page as freed (so we can detect double-frees).
memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;

// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
} else {
// Do nothing
}
}

将内存区域置为空,如果是一个大内存块的话就保留弱引用,以供下次需要的时候直接进行使用。为了加大命中概率可以看到在计算占用内存的时候都会找到比当前内存大的最近的一个8的倍数,保证了从弱引用区域中找到的一定是足够能装的下数据中最小的一块。

参考

spark 源码分析之十五 — Spark内存管理剖析 - JohnnyBai - 博客园
GitHub - hustnn/TungstenSecret: Explore the project Tungsten
Java 6 thread states and life cycle UML protocol state machine diagram example.

Read More

在我们的业务当中有这样一个场景,用户每收到一笔转账以后,就会通过推送服务给用户发送一个通知,而这些通知之间需要保证先后顺序(即帐单的产生时序)。假设我们有一个订单数据结构Bill

1
2
3
4
5
6
7
{
"_id": ObjectId
"fromUserId": String,
"toUserId": String,
"amount": Double,
"createdAt": Long
}

fromUserId是转账用户的id,toUserId是收款用户的id,amount是转账金额,createdAt是记录创建的日期。

系统中使用安装了debezium connector for mongodb插件的kafka connect来将mongodb oplog收集到kafka中,下游使用flink streaming task来进行消费并触发推送服务。整个过程中数据会流过多个分布式系统,如何保证在流经这些系统以后还能保证记录的产生顺序就是今天要讨论的问题。

Kafka connect && Kafka

所有的Bill oplog都会被发送到同一个Kafka topic中,所以只需要保证在发往Kafka的过程当中使用toUserId作为 Partition Key即可。但是由于使用了开源组件来帮助我们完成了收集oplog的任务,所以需要保证debezium connector for mongodb能如我们的愿。

首先需要知道的是Kafka connect默认情况下使用的分区策略是org.apache.kafka.clients.producer.DefaultPartitioner,其中partition方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

如果key是空的,则以轮训的方式来分配partition,否则则使用一种murmur2的HASH算法来分配partition。所以我们只需要将message key设置为toUserId即可达成我们的目的。但是 Debezium Connector for MongoDB :: Change event’s key 中写道,message key只能是_id,而toUserId不具有唯一性,所以不能作为_id;如果加上时间戳或者其他的字段保证了唯一性,又失去了要将相同toUserId放在一个分区的语义,所以这条路基本是走不通的。

The MongoDB connector does not make any explicit determination of the topic partitions for events. Instead, it allows Kafka to determine the partition based upon the key. You can change Kafka’s partitioning logic by defining in the Kafka Connect worker configuration the name of the Partitioner implementation.
Be aware that Kafka only maintains total order for events written to a single topic partition. Partitioning the events by key does mean that all events with the same key will always go to the same partition, ensuring that all events for a specific document are always totally ordered.

继续寻找其他的变通方法,Debezium Connector for MongoDB :: Partitions 文档中写到可以通过设置 Kafka connect worker 的Partitioner设置来指定分区策略。也就是 Apache Kafka 文档中提到的partitioner.class

partitioner.class: Partitioner class that implements the org.apache.kafka.clients.producer.Partitioner interface.
Type: classDefault: org.apache.kafka.clients.producer.internals.DefaultPartitioner

即需要自己通过继承Partitioner接口来实现自己的分区策略,要实现partition方法

1
2
3
 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
...
}

实现了之后将包含该类的JAR包放入Kafka connect能扫描的路径下,再在connect-standalone.properties或者connect-distributed.properties配置文件中进行全局的设定。可以参见下面两个链接👇:

那么试想一下我们的方案是不是就可以实现为,将toUserId和时间戳拼接为唯一性的_id,在partition函数中将toUserId提取出来并以此作为partition key进行分区以实现在kafka中保证顺序的目的。但是这样的实现方案也存在问题,比如这个Kafka connect 仅来为这个任务服务,因为这种分区策略并不具有通用性,如果有多个类似的需求则要部署多份Kafka connect。同时也存在一些好处,例如拼接的_id是可以直接作为Mongodb的shard key来对该场景的上游进行横向扩展的。可惜的是debezium的实现并不能保证shard cluster传入kafka时保证事件的发生顺序,虽然能将拼接的_id作为shard key,但是由于这种架构并没有能力保证顺序性,所以这种扩展也是无效的,参见👇:
Debezium Connector for MongoDB :: MongoDB sharded cluster

综上在该架构中如果想在Kafka层保证事件的有序性是非常困难,并且很不经济实惠。

通过上文的分析,我们不得不接受一个事实,Flink消费到的是乱序数据。但是因为其特性,能较为方便的对乱序数据进行处理。
在debezium收集到的oplog中,包含两个时间戳,一个是在Value payload中的ts_ms,表示的是debezium收集该oplog时的系统时间;另外一个是在Value payload中的source.ts_ms,表示的是mongodb产生oplog的时间。而我们的实体类中也写入了Create bill的时间createdAt,由于只关心Insert事件,所以在Value payload的after.createdAt中能获取Bill创建的真实时间。在使用过程中一般以第二个时间或者第三个时间为准,在没有采用shard cluster的mongodb中,我认为第二个时间的先后次序应该与第三个时间的先后次序相同;采用了shard cluster的mongodb中,应该以第三个时间为准。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"payload": {
"after": ...,
"patch": null,
"source": {
"version": "1.0.3.Final",
"connector": "mongodb",
"name": "cdc_test",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "bill",
"ord": 31,
"h": 1546547425148721999
},
"op": "r",
"ts_ms": 1558965515240
}

在Flink的时间概念中也有三种时间:

  • 事件时间:独立事件在产生它的设备上发生的事件,通常在进入Flink之前就已经嵌入到事件中。
  • 接入时间:数据进入Flink的时间,取决于Source Operator所在主机的系统时钟。
  • 处理时间:数据在操作算子计算过程中获取到的所在主机时间。

显然在该场景下应该选择Bill的创建时间作为Flink的事件时间:

1
2
3
4
5
6
7
env.addSource(Utils.providesAvroKafkaConsumer(kafkaConfig))
.map(recordTuple => AvroMongoOplog.newInstance(recordTuple._1.asInstanceOf[Record], recordTuple._2.asInstanceOf[Record]))
.filter(mongoOplog => mongoOplog.getOpType.eq(MongoOpType.Insert))
.assignAscendingTimestamps(mongoOplog => mongoOplog.getDocument.getLong("createdAt"))
.keyBy(mongoOplog => mongoOplog.getDocument.get("toUserId"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(...)
  • 连接Kafka,使用实现的KeyedAvroDeserializationSchema进行反序列化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
    import org.apache.avro.generic.GenericRecord
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    import org.apache.kafka.clients.consumer.ConsumerRecord

    @SerialVersionUID(1584533572114L)
    class KeyedAvroDeserializationSchema extends KafkaDeserializationSchema[(GenericRecord, GenericRecord)] {

    var keyDeserializer: ConfluentRegistryAvroDeserializationSchema[GenericRecord] = _
    var valueDeserializer: ConfluentRegistryAvroDeserializationSchema[GenericRecord] = _

    def this(topic: String, schemaRegistry: String) {
    this()
    val keySubject = topic + "-key"
    val valueSubject = topic + "-value"
    val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistry, 1000)
    val keySchema = schemaRegistryClient.getByID(schemaRegistryClient.getLatestSchemaMetadata(keySubject).getId)
    val valueSchema = schemaRegistryClient.getByID(schemaRegistryClient.getLatestSchemaMetadata(valueSubject).getId)
    keyDeserializer = ConfluentRegistryAvroDeserializationSchema.forGeneric(keySchema, schemaRegistry)
    valueDeserializer = ConfluentRegistryAvroDeserializationSchema.forGeneric(valueSchema, schemaRegistry)
    }

    override def isEndOfStream(t: (GenericRecord, GenericRecord)): Boolean = false

    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (GenericRecord, GenericRecord)
    = (keyDeserializer.deserialize(consumerRecord.key()), valueDeserializer.deserialize(consumerRecord.value()))

    override def getProducedType: TypeInformation[(GenericRecord, GenericRecord)] = createTypeInformation[(GenericRecord, GenericRecord)]
    }
  • 通过一个工具类将(Record,Record)转化为更好处理的MongoOplogEntry类型

  • 过滤出所有的Insert事件

  • Bill的创建时间作为Flink的事件时间

  • 按照toUserId进行分区

  • 设置10秒的时间窗口

  • 在窗口处理函数中对所有MongoOplogEntry对象按照createdAt再排序后依次触发推送服务

最后

以上介绍了在Mongodb CDC过程中保证数据有序性的两种思路,一种是在Kafka中就保证toUserId相同的数据均有序,这样在消费过程中不需要做窗口计算,只要在需要partition的地方继续使用toUserId进行partition,就能保证数据有序性。这种方案在Kafka connect侧需要做很多的工作,但是能为流式任务带来更好的消费性能,但是由于debezium的局限性,在shard cluster的Mongodb中不能发挥作用。第二种是让Flink消费乱序数据,使用其本身的事件时间窗口计算来重新纠正数据。这种方案会让Flink的效率大打折扣,并且需要保证窗口缓存数据不能超过限制,但是比较通用,使用的时候也需要注意迟到的数据该如何处理。大家可以根据自己的场景来挑选方案,或者如果有更好的方案欢迎一起交流。

Read More
post @ 2020-01-20

发现WebclientURLEncoder的表现非常不稳定,所以找了一下正确的使用方式,首先是一个issue TestRestTemplate does the url encoding twice if I pass the URI as a string · Issue #8888 · spring-projects/spring-boot · GitHub ,一个老哥发现当exchange 里传入 URI 类的时候,exchange不会有任何encode行为,但是在传入一个字符串的时候会被encode。

下面有bclozel的回答👇:

1
2
3
4
Hi  [@georgmittendorfer](https://github.com/georgmittendorfer) ,
I think this is the expected behavior in Spring Framework (see [SPR-16202](https://jira.spring.io/browse/SPR-16202) and [SPR-14828](https://jira.spring.io/browse/SPR-14828) for some background on this).
As a general rule, providing a URI String to RestTemplate means that this String should be expanded (using optional method parameters) and then encoded. If you want to take full control over the encoding, you should then use the method variant taking a URI as a parameter.
The Spring Framework team recently added [some more documentation on the subject of URI encoding](https://docs.spring.io/spring-framework/docs/current/spring-framework-reference/web.html#web-uri-encoding) . Does that help?

看来Spring认为这是一个正常的行为,即传入字符串的时候我就要对你进行encode,你传URI对象的时候我不管。继续点进去看看官方文档 Web on Servlet Stack
1.5.3 中被补上了详细的使用细节,首先它将URI的组成分为两个部分,一个叫做URI template,一个叫做URI variables。然后提供了4种encode的模式:

  • TEMPLATE_AND_VALUES:会先对template进行encode,然后在扩展的时候再对variables进行encode。
  • VALUES_ONLY:不会对template进行encode,在扩展之前对variables进行encode。
  • URI_COMPONENTS:和第二种类似,不过是在扩展之后对values进行encode。
  • NONE:不做任何的encode。

再来看看我们遇到的问题,在我们的一个库中,用错误的姿势使用的URI

1
2
3
4
return webClient
.get()
.uri("/doItemHighCommissionPromotionLinkByAll?" + Utils.pojo2UrlQuery(request))
.exchange();

直接传入了一个字符串,会发现调用的方法是:

1
2
3
4
5
@Override
public RequestBodySpec uri(String uriTemplate, Object... uriVariables) {
attribute(URI_TEMPLATE_ATTRIBUTE, uriTemplate);
return uri(uriBuilderFactory.expand(uriTemplate, uriVariables));
}

原来我们传入的是一个没有任何占位符的uriTemplate,而现在依赖的WebClient版本中默认的encode模式是EncodingMode.URI_COMPONENTS,也就是根本不会管template部分。所以我们发现为什么传入一个字符串的时候没有被自动encode。

那么是不是把encode模式改为EncodingMode. TEMPLATE_AND_VALUES,让它会encode template就没问题了呢?也不是,比如http://api.vephp.com/hcapi?detail=1&vekey=V00003484Y95498091&para=https://uland.taobao.com/coupon/edetail?activityId=8932eb9980234090851d448195fe363c&itemId=578614572836
这个url,para传入的又是一个url,跟了一下解析代码,会发现template在解析的时候会把query map解成:

1
2
3
4
5
6
{
"detail": 1,
"vekey": "V00003484Y95498091",
"para": "https://uland.taobao.com/coupon/edetail?activityId=8932eb9980234090851d448195fe363c",
"itemId": "578614572836"
}

因为它解析query params的正则长这样:

1
"([^&=]+)(=?)([^&]+)?"

所以后果是para会被encode(而且还是错误的encode,这是一个针对template的encode,不是正紧的urlEncode),但是itemId部分不会。所以最终也只是得到了一个错误的encode结果。

综上所述有三种使用方式:

  • 传入的还是一个字符串,不过在构造的时候自己去做urlEncode,然后在WebClient的设置里将encode模式设为后三种。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    return WebClient
    .builder()
    .exchangeStrategies(strategies)
    .baseUrl(endpoint)
    .uriBuilderFactory(providesUriBuilderFactory(endpoint));

    private static DefaultUriBuilderFactory providesUriBuilderFactory(String endpoint) {
    DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory(endpoint);
    factory.setEncodingMode(EncodingMode.NONE);
    return factory;
    }
  • 传入一个URI对象,构造的时候自己去做urlEncode,不用关心WebClient里的设置。

  • 正确的使用url templateurl variables进行构造,那就不用自己去做urlEncode。

    1
    2
    3
    URI uri = UriComponentsBuilder.fromPath("/hotel list/{city}")
    .queryParam("q", "{q}")
    .build("New York", "foo+bar")

最后你可能会发现自己在进行urlEncode的时候,还是会有问题。可以阅读下 Java URL encoding: URLEncoder vs. URI - Stack OverflowJava equivalent to JavaScript’s encodeURIComponent that produces identical output? - Stack Overflow
简单点说就是 URLEncoder.encode() 方法不是你真正想用到的方法,你可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static String encodeURIComponent(String s) {
String result;

try {
result = URLEncoder.encode(s, "UTF-8")
.replaceAll("\\+", "%20")
.replaceAll("\\%21", "!")
.replaceAll("\\%27", "'")
.replaceAll("\\%28", "(")
.replaceAll("\\%29", ")")
.replaceAll("\\%7E", "~");
} catch (UnsupportedEncodingException e) {
result = s;
}

return result;
}

(好傻啊

Read More
post @ 2019-11-08

Percolator

Percolator主要使用在Bigtable系统中提供分布式事务能力,其本身的实现是利用Bigtable的单行事务能力以及在行内设置lock列来进行悲观事务。数据结构:

1
<key, version>: <data>, <lock>, <write>

Transaction Write

  • 开启事务:向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 2PC第一阶段:Percolator在一些Write set当中选择一行做为Primary,其他的行做为Secondaries。先对Primary做prewrite操作,如果成功则对其他的Secondaries做prewrite操作。
    • Prewrite:这个操作当中有多个原子操作,被封装在一个事务当中,由Bigtable的单行事务性来保证。
      • 首先检查write是否有时间戳大于t(s)的版本,如果有则说明这行数据已经被新的事务提交过了,直接返回事务冲突
      • 然后检查lock是否有任意版本的数据存在,如果有则说明这行资源还被别的事务持有,返回事务冲突
      • 如果前面的操作都成功了,那么在data写入版本为t(s)的value数据
      • 并且在lock中写入版本为t(s)值为primary位置的数据
        prewrite的第二部检查当中,发生冲突是有三种情况:
    • 获得锁的版本小于t(s),该资源正在被一个事务持有
    • 获得锁的版本小于t(s),有一个老事务因为某种原因没有成功的还掉锁
    • 获得锁的版本大于t(s),该资源正在被一个事务持有
      上述情况当中的1,3都是典型的写-写冲突,client就进行正常的backoff重试即可。而第2种情况是客户端在2PC的第二阶段发生了异常导致,这时需要rollback之前的事务来释放掉这个异常的锁。并且这里是很难区分1,2的,毕竟锁的版本都小于t(s),所以需要一个附件条件锁的ttl时间,如果锁处于ttl时间内则说明是第1种情况,在ttl时间外则是第2种情况。
  • 2PC第二阶段:向集中的TSO节点申请一个事务提交时间戳t(c),之后检查Primary的lock是否还存在t(s)版本的数据,如果不存在则说明该事务锁已经超过ttl时长,被其他的事务中断了。如果存在的话,则向write写入版本为t(c)值为t(s)的数据并且清掉锁,这时整个事务已经成功。最后异步的完成Secondaries写write并且释放锁的操作。这个阶段当中检查、写入、清锁的过程被包装在一个事务当中。

Transaction Read

读事务就要简单很多,Percolator向集中的TSO节点申请一个事务开始的时间戳t(s),然后检查所有的Read set中的锁,如果存在时间戳小于t(s)的锁:

  • 锁还处于TTL时间内,说明该资源正在被另外一个事务持有,Client进行backoff操作
  • 锁已经超时,这时可以通过锁中记录的primary位置找到primary行的write列,检查是否存在锁版本的数据。如果存在则说明该事务已经成功,只是没有正常的还锁,这时将锁对应的事务进行提交,如果不存在则说明该事务2PC第二阶段出现问题,将该事务进行rollback

下面是论文源码👇:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class Transaction {
struct Write{ Row row; Column: col; string value;};
vector<Write> writes_;
int start_ts_;

Transaction():start_ts_(orcle.GetTimestamp()) {}
void Set(Write w) {writes_.push_back(w);}
bool Get(Row row, Column c, string* value) {
while(true) {
bigtable::Txn = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"locks", [0, start_ts_])) {
// There is a pending lock; try to clean it and wait
BackoffAndMaybeCleanupLock(row, c);
continue;
}
}

// Find the latest write below our start_timestamp.
latest_write = T.Read(row, c+"write", [0, start_ts_]);
if(!latest_write.found()) return false; // no data
int data_ts = latest_write.start_timestamp();
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}
// prewrite tries to lock cell w, returning false in case of conflict.
bool Prewrite(Write w, Write primary) {
Column c = w.col;
bigtable::Txn T = bigtable::StartRowTransaction(w.row);

// abort on writes after our start stimestamp ...
if (T.Read(w.row, c+"write", [start_ts_, max])) return false;
// ... or locks at any timestamp.
if (T.Read(w.row, c+"lock", [0, max])) return false;

T.Write(w.row, c+"data", start_ts_, w.value);
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col}); // The primary's location.
return T.Commit();
}
bool Commit() {
Write primary = write_[0];
vector<Write> secondaries(write_.begin() + 1, write_.end());
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;

int commit_ts = orcle.GetTimestamp();

// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
T.Write(p.row, p.col+"write", commit_ts,
start_ts_); // Pointer to data written at start_ts_
T.Erase(p.row, p.col+"lock", commit_ts);
if(!T.Commit()) return false; // commit point

// Second phase: write our write records for secondary cells.
for (Write w:secondaries) {
bigtable::write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
}; // class Transaction

Example

  1. 初始化Bob和Joe的账户,Bob有10元,Joe有2元
    a15-1
  2. 有一个事务出现,这个事务要将Bob的7元给Joe,这时获得了一个新的时间戳7,选择Bob做为primary,锁住该行写入Bob减掉7元以后的数据
    a15-2
  3. 将Joe选为Secondary,并指向Primary的Bob,锁住该行写入Joe加7元以后的数据
    a15-3
  4. 这时候2PC第一阶段完成,开始第二阶段,申请一个提交时间戳8,将时间戳7写入Bob的write的8版本中
    a15-4
  5. 将时间戳7写入Joe的write的8版本中
    a15-5

Percolator in TiDB

有了上面对Percolator的解释,我们现在很容易理解在TiDB中是如果使用Percolator来实现事务逻辑。首先我们来看其乐观事务的实现:
a15-6
我们从图上可以看出,Percolator是被使用在TiDB和其下面的TiKV进行事务通信的协议。最开始的时候我很奇怪,Percolator的实现不是一个悲观事务模型吗?但是为什么TiDB里称其为乐观事务,是因为暴露给Client的不是底层的KV而是DB这一层,而加锁的过程被放在了Commit阶段,所以对于Client来说,这就是一个乐观事务模型。当事务开始以后,首先执行DML操作,得到Write set,然后将Write set放到Percolator中执行2PC,在第一阶段上锁。

下面再看看他们在这基础上修改的悲观事务模型,很巧妙:
a15-7
将上锁的过程提前到开始执行Percolator事务之前,先对所有的Write set上一个和Percolator同样的锁,不过锁里面没有记录Primary的位置,而是空的,仅做占位符使用。等开始执行Percolator事务以后,锁会被写入正确的值。这样做的好处是,在数据真正开始发生变更之前就锁住了所有资源。不会发生回滚行为,在资源竞争密集的场景下效率大大优于乐观事务。写请求看到这个空锁直接等锁,读请求可以直接从TiKV中读取数据即可。

Omid

Omid主要使用在Phoenix系统中提供分布式事务能力,其本身的实现是利用Hbase的单行事务能力以及在行内设置version, commit列来进行乐观事务,数据结构:

1
2
Data table   <key, version>: <value> <commit>
Commit table <version>: <commit>

Transaction Write

  • 开启事务:向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 2PC第一阶段:Client将Write set中的每行的修改数据写入Data table版本为t(s),对应的key的value当中,需要注意的是这时候的commit均为null
  • 2PC第二阶段:Client带上Write set和t(s)TSO提交commit请求,TSO会进行冲突检查,如果检查成功则返回t(c)给Client,否则的话整个事务被中断。Client拿到t(c)以后向Commit Table发起CAS(t(s), commit, null, t(c))操作,如果返回ABORT则将事务终止,并且异步的清除Data table中之前写入的数据。如果成功,则进行Post-commit流程,将写入Commit table中的t(c)异步复制到Data table的版本为t(s)commit当中。完成所有的异步复制以后进行垃圾回收,将Commit table当中的数据清除掉,完成整个事务。
    • TSO如何进行冲突检查:原理非常的简单,就是检查Write set当中的每一行是否有lastCommit > t(s)的数据,lastCommit是这一行最新的一个t(c)。如果有则说明在该事务执行过程当中已经有其他的事务完成,出现了写-写冲突,则中断该事务。但是要执行这个操作需要在TSO当中保存所有行的lastCommit数据才行,这个存储开销太大了,所以需要想办法优化。优化的手段也比较简单,就是维护一个LRU队列即可,只保存一定数量的行的lastCommit即可。那么不在队列当中的行的lastCommit一定小于等于队列中最小的一个lastCommit时间,这样可以检查lastCommit<=Smallest(lastCommit)<=t(s)的偏序关系,以检查冲突情况。但是由于队列中没有保存所有的数据,还是会有漏网之鱼,比如说现在队列里的Smallest(lastCommit) > t(s),并且要检查的行没有在队列当中,那么偏序关系就无从可知,这时候就直接将事务中断即可,也不会影响正确性。
    • CAS函数:这是一个在实现乐观锁当中经常会使用到的函数,CAS(a,b,c,d)是指比较a行b列,如果它现在的值等于c,则将其修改为d。并且这个函数需要保证原子性。在HBase当中可以使用行级事务来实现CAS函数,并保证其原子性。

Transaction Read

  • 向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 扫描所有的Read set,每一行从大版本到小版本扫描,找到第一个提交版本小于t(s)的value和对应的版本t(s2)。如果发现其commit==null,这时候有两种情况:
    • 这一次事务已经成功,只是正在进行Post-commit流程,从Commit table当中将t(c)复制过来
    • 这一次事务没有成功
      为了区分这两种情况,Omid会去Commit table当中检查版本t(s2)对应的commit是否有值,如果有值则说明是情况1,如果没有则说明是情况2。情况1的话很好办,继续向下遍历更小的版本。情况2的话就比较麻烦:
    • Client调用CAS(t(s2), commit, null, ABORT)来将其对应的事务设置为ABORT,这样在其重试的Commit环节会发现ABORT标志而使其事务进行中断操作
    • 如果设置成功,还需要检查一下是否是因为读事务太慢而导致的错误中断,去Data table读t(s2)版本的commit,如果发现存在值t(c2),并且t(c2)<t(s),则返回其value和版本。否则继续向下遍历更小的版本。

下面是论文源码👇:
a15-8

a15-9

End

通过上文的分析我们可以看出,Percolator的优点是分布式的Commit table,TSO逻辑简单,缺点是锁检查时需要扫描所有Write set的锁情况,并且需要额外的存储开销来记录锁。Omid的优点是执行效率上优于Percolator,但是又多了一个中心系统Commit table。大家可以根据自己的使用场景来进行选择。

参考资料:

Read More

post @ 2019-10-18

在很多的多线程编程场景下都会遇到多个线程对一个资源进行操作访问的情况,这种场景一旦发生就会牵扯到线程安全问题。为了保证程序的正确性,我们不得不花很大的力气去解决这些线程安全问题。在Java中解决线程安全问题的办法被分为了三种,其一是互斥同步,其二是非阻塞同步,其三是无同步方案。前两种的实现形式都是锁,第三种是通过设计模式的转变来将代码转变为不共享变量的形式,这不在这篇博客的讨论范围中。

线程不安全

首先我们来分析一下线程安全问题产生的底层原因到底是什么?先看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package io.talkwithkeyboard.code;

public class MultiThreadIncrease {

public static int race = 0;

public static void increase() {
race = race + 1;
}

public static void main(String[] args) {
final int THREAD_COUNT = 20;
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
System.out.format("This is %d thread.\n", Thread.currentThread().getId());
for (int j = 0; j < 10000; j++) {
increase();
}
});
threads[i].start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(race);
}
}

首先解释一个细节,因为我的代码是在Idea里跑的,所以不仅是有Main thread,还有一个Monitor Ctrl-Break的守护线程。所以代码中是Thread.activeCount() > 2。如果直接使用java执行的话,这里是1即可。然后上面的代码做了一个很简单的事情,开了20个线程,每个线程做一件事情,对race这个变量累加10000次,最后输出。最后的结果显然不是200000,会小很多并且每次都不一样。这是为什么呢?

是因为race = race + 1这一行代码其实做了三件事情:

    1. 取出race现有的值
    1. race现有的值加上1
    1. 将更新后的值再附给race

我们理想的状态是,每个线程顺序的做完这三件事:

1
2
3
4
5
6
7
thread1.1  // race=0
thread1.2 // race=0
thread1.3 // race=1
thread2.1 // race=1
thread2.2 // race=1
thread2.3 // race=2
...

但实际是:

1
2
3
4
5
6
7
thread1.1  // race=0
thread2.1 // race=0
thread2.2 // race=0
thread1.2 // race=0
thread1.3 // race=1
thread2.3 // race=1
...

甚至更加的混乱,这就造成代码运行结果错误的现象,也就是出现了线程不安全行为。那么为了规避这样的行为,就需要引出锁的概念,悲观锁就是互斥同步的实现,乐观锁是非阻塞同步的实现。

互斥同步 悲观锁

首先我们看看《深入理解JVM》中对互斥同步的定义,”互斥同步是常见的一种并发正确性保障手段。同步是指在多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一个(或者一些,使用信号量的时候)线程使用。而互斥是实现同步的一种手段,临界区、互斥量和信号量都是主要的互斥实现方式。因此,在这4个字里面,互斥是因,同步是果;互斥是方法,同步是目的。“

synchronized

而如何保证共享数据在同一个时刻只被一个线程使用?那么就需要在这个数据被使用之前就为期加上锁,只有获得锁的线程能够对其进行操作,而这样的锁就被称为悲观锁。在Java中,最基本的实现就是synchronized关键字。其实现的原理是在编译后会在同步块的前后分别形成monitorentermonitorexit这两个字节码指令,这两个字节码指令都需要一个reference类型的参数来指明要锁定和解锁的对象。如果指明的是对象参数,那就是这个对象的reference;如果没有明确指定,那就根据synchronized的是实例还是类方法,去取对应的对象实例或Class对象来作为锁对象。先看下面的例子,对比一下添加synchronized关键字前后的字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package io.talkwithkeyboard.code;

public class NoSynchronized {

public static int race = 0;

public void increase() {
race = race + 1;
}

public static void main(String args[]) {
new NoSynchronized().increase();
}
}

通过javap工具来获取字节码:

1
$ javap -verbose -p io.talkwithkeyboard.code.NoSynchronized

我们只关注increase方法:

1
2
3
4
5
6
7
8
9
10
public void increase();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=1, args_size=1
0: getstatic #2 // Field race:I
3: iconst_1
4: iadd
5: putstatic #2 // Field race:I
8: return

那么在添加synchronized关键字后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package io.talkwithkeyboard.code;

public class WithSynchronized {
public static int race = 0;

public void increase() {
synchronized (this) {
race = race + 1;
}
}

public static void main(String args[]) {
new WithSynchronized().increase();
}
}

还是只关注increase方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void increase();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: dup
2: astore_1
3: monitorenter
4: getstatic #2 // Field race:I
7: iconst_1
8: iadd
9: putstatic #2 // Field race:I
12: aload_1
13: monitorexit
14: goto 22
17: astore_2
18: aload_1
19: monitorexit
20: aload_2
21: athrow
22: return

字节码描述的过程是:

    1. 将类对象入栈
    1. 复制栈顶元素(即类对象的引用)
    1. 将栈顶元素(类对象)存储到局部变量表Slot 1中
    1. 以栈顶元素做为锁开始同步
    1. 取获取类的静态字段(race),将其值压入栈顶
    1. int型常量1进栈
    1. 对操作数栈上的两个数值进行加法,结果压入栈顶
    1. 用栈顶元素给类的静态字段(race)赋值
    1. 将局部变量表Slot 1中的类对象入栈
    1. 退出同步
    1. 方法正常结束,跳转到22返回
    1. 从这步开始是异常路径,暂不赘述

在展示了整个synchronized关键字的代码流程以后,我们再深究一下monitorenter指令和monitorexit指令在机器码成面到底做了什么。为了阅读方便,我们先不展示机器码的内容,而是从虚拟机规范出发,在执行monitorenter指令的时,首先要尝试获取对象的锁。如果这个对象没有被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1。相应的,在执行monitorexit指令的时候,把锁的计数器减1,当计数器为0的时候,锁就被释放掉。

ReentrantLock

以上就是锁的整个低层实现过程,在Java中其实还有更上层的锁封装能实现更多特性的锁,那就是ReentrantLock类,它和synchronized关键字一样都是悲观锁的实现。但是相比synchronized,增加了一些高级功能,主要是以下三点:

  • 等待可中断:当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情,可中断特性对处理执行时间非常长的同步块很有帮助。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // lock() 实现
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
    // lockInterruptibly() 实现
    private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

可以看到在lock()lockInterruptibly()源码的实现中,唯一的区别是在一直等待锁的过程中,lock()会吞掉中断,近记录中断状态,而lockInterruptibly()会抛异常到上层,交给上面的业务逻辑进行处理。

  • 公平锁:多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁,而非公平锁是不能保证这一点的。synchronized就是非公平锁,可以通过
    1
    final ReentrantLock lock = new ReentrantLock(true);

来创建公平锁。

  • 可以绑定多个条件:主要是处理生产者消费者模型,由于篇幅这里暂不赘述。

volatile

volatile可以说是Java中最轻量化级的同步机制,在一定程度上也是可以当作对象的锁来进行使用的,但是在功能上还是不能完全替代被synchronized作用的对象。当一个变量定义为volatile之后,它将具备两种特性,第一是当一个线程修改了这个变量的值,新值对于其他线程来说是立即得知的,这就是可见性。第二个语义是禁止指令重排序优化。

首先介绍一下可见性是如何实现的?普通变量在被一个线程修改之后,会向主内存进行回写,只有等到主内存回写完成以后,其他的线程才能读到新的值。 而被volatile修饰的变量在赋值后会产生一个lock addl $0x0,(%esp)向寄存器中加0的空操作,这个操作能使用本CPU的Cache写入内核,并使别的CPU或者别的内核无效化Cache。相当于将工作内存中的变量拿到了主内存当中,正是因为此让volatile修饰的变量马上对其他线程可见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package io.talkwithkeyboard.code;

public class VolatileControl {

private volatile static boolean shutdownRequested = false;

public void shutdown() {
shutdownRequested = true;
}

public void doWork() {
long loopCount = 100000000;
System.out.println(Thread.currentThread().getId() + ":" + loopCount);
for (int i = 0; i < loopCount; i++) {
if (shutdownRequested) {
return;
}
}
System.out.println(Thread.currentThread().getId() + ":" + "shutdown!");
shutdown();
}

public static void main(String[] args) {
final int THREAD_COUNT = 10;
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i ++) {
threads[i] = new Thread(() -> new VolatileControl().doWork());
threads[i].start();
}

while (Thread.activeCount() > 2) {
Thread.yield();
}
}
}

比如在这个例子当中,让每个线程都循环100000000次,在大多数情况下最后可以看到shutdown!只被打印了一次。但是一旦去掉volatile修饰以后,就会看到很多个shutdown!被打印出来,这就是因为很多线程在shutdownRequested被修改以后,都读到了老版本的值,出现了线程不安全的情况。而volatile从表现来看基本上达到了为shutdownRequested加锁的效果。但是刚才也提到了我们是在大多数情况下是只看到一次shutdown!,这是为什么呢?可以先看一个更加容易复现的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package io.talkwithkeyboard.code;

public class VolatileIncrease {
public static volatile int race = 0;

public static void increase() {
race = race + 1;
}

public static void main(String[] args) {
final int THREAD_COUNT = 20;
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
increase();
}
});
threads[i].start();
}

while (Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(race);
}
}

还是上面出现过的,每个线程都给race累加值的代码,只不过现在会用volatile进行修饰。volatile的特性又是值被修改后立即能被其他线程看见,那么这个例子就应该输出正确的结果200000,但是运行后会发现还是出现了上面提到的线程不安全的问题。那么这是不是和volatile的描述不符呢?我们还是输出字节码来看看:

1
2
3
4
5
6
7
8
9
10
public static void increase();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=0, args_size=0
0: getstatic #2 // Field race:I
3: iconst_1
4: iadd
5: putstatic #2 // Field race:I
8: return

可以看到问题是出在race = race + 1上面,到字节码层面上这个操作已经被拆分成了4条指令,并且不具备事务性了。volatile能保证的是getstatic能取到最新的值,但是在iadd操作的时候其他线程可能已经把这个值加大了。在上面的例子当中同理,在将shutdownRequested赋值为true的时候,可能其他线程已经赋值成功,但是当前线程不可见。所以volatile的作用是非常轻微的,只能够保证在取值的时候能取到最新值,当一个操作的事务性无法保证的时候,volatile也不能提供锁的性质。至于防止指令重拍和题目相关性不强,这里先不做赘述。

总结

在《深入理解JVM虚拟机》中,有对synchronizedReentrantLock进行性能对比,通过对synchronized的优化,性能基本上持平。并且提供因为团队会更偏向于优化原生的synchronized关键字,所以当两个都能使用的时候可以优先使用synchronized关键字,需要更高阶的功能时,再选择ReentrantLock。但是因为阻塞的实现方式,这两种实现都会阻塞后面其他的线程进入,而Java的线程是映射到操作系统的原生线程之上的,如果一个线程要阻塞或唤醒,都是需要操作系统从用户态切换到核心态来进行帮忙的,所以需要非常谨慎的时候,在一定情况下是可以使用volatile关键字来进行替代的,以提高性能。

非阻塞同步 乐观锁

由于悲观锁的实现中涉及到加锁、用户态核心态切换、维护锁计数器和检查是否有被阻塞线程需要被唤醒等复杂的操作,在执行效率上大打折扣。随着硬件指令集的发展,又多了一种锁实现方案,也就是乐观锁,其主要的思想是:先进行操作,如果没有竞争则操作成功,如果有竞争,那就再采取其他的补偿措施。这种实现方式下,不需要将线程挂起,因此也称为非阻塞同步。

Compare-and-Swap

乐观锁的一个实现关键是需要让“现在的值等于旧预期值时,将新预期值写入”这个操作原子化,而这也依赖于硬件指令集的发展,出现了CAS(Compare-and-Swap)指令来完成这个任务。CAS指令需要三个操作数,分别是内存位置V、旧的预期值A、新的预期值B。CAS指令执行时,当且仅当V符合旧的预期值A的时候,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,并且上述的过程是原子性的。在JDK1.5之后,sun.misc.Unsafe类的compareAndSwapInt()compareAndSwapLong()等几个操作都依靠CAS指令执行,虚拟机内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令。在更上层中的接口中,AtomicInteger.incerementAndGet()等使用了Unsafe的低层接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package io.talkwithkeyboard.code;

import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadAtomicIncrease {

public static AtomicInteger race = new AtomicInteger(0);

public static void insert() {
race.incrementAndGet();
}

public static void main(String[] args) {
final int THREAD_COUNT = 20;
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
insert();
}
});
threads[i].start();
}

while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(race.get());
}
}

使用乐观锁来对上面多线程累加的程序进行优化,运行程序可以看到正确的结果。

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

incrementAndGet方法就是对Unsafe类的getAndAddInt方法进行了封装,而getAndAddInt在低层使用了和CAS类似的指令Fetch-and-Increment,将获取值和累加两个操作进行原子化封装。而在早期的JDK实现中,是使用CAS指令进行完成,不断尝试将比现在值大1的值写入。这个优化也是硬件指令集的进一步丰富带来的。

1
2
3
4
5
6
7
8
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}

锁优化

自旋锁

因为互斥同步对性能的消耗非常大,并且JVM团队发现大量的锁定状态只会持续很短的一段时间,这个时间远小于对CPU的用户态和内核态切换时间。所以就想出来一个办法不轻易的对线程进行阻塞,而使用忙循环(自旋)来替代。不过自旋也不是完全优于阻塞的,虽然省下了线程切换的开销,不过忙循环会占用处理器时间,所以如果锁定状态时间较短使用自旋是划算的,锁定状态时间较长就会浪费处理器资源,带来性能的消耗。因此现在的实现中,会规定一个自旋的上限,当达到上限以后就转为重锁的方式挂起线程。现在高版本的JDK中自旋是默认开启的,Java用户可以通过-XX:PreBlockSpin来修改自旋的次数。

并且为了进一步的提高自旋锁的性能,在JDK1.6提出了自适应的自旋锁,每次自旋的时间不固定,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态决定。如果在一个锁对象上,通过自旋的方式经常成功获得过锁,并且持有锁的进程正在运行中,那么这次自旋有较大可能获得锁,就可以等待较多的自旋次数。如果在一个锁对象上从来没有成功通过自旋获得锁,那么就直接省去自旋步骤,直接进入重锁。

锁消除

锁消除是指开发人员虽然要求一段代码块需要上锁,同步执行。但是被JVM检测到存在不可能存在共享数据竞争的锁,就会自动将其消除掉。这个检测主要依赖逃逸分析的数据支持,如果判断在一段代码中,堆上的所有数据都不会逃逸出去被其他线程访问到,那就可以把它当作栈上数据对待,认为他们是线程私有的,就不用加锁。

锁粗化

很多时候我们都希望加锁的作用范围限制的尽可能的小,这样可以缩短锁状态的持续时间,让等待的线程尽快的获得锁。但是偶尔会出现一系列连续的操作都对同一个对象反复加锁和解锁,甚至加锁操作出现在循环体中的,这样频繁的进行互斥同步会极大的降低执行效率,这时候虚拟机探测到有这样一串零碎的操作都对一个对象加锁,就会把加锁的范围粗化到整个操作序列的外部,这样加锁一次就可以了。就还是用上面的例子举例,每次increase操作都有加锁解锁的步骤,这时就会把锁粗化到for循环的外部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package io.talkwithkeyboard.code;

public class MultiThreadIncreaseSync {

public static int race = 0;

public static synchronized void increase() {
race = race + 1;
}

public static void main(String[] args) {
final int THREAD_COUNT = 20;
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
increase();
}
});
threads[i].start();
}

while (Thread.activeCount() > 2) {
Thread.yield();
}

System.out.println(race);
}
}

轻量级锁

轻量级锁的优化方向是使用CAS代替互斥量的开销,并且依据经验“对于绝大多数的锁,在整个同步周期内都是不存在竞争的”,假设没有竞争那么CAS操作就避免了互斥量的开销,但是如果存在竞争,轻量锁最终会膨胀为重量锁,不仅有互斥量的开销,还多了CAS操作。在HotSpot虚拟机中,对象头由两部分组成,一部分是非固定数据结构的,用来储存对象自身的运行时数据,如哈希值,GC分带年龄等数据,官方称为”Mark word”;另一部分用于储存指向方法区对象类型数据的指针,这一部分先不关注。而“Mark word”就是锁实现的关键,我们以32位的HotSpot举例,32bit的”Mark word”中除了2bit用于存储锁标志位外,其他的30bit所保存的内容都根据锁状态发生变化:

  • 当处于未锁定(标志位为01)时,29bit存储了对象哈希值、对象分代年龄等
  • 需要加锁时,先检查是否处于未锁定状态,如果是,在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储当前锁对象”Mark word”的拷贝
  • 使用CAS操作将其余30bit更新为指向锁记录的指针
    • 这些动作成功了,改变标志位(00是轻量级锁),这个线程就拥有了该对象的锁
    • 如果失败了,虚拟机会检查对象的Mark word是否指向当前线程,如果是则说明当前线程已经获得锁,则直接进入同步块执行。否则这个锁对象已经被其他的线程抢占了。这时候轻量锁膨胀为重量锁,标志位改为10,Mark word指向重量锁,后面的线程进入阻塞状态。
  • 当执行完同步块,使用CAS操作将对象当前的Mark word与之前存储的老Mark word拷贝进行交换,完成解锁。

偏向锁

偏向锁的优化方向是在不存在竞争时直接去掉同步原语,当锁对象第一次被线程获取的时候,虚拟机会将标志位改为01,即偏向模式,同时使用CAS操作把获取到的锁的线程ID记录在Mark word之中,如果操作成功,这个线程就拥有了该对象的锁。之后当持有偏向锁的线程进入同步块的时候,虚拟机不需要做任何操作,而在轻量锁中,还是需要尝试检查锁定状态,以及对象的Mark word是否指向自己。当有其他线程尝试获取锁的时候,偏向锁就膨胀为轻量锁。偏向锁可以提高带有同步但无竞争的程序性能,但是如果程序中大多数的锁总是被多个不同的线程访问,那偏向模式就是多余的。可以通过-XX:-UseBiasedLocking来进行禁止。

a-14-1

End

参考资料:《深入理解Java虚拟机》

Read More
post @ 2019-09-28

于9月25日,参加云栖大会elasticsearch专场后对阿里云的优化思路进行整理总结。行文分为两个部分,一个部分是对原生elasticsearch的解决方案进行大致介绍,再对阿里云的优化进行整理。

原生架构

在elasticsearch当中index是大家最熟悉的组织结构,它就像传统nosql数据库中的collection,组织着一批有相同数据结构的数据。当一个index中的数据量越来越大,就会很自然的将index进行分片,分到多个shard当中。这样通过水平扩展就能解决数据膨胀的问题,然后同时也引入了副本来提高整个集群的可用性。所以有两个index,每个index有3个shard,每个shard有2个replica的集群就是下面这个样子👇:
a6-1
index A有3个shard:A1A2A3A1’A1的副本,以此类推。

在这样的架构中,如果想向index A中写入文档,会先通过路由算法分配到一个primary所在的节点(比如A1)进行写入,完成以后会到该primary的副本A1’中进行写入,完成以后再返回成功。可以看出这样的流程中有几个瓶颈所在:

  • 如果副本的数量一旦较多,写请求的延时就会成倍增长
  • 副本会保存一份完整的数据,所以集群的存储成本也在成倍地增长
  • 如果副本数量超过了分片数量,会出现有节点只是用来做副本的情况,只能被读,浪费了资源
  • 当主节点出现故障时,会从新在副本当中选举主节点,并新启动一个节点,从新的主节点当中全量拷贝数据。全量拷贝的延时较长,在这一段时间内新副本节点是不可用的,所有的流量会打到主节点和其他副本节点上,可能出现性能问题

优化架构

先来看阿里云的架构图👇:
a6-2
使用了计算与存储分离的架构,通过底层的云储存来提供共享存储。这样主副之间的数据只用存在一个地方即可,而节点上只提供计算能力,由云存储来提供数据的多副本机制以保证数据的可靠性。如果要完整的理解上面这张图,我们需要再介绍一些概念:

在elasticsearch中,一个shard是由多个segment组成的,并对应一个translog文件。那么当写入一个文档时,会先在translong中进行相应的记录,并将其写入到primary节点的buffer当中。然后在primary节点上是有一个每秒执行一次的refresh操作,该操作会把buffer当中的所有document写入到内存中的一个新的segment当中,这时候文档变成了可被检索的状态,所以一个segment实际就是一个倒排索引。最后会有一个flush操作,该操作也会把buffer中的数据写入到一个新的segment中,并进一步将内存中的所有segment冲洗到硬盘上,并清空该shard上的translog。

在对整个索引过程解释了一遍以后就可以清晰的理解上图,NFS表示阿里云提供的分布式云存储。当一个index请求进来的时候,primary节点会先在translog中添加一条记录,并将该文档写入buffer当中。每隔一段时间,primary节点会进行refresh操作将buffer中的所有数据写入到nfs的refresh segment中进行保存。然后会由 nrt segment synchronizer进程定期的复制refresh segment中的数据到临时目录当中。最后会由flush操作,将refresh segment中的数据写入到commit segment中,并删除refresh segment,临时目录,以及translog。而读请求如果从primary节点读会读到refresh segment + commit segment的内容,如果从replica中读会读到临时目录 + commit segment的内容。

这种架构解决了原生架构的一些问题:

  • 降低了写请求的延时,因为现在的refresh segment到临时目录的复制没有通过网络传输,直接是同一文件系统中的复制,所以大大降低了主备延时。(据阿里所说从分钟级降低到了毫秒级,但是原生架构也没有分钟级这么夸张吧…)
  • commit segment只会保存一份,所以不会因为副本数量过多而导致集群的存储成本上升
  • 因为计算与存储分离,当出现主节点故障需要主副切换时,不需要长时间的拷贝全量数据,一个新的副节点启动以后,只要指向原来的临时目录即可

但是也引入了一些新的问题:

  • 由于现在是共享内存,所以refresh操作多了传输成本,并且在NFS的速度也低于原本机内存中的速度,所以这里的成本有提高。不过应该是小于主从复制的性能提升。
  • 引入了一些传统共享内存型分布式的问题,比如脑裂,双写等。不过在PPT中也看到阿里也实现了IO fencing来避免主从切换时带来的双写问题。

下面再简单介绍一下演讲中讲到的主从复制的优化,下面是给出的示意图👇:
a6-3
阿里说是使用了luence-replicator框架进行实现,我现在还没仔细阅读,有兴趣的可以看看Code Reduction: The Replicator
所以我下面的解释主要基于他们的演讲以及一些猜测:
在主从复制过程当中,主节点会定时的对meta进行快照,比如生成了图中的snapshot4,然后对它增加引用计数,再发送给从节点。从节点会和自己的快照进行比对,找到落后了多少个版本。将缺少的segment复制到临时目录当中,复制完成以后就可以通知primary节点复制结束,从而减少同步前从节点快照版本的引用计数,删除引用计数为0的文件。

其中还提到了一些针对segment merge的优化,那么还是先介绍一些什么是segment merge:
因为每秒都会进行refresh操作,生成一个小的segment文件,这些小的文件对内存的利用率是非常低的,而且每次query请求来的时候都会轮训这些小的segment文件,所以文件数量越多性能越差。elasticsearch会在后台进行异步的合并操作,从小的segment合并成大的segment,并且在合并阶段处理文件的删除和更新。

那么优化的部分是什么呢?就是在合并成大的segment以后会立即进行flush操作,来保证大的segment不会出现在主从复制当中。从而进一步的对主从复制进行提速。

优化索引速度

离线

离线全量写入一直是一个痛点问题,传统解决办法是维护两个elasticsearch集群,正常使用A集群,然后收集A集群的translog,并将全量快照写入B集群,完成以后回放translog,保证两边数据一致。完成后从A集群切到B集群,这样的维护成本是非常高的,并且全量快照写入的时间又非常的长。

阿里云针对这个点进行的优化是取消了translog,使用blink checkpoint天生的at least once的语义来保证故障恢复,相当于减少了一半的写工作量。但是这样相当于在elasticsearch外面套上了一个blink,并且需要和外部的blink进行通信,系统复杂度又上升了一个等级(不过反正也是云服务,又不要我们自己做DBA)。第二个是在segment合并上增加了一个limit,只有合并后达到一定的大小才会flush到磁盘当中,这样可以减少磁盘里的小segment再被读入到内存中进行反复的合并,减小IO次数。

前两个我觉得都是较小的优化,第三个优化比较大,如下图👇:
a6-4
优化的核心点是利用更高的并发提前计算好索引,直接让线上的elasticsearch集群来load索引。具体怎么做的呢,就是利用blink的流计算能力来取代client node的计算数据分片能力,然后模拟process,build,merge三个操作并将其分布式化,让这三个操作都可以进行水平扩展,让索引能力可以随着计算资源的提升而提升。原来如果想提高索引的计算能力,是需要对elasticsearch集群的资源进行拓展,但是现在将索引计算的步骤分离了出来,转变成了一个经典分布式计算框架能解决的问题。最后放到OSS当中,让线上的模型进行load。类似的优化之前也有人已经尝试过,有兴趣可以看看基于Hadoop快速构建离线elasticsearch索引

在线

在线增量导入的优化也是将client node路由计算的能力放到了blink上,因为他们发现在大数据增量导入的时候,瓶颈出现在了CPU上,所以将计算部分外移,并且blink可以更好的进行针对计算的弹性升缩。

参考材料:
从Elasticsearch来看分布式系统架构设计 - 知乎
Elasticsearch写入原理深入详解 - 铭毅天下(公众号同名) - CSDN博客
Elasticsearch 之 commit point | Segment | refresh | flush 索引分片内部原理 - 舒哥的blog - CSDN博客

Read More

背景

最近迁移数据库的时候发现了一个问题,相信也是很多 MongoDB 使用者都会遇到的问题。我在使用 MongoSpark 批量的写入数据的时候会造成严重的数据库抖动。主要的原因是现在大多 MongoDB 的配置都是 replica set 模式,这样写操作只会到 Master 节点上,写操作就会成为整个系统的瓶颈,大量的写操作会使 Master 节点的读操作变慢,并且会让 secondary 节点同步速度变慢,从而出现从 secondary 节点上读到老数据的问题。

而通过对 MongoDB 进行 shard 是一个代价非常昂贵且只能线性提高写吞吐的方法,非常的不经济实惠。所以我们也只能牺牲速率来保证线上服务的稳定。所以解决方案就是在 MongoSpark 中添加上写限速功能。

Guava.RateLimiter

选择的限速器是 Guava 提供的令牌桶。算法的原理很简单,会定时的向桶中放置令牌,服务只有获得令牌以后才能进行后续的操作,比如希望对一个服务进行每秒10次的限速,那么每秒中就会向桶中放置10个令牌。而获取的方式有两种,一种是阻塞等待令牌或者取不到令牌直接返回失败。

那么下面就简单的介绍一下API,主要是分两个动作,创建桶和获取令牌:

创建令牌桶

  • RateLimiter create(double permitsPerSecond): 根据一个稳定的速率创建令牌桶,也就是每秒插入 permitsPerSecond 个令牌。
  • RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit): 通过一个预热时间段达到稳定的速率创建令牌桶,在 warmupPeriod 这段时间内每秒令牌数量平稳的爬升至 permitsPerSecond,而后保持稳定。这里的速率也是值每秒插入 permitsPerSecond 个令牌。

获取令牌

  • void acquire(): 获取一个令牌,会一直阻塞等待直到拿到。
  • void acquire(int permits): 获取 permits 个令牌,会一直阻塞等到直到全部拿到。
  • boolean tryAcquire(): 获取一个令牌,成功则返回 true,失败直接返回 false。
  • boolean tryAcquire(long timeout, TimeUnit unit): 获取一个令牌,成功则返回 true,没有足够的令牌时等到 timeout 时间,如果还没有则返回 false。
  • boolean tryAcquire(int permits): 获取 permits 个令牌,成功则返回 true,失败直接返回 false。
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit): 获取 permits 个令牌,成功则返回 true,没有足够的令牌时等到 timeout 时间,如果还没有则返回 false。

通过上面的API可以很灵活的对 RateLimiter 进行使用,但是在阅读源码的过程中,我也发现了 Guava 的实现中有一个不尽如人意的地方。那就是限制能力只能在每秒多少个令牌桶,但是我想实现将少一秒的剩余令牌留给下一秒继续用,也就是几秒甚至更高时间单位的限流是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class Bursty extends RateLimiter {
Bursty(RateLimiter.SleepingTicker ticker) {
super(ticker, null);
}

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
this.maxPermits = permitsPerSecond;
this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
}

long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
}

在上面的代码中,Bursty 就是始终保持平稳速率的令牌桶类,其中 maxPermits 是桶中最多有多少的令牌,storedPermits 是现在桶中的令牌个数。可以看到maxPermits 始终等于 permitsPerSecond,是不能乘上时间系数的,并且在重新设置 maxPermits 后会按照比例缩放之前的桶中令牌数量。

MongoSpark.save

save() 方法是我们需要修改的主要方法,但是在 MongoSpark 中又存在多种的 save 方法,我们需要分别为这些 save 方法加上限流功能,或者你已经很明确将使用的函数。

在这之前,我们需要做一些准备工作,既然要限流,那我们肯定需要一个参数来控制流速,而在 MongoSpark 中是有一个配置类供我们设置参数的,我们需要修改一下 WriteConfig 这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* Write Configuration for writes to MongoDB
*
* @param databaseName the database name
* @param collectionName the collection name
* @param connectionString the optional connection string used in the creation of this configuration.
* @param replaceDocument replaces the whole document, when saving a Dataset that contains an `_id` field.
* If false only updates / sets the fields declared in the Dataset.
* @param maxBatchSize the maxBatchSize when performing a bulk update/insert. Defaults to 512.
* @param localThreshold the local threshold in milliseconds used when choosing among multiple MongoDB servers to send a request.
* Only servers whose ping time is less than or equal to the server with the fastest ping time plus the local
* threshold will be chosen.
* @param writeConcernConfig the write concern configuration
* @param shardKey an optional shardKey in extended json form: `"{key: 1, key2: 1}"`. Used when upserting DataSets in sharded clusters.
* @param forceInsert if true forces the writes to be inserts, even if a Dataset contains an `_id` field. Default `false`.
* @param ordered configures the bulk operation ordered property. Defaults to true.
* @param secondLatch the maxBatchSize when performing a bulk update/insert per second per partition.
* @since 1.0
*/
case class WriteConfig(
databaseName: String,
collectionName: String,
connectionString: Option[String] = None,
replaceDocument: Boolean = WriteConfig.DefaultReplaceDocument,
maxBatchSize: Int = WriteConfig.DefaultMaxBatchSize,
localThreshold: Int = MongoSharedConfig.DefaultLocalThreshold,
writeConcernConfig: WriteConcernConfig = WriteConcernConfig.Default,
shardKey: Option[String] = None,
forceInsert: Boolean = WriteConfig.DefaultForceInsert,
ordered: Boolean = WriteConfig.DefautOrdered,
secondLatch: Option[Int] = None
) extends MongoCollectionConfig with MongoClassConfig {
require(maxBatchSize >= 1, s"maxBatchSize ($maxBatchSize) must be greater or equal to 1")
require(localThreshold >= 0, s"localThreshold ($localThreshold) must be greater or equal to 0")
require(Try(connectionString.map(uri => new ConnectionString(uri))).isSuccess, s"Invalid uri: '${connectionString.get}'")
require(Try(shardKey.map(json => BsonDocument.parse(json))).isSuccess, s"Invalid shardKey: '${shardKey.get}'")

type Self = WriteConfig

override def withOption(key: String, value: String): WriteConfig = WriteConfig(this.asOptions + (key -> value))

override def withOptions(options: collection.Map[String, String]): WriteConfig = WriteConfig(options, Some(this))

override def asOptions: collection.Map[String, String] = {
val options = mutable.Map("database" -> databaseName, "collection" -> collectionName,
WriteConfig.replaceDocumentProperty -> replaceDocument.toString,
WriteConfig.localThresholdProperty -> localThreshold.toString,
WriteConfig.forceInsertProperty -> forceInsert.toString) ++ writeConcernConfig.asOptions
connectionString.map(uri => options += (WriteConfig.mongoURIProperty -> uri))
shardKey.map(json => options += (WriteConfig.shardKeyProperty -> json))
secondLatch.map(number => options += (WriteConfig.secondLatchProperty -> number.toString))
options.toMap
}

override def withOptions(options: util.Map[String, String]): WriteConfig = withOptions(options.asScala)

override def asJavaOptions: util.Map[String, String] = asOptions.asJava

/**
* The `WriteConcern` that this config represents
*
* @return the WriteConcern
*/
def writeConcern: WriteConcern = writeConcernConfig.writeConcern
}

这是修改以后的 WriteConfig 类代码,我们添加上了一个 secondLatch 的参数作为流速控制参数。在使用的时候可以:

1
2
3
4
val writeConfig = new WriteConfig(conf.mongoDatabase, conf.mongoCollection)
.withOption(WriteConfig.replaceDocumentProperty, conf.replaceDocument.toString)
.withOption(WriteConfig.mongoURIProperty, conf.mongoUri)
.withOption(WriteConfig.secondLatchProperty, conf.secondLatch.toString)

通过 withOption() 的方法设定 secondLatch 参数,然后我们跟踪一下上面的 withOption() 方法的实现,是通过一个 WriteConfig(options: util.Map[String, String]) 的构造函数进行了构造。所以也需要修改这个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
override def apply(options: collection.Map[String, String], default: Option[WriteConfig]): WriteConfig = {
val cleanedOptions = stripPrefix(options)
val cachedConnectionString = connectionString(cleanedOptions)
val defaultDatabase = default.map(conf => conf.databaseName).orElse(Option(cachedConnectionString.getDatabase))
val defaultCollection = default.map(conf => conf.collectionName).orElse(Option(cachedConnectionString.getCollection))

WriteConfig(
databaseName = databaseName(databaseNameProperty, cleanedOptions, defaultDatabase),
collectionName = collectionName(collectionNameProperty, cleanedOptions, defaultCollection),
connectionString = cleanedOptions.get(mongoURIProperty).orElse(default.flatMap(conf => conf.connectionString)),
replaceDocument = getBoolean(cleanedOptions.get(replaceDocumentProperty), default.map(conf => conf.replaceDocument),
defaultValue = DefaultReplaceDocument),
maxBatchSize = getInt(cleanedOptions.get(maxBatchSizeProperty), default.map(conf => conf.maxBatchSize),
DefaultMaxBatchSize),
localThreshold = getInt(cleanedOptions.get(localThresholdProperty), default.map(conf => conf.localThreshold),
MongoSharedConfig.DefaultLocalThreshold),
writeConcernConfig = WriteConcernConfig(cleanedOptions, default.map(writeConf => writeConf.writeConcernConfig)),
shardKey = cleanedOptions.get(shardKeyProperty).orElse(default.flatMap(conf => conf.shardKey).orElse(None)),
forceInsert = getBoolean(cleanedOptions.get(forceInsertProperty), default.map(conf => conf.forceInsert),
defaultValue = DefaultForceInsert),
ordered = getBoolean(cleanedOptions.get(orderedProperty), default.map(conf => conf.ordered), DefautOrdered),
// 流速控制参数
secondLatch = cleanedOptions
.get(secondLatchProperty).orElse(default.flatMap(conf => Try(conf.secondLatch.toString).toOption).orElse(None))
.flatMap(s => Try(s.toInt).toOption)
)
}

可以看到这个构造函数调用的是一个要传递所有参数的构造函数进行构造的,所以我们还需要实现这样一个传递所有参数的构造函数,然后加上:

1
2
3
secondLatch = cleanedOptions
.get(secondLatchProperty).orElse(default.flatMap(conf => Try(conf.secondLatch.toString).toOption).orElse(None))
.flatMap(s => Try(s.toInt).toOption)

由于 options 的 value 都是字符串,所以这边需要转一下 Int,传递所有参数的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Creates a WriteConfig
*
* @param databaseName the database name
* @param collectionName the collection name
* @param connectionString the optional connection string used in the creation of this configuration
* @param replaceDocument replaces the whole document, when saving a Dataset that contains an `_id` field.
* If false only updates / sets the fields declared in the Dataset.
* @param maxBatchSize the maxBatchSize when performing a bulk update/insert. Defaults to 512.
* @param localThreshold the local threshold in milliseconds used when choosing among multiple MongoDB servers to send a request.
* Only servers whose ping time is less than or equal to the server with the fastest ping time plus the local
* threshold will be chosen.
* @param writeConcern the WriteConcern to use
* @param shardKey an optional shardKey in extended form: `"{key: 1, key2: 1}"`. Used when upserting DataSets in sharded clusters.
* @param forceInsert if true forces the writes to be inserts, even if a Dataset contains an `_id` field. Default `false`.
* @param ordered configures if the bulk operation is ordered property.
* @param secondLatch the maxBatchSize when performing a bulk update/insert per second per partition.
* @return the write config
* @since jike-1.0.0
*/
def apply(databaseName: String, collectionName: String, connectionString: Option[String], replaceDocument: Boolean, maxBatchSize: Int,
localThreshold: Int, writeConcern: WriteConcern, shardKey: Option[String], forceInsert: Boolean, ordered: Boolean, secondLatch: Option[Int]): WriteConfig = {
apply(databaseName, collectionName, connectionString, replaceDocument, maxBatchSize, localThreshold, WriteConcernConfig(writeConcern),
shardKey, forceInsert, ordered, secondLatch)
}

使用 case class 提供的默认构造函数进行构造,至此我们就能愉快的对包含 secondLatch 字段的 WriteConfig 进行构造了。

Save Method

然后我们就需要分别对多个 save() 添加限速器,原理都大同小异,就是在 foreachPartition 的函数中构建令牌桶,然后在 foreach 的写Mongodb 函数之前进行阻塞的令牌获取,这里就展示常用的 Datasets 和 RDD 类型 save 方法的修改:

save[D] (dataset: Dataset[D]): Unit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Save data to MongoDB
*
* '''Note:''' If the dataFrame contains an `_id` field the data will upserted and replace any existing documents in the collection.
*
* @param dataset the dataset to save to MongoDB
* @param writeConfig the writeConfig
* @tparam D
* @since 1.1.0
*/
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
// **INIT RateLimiter
var rateLimiter: Option[RateLimiter] = None
if (writeConfig.secondLatch.isDefined) {
// If secondLatch < maxBatchSize, it will destroy rate limit rule.
val permitSize = if (writeConfig.secondLatch.get >= writeConfig.maxBatchSize) (writeConfig.secondLatch.get / writeConfig.maxBatchSize).floor else 1
rateLimiter = Option.apply(RateLimiter.create(permitSize))
}
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
// **Acquire
if (rateLimiter.isDefined) {
rateLimiter.get.acquire(1)
}
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava, new BulkWriteOptions().ordered(writeConfig.ordered))
})
})
})
}
}

save[D: ClassTag] (rdd: RDD[D]): Unit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Save data to MongoDB
*
* @param rdd the RDD data to save to MongoDB
* @param writeConfig the writeConfig
* @tparam D the type of the data in the RDD
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition(iter => if (iter.nonEmpty) {
// **INIT RateLimiter
var rateLimiter: Option[RateLimiter] = None
if (writeConfig.secondLatch.isDefined) {
// If secondLatch < maxBatchSize, it will destroy rate limit rule.
val permitSize = if (writeConfig.secondLatch.get >= writeConfig.maxBatchSize) (writeConfig.secondLatch.get / writeConfig.maxBatchSize).floor else 1
rateLimiter = Option.apply(RateLimiter.create(permitSize))
}
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
// **Acquire
if (rateLimiter.isDefined) {
rateLimiter.get.acquire(1)
}
collection.insertMany(
batch.toList.asJava,
new InsertManyOptions().ordered(writeConfig.ordered)
)
})
})
})
}

需要注意的是不要在 Driver 上去初始化 RateLimiter,还要注意与 WriteConfig 中已存在的 maxBatchSize 参数的关系。

到这里就能愉快的进行限速了。

Read More
post @ 2019-04-06

写在前面

最近在学习 Druid,索性整理了一下从 Google 的 BigTable 论文衍生出的这一系列数据产品架构思路。该文章中通过对 BigTable 文章的介绍到 Hbase 再到 Druid,读者会发现这三者非常的相似,但是又由于不同的使用场景而做了其他的优化。由于作者也是初学者,所以多是对网上文章的整理汇总,也算初有脉络,但是总的来说还是一篇纸上谈兵的文章,仅作为对学习的记录。之后应该会在分布式系统上有更多的实战文章。

BigTable

Bigtable 是一个分布式的结构化数据存储系统,它被设计用来处理海量数据。

这是论文开篇的概括,也作为这篇博客的开始吧。

数据模型

1
(row: string, column: string, time: int64) -> string

a-13-1

整个 BigTable 的数据模型可以看作是一个很简单的映射,分别是 row,cloumn,time 到一个具体数据的映射。如上图是一个简单的例子,行名是一个反向 URL。contents 列族存放的是网页的内容,anchor 列族存放引用该网页的锚链接文本。 CNN 的主页被 Sports Illustrator 和 MY-look 的主页引用,因此该行包含了名为 anchor:cnnsi.comanchhor:my.look.ca 的列。每个锚链接只有一个版本,而 contents 列则有三个版本,分别由时间戳 t3,t5 和 t6 标识,最新的时间戳会放在最前面。

架构组件

这整个系统当中主要有三部分的组件,分别是 Master 服务器,Tablet 服务器和 Chubby 服务器构成。在后面对其他框架的介绍当中,会发现基础架构有很高的相似性。

Master 服务器(控制者)

主要工作:

  • 为 Tablet 服务器分配 Tablets。
  • 检测新加入的或者过期失效的 Tablet 服务器。
  • 对 Tablet 服务器进行负载均衡、以及对保存在 GFS 上的文件进行垃圾收集。
  • 除此之外,它还处理对模 式的相关修改操作,例如建立表和列族。

Tablet 服务器(工作者)

主要工作:管理一个 Tablet 的集合(通常每个服务器有大约数十个至上千个 Tablet)。每个 Tablet 服务器负责处理它所加载的 Tablet 的读写操作,以及在 Tablets 过大时,对其进行分割。

Chubby(协调者)

在说 Chubby 之前想插入一个小话题,到底是什么是 分布式一致性问题(Distributed consensus problem) 。其实在接触分布式系统开始这个问题基本上在每一篇分布式系统的文章中都会被提及,但是在感性的认识下,我发现自己很难用简洁的语言进行概括,所以去查阅了一些文章,有一段是我很认同的简介描述:

在一个分布式系统中,有一组的 Process,他们需要确定一个 Value。于是每个 Process 都提出一个 Value, 一致性就是指只有其中的一个 Value 能够被选中作为最后确定的值,并且当这个值被选出来后,所有的 Process 都需要被通知到。

表面上看,这个问题很容易解决。比如设置一个server,所有的process都 向这个server提交一个Value,这个server可以通过一个简单的规则来挑选出一个Value(例如最先到达的Value被选中),然后由这个 server通知所有的Process。但是在分布式系统中,就会有各种的问题发生,例如,这个server崩溃了怎么办,所以我们可能需要有几台 server共同决定。还有,Process提交Value的时间都不一样,网络传输过程中由于延迟这些Value到达server的顺序也都没有保证。

有一个很具象的例子:

在Google File System(GFS) 中,有很多的server,这些server需要选举其中的一台作为master server。这其实是一个很典型的consensus问题,Value就是master server的地址。GFS就是用Chubby来解决的这个问题,所有的server通过Chubby提供的通信协议到Chubby server上创建同一个文件,当然,最终只有一个server能够获准创建这个文件,这个server就成为了master,它会在这个文件中写入自己 的地址,这样其它的server通过读取这个文件就能知道被选出的master的地址。

对应于 Chubby 就会有一个众所周知的开源项目 Zookeeper,两者从作用到架构上都非常的相似。但是又存在一些差别,文章后面会简单叙述一下两者的不同,不过可以类比 Zookeeper 来理解 Chubby

前面说了这么多,那 Chubby 到底是一个什么服务。首先它是一个分布式的文件系统,可以提供机制使得 client 可以在 Chubby service 上创建文件和执行一些文件的基本操作。从更高一点的语义层面上。Chubby 是一个分布式的锁系统,“锁” 就是文件,加锁操作就是创建文件成功的那个 server 抢占到了 “锁”。用户通过打开、关闭和读取文件,获取共享锁或者独占锁;并且通过通信机制,向用户发送更新信息。

Tablet(数据聚合单位)

INDEX

a-13-2

Index 是一个三层的B树,由于 Root tablet 不会分裂,所以永远是三层。真正的 Tablet 位置信息存储在第三层每一个行关键字下,而第二层只是对第三层的索引。Root tablet 储存在 Chubby 中。在客户端会缓存 Tablet 的位置信息,如果客户端没有缓存或者发现它的缓存地址不正确,就在树状的存储结构中递归的查询 Tablet 位置信息。如果客户端缓存是空的,那么需要三次寻址。如果是过期了则需要6次,3次是在缓存中寻找,3次是更新缓存。

分配

在任何一个时刻,一个Tablet 只能分配给一个Tablet服务器。Master服务器记录了当前有哪些活跃的 Tablet 服务器、哪些 Tablet 分配给了哪些 Tablet 服务器、哪些 Tablet 还没有被分配。当一个 Tablet 还没有被分配、 并且刚好有一个 Tablet 服务器有足够的空闲空间装载该 Tablet 时,Master 服务器会给这个 Tablet 服务器发送一个装载请求,把 Tablet 分配给这个服务器。

BigTable 使用 Chubby 跟踪记录 Tablet 服务器的状态。当一个 Tablet 服务器启动时,它在 Chubby 的一个 指定目录下建立一个有唯一性名字的文件,并且获取该文件的独占锁。Master 服务器实时监控着这个目录(服务器目录),因此 Master 服务器能够知道有新的 Tablet 服务器加入了。如果 Tablet 服务器丢失了 Chubby 上的独占锁,比如由于网络断开导致 Tablet 服务器和 Chubby 的会话丢失 — 它就停止对 Tablet 提供服务。(Chubby 提供了一种高效的机制,利用这种机制,Tablet 服务器能够在不增加网络负担的情况下知道它是否 还持有锁)。只要文件还存在,Tablet 服务器就会试图重新获得对该文件的独占锁;如果文件不存在了,那么 Tablet 服务器就不能再提供服务了,它会自行退出。

冲写

在 BigTable 中一次写操作会先被记录在日志文件当中,然后被记入 memtable 中。这里 memtable 实际上就是一个写缓存,随着写操作的执行,memtable 的大小不断增加。当 memtable 的尺寸到达一个门限值的时候,这个 memtable 就会被冻结,然后创建一个新的 memtable;被冻结住 memtable 会被转换成 SSTable,然后写入 GFS。这个过程被称为 Minor Compaction, 它有两个目的:收缩 Tablet 服务器使用的内存,以及在服务器灾难恢复过程中,减少必须从 提交日志里读取的数据量。在 Compaction 过程中,正在进行的读写操作仍能继续。

而每一次 Minor Compaction 都会创建一个新的 SSTable。如果 Minor Compaction 过程不停滞的持续进行下去,读操作可能需要合并来自多个 SSTable 的更新。通过定期在后台执行 Tablet 的合并,来限制这类文件的数量,加速存储使用率和读速度,这个过程被称为 Merging Compaction

这两个步骤在 BIGTABLE 系统中非常的重要,这也是在读写分离架构中,得以优化读写效率的根本,在后面其他系统的设计当中能也能看到类似的优化设计。

存储结构

Tablet 是逻辑层面的存储单元,而实际的存储单元是 SSTable,SSTable 是一个持久化的、排序的、不可更改的 Map 结构,而 Map 是一个 key-value 映射的数据结构,key 和 value 的值都是任意的 Byte 串。可以对 SSTable 进行如下的操作:查询与一个 key 值相关的 value,或者遍历某个 key 值范围内的所有的 key-value 对。从内 部看,SSTable 是一系列的数据块(通常每个块的大小是 64KB,这个大小是可以配置的)。SSTable 使用块索 引(通常存储在 SSTable 的最后)来定位数据块;在打开 SSTable 的时候,索引被加载到内存,来提高查询、读取效率。

HBase

HMaster

没有单点故障问题,启动多个 HMaster,通过 Zookeeper 的 Master Election 机制保证同时只有一个 HMaster 处于 Active,其他处于热备份的状态,定期从 Active 的 Master 同步其最新状态。

主要作用:

  • 管理 HReigonServer,实现其负载均衡
    • 启动时HRegion的分配,以及负载均衡和修复时 HRegion 的重新分配
    • 监控集群中所有 HRegionServer 的状态(通过 Heartbeat 和监听 Zookeeper 中的状态)
  • 管理和分配 HRegion
  • 实现 DDL 操作 (Data Defintion Language, namespace 和 table 的增删改,column family的增删改等)
  • 管理 namespace 和 table 的元数据
  • 权限控制

HRegionServer(工作者)

HRegionServer一般和DataNode在同一台机器上运行,实现数据的本地性。

存放和管理本地HRegion。
读写HDFS,管理Table中的数据。
Client直接通过HRegionServer读写数据
可以发现 HRegionServer 的作用与 BigTable 中 Tablet 服务器的作用几乎一摸一样,下面介绍更多它内部的机制:

WAL(WRITE AHEAD LOG)

所有的写操作都会保证将数据写入 LOG 文件以后,才会真正更新 MemStore(写缓存),最后写入 HFile 中。采用这种模式,可以保证HRegionServer宕机后,我们依然可以从该Log文件中读取数据,Replay所有的操作,而不至于数据丢失。这个Log文件会定期Roll出新的文件而删除旧的文件(那些已持久化到HFile中的Log可以删除)。

BLOCKCACHE(读缓存)

基于分空间局部性和时间局部性原理,将数据预读取到内存中,以提升读性能。HBase 提供了两种 BlockCache 的实现,默认是 on-heap LRUBlockCache 和 BucketCache(off-heap)。通常BucketCache的性能要差于LruBlockCache,然而由于GC的影响,LruBlockCache的延迟会变的不稳定,而BucketCache由于是自己管理BlockCache,而不需要GC,因而它的延迟通常比较稳定,这也是有些时候需要选用BucketCache的原因。在 BlockCache 101 - Nick Dimiduk 中更加详细的对比。

HSTORE(最小单位)

a-13-3

一个Table可以有一个或多个Region,他们可以在一个相同的HRegionServer上,也可以分布在不同的HRegionServer上,一个HRegionServer可以有多个HRegion,他们分别属于不同的Table。HRegion由多个Store(HStore)构成,每个HStore对应了一个Table在这个HRegion中的一个Column Family,即每个Column Family就是一个集中的存储单元,因而最好将具有相近IO特性的Column存储在一个Column Famil。

MEMSTORE(写缓存)

所有数据的写在完成WAL日志写后,会 写入MemStore中,由MemStore根据一定的算法将数据Flush到地层HDFS文件中(HFile),通常每个HRegion中的每个 Column Family有一个自己的MemStore。

  • 每一次Put/Delete请求都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile(底层实现是HFile),即一个HStore(Column Family)可以有0个或多个StoreFile(HFile)。有以下三种情况可以触发MemStore的Flush动作,需要注意的是MEMSTORE的最小FLUSH单元是HREGION而不是单个MEMSTORE。
  • 当一个HRegion中的所有MemStore的大小总和超过了size。当前 HRegion 下的所有 MemStore 进行 flush。
  • 当全局MemStore的大小总和超过了size,当前 HRegionServer 下的所有 MemStore 进行 flush.
  • 当前HRegionServer中WAL的大小超过了size,当前 HRegionServer 下的所有 MemStore 进行 flush。依照时间先后顺序,直到 WAL 少于 size。

HFILE

用于存储HBase的数据。

Zookeeper

在产品定位上,Zookeeper 是一个 Distributed process coordinator 而 Chubby 是一个 Distributed lock service。而高一致性与它的使用场景密切相关,由于 Chubby 主要处理少量的写更多的读操作的场景,提供粗力度的锁,所以是需要缓存的。而如果需要进行一次数据更新,Chubby 会先保证所有的缓存都进行更新以后才宣布这次更新完成。而这样的缓存机制在 Zookeeper 中是不存在的,所以当你在 Zookeeper 中进行数据更新,更新作用到 A 副本上,但是没来得及同步到 B 副本上是,就会出现 AB 副本数据不一致的情况。

而开源有开源的好处,在 Zookeeper 客户端上有 Apache 项目 Curator 进行了高度的封装,提供了这样的缓存机制,提升了 Zookeeper 的一致性等级。所以使用 Zookeeper 加上 Curator 的话是等于 Chubby 的。

下面再介绍一下它是如何在 HBase 中的作用:

  • 在HMaster和HRegionServer连接到ZooKeeper后创建Ephemeral节点,并使用Heartbeat机制维持这个节点的存活状态,如果某个Ephemeral节点实效,则HMaster会收到通知,并做相应的处理。
    协调多个热备份的 HMaster 节点,通过监听 /hbase/master 来确认 Active Master 节点的状态,如果节点消失,则得到通知,并将自己转为 Activer HMaster。
  • Hbase 使用 RowKey 将表水平切割成多个 HRegion,每个 HRegion 都记录了它的 StartKey 和 EndKey,且有序。HRegion 由 HMaster 分配到相应的 HRegionServer 中,然后由 HRegionServer 复杂 HRegion 的启动和管理,和 Client 的通信,负责数据的读(使用 HDFS)。每个 HRegionServer 可以同时管理1000个左右的 HRegion。现在很多的分布式存储都使用这种横向切割,列式存储的方式。

HRegion(数据聚合单位)

Hbase 使用 RowKey 将表水平切割成多个 HRegion,每个 HRegion 都记录了它的 StartKey 和 EndKey,且有序。HRegion 由 HMaster 分配到相应的 HRegionServer 中,然后由 HRegionServer 复杂 HRegion 的启动和管理,和 Client 的通信,负责数据的读(使用 HDFS)。每个 HRegionServer 可以同时管理1000个左右的 HRegion。现在很多的分布式存储都使用这种横向切割,列式存储的方式。

INDEX

a-13-4

可以发现这张图和 BigTable 的 Tablet 索引图几乎一摸一样,不过有趣的是在 0.96 以后,HBase 觉得自己不需要这么大的地址空间,并以每次查询多一次寻址的代价。所以改为了两层结构,并且也保证了 META Table 像之前的 Root table 一样是不可切割的,保证这棵 B 树永远只有两层结构。同样提供客户端缓存。

Druid

底层模型的演进

在中插一个小话题,说说数据存储系统底层模型的一个演进之路。从最开始的平衡树到 B+ 树再到 LSM树:

  • 平衡树缺点:树高为 log2(N),对于索引树来说树高越高,意味着查找所要的花费的访问次数越多,查询效率越低。(常数代价高)况且主存从磁盘读数据一般以页为单位,每次访问磁盘读取多个扇区的数据(大约4kb),远大于单个二叉树节点的值,造成了不必要的查询浪费。

  • B+ tree 缺点:叶子节点慢慢分裂,可能导致逻辑上原本连续的数据实际上存放在不同的物理磁盘块位置上,在做范围查询的时候会导致较高的 IO,影响性能。

  • LSM-tree 特点:在磁盘的访问中,顺序访问的速度是远大于随机访问的速度。而 LSM-tree 正是顺应了这个特点,保证数据的有序性以将一个请求转化为顺序的磁盘访问。在 LSM-tree 中同时使用两部分类树的结构来存储数据,并同时提供查询。其中一部分数据存放在内存中,负责接受新的数据插入更新以及读请求,并直接在内存中对数据进行排序。另外一部分存放在硬盘上,它们是由存放在内存中的 c0 树冲写到磁盘而成,主要提供读,特点是有序且不可别更改。再通过 WAL 原则来容灾恢复,使用 bloom filter 来快速判断数据的存在性。而其更适合插入操作远多于数据更新删除操作与读操作的场景。

Druid 在架构上借鉴了 LSM-tree 的思想,但是也因为使用场景的关系有一些取舍。由于不支持数据修改,所以直接去掉了 WAL 原则。数据直接进入内存的堆区,到达条件后冲写到硬盘上形成一个数据块,同时实时节点又会立即将新生成的数据块加载到非堆区。会周期性的堆 Segment split 进行合并,合并好的会立即被实时节点上传到数据文件存储库中,随后协调节点会指导一个历史节点去文件存储库,将新生成的 Segment 下载到其本地磁盘中。当历史节点成功加载到 Segment 后,会通过协调服务在集群中声明其从此刻开始负责提供该 Segment 的查询。实时节点收到该声明后就不再提供 Segment 的查询服务。

系统结构

实时节点(工作者 内存读写)

主要负责即时摄入实时数据,以及生成 Segment 数据文件。如 Kafka 的消费,可以开多个节点对多个 Partition 进行同时消费,在 Zookeeper 上记录 partition offset,保证 at least one。

a-13-5

上图是实时节点的数据流图,可以看到实时数据会先被写入堆内存当中。在写到一定数量以后,会形成一个 Segement split 持久化保存到硬盘中,且堆内存能保证 Segement split 内部的有序性。而一个 Segement split 马上会被从硬盘里读取到非堆内存中,而此时堆内存中相同的数据会被清理掉。一个查询请求会同时到堆内存和非堆内存里查询数据再进行拼接以后进行返回。

协调节点(历史节点的控制者)

负责历史节点的数据负载均衡,以及通过规则管理数据的生命周期。Druid 通过对每个 DataSource 设置的规则来加载或丢弃具体的数据文件,以管理数据生命周期。在历史节点推出系统的时候,协调节点还没有把其身上携带的 Segment 负载到其他节点身上的时候,会出现短暂的数据无法访问。而Druid 允许通过 创建 Segment 的副本来解决该问题。

历史节点(工作者 读)

历史节点负责加载已经生成好的数据文件以提供查询,并且由协调节点来进行负载均衡。历史节点在启动的时候,首先检查自己的本地缓存中已经存在的 Segment 数据文件,然后从 DeepStorage 中下载属于自己但目前不在自己本地磁盘的 Segment 数据。无论是何种查询,历史节点都会将相关的 Segment 先加入到自己的内存中,然后提供查询服务。

历史节点的查询速度与其内存空间大小和所负责的Segment 数据文件大小之比成正比。Druid 对历史节点进行分层,可以根据数据温度来协调数据的存储位置。

通过 Zookeeper 来协调高可用和高拓展性,新的历史节点被添加后,会通过 Zookeeper 被协调节点发现,然后协调节点将会自动分配相关的 Segment 给它。原有的历史节点被移除的时候,同样会被协调节点发现。

查询节点(工作者)

对外提供查询服务,从历史节点和事实节点查询数据,合并后回传。提供缓存机制,使用多个查询节点来防止单点故障,使用 Nginx 来做负载均衡。保证每个查询节点在对同一个请求相应的时候返回相同的结果。

Segment(数据聚合单位)

a-13-6

总览

a-13-7

实线是数据请求的流向,虚线是实时数据的流向。

实时数据会先进入实时节点的堆内存,在堆内存的大小达到一定的数量以后,会形成 Segment split 冲写到硬盘里,同时这个 Segement split 会被加载到非堆内存中以供访问。并且会提供周期性的 Merging Compaction,由几个小的 Segment split 合成一个 Segement,并存入到存储节点中。存储完成以后会告之协调节点,由协调节点进行负载均衡,决定把这个 Segement 交给一个历史节点,之后这个历史节点会声明对这个 Segement 的查询提供服务,而实时节点收到这个消息以后就不再对这个 Segement 的查询提供服务了,并清理掉相关数据。

一个请求进入系统以后会先到查询节点,查询节点再通过现在各节点对数据的负责情况,分别到实时节点和历史节点上进行查询,最后将结果合并进行返回。并且在查询节点上会提供各等级的缓存,历史节点则提供 Block cache。

End

相信你在阅读以后能明显的感受到三个系统的相似与区别,并对这种分布式架构有个大体上的理解。本篇博客大部分内容都是对现有文章的整理,汇总。所以最后感谢这些文章及其作者:

Read More
post @ 2019-03-17

描述

最近一次在实现需求的时候发现 Storm 中的一个 Bolt 出现了 OOM 导致的长时间 GC 问题。最后虽然通过 review 新更新的代码找到了问题,但是深究其中还是有一些别的收获,所以在这里进行记录。

在 review 新更新的代码之后发现,我将 JedisPool 的实例化写到了 execute 中而不是 prepare 中,所以 Storm 每次执行 execute 的时候都会重新实例化 JedisPool 并且也没有显式的进行 close

虽然这个问题只是因为疏忽导致的,但是也让我对两个大问题进行了思考。一个是对于 Storm 中资源冲突的问题应该如何去发现、定位、处理,第二个是 Storm 中 Component 的生命周期。下面会讨论这两个问题。

Storm中的资源冲突

a4-1

要解决 Storm 中的资源冲突,那么需要先了解 Storm 中的资源分配。一个集群由一个 nimbus 节点和多个工作节点组成,每个工作节点由一个 Supervisor 管理着多个 Worker process,每个 Worker process 对应着一个 JVM,在其中有多个 Executor thread。每个 Executor thread 中可能存在多个 Task。而 Task 则是一个 bolt 或者 spout 的实例。

在此基础上,可以把资源竞争从所属结构从小到大划分为:

  • Worker process 中的内存冲突
  • Worker process 的冲突
  • 工作节点上的内存冲突
  • 工作节点上的 CPU 冲突
  • 工作节点上的网络 I/O 冲突
  • 工作节点上的磁盘 I/O 冲突

Worker process 中的内存冲突

首先这类冲突实际上是 JVM 中的内存占用过多,表现为 out-of-memory 或者进入长时间的垃圾回收,并且这类冲突会在 UI 上暴露出来。而解决办法无非是:

  • 减少一个 Worker process 中的 Executor thread 个数
    • 保证 Executor thread 数量不变的情况下加大 Worker process 数量
    • 保证 Worker process 数量不变的情况下减少 Executor thread 数量
  • 提高给 JVM 分配的内存:在 storm.yamlworker.childopts 属性是 JVM 相关的参数,可以通过设定 -Xms-Xmx 来进行修改。

而观察任务的 GC 日志是最直接也是最长用到来解决问题的途径,在 storm.yamlworker.childopts 中可以对 JVM 的 GC 日志进行配置。

1
2
3
4
5
6
7
8
9
worker.childopts: ""-XX +PrintGCTimeStamps  
-XX: +PrintGCDetails
-Xloggc: /opt/storm/worker-%ID%-jvm-gc.log
-XX: +UseGCLogFileRotation
-XX: NumberOfGCLogFiles=5
-XX: GCLogFileSize=1M
-XX: +PrintGCDateStamps
-XX: +PrintGCApplicationStoppedTime
-XX: +PrintGCApplicationConsurrentTime"
  • -XX +PrintGCTimeStamps: 打印垃圾回收的时间戳
  • -XX: +PrintGCDetails: 打印额外的 GC 细节
  • -Xloggc: /opt/storm/worker-%ID%-jvm-gc.log: 为每个工作进程分别创建日志文件
  • -XX: +UseGCLogFileRotation: 对 GC 日志文件使用日志转储
  • -XX: NumberOfGCLogFiles=5: 设置日志的分割个数
  • -XX: GCLogFileSize=1M: 设置日志的分割大小
  • -XX: +PrintGCDateStamps: 打印垃圾回收的日期和时间信息
  • -XX: +PrintGCApplicationStoppedTime: 打印应用程序停止时 GC 启动时间(时间在安全点内)
  • -XX: +PrintGCApplicationConsurrentTime: 打印 GC 执行期间程序启动的时间(时间不在安全点内)

Worker process 的冲突

这是由于需要的 Worker process 数量超过了集群中的数量,可以通过扩展集群,或者增加每个工作节点的 Worker process 数量来解决。也可以对减少集群里一些任务的 Worker process 占用来解决。

storm.yamlsupervisor.slots.ports 配置项可以配置一个工作节点的 Worker process 数量,每一个端口对应一个进程。添加添加、删除端口就能进行控制。

工作节点上的内存冲突

因为工作节点的内存需要支撑 Supervisor process, 操作系统,多个 Worker process 和其他的一些进程。如果工作节点在内存上发生了使用冲突,工作节点将开启进程间的内存调度(swapping),会造成有较高的延时发生。可以通过 sar 命令来进行监控:

1
2
3
4
5
6
7
8
9
10
$ sar -S 1 3
```

表示每隔1秒输出3条内存的活动信息,主要需要关注 `kbswpused` 和 `%swpused` 数据。`kbswpused` 是使用中的交换空间内存(KB) ,`%swpused` 是使用中的交换空间内存百分比。如果这两个值大于0则说明系统中存在内存交换。

### 工作节点上的 CPU 冲突
和上一个情况类似,也是因为对 CPU 的使用超过了节点所能提供的而造成的。也可以使用 `sar` 命令来进行监控:

```shell
$ sar -u 1 3

主要需要关注 %idle,即在系统没有任何磁盘 I/O 请求空闲CPU时间百分比,如果值偏低。再到 %user%nice%system 中去找事应用层面上的问题还是系统层面的问题。

工作节点上的 I/O 冲突

同样可以使用 sar 命令来进行监控:

1
$ sar -u 1 3

只是现在需要关注的值是 %iowait,CPU 时间空闲的百分比,在此期间系统将执行 I/O 请求。如果这个值约为 10.00,那么大概率出现因 I/O 冲突导致的性能问题,如果大于 25.00,一定面临比较严重的 I/O 冲突。

然后要做的就是定位问题是在网络 I/O 还是磁盘 I/O,可以先通过下面的方法检查网络 I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取进程id
$ ps aux | grep MY_TOPOLOGY_NAME
stormuser 12345 .....

// 获取端口
$ netstat -antup | grep "12345/"
tcp6 0 0 xx.xx.xx.xx: 12345 xx.xx.xx.xx:42474 ESTABLISHED 4576/java

// 查看限制
$ cat /proc/12345/limits

// 查看该端口的稳定的连接tcp连接
$ netstat -na | grep "tcp6" | grep "4576" | grep "ESTABLISHED" | wc -l

检查 Max open files 一行, soft limit 和 hard limit 的值,如果已经到达设置的极限值,那么就会发生网络 I/O 冲突。然后可以使用 iotop 来观察是否发生了磁盘 I/O。

主要观察 USER 为 storm 的进程, DISK READDISK WRITEIO> 即每秒该进程读取的字节数,每秒该进程写入的字节数,每秒 I/O 调用百分比。如果其中一个值较高就说明相关的任务出现了磁盘 I/O 冲突。

该章节主要参考《Storm应用实践》

Component 生命周期

这里可以先参考一下 Nathan on 自己的回答: The lifecycle of a bolt or spout

可以看到 prepare 仅仅只会被 worker 在开始的时候执行一次,但是 execute 会在每次有 tuple 进入的时候都被调用。也就是说如果我有一个 Bolt 定义了3个 worker,每个 worker 都有3个 executer。那么总共是有9个 task,也就是说 prepare 会被调用9次,而 execute 就会被无限调用。

所以我们一些连接和静态变量的初始化工作放到 prepare 中去完成会更加的实惠。但是由于每个 worker 对应一个单独的 JVM 进程,一个 JVM 进程中会有多个 executor 线程,所以就会有多个 task 同时执行 prepare 的操作。
那么一些连接的创建是需要线程安全的环境,线程安全的单例写法这里不赘述。

Read More
post @ 2018-11-16

最近需要实现的一个场景是对于社区里的每条动态进行多次的检测和评估,暂且不论检测和评估的具体功能,统称为「计算」。

而这些计算由其他同事提供现成的接口,但是由于其具体功能的差异这些计算有些依赖了外部接口,并且可能依赖外部的长延时接口,例如视频分析。所以这里从计算的实现上将其划分为「瞬时计算」和「长时计算」,而对于上层使用方来说是需要构造出一种使用协议来同时兼容两种类型。而这样的三层结构中是有两次网络传输过程,更需要一种足够健壮的重试策略。并且在动态流量基数较大,峰值与谷值差异较大的情况下,横向扩容能力也非常的重要。

模型

第一次讨论的方案是 「push模型」,多方使用我暴露的接口,在完成数据消费计算之后推送到我的服务中进行下游逻辑。这种方案有几个弊端,第一是多方需要实现消费 Kafka 的逻辑并且需要管理 Kafka offset,保证在事故不会导致数据丢失。第二是网络传输导致的重试逻辑复杂,需要接入第二个重试队列,或者提供重试接口,这样数据流的流向就从单向变为双向,增加了之后的维护和排查成本。但是这种方案还是有一个好处,因为计算方是接触数据的第一层,所以在计算方内部出现错误引起的重试或者报错机制实现其他非常简单。

在否定以后转换为 「pull模型」,多方暴露接口让我进行统一的调用,仅由我方消费 Kafka 并统一维护状态和提供重试策略。但是在实现细节上还是有两种方案可供选择,一种是以「动态为单位」,一种是以「计算为单位」。动态为单位是说,主线程消费 Kafka,然后交由下游异步或多线程方式调用多个接口,全部成功后算作一个动态计算完成。计算为单位是说,多个线程各自使用独立的 group id 消费 Kafka 并维护重试队列,然后在线程内进行接口调用。两种方式的区别是一个计算失败之后的 block 代价不同,第一种方案会 block 整个动态,第二种方案只会 block 动态的一个计算。并且由于计算之间的效率差,第一种的效率取决于最慢的一个计算,第二种在动态为单位的角度来看,也是以最慢的一个计算决定,但是由于计算之间不会互相影响,所以之后想对「慢计算」进行降级的话能很方便的完成。

三个问题

在决定模型以后,需要考虑第一个问题,「构造出一个通用的协议兼容同步计算和异步计算的调用」。一种是同步轮询,一种是异步调用。由于底层依赖的外部长时计算有请求的次数限制,所以同步轮询需要记录请求时间来控制轮询的时间间隔,但是没有性能问题的风险。而对于异步调用,如果下游系统被某些原因 block 住的时候会无限的建立连接,在超过线程池的上限以后会 block 住调用方。所以比较下来还是使用轮询的方式简单实用,只需要维护一个调用队列,每个请求带上「上一次请求的时间戳」和「重试次数」,如果在消费到一个还没有超过调用间隔的请求,不累加重试次数,直接放入队列末尾;如果一个请求失败则直接修改时间戳,累加重试次数放入队列末尾;如果超过了重试次数,直接抛出系统记为一个 bad case。

在确定轮询的方式以后,第二个重试问题也迎刃而解,仅由顶部调用方来控制请求状态,并且提供重试,这样单向的数据流在后期的问题排查和维护过程中是非常重要的。而第三个问题,横向扩展能力,由于使用了「以计算为单位的pull模型」,扩展新的计算可见是非常方便的,只需要添加独立的线程。并且扩展每个计算调用系统本身也是非常方便的,需要对「分发任务逻辑」和「调用逻辑」进行解耦,扩展时只扩展「调用逻辑」部分,不然的话还需要保证每个线程之间分发任务的队列的一致性,是很冗余的设计。那么整个轮询系统内部也被划分为了两个部分,第一个部分消费 Kafka,维护调用队列,第二个部分消费调用队列进行底层接口调用,并且会反馈调用结果给第一个部分,使其进行调用队列的状态维护。

a3-1

而上面也提到了单向数据流的好处,所以这里为了规避掉双向数据流,将请求完成后的队列维护工作也放在接口调用部分。所以就变成了「状态维护」部分只管往调用队列里放请求,「接口调用」部分负责调用接口并且使用 response 来维护调用队列里的请求状态,例如请求次数加一之后放入队列末尾。

a3-2

复用Storm

上面的结构一看非常像 Storm 的流式结构了,并且 Storm 能保证一条 Kafka message 在轮询系统中一定会被成功消费并且是顺序消费,还能帮我们管理 Kafka offset 状态,还不需要写多线程,扩容起来也非常方便。那么何乐为不为呢?

a3-3

将设计图一改,瞬间转变成一个 Storm 架构,其实上面单向数据流的设计也规避了 Storm 中不能由下游 Bolt 给上游 Bolt 传递消息的情况。而 Storm 本身也提供重试机制,在该重试机制下我们可以重新考虑之前数据结构的设计。

因为我们需要考虑的是两类重试情况,一种是通过重试能解决的网络问题,一种是通过重试不能解决的系统问题。而当我们在遇到超过重试次数没有解决的问题时,之前的解决方案是抛出系统持久化到一个地方,之后再想办法解决,但是这样又增添了该系统的复杂程度,需要通过自动化的方案能区分这个 case 到底是网络问题还是系统问题,之后再通过一套方案将它解决掉再写入系统。

试想最简单的处理方案是遇到网络问题通过重试自然的解决它,而遇到系统问题直接 block 整个系统,等待计算提供方解决问题后再继续。而 Storm 的机制刚好提供了这种方案的解决策略,如果遇到重试的请求都进行无限次的重试,因为短暂的问题肯定是会在有限次重试的过程中恢复,而系统问题是无限次重试都不能解决的,那么遇到很多的系统问题 case 不是会浪费IO,并且也没有 block 整个系统吗?

其实并不会,Storm 内部设计的时候一次会从 Kafka 中拿出多个 message 形成多个 tuple,只有在这些 tuple 全部 ack 掉以后才会继续向后面拿数据。所以如果在一批数据中出现了一些系统问题的 case,他们通过无限的 fail 重试是会 block 整个 topology 的,并且他们的个数不会很多,所以不会对下游造成 IO 的压力。对 Kafka 的 offset 进行检测接上警报以后,很快消费能力就跟不上生产能力,就能知道出现了这样的系统问题,在下游的服务修好以后,再接着进行消费,也不会丢失数据。

思考

系统的设计中每一部分一定要简单纯粹,不要想让一个部分做多件事情。

Read More
⬆︎TOP