Percolator

Percolator主要使用在Bigtable系统中提供分布式事务能力,其本身的实现是利用Bigtable的单行事务能力以及在行内设置lock列来进行悲观事务。数据结构:

1
<key, version>: <data>, <lock>, <write>

Transaction Write

  • 开启事务:向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 2PC第一阶段:Percolator在一些Write set当中选择一行做为Primary,其他的行做为Secondaries。先对Primary做prewrite操作,如果成功则对其他的Secondaries做prewrite操作。
    • Prewrite:这个操作当中有多个原子操作,被封装在一个事务当中,由Bigtable的单行事务性来保证。
      • 首先检查write是否有时间戳大于t(s)的版本,如果有则说明这行数据已经被新的事务提交过了,直接返回事务冲突
      • 然后检查lock是否有任意版本的数据存在,如果有则说明这行资源还被别的事务持有,返回事务冲突
      • 如果前面的操作都成功了,那么在data写入版本为t(s)的value数据
      • 并且在lock中写入版本为t(s)值为primary位置的数据
        prewrite的第二部检查当中,发生冲突是有三种情况:
    • 获得锁的版本小于t(s),该资源正在被一个事务持有
    • 获得锁的版本小于t(s),有一个老事务因为某种原因没有成功的还掉锁
    • 获得锁的版本大于t(s),该资源正在被一个事务持有
      上述情况当中的1,3都是典型的写-写冲突,client就进行正常的backoff重试即可。而第2种情况是客户端在2PC的第二阶段发生了异常导致,这时需要rollback之前的事务来释放掉这个异常的锁。并且这里是很难区分1,2的,毕竟锁的版本都小于t(s),所以需要一个附件条件锁的ttl时间,如果锁处于ttl时间内则说明是第1种情况,在ttl时间外则是第2种情况。
  • 2PC第二阶段:向集中的TSO节点申请一个事务提交时间戳t(c),之后检查Primary的lock是否还存在t(s)版本的数据,如果不存在则说明该事务锁已经超过ttl时长,被其他的事务中断了。如果存在的话,则向write写入版本为t(c)值为t(s)的数据并且清掉锁,这时整个事务已经成功。最后异步的完成Secondaries写write并且释放锁的操作。这个阶段当中检查、写入、清锁的过程被包装在一个事务当中。

Transaction Read

读事务就要简单很多,Percolator向集中的TSO节点申请一个事务开始的时间戳t(s),然后检查所有的Read set中的锁,如果存在时间戳小于t(s)的锁:

  • 锁还处于TTL时间内,说明该资源正在被另外一个事务持有,Client进行backoff操作
  • 锁已经超时,这时可以通过锁中记录的primary位置找到primary行的write列,检查是否存在锁版本的数据。如果存在则说明该事务已经成功,只是没有正常的还锁,这时将锁对应的事务进行提交,如果不存在则说明该事务2PC第二阶段出现问题,将该事务进行rollback

下面是论文源码👇:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class Transaction {
struct Write{ Row row; Column: col; string value;};
vector<Write> writes_;
int start_ts_;

Transaction():start_ts_(orcle.GetTimestamp()) {}
void Set(Write w) {writes_.push_back(w);}
bool Get(Row row, Column c, string* value) {
while(true) {
bigtable::Txn = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"locks", [0, start_ts_])) {
// There is a pending lock; try to clean it and wait
BackoffAndMaybeCleanupLock(row, c);
continue;
}
}

