背景

最近迁移数据库的时候发现了一个问题,相信也是很多 MongoDB 使用者都会遇到的问题。我在使用 MongoSpark 批量的写入数据的时候会造成严重的数据库抖动。主要的原因是现在大多 MongoDB 的配置都是 replica set 模式,这样写操作只会到 Master 节点上,写操作就会成为整个系统的瓶颈,大量的写操作会使 Master 节点的读操作变慢,并且会让 secondary 节点同步速度变慢,从而出现从 secondary 节点上读到老数据的问题。

而通过对 MongoDB 进行 shard 是一个代价非常昂贵且只能线性提高写吞吐的方法,非常的不经济实惠。所以我们也只能牺牲速率来保证线上服务的稳定。所以解决方案就是在 MongoSpark 中添加上写限速功能。

Guava.RateLimiter

选择的限速器是 Guava 提供的令牌桶。算法的原理很简单,会定时的向桶中放置令牌,服务只有获得令牌以后才能进行后续的操作,比如希望对一个服务进行每秒10次的限速,那么每秒中就会向桶中放置10个令牌。而获取的方式有两种,一种是阻塞等待令牌或者取不到令牌直接返回失败。

那么下面就简单的介绍一下API,主要是分两个动作,创建桶和获取令牌:

创建令牌桶

  • RateLimiter create(double permitsPerSecond): 根据一个稳定的速率创建令牌桶,也就是每秒插入 permitsPerSecond 个令牌。
  • RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit): 通过一个预热时间段达到稳定的速率创建令牌桶,在 warmupPeriod 这段时间内每秒令牌数量平稳的爬升至 permitsPerSecond,而后保持稳定。这里的速率也是值每秒插入 permitsPerSecond 个令牌。

获取令牌

  • void acquire(): 获取一个令牌,会一直阻塞等待直到拿到。
  • void acquire(int permits): 获取 permits 个令牌,会一直阻塞等到直到全部拿到。
  • boolean tryAcquire(): 获取一个令牌,成功则返回 true,失败直接返回 false。
  • boolean tryAcquire(long timeout, TimeUnit unit): 获取一个令牌,成功则返回 true,没有足够的令牌时等到 timeout 时间,如果还没有则返回 false。
  • boolean tryAcquire(int permits): 获取 permits 个令牌,成功则返回 true,失败直接返回 false。
  • boolean tryAcquire(int permits, long timeout, TimeUnit unit): 获取 permits 个令牌,成功则返回 true,没有足够的令牌时等到 timeout 时间,如果还没有则返回 false。

通过上面的API可以很灵活的对 RateLimiter 进行使用,但是在阅读源码的过程中,我也发现了 Guava 的实现中有一个不尽如人意的地方。那就是限制能力只能在每秒多少个令牌桶,但是我想实现将少一秒的剩余令牌留给下一秒继续用,也就是几秒甚至更高时间单位的限流是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class Bursty extends RateLimiter {
Bursty(RateLimiter.SleepingTicker ticker) {
super(ticker, null);
}

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
this.maxPermits = permitsPerSecond;
this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
}

long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
}

在上面的代码中,Bursty 就是始终保持平稳速率的令牌桶类,其中 maxPermits 是桶中最多有多少的令牌,storedPermits 是现在桶中的令牌个数。可以看到maxPermits 始终等于 permitsPerSecond,是不能乘上时间系数的,并且在重新设置 maxPermits 后会按照比例缩放之前的桶中令牌数量。

MongoSpark.save

save() 方法是我们需要修改的主要方法,但是在 MongoSpark 中又存在多种的 save 方法,我们需要分别为这些 save 方法加上限流功能,或者你已经很明确将使用的函数。

