Environment

  • Spark version: 2.2.1
  • EMR: Amazon EMR
    • Master: m4.xlarge [8vCore, 16GB] * 1
    • Task: r4.xlarge[4vCore, 30.5GB] * (0-20)
    • Core: r4.xlarge[4vCore, 30.5GB] * (1-15)
  • Mongo Collection:
    • A: 73.4G
    • B: 28.7G

Task 和 Core 都是 Auto Scaling, B表与A表是一对多的关系。

操作非常简单,从 Mongo 中分别读取 A , B 表。再将两表 join 后,选取字段,存入一个已经按照 C 字段 shard 的 Mongo 当中。并且 C 字段不是 _id

Trap

Full Scan

在 MongoSpark 中如果使用 schema,并且在 schema 中对一些参数设置了 nullable=false 会出现在 NodeManager 进行 sample partition统计的时候需要使用这个 filter 条件对全表进行 scan,所以如果有些字段没有索引的话,会发现 load 数据的时间特别长。(还好 Dreamsome 踩过这个坑,不然不知道猴年马月能发现。

1
2
3
4
5
6
7
8
val count = if (matchQuery.isEmpty) {
results.getNumber("count").longValue()
} else {
connector.withCollectionDo(readConfig, { coll: MongoCollection[BsonDocument] => coll.countDocuments(matchQuery) })
}
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue()
val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt
val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt

mongo-spark/MongoSamplePartitioner.scala at master · mongodb/mongo-spark · GitHub

源码中可以看到,如果不包含 matchQuery 是没有问题的,如果有的话会使用 matchQuery 进行 count。

NodeManger Restart

在任务执行中间,偶尔会出现 java.lang.RuntimeException: Executor is not registered 的报错。查看后主要原因是因为 NodeManager 在任务运行中挂掉重启以后,本来在它管理下的 Executor 没有办法重现注册导致的。但是看到 Spark 社区有人报这个bug,并且被标记为在 1.6.0 版本已经 fix 了。黑人问号脸。

SPARK-9439 ExternalShuffleService should be robust to NodeManager restarts in yarn - ASF JIRA

Mongo Spark Upsert

Mongo 中的 Upsert 操作不是原子操作,所以在两个线程同时 upsert 一个不存在的 _id 时,是可能出现报错的。

1
2
3
4
5
6
7
8
To prevent MongoDB from inserting the same document more than once, create a unique index on the name field. With a unique index, if multiple applications issue the same update with upsert: true, exactly one update() would successfully insert a new document.

The remaining operations would either:

1. update the newly inserted document, or

2. fail when they attempted to insert a duplicate.
If the operation fails because of a duplicate index key error, applications may retry the operation which will succeed as an update operation.

db.collection.update() — MongoDB Manual

以上是官方文档中的说明,可能出现同时插入时,后一个 upsert 报错的情况。而对于这种情况来说,使用 Mongo spark 是很难处理的,没办法 catch 住后一个异常。先来看看 Mongo spark 的 save 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
val updateOptions = new UpdateOptions().upsert(true)
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, updateOptions)
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), updateOptions)
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava)
})
})
})
}
}

在集合一些 ReplaceOneModel, UpdateOneModel, InsertOneModel 最后调用的是 collection.bulkWrite 方法。对使用者来说是没有办法 catch 其中一条的异常的,所以可能导致整个 task 失败重试。在设计上就应该尽量规避有两个 partition 同时 upsert 一个 _id 对象的情况。

Mongo Shard

当 mongo 的 shard key 不是 _id,而是其他 filed 的时候,会出现同一个 _id 的多个元素被写入到数据库中。因为 mongo 内部只能保证在一个 shard 中的 collection 强制的唯一性。并且在进行 shard 的时候,会自动对 shard 进行索引,但是不会创建唯一性索引。所以在可能会出现多个同一 _id 的情况下,需要注意。

mongodb - Duplicate documents on _id (in mongo) - Stack Overflow

而且经过实验发现,在 Mongo Spark save 的过程当中需要指定 WriteConfig 中的 shardKey ,而且必须包含 {_id: 1} ,不然会报错。比如 shardKeyuser,需要写成 {_id: 1, user: 1}。这是因为 user 并不是数据库的 unique index,而 _id 在这个 shard 中是 unique index 并且是 immutable 的, 所以如果我用 user 做 query 条件去更新 _id 就会出错。

Yarn Resource

在分 Executor Container 的时候是一台物理机器一台的分,所以可能出现内存碎片。比如一台16G内存的机器,4.5G一个 Executor Container,那么只能产生3个Executor Container,还剩下 2.5G 的内存不够启一个 Executor Container,所以产生碎片。并且这个碎片是不会在 Yarn UI 中表现出来, 所以会导致在 UI 中出现 total <> used + reserved 的情况。

并且每个 Executor Container 的内存使用不只是通过 spark.executor.memory 设置的大小,会有多余的内存来作为 Container 的运行使用。

Optimization

Spark Join Shuffle

Spark 在进行 join 操作的之前会对 join key 进行 repartition。而 Mongo Spark 在从 Mongo 中读取数据的时候会使用 _id 进行 partition,这样会多做一次较为耗时的工作。可以在 MongoSpark 读取数据的时候直接通过 join key 进行 partition

但是 MongoSpark 中没有支持一种 partition 策略,保证一些 join key 对应的 Document 全部分在一个 partition 当中,基本都是按照 partitionSize 结合 join key 来做切分,所以需要自己实现,并且如果数据分布不均匀的话可能导致数据倾斜而造成内存问题。所以需要对自己的数据集有一定认识以后再选取合适的方法。

下文中有详细的指出各种 partitioner 的策略:
mongo-spark/2-configuring.md at master · mongodb/mongo-spark · GitHub

并且如果是一个小的集合和另外一个大的集合进行 join 的时候,可以考虑 broadcast join 通过将小的集合广播到其他 Excutor 上的形式来避免 shuffle。

EMR Auto Scaling

EMR 的自动收缩会导致一些并没有完成所有 Task 的机器被回收,导致一些机器重启,而之上的所有 Excuter 执行的任务都需要重新运行。如果需要依赖 cache 的任务还需要重跑上游 Task,在跑大体量的任务的时候,不应该再把这个风险引入。

Resource assignment

Spark性能优化:资源调优篇 – 过往记忆
浅谈Spark应用程序的性能调优 | 青云QingCloud 社区

Comments

⬆︎TOP