在我们的业务当中有这样一个场景,用户每收到一笔转账以后,就会通过推送服务给用户发送一个通知,而这些通知之间需要保证先后顺序(即帐单的产生时序)。假设我们有一个订单数据结构Bill

1
2
3
4
5
6
7
{
"_id": ObjectId
"fromUserId": String,
"toUserId": String,
"amount": Double,
"createdAt": Long
}

fromUserId是转账用户的id,toUserId是收款用户的id,amount是转账金额,createdAt是记录创建的日期。

系统中使用安装了debezium connector for mongodb插件的kafka connect来将mongodb oplog收集到kafka中,下游使用flink streaming task来进行消费并触发推送服务。整个过程中数据会流过多个分布式系统,如何保证在流经这些系统以后还能保证记录的产生顺序就是今天要讨论的问题。

Kafka connect && Kafka

所有的Bill oplog都会被发送到同一个Kafka topic中,所以只需要保证在发往Kafka的过程当中使用toUserId作为 Partition Key即可。但是由于使用了开源组件来帮助我们完成了收集oplog的任务,所以需要保证debezium connector for mongodb能如我们的愿。

首先需要知道的是Kafka connect默认情况下使用的分区策略是org.apache.kafka.clients.producer.DefaultPartitioner,其中partition方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

如果key是空的,则以轮训的方式来分配partition,否则则使用一种murmur2的HASH算法来分配partition。所以我们只需要将message key设置为toUserId即可达成我们的目的。但是 Debezium Connector for MongoDB :: Change event’s key 中写道,message key只能是_id,而toUserId不具有唯一性,所以不能作为_id;如果加上时间戳或者其他的字段保证了唯一性,又失去了要将相同toUserId放在一个分区的语义,所以这条路基本是走不通的。

The MongoDB connector does not make any explicit determination of the topic partitions for events. Instead, it allows Kafka to determine the partition based upon the key. You can change Kafka’s partitioning logic by defining in the Kafka Connect worker configuration the name of the Partitioner implementation.
Be aware that Kafka only maintains total order for events written to a single topic partition. Partitioning the events by key does mean that all events with the same key will always go to the same partition, ensuring that all events for a specific document are always totally ordered.

继续寻找其他的变通方法,Debezium Connector for MongoDB :: Partitions 文档中写到可以通过设置 Kafka connect worker 的Partitioner设置来指定分区策略。也就是 Apache Kafka 文档中提到的partitioner.class

partitioner.class: Partitioner class that implements the org.apache.kafka.clients.producer.Partitioner interface.
Type: classDefault: org.apache.kafka.clients.producer.internals.DefaultPartitioner

即需要自己通过继承Partitioner接口来实现自己的分区策略,要实现partition方法

1
2
3
 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
...
}

实现了之后将包含该类的JAR包放入Kafka connect能扫描的路径下,再在connect-standalone.properties或者connect-distributed.properties配置文件中进行全局的设定。可以参见下面两个链接👇:

那么试想一下我们的方案是不是就可以实现为,将toUserId和时间戳拼接为唯一性的_id,在partition函数中将toUserId提取出来并以此作为partition key进行分区以实现在kafka中保证顺序的目的。但是这样的实现方案也存在问题,比如这个Kafka connect 仅来为这个任务服务,因为这种分区策略并不具有通用性,如果有多个类似的需求则要部署多份Kafka connect。同时也存在一些好处,例如拼接的_id是可以直接作为Mongodb的shard key来对该场景的上游进行横向扩展的。可惜的是debezium的实现并不能保证shard cluster传入kafka时保证事件的发生顺序,虽然能将拼接的_id作为shard key,但是由于这种架构并没有能力保证顺序性,所以这种扩展也是无效的,参见👇:
Debezium Connector for MongoDB :: MongoDB sharded cluster

综上在该架构中如果想在Kafka层保证事件的有序性是非常困难,并且很不经济实惠。

通过上文的分析,我们不得不接受一个事实,Flink消费到的是乱序数据。但是因为其特性,能较为方便的对乱序数据进行处理。
在debezium收集到的oplog中,包含两个时间戳,一个是在Value payload中的ts_ms,表示的是debezium收集该oplog时的系统时间;另外一个是在Value payload中的source.ts_ms,表示的是mongodb产生oplog的时间。而我们的实体类中也写入了Create bill的时间createdAt,由于只关心Insert事件,所以在Value payload的after.createdAt中能获取Bill创建的真实时间。在使用过程中一般以第二个时间或者第三个时间为准,在没有采用shard cluster的mongodb中,我认为第二个时间的先后次序应该与第三个时间的先后次序相同;采用了shard cluster的mongodb中,应该以第三个时间为准。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"payload": {
"after": ...,
"patch": null,
"source": {
"version": "1.0.3.Final",
"connector": "mongodb",
"name": "cdc_test",
"ts_ms": 1558965508000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "bill",
"ord": 31,
"h": 1546547425148721999
},
"op": "r",
"ts_ms": 1558965515240
}