在这之前,我们需要做一些准备工作,既然要限流,那我们肯定需要一个参数来控制流速,而在 MongoSpark 中是有一个配置类供我们设置参数的,我们需要修改一下 WriteConfig 这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* Write Configuration for writes to MongoDB
*
* @param databaseName the database name
* @param collectionName the collection name
* @param connectionString the optional connection string used in the creation of this configuration.
* @param replaceDocument replaces the whole document, when saving a Dataset that contains an `_id` field.
* If false only updates / sets the fields declared in the Dataset.
* @param maxBatchSize the maxBatchSize when performing a bulk update/insert. Defaults to 512.
* @param localThreshold the local threshold in milliseconds used when choosing among multiple MongoDB servers to send a request.
* Only servers whose ping time is less than or equal to the server with the fastest ping time plus the local
* threshold will be chosen.
* @param writeConcernConfig the write concern configuration
* @param shardKey an optional shardKey in extended json form: `"{key: 1, key2: 1}"`. Used when upserting DataSets in sharded clusters.
* @param forceInsert if true forces the writes to be inserts, even if a Dataset contains an `_id` field. Default `false`.
* @param ordered configures the bulk operation ordered property. Defaults to true.
* @param secondLatch the maxBatchSize when performing a bulk update/insert per second per partition.
* @since 1.0
*/
case class WriteConfig(
databaseName: String,
collectionName: String,
connectionString: Option[String] = None,
replaceDocument: Boolean = WriteConfig.DefaultReplaceDocument,
maxBatchSize: Int = WriteConfig.DefaultMaxBatchSize,
localThreshold: Int = MongoSharedConfig.DefaultLocalThreshold,
writeConcernConfig: WriteConcernConfig = WriteConcernConfig.Default,
shardKey: Option[String] = None,
forceInsert: Boolean = WriteConfig.DefaultForceInsert,
ordered: Boolean = WriteConfig.DefautOrdered,
secondLatch: Option[Int] = None
) extends MongoCollectionConfig with MongoClassConfig {
require(maxBatchSize >= 1, s"maxBatchSize ($maxBatchSize) must be greater or equal to 1")
require(localThreshold >= 0, s"localThreshold ($localThreshold) must be greater or equal to 0")
require(Try(connectionString.map(uri => new ConnectionString(uri))).isSuccess, s"Invalid uri: '${connectionString.get}'")
require(Try(shardKey.map(json => BsonDocument.parse(json))).isSuccess, s"Invalid shardKey: '${shardKey.get}'")

type Self = WriteConfig

override def withOption(key: String, value: String): WriteConfig = WriteConfig(this.asOptions + (key -> value))

override def withOptions(options: collection.Map[String, String]): WriteConfig = WriteConfig(options, Some(this))

override def asOptions: collection.Map[String, String] = {
val options = mutable.Map("database" -> databaseName, "collection" -> collectionName,
WriteConfig.replaceDocumentProperty -> replaceDocument.toString,
WriteConfig.localThresholdProperty -> localThreshold.toString,
WriteConfig.forceInsertProperty -> forceInsert.toString) ++ writeConcernConfig.asOptions
connectionString.map(uri => options += (WriteConfig.mongoURIProperty -> uri))
shardKey.map(json => options += (WriteConfig.shardKeyProperty -> json))
secondLatch.map(number => options += (WriteConfig.secondLatchProperty -> number.toString))
options.toMap
}

override def withOptions(options: util.Map[String, String]): WriteConfig = withOptions(options.asScala)

override def asJavaOptions: util.Map[String, String] = asOptions.asJava

/**
* The `WriteConcern` that this config represents
*
* @return the WriteConcern
*/
def writeConcern: WriteConcern = writeConcernConfig.writeConcern
}

这是修改以后的 WriteConfig 类代码,我们添加上了一个 secondLatch 的参数作为流速控制参数。在使用的时候可以:

1
2
3
4
val writeConfig = new WriteConfig(conf.mongoDatabase, conf.mongoCollection)
.withOption(WriteConfig.replaceDocumentProperty, conf.replaceDocument.toString)
.withOption(WriteConfig.mongoURIProperty, conf.mongoUri)
.withOption(WriteConfig.secondLatchProperty, conf.secondLatch.toString)

通过 withOption() 的方法设定 secondLatch 参数,然后我们跟踪一下上面的 withOption() 方法的实现,是通过一个 WriteConfig(options: util.Map[String, String]) 的构造函数进行了构造。所以也需要修改这个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
override def apply(options: collection.Map[String, String], default: Option[WriteConfig]): WriteConfig = {
val cleanedOptions = stripPrefix(options)
val cachedConnectionString = connectionString(cleanedOptions)
val defaultDatabase = default.map(conf => conf.databaseName).orElse(Option(cachedConnectionString.getDatabase))
val defaultCollection = default.map(conf => conf.collectionName).orElse(Option(cachedConnectionString.getCollection))

WriteConfig(
databaseName = databaseName(databaseNameProperty, cleanedOptions, defaultDatabase),
collectionName = collectionName(collectionNameProperty, cleanedOptions, defaultCollection),
connectionString = cleanedOptions.get(mongoURIProperty).orElse(default.flatMap(conf => conf.connectionString)),
replaceDocument = getBoolean(cleanedOptions.get(replaceDocumentProperty), default.map(conf => conf.replaceDocument),
defaultValue = DefaultReplaceDocument),
maxBatchSize = getInt(cleanedOptions.get(maxBatchSizeProperty), default.map(conf => conf.maxBatchSize),
DefaultMaxBatchSize),
localThreshold = getInt(cleanedOptions.get(localThresholdProperty), default.map(conf => conf.localThreshold),
MongoSharedConfig.DefaultLocalThreshold),
writeConcernConfig = WriteConcernConfig(cleanedOptions, default.map(writeConf => writeConf.writeConcernConfig)),
shardKey = cleanedOptions.get(shardKeyProperty).orElse(default.flatMap(conf => conf.shardKey).orElse(None)),
forceInsert = getBoolean(cleanedOptions.get(forceInsertProperty), default.map(conf => conf.forceInsert),
defaultValue = DefaultForceInsert),
ordered = getBoolean(cleanedOptions.get(orderedProperty), default.map(conf => conf.ordered), DefautOrdered),
// 流速控制参数
secondLatch = cleanedOptions
.get(secondLatchProperty).orElse(default.flatMap(conf => Try(conf.secondLatch.toString).toOption).orElse(None))
.flatMap(s => Try(s.toInt).toOption)
)
}

