Target

在不暂停服务的前提下将一个 290G 的 Mongo 数据表中的部分字段迁移到 Postgresql 数据库,保证足够低的差异性。

Strategy

一切的工作的基础是 MongoDB oplog 的幂等原则:

  • 首先启动一个服务将 MongoDB 中实时生产的 oplog 同步到 kafka 中。
  • 启动 Spark 任务将目标表中的字段批量同步到 Postgresql 中。
  • 启动 Storm 服务将 kafka 中的 oplog 回放到 Postgresql 中。
  • 启动 Spark 任务进行一致性的检查。

a1-1

必须保证第一个过程在第二个过程之前启动,第三个任务在第二个任务完成之后启动。任务流程如图,oplog 被同步到 kafka 的一个 topic 中,这个 topic 被分成了8个 partition,启动了 3个 broker。然后将这个 kafka topic 作为 Storm 任务的 KafkaSpout,并发数为4。下游进行持久化任务的 Bolt 的并发数也为4。

同步 Oplog

这一步需要注意的是同一个 _id 的 oplog 的乱序问题,我们在回放 oplog 的时候必须按照发生的时间顺序进行回放,不然会出现丢失数据的情况。所以一定要保证 oplog 是按照生成的顺序放入 kafka 队列中的。在这种情况下肯定是单线程的服务来同步数据更加的合适,就不需要担心由于并发带来的乱序问题。

第二个可能导致乱序的点是 oplog 在 kafka 中的存储位置,我们为了保证同一个 _id 的对象 oplog 不乱序,那么必须保证它们被存储在同一个 partition 当中。如果存储在不同的 partition 当中的话可能会在 Storm 的不同 Spout Consumer thread 中被处理,那么就有可能会出现创建时间在后面的 oplog 先被回放到 Postgresql 当中。而如何能保证同一个 _id 的对象放入到同一个 kafka partition 当中?只需要将 _id 作为 kafka message 的 key,因为 kafka 的 partition 机制就是如果有 message key 就按照 message key 进行 hash 以后进行分区。参考源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

如果消息的 key 为 null,则先根据 topic 名获取上次计算分区时使用的一个整数并加一。然后判断 topic 的可用分区数是否大于0,如果大于0则使用获取的 nextValue 的值和可用分区数进行取模操作。如果 topic 的可用分区数小于等于0,则用获取的 nextValue 的值和总分区数进行取模操作(随机选择了一个不可用分区)

如果消息的 key 不为 null,就是根据 hash 算法 murmur2 算出 key 的 hash 值,然后和分区数进行取模运算。

Storm 回放

乱序

现在我们已经保证同 _id 的消息会进入到同一个 Storm Spout thread 当中,现在还需要保证在发送到 Bolt Task 的时候也进入到同一个。这就需要考虑 Storm 的 grouping 策略了,其中只有 FieldGrouping 能满足要求,FieldGrouping 是通过 parentBolt 发出的 stream 当中声明的某一个或者几个 field 来做 grouping,比如 parentBolt 发出的 stream 中有:usernamepassword 两个 field,而在声明 FieldGrouping 的时候设置按照 username 这个 field 来做 grouping。那么 username 相同的stream 则一定会进入到同一个 BoltThread 当中。

但是这次的设计当中只有1个 Bolt,它的父级 Bolt 是一个 Spout。而没有找到合适的api来对一个 Spout 的 stream 声明 field,默认只有一个 bytes field。通过分析发现在 KafkaSpout 类中有一个 public 方法:

1
2
3
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(this._spoutConfig.scheme.getOutputFields();
}

是用来声明 Fields 的,通过参看文档,没有携带 streamName 参数的 declare 方法会声明一个 streamNamedefault 的 stream。

所以我们只要使用自己的 scheme ,并且重载它的 getOutputFields 参数即可完成对这个 Spout 的 stream 声明 field。并且发现 KafkaConfig 类中 stream 是一个 MultiScheme 类型的 public 参数,构造函数里使用 RawMultiScheme 类进行初始化它的值,而该类中的 getOutputFields 方法:

1
2
3
public Fields getOutputFields() {
return new Fields(new String[]{"bytes"});
}

所以默认的 Fieldsbytes。那么只要继承 backtype.storm.spout.Scheme 类重载 getOutputFieldsdeserialize 即可。由于传入 Spout 的要么是 bytes,要么是一个 String,所以如果不重载 deserialize 方法对其进行反序列化,那么设定的 field 也是没有实际意义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
try {
...
} catch (Exception ex) {
return null;
}
}

@Override
public Fields getOutputFields() {
return new Fields(...);
}
}

网络不稳定性

上面说了怎么保证 Storm 内部的不乱序问题,而仅凭以上的方法还不足以保证数据的完整性。因为在 Storm 内部传输信息是需要通过网络进行传输,所以数据是有丢失的风险的。而 Storm 本身为了应对这种风险也设立了一种容错机制。具体可以参考: Guaranteeing Message Processing

总得来说就是 Storm 内部通过 SpoutOutputCollector 给每一个从 Spout 发出的 tuple 给定一个 id,如果在设定的 timeout 时间内没有完成这个 tuple 应该完成的所有 task,那么会发出一个 fail 信号让 Storm 通过这个 id 来对这个 tuple 进行重试。而使用者仅需要设定 timeout 时间,并且在 collector.emit 的时候将 tuple 作为第一个参数。

TIPS

  • 无论是离线的批量导入还是在线的回放,insertupdate 操作需要使用 upsert 操作代替,不然会出现大量的由于 id 重复或者 id 不存在的报错。比如在表中有一个 id 为 a 的数据,而在第一步中记录了对 a 的 update 操作和 delete 操作,所以在第二步启动的时候这个 a 数据已经不存在了,而在回放的时候直接使用 update 操作回放会报错。

  • 由于是对表中的部分数据进行迁移,所以在 Bolt 当中需要对 oplog 进行过滤,只对包含目标 field 的 oplog 进行重放,否则会存在大量的垃圾数据。

  • MongoDB oplog 的 offset 最好进行缓存,做好容错工作。

Comments

⬆︎TOP