// Find the latest write below our start_timestamp.
latest_write = T.Read(row, c+"write", [0, start_ts_]);
if(!latest_write.found()) return false; // no data
int data_ts = latest_write.start_timestamp();
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}
// prewrite tries to lock cell w, returning false in case of conflict.
bool Prewrite(Write w, Write primary) {
Column c = w.col;
bigtable::Txn T = bigtable::StartRowTransaction(w.row);

// abort on writes after our start stimestamp ...
if (T.Read(w.row, c+"write", [start_ts_, max])) return false;
// ... or locks at any timestamp.
if (T.Read(w.row, c+"lock", [0, max])) return false;

T.Write(w.row, c+"data", start_ts_, w.value);
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col}); // The primary's location.
return T.Commit();
}
bool Commit() {
Write primary = write_[0];
vector<Write> secondaries(write_.begin() + 1, write_.end());
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;

int commit_ts = orcle.GetTimestamp();

// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
T.Write(p.row, p.col+"write", commit_ts,
start_ts_); // Pointer to data written at start_ts_
T.Erase(p.row, p.col+"lock", commit_ts);
if(!T.Commit()) return false; // commit point

// Second phase: write our write records for secondary cells.
for (Write w:secondaries) {
bigtable::write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
}; // class Transaction

Example

  1. 初始化Bob和Joe的账户,Bob有10元,Joe有2元
    a15-1
  2. 有一个事务出现,这个事务要将Bob的7元给Joe,这时获得了一个新的时间戳7,选择Bob做为primary,锁住该行写入Bob减掉7元以后的数据
    a15-2
  3. 将Joe选为Secondary,并指向Primary的Bob,锁住该行写入Joe加7元以后的数据
    a15-3
  4. 这时候2PC第一阶段完成,开始第二阶段,申请一个提交时间戳8,将时间戳7写入Bob的write的8版本中
    a15-4
  5. 将时间戳7写入Joe的write的8版本中
    a15-5

Percolator in TiDB

有了上面对Percolator的解释,我们现在很容易理解在TiDB中是如果使用Percolator来实现事务逻辑。首先我们来看其乐观事务的实现:
a15-6
我们从图上可以看出,Percolator是被使用在TiDB和其下面的TiKV进行事务通信的协议。最开始的时候我很奇怪,Percolator的实现不是一个悲观事务模型吗?但是为什么TiDB里称其为乐观事务,是因为暴露给Client的不是底层的KV而是DB这一层,而加锁的过程被放在了Commit阶段,所以对于Client来说,这就是一个乐观事务模型。当事务开始以后,首先执行DML操作,得到Write set,然后将Write set放到Percolator中执行2PC,在第一阶段上锁。

下面再看看他们在这基础上修改的悲观事务模型,很巧妙:
a15-7
将上锁的过程提前到开始执行Percolator事务之前,先对所有的Write set上一个和Percolator同样的锁,不过锁里面没有记录Primary的位置,而是空的,仅做占位符使用。等开始执行Percolator事务以后,锁会被写入正确的值。这样做的好处是,在数据真正开始发生变更之前就锁住了所有资源。不会发生回滚行为,在资源竞争密集的场景下效率大大优于乐观事务。写请求看到这个空锁直接等锁,读请求可以直接从TiKV中读取数据即可。

Omid

Omid主要使用在Phoenix系统中提供分布式事务能力,其本身的实现是利用Hbase的单行事务能力以及在行内设置version, commit列来进行乐观事务,数据结构:

1
2
Data table   <key, version>: <value> <commit>
Commit table <version>: <commit>

Transaction Write

  • 开启事务:向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 2PC第一阶段:Client将Write set中的每行的修改数据写入Data table版本为t(s),对应的key的value当中,需要注意的是这时候的commit均为null
  • 2PC第二阶段:Client带上Write set和t(s)TSO提交commit请求,TSO会进行冲突检查,如果检查成功则返回t(c)给Client,否则的话整个事务被中断。Client拿到t(c)以后向Commit Table发起CAS(t(s), commit, null, t(c))操作,如果返回ABORT则将事务终止,并且异步的清除Data table中之前写入的数据。如果成功,则进行Post-commit流程,将写入Commit table中的t(c)异步复制到Data table的版本为t(s)commit当中。完成所有的异步复制以后进行垃圾回收,将Commit table当中的数据清除掉,完成整个事务。
    • TSO如何进行冲突检查:原理非常的简单,就是检查Write set当中的每一行是否有lastCommit > t(s)的数据,lastCommit是这一行最新的一个t(c)。如果有则说明在该事务执行过程当中已经有其他的事务完成,出现了写-写冲突,则中断该事务。但是要执行这个操作需要在TSO当中保存所有行的lastCommit数据才行,这个存储开销太大了,所以需要想办法优化。优化的手段也比较简单,就是维护一个LRU队列即可,只保存一定数量的行的lastCommit即可。那么不在队列当中的行的lastCommit一定小于等于队列中最小的一个lastCommit时间,这样可以检查lastCommit<=Smallest(lastCommit)<=t(s)的偏序关系,以检查冲突情况。但是由于队列中没有保存所有的数据,还是会有漏网之鱼,比如说现在队列里的Smallest(lastCommit) > t(s),并且要检查的行没有在队列当中,那么偏序关系就无从可知,这时候就直接将事务中断即可,也不会影响正确性。
    • CAS函数:这是一个在实现乐观锁当中经常会使用到的函数,CAS(a,b,c,d)是指比较a行b列,如果它现在的值等于c,则将其修改为d。并且这个函数需要保证原子性。在HBase当中可以使用行级事务来实现CAS函数,并保证其原子性。

Transaction Read

  • 向集中的Timestamp server oracle(TSO)节点申请一个事务开始时间戳t(s),并且以此时间戳做为事务id
  • 扫描所有的Read set,每一行从大版本到小版本扫描,找到第一个提交版本小于t(s)的value和对应的版本t(s2)。如果发现其commit==null,这时候有两种情况:
    • 这一次事务已经成功,只是正在进行Post-commit流程,从Commit table当中将t(c)复制过来
    • 这一次事务没有成功
      为了区分这两种情况,Omid会去Commit table当中检查版本t(s2)对应的commit是否有值,如果有值则说明是情况1,如果没有则说明是情况2。情况1的话很好办,继续向下遍历更小的版本。情况2的话就比较麻烦:
    • Client调用CAS(t(s2), commit, null, ABORT)来将其对应的事务设置为ABORT,这样在其重试的Commit环节会发现ABORT标志而使其事务进行中断操作
    • 如果设置成功,还需要检查一下是否是因为读事务太慢而导致的错误中断,去Data table读t(s2)版本的commit,如果发现存在值t(c2),并且t(c2)<t(s),则返回其value和版本。否则继续向下遍历更小的版本。

下面是论文源码👇:
a15-8

a15-9

End

通过上文的分析我们可以看出,Percolator的优点是分布式的Commit table,TSO逻辑简单,缺点是锁检查时需要扫描所有Write set的锁情况,并且需要额外的存储开销来记录锁。Omid的优点是执行效率上优于Percolator,但是又多了一个中心系统Commit table。大家可以根据自己的使用场景来进行选择。

参考资料:

Comments

⬆︎TOP