可以看到这个构造函数调用的是一个要传递所有参数的构造函数进行构造的,所以我们还需要实现这样一个传递所有参数的构造函数,然后加上:

1
2
3
secondLatch = cleanedOptions
.get(secondLatchProperty).orElse(default.flatMap(conf => Try(conf.secondLatch.toString).toOption).orElse(None))
.flatMap(s => Try(s.toInt).toOption)

由于 options 的 value 都是字符串,所以这边需要转一下 Int,传递所有参数的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Creates a WriteConfig
*
* @param databaseName the database name
* @param collectionName the collection name
* @param connectionString the optional connection string used in the creation of this configuration
* @param replaceDocument replaces the whole document, when saving a Dataset that contains an `_id` field.
* If false only updates / sets the fields declared in the Dataset.
* @param maxBatchSize the maxBatchSize when performing a bulk update/insert. Defaults to 512.
* @param localThreshold the local threshold in milliseconds used when choosing among multiple MongoDB servers to send a request.
* Only servers whose ping time is less than or equal to the server with the fastest ping time plus the local
* threshold will be chosen.
* @param writeConcern the WriteConcern to use
* @param shardKey an optional shardKey in extended form: `"{key: 1, key2: 1}"`. Used when upserting DataSets in sharded clusters.
* @param forceInsert if true forces the writes to be inserts, even if a Dataset contains an `_id` field. Default `false`.
* @param ordered configures if the bulk operation is ordered property.
* @param secondLatch the maxBatchSize when performing a bulk update/insert per second per partition.
* @return the write config
* @since jike-1.0.0
*/
def apply(databaseName: String, collectionName: String, connectionString: Option[String], replaceDocument: Boolean, maxBatchSize: Int,
localThreshold: Int, writeConcern: WriteConcern, shardKey: Option[String], forceInsert: Boolean, ordered: Boolean, secondLatch: Option[Int]): WriteConfig = {
apply(databaseName, collectionName, connectionString, replaceDocument, maxBatchSize, localThreshold, WriteConcernConfig(writeConcern),
shardKey, forceInsert, ordered, secondLatch)
}

使用 case class 提供的默认构造函数进行构造,至此我们就能愉快的对包含 secondLatch 字段的 WriteConfig 进行构造了。

Save Method

然后我们就需要分别对多个 save() 添加限速器,原理都大同小异,就是在 foreachPartition 的函数中构建令牌桶,然后在 foreach 的写Mongodb 函数之前进行阻塞的令牌获取,这里就展示常用的 Datasets 和 RDD 类型 save 方法的修改:

save[D] (dataset: Dataset[D]): Unit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Save data to MongoDB
*
* '''Note:''' If the dataFrame contains an `_id` field the data will upserted and replace any existing documents in the collection.
*
* @param dataset the dataset to save to MongoDB
* @param writeConfig the writeConfig
* @tparam D
* @since 1.1.0
*/
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
// **INIT RateLimiter
var rateLimiter: Option[RateLimiter] = None
if (writeConfig.secondLatch.isDefined) {
// If secondLatch < maxBatchSize, it will destroy rate limit rule.
val permitSize = if (writeConfig.secondLatch.get >= writeConfig.maxBatchSize) (writeConfig.secondLatch.get / writeConfig.maxBatchSize).floor else 1
rateLimiter = Option.apply(RateLimiter.create(permitSize))
}
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
// **Acquire
if (rateLimiter.isDefined) {
rateLimiter.get.acquire(1)
}
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava, new BulkWriteOptions().ordered(writeConfig.ordered))
})
})
})
}
}

save[D: ClassTag] (rdd: RDD[D]): Unit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Save data to MongoDB
*
* @param rdd the RDD data to save to MongoDB
* @param writeConfig the writeConfig
* @tparam D the type of the data in the RDD
*/
def save[D: ClassTag](rdd: RDD[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
rdd.foreachPartition(iter => if (iter.nonEmpty) {
// **INIT RateLimiter
var rateLimiter: Option[RateLimiter] = None
if (writeConfig.secondLatch.isDefined) {
// If secondLatch < maxBatchSize, it will destroy rate limit rule.
val permitSize = if (writeConfig.secondLatch.get >= writeConfig.maxBatchSize) (writeConfig.secondLatch.get / writeConfig.maxBatchSize).floor else 1
rateLimiter = Option.apply(RateLimiter.create(permitSize))
}
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
// **Acquire
if (rateLimiter.isDefined) {
rateLimiter.get.acquire(1)
}
collection.insertMany(
batch.toList.asJava,
new InsertManyOptions().ordered(writeConfig.ordered)
)
})
})
})
}

需要注意的是不要在 Driver 上去初始化 RateLimiter,还要注意与 WriteConfig 中已存在的 maxBatchSize 参数的关系。

到这里就能愉快的进行限速了。

Comments

⬆︎TOP