在Flink的时间概念中也有三种时间:

  • 事件时间:独立事件在产生它的设备上发生的事件,通常在进入Flink之前就已经嵌入到事件中。
  • 接入时间:数据进入Flink的时间,取决于Source Operator所在主机的系统时钟。
  • 处理时间:数据在操作算子计算过程中获取到的所在主机时间。

显然在该场景下应该选择Bill的创建时间作为Flink的事件时间:

1
2
3
4
5
6
7
env.addSource(Utils.providesAvroKafkaConsumer(kafkaConfig))
.map(recordTuple => AvroMongoOplog.newInstance(recordTuple._1.asInstanceOf[Record], recordTuple._2.asInstanceOf[Record]))
.filter(mongoOplog => mongoOplog.getOpType.eq(MongoOpType.Insert))
.assignAscendingTimestamps(mongoOplog => mongoOplog.getDocument.getLong("createdAt"))
.keyBy(mongoOplog => mongoOplog.getDocument.get("toUserId"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(...)
  • 连接Kafka,使用实现的KeyedAvroDeserializationSchema进行反序列化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
    import org.apache.avro.generic.GenericRecord
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
    import org.apache.kafka.clients.consumer.ConsumerRecord

    @SerialVersionUID(1584533572114L)
    class KeyedAvroDeserializationSchema extends KafkaDeserializationSchema[(GenericRecord, GenericRecord)] {

    var keyDeserializer: ConfluentRegistryAvroDeserializationSchema[GenericRecord] = _
    var valueDeserializer: ConfluentRegistryAvroDeserializationSchema[GenericRecord] = _

    def this(topic: String, schemaRegistry: String) {
    this()
    val keySubject = topic + "-key"
    val valueSubject = topic + "-value"
    val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistry, 1000)
    val keySchema = schemaRegistryClient.getByID(schemaRegistryClient.getLatestSchemaMetadata(keySubject).getId)
    val valueSchema = schemaRegistryClient.getByID(schemaRegistryClient.getLatestSchemaMetadata(valueSubject).getId)
    keyDeserializer = ConfluentRegistryAvroDeserializationSchema.forGeneric(keySchema, schemaRegistry)
    valueDeserializer = ConfluentRegistryAvroDeserializationSchema.forGeneric(valueSchema, schemaRegistry)
    }

    override def isEndOfStream(t: (GenericRecord, GenericRecord)): Boolean = false

    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (GenericRecord, GenericRecord)
    = (keyDeserializer.deserialize(consumerRecord.key()), valueDeserializer.deserialize(consumerRecord.value()))

    override def getProducedType: TypeInformation[(GenericRecord, GenericRecord)] = createTypeInformation[(GenericRecord, GenericRecord)]
    }
  • 通过一个工具类将(Record,Record)转化为更好处理的MongoOplogEntry类型

  • 过滤出所有的Insert事件

  • Bill的创建时间作为Flink的事件时间

  • 按照toUserId进行分区

  • 设置10秒的时间窗口

  • 在窗口处理函数中对所有MongoOplogEntry对象按照createdAt再排序后依次触发推送服务

最后

以上介绍了在Mongodb CDC过程中保证数据有序性的两种思路,一种是在Kafka中就保证toUserId相同的数据均有序,这样在消费过程中不需要做窗口计算,只要在需要partition的地方继续使用toUserId进行partition,就能保证数据有序性。这种方案在Kafka connect侧需要做很多的工作,但是能为流式任务带来更好的消费性能,但是由于debezium的局限性,在shard cluster的Mongodb中不能发挥作用。第二种是让Flink消费乱序数据,使用其本身的事件时间窗口计算来重新纠正数据。这种方案会让Flink的效率大打折扣,并且需要保证窗口缓存数据不能超过限制,但是比较通用,使用的时候也需要注意迟到的数据该如何处理。大家可以根据自己的场景来挑选方案,或者如果有更好的方案欢迎一起交流。

Comments

⬆︎TOP