「@JIKE」 Data engineer/Code player
post @ 2018-10-16

Environment

  • Spark version: 2.2.1
  • EMR: Amazon EMR
    • Master: m4.xlarge [8vCore, 16GB] * 1
    • Task: r4.xlarge[4vCore, 30.5GB] * (0-20)
    • Core: r4.xlarge[4vCore, 30.5GB] * (1-15)
  • Mongo Collection:
    • A: 73.4G
    • B: 28.7G

Task 和 Core 都是 Auto Scaling, B表与A表是一对多的关系。

操作非常简单,从 Mongo 中分别读取 A , B 表。再将两表 join 后,选取字段,存入一个已经按照 C 字段 shard 的 Mongo 当中。并且 C 字段不是 _id

Trap

Full Scan

在 MongoSpark 中如果使用 schema,并且在 schema 中对一些参数设置了 nullable=false 会出现在 NodeManager 进行 sample partition统计的时候需要使用这个 filter 条件对全表进行 scan,所以如果有些字段没有索引的话,会发现 load 数据的时间特别长。(还好 Dreamsome 踩过这个坑,不然不知道猴年马月能发现。

1
2
3
4
5
6
7
8
val count = if (matchQuery.isEmpty) {
results.getNumber("count").longValue()
} else {
connector.withCollectionDo(readConfig, { coll: MongoCollection[BsonDocument] => coll.countDocuments(matchQuery) })
}
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue()
val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt
val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt

mongo-spark/MongoSamplePartitioner.scala at master · mongodb/mongo-spark · GitHub

源码中可以看到,如果不包含 matchQuery 是没有问题的,如果有的话会使用 matchQuery 进行 count。

NodeManger Restart

在任务执行中间,偶尔会出现 java.lang.RuntimeException: Executor is not registered 的报错。查看后主要原因是因为 NodeManager 在任务运行中挂掉重启以后,本来在它管理下的 Executor 没有办法重现注册导致的。但是看到 Spark 社区有人报这个bug,并且被标记为在 1.6.0 版本已经 fix 了。黑人问号脸。

SPARK-9439 ExternalShuffleService should be robust to NodeManager restarts in yarn - ASF JIRA

Mongo Spark Upsert

Mongo 中的 Upsert 操作不是原子操作,所以在两个线程同时 upsert 一个不存在的 _id 时,是可能出现报错的。

1
2
3
4
5
6
7
8
To prevent MongoDB from inserting the same document more than once, create a unique index on the name field. With a unique index, if multiple applications issue the same update with upsert: true, exactly one update() would successfully insert a new document.

The remaining operations would either:

1. update the newly inserted document, or

2. fail when they attempted to insert a duplicate.
If the operation fails because of a duplicate index key error, applications may retry the operation which will succeed as an update operation.

db.collection.update() — MongoDB Manual

以上是官方文档中的说明,可能出现同时插入时,后一个 upsert 报错的情况。而对于这种情况来说,使用 Mongo spark 是很难处理的,没办法 catch 住后一个异常。先来看看 Mongo spark 的 save 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
val updateOptions = new UpdateOptions().upsert(true)
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, updateOptions)
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), updateOptions)
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava)
})
})
})
}
}

在集合一些 ReplaceOneModel, UpdateOneModel, InsertOneModel 最后调用的是 collection.bulkWrite 方法。对使用者来说是没有办法 catch 其中一条的异常的,所以可能导致整个 task 失败重试。在设计上就应该尽量规避有两个 partition 同时 upsert 一个 _id 对象的情况。

Mongo Shard

当 mongo 的 shard key 不是 _id,而是其他 filed 的时候,会出现同一个 _id 的多个元素被写入到数据库中。因为 mongo 内部只能保证在一个 shard 中的 collection 强制的唯一性。并且在进行 shard 的时候,会自动对 shard 进行索引,但是不会创建唯一性索引。所以在可能会出现多个同一 _id 的情况下,需要注意。

mongodb - Duplicate documents on _id (in mongo) - Stack Overflow

而且经过实验发现,在 Mongo Spark save 的过程当中需要指定 WriteConfig 中的 shardKey ,而且必须包含 {_id: 1} ,不然会报错。比如 shardKeyuser,需要写成 {_id: 1, user: 1}。这是因为 user 并不是数据库的 unique index,而 _id 在这个 shard 中是 unique index 并且是 immutable 的, 所以如果我用 user 做 query 条件去更新 _id 就会出错。

Yarn Resource

在分 Executor Container 的时候是一台物理机器一台的分,所以可能出现内存碎片。比如一台16G内存的机器,4.5G一个 Executor Container,那么只能产生3个Executor Container,还剩下 2.5G 的内存不够启一个 Executor Container,所以产生碎片。并且这个碎片是不会在 Yarn UI 中表现出来, 所以会导致在 UI 中出现 total <> used + reserved 的情况。

并且每个 Executor Container 的内存使用不只是通过 spark.executor.memory 设置的大小,会有多余的内存来作为 Container 的运行使用。

Optimization

Spark Join Shuffle

Spark 在进行 join 操作的之前会对 join key 进行 repartition。而 Mongo Spark 在从 Mongo 中读取数据的时候会使用 _id 进行 partition,这样会多做一次较为耗时的工作。可以在 MongoSpark 读取数据的时候直接通过 join key 进行 partition

但是 MongoSpark 中没有支持一种 partition 策略,保证一些 join key 对应的 Document 全部分在一个 partition 当中,基本都是按照 partitionSize 结合 join key 来做切分,所以需要自己实现,并且如果数据分布不均匀的话可能导致数据倾斜而造成内存问题。所以需要对自己的数据集有一定认识以后再选取合适的方法。

下文中有详细的指出各种 partitioner 的策略:
mongo-spark/2-configuring.md at master · mongodb/mongo-spark · GitHub

并且如果是一个小的集合和另外一个大的集合进行 join 的时候,可以考虑 broadcast join 通过将小的集合广播到其他 Excutor 上的形式来避免 shuffle。

EMR Auto Scaling

EMR 的自动收缩会导致一些并没有完成所有 Task 的机器被回收,导致一些机器重启,而之上的所有 Excuter 执行的任务都需要重新运行。如果需要依赖 cache 的任务还需要重跑上游 Task,在跑大体量的任务的时候,不应该再把这个风险引入。

Resource assignment

Spark性能优化:资源调优篇 – 过往记忆
浅谈Spark应用程序的性能调优 | 青云QingCloud 社区

Read More
post @ 2018-08-19

Target

在不暂停服务的前提下将一个 290G 的 Mongo 数据表中的部分字段迁移到 Postgresql 数据库,保证足够低的差异性。

Strategy

一切的工作的基础是 MongoDB oplog 的幂等原则:

  • 首先启动一个服务将 MongoDB 中实时生产的 oplog 同步到 kafka 中。
  • 启动 Spark 任务将目标表中的字段批量同步到 Postgresql 中。
  • 启动 Storm 服务将 kafka 中的 oplog 回放到 Postgresql 中。
  • 启动 Spark 任务进行一致性的检查。

a1-1

必须保证第一个过程在第二个过程之前启动,第三个任务在第二个任务完成之后启动。任务流程如图,oplog 被同步到 kafka 的一个 topic 中,这个 topic 被分成了8个 partition,启动了 3个 broker。然后将这个 kafka topic 作为 Storm 任务的 KafkaSpout,并发数为4。下游进行持久化任务的 Bolt 的并发数也为4。

同步 Oplog

这一步需要注意的是同一个 _id 的 oplog 的乱序问题,我们在回放 oplog 的时候必须按照发生的时间顺序进行回放,不然会出现丢失数据的情况。所以一定要保证 oplog 是按照生成的顺序放入 kafka 队列中的。在这种情况下肯定是单线程的服务来同步数据更加的合适,就不需要担心由于并发带来的乱序问题。

第二个可能导致乱序的点是 oplog 在 kafka 中的存储位置,我们为了保证同一个 _id 的对象 oplog 不乱序,那么必须保证它们被存储在同一个 partition 当中。如果存储在不同的 partition 当中的话可能会在 Storm 的不同 Spout Consumer thread 中被处理,那么就有可能会出现创建时间在后面的 oplog 先被回放到 Postgresql 当中。而如何能保证同一个 _id 的对象放入到同一个 kafka partition 当中?只需要将 _id 作为 kafka message 的 key,因为 kafka 的 partition 机制就是如果有 message key 就按照 message key 进行 hash 以后进行分区。参考源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

如果消息的 key 为 null,则先根据 topic 名获取上次计算分区时使用的一个整数并加一。然后判断 topic 的可用分区数是否大于0,如果大于0则使用获取的 nextValue 的值和可用分区数进行取模操作。如果 topic 的可用分区数小于等于0,则用获取的 nextValue 的值和总分区数进行取模操作(随机选择了一个不可用分区)

如果消息的 key 不为 null,就是根据 hash 算法 murmur2 算出 key 的 hash 值,然后和分区数进行取模运算。

Storm 回放

乱序

现在我们已经保证同 _id 的消息会进入到同一个 Storm Spout thread 当中,现在还需要保证在发送到 Bolt Task 的时候也进入到同一个。这就需要考虑 Storm 的 grouping 策略了,其中只有 FieldGrouping 能满足要求,FieldGrouping 是通过 parentBolt 发出的 stream 当中声明的某一个或者几个 field 来做 grouping,比如 parentBolt 发出的 stream 中有:usernamepassword 两个 field,而在声明 FieldGrouping 的时候设置按照 username 这个 field 来做 grouping。那么 username 相同的stream 则一定会进入到同一个 BoltThread 当中。

但是这次的设计当中只有1个 Bolt,它的父级 Bolt 是一个 Spout。而没有找到合适的api来对一个 Spout 的 stream 声明 field,默认只有一个 bytes field。通过分析发现在 KafkaSpout 类中有一个 public 方法:

1
2
3
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(this._spoutConfig.scheme.getOutputFields();
}

是用来声明 Fields 的,通过参看文档,没有携带 streamName 参数的 declare 方法会声明一个 streamNamedefault 的 stream。

所以我们只要使用自己的 scheme ,并且重载它的 getOutputFields 参数即可完成对这个 Spout 的 stream 声明 field。并且发现 KafkaConfig 类中 stream 是一个 MultiScheme 类型的 public 参数,构造函数里使用 RawMultiScheme 类进行初始化它的值,而该类中的 getOutputFields 方法:

1
2
3
public Fields getOutputFields() {
return new Fields(new String[]{"bytes"});
}

所以默认的 Fieldsbytes。那么只要继承 backtype.storm.spout.Scheme 类重载 getOutputFieldsdeserialize 即可。由于传入 Spout 的要么是 bytes,要么是一个 String,所以如果不重载 deserialize 方法对其进行反序列化,那么设定的 field 也是没有实际意义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
try {
...
} catch (Exception ex) {
return null;
}
}

@Override
public Fields getOutputFields() {
return new Fields(...);
}
}

网络不稳定性

上面说了怎么保证 Storm 内部的不乱序问题,而仅凭以上的方法还不足以保证数据的完整性。因为在 Storm 内部传输信息是需要通过网络进行传输,所以数据是有丢失的风险的。而 Storm 本身为了应对这种风险也设立了一种容错机制。具体可以参考: Guaranteeing Message Processing

总得来说就是 Storm 内部通过 SpoutOutputCollector 给每一个从 Spout 发出的 tuple 给定一个 id,如果在设定的 timeout 时间内没有完成这个 tuple 应该完成的所有 task,那么会发出一个 fail 信号让 Storm 通过这个 id 来对这个 tuple 进行重试。而使用者仅需要设定 timeout 时间,并且在 collector.emit 的时候将 tuple 作为第一个参数。

TIPS

  • 无论是离线的批量导入还是在线的回放,insertupdate 操作需要使用 upsert 操作代替,不然会出现大量的由于 id 重复或者 id 不存在的报错。比如在表中有一个 id 为 a 的数据,而在第一步中记录了对 a 的 update 操作和 delete 操作,所以在第二步启动的时候这个 a 数据已经不存在了,而在回放的时候直接使用 update 操作回放会报错。

  • 由于是对表中的部分数据进行迁移,所以在 Bolt 当中需要对 oplog 进行过滤,只对包含目标 field 的 oplog 进行重放,否则会存在大量的垃圾数据。

  • MongoDB oplog 的 offset 最好进行缓存,做好容错工作。

Read More

原始数据类型

  • boolean
  • number
  • string
  • void: 只能为定义为 void 的变量赋值为 undefinednull
  • null / undefined: 是所有类型的子类型
  • any: 定义为 any 的变量可以被赋值为任意类型的值,可以使用任意方法,任意属性也能被访问
  • 联合类型: 可以使用 string | number 来定义联合属性,当不能确定一个联合属性类型的变量到底是哪一个类型的时候,只能访问此联合类型的所有类型里共有的属性或方法

定义的时候没有赋值的变量都会被类型推断为 any

使用 type 关键字可以用来创建类型别名和字符串字面量

1
2
3
type Name = string
type NameResolver = () => string
type NameOrResolver = Name | NameResolver
1
2
3
4
5
6
7
type EventNames = 'click' | 'scroll' | 'mousemove'
function handleEvent(ele: Element, event: EventNames) {
// do something
}

handleEvent(document.getElementById('hello'), 'scroll') // 没问题
handleEvent(document.getElementById('world'), 'dbclick') // 报错,event 不能为 'dbclick'

接口

可选属性

1
2
3
4
interface Person {
name: string,
age?: number,
}

任意属性

1
2
3
4
5
interface Person {
name: string,
age?: number,
[propName: string]: any,
}

一旦定义了任意属性,那么确定属性和可选属性都必须是它的子属性

1
2
3
4
5
6
interface Person {
name: string,
age?: number,
// error
[propName: string]: string,
}

只读属性

1
2
3
4
5
6
interface Person {
readonly id: number,
name: string,
age?: number,
[propName: string]: any,
}

只读属性只能在对象初始化的时候赋值,后面再赋值会报错

接口定义数组

1
2
3
4
5
interface MyArray {
(index: number): number
}

const _myArray: MyArray = [1, 2, 3]

接口定义函数

1
2
3
4
5
6
7
8
interface SearchFunc {
(source: string, subString: string): boolean
}

let mySearch: SearchFunc
mySearch = function(source: string, subString: string) {
return source.search(subString) !== -1
}

接口定义混合类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
interface Counter {
(start: number): string
interval: number
reset(): void
}

function getCounter(): Counter {
let counter = <Counter>function (start: number) { }
counter.interval = 123
counter.reset = function () { }
return counter
}

let c = getCounter()
c(10)
c.reset()
c.interval = 5.0

这样函数也有了自己的属性

函数

1
2
3
4
5
6
7
let mySum = function(x: number, y: number): number {
return x + y
}

let mySum: (x: number, y: number) => number = function(x: number, y: number): number {
return x + y
}

mySum 的类型会通过等号右边的函数表达式推断得到,所以上下两个函数表达式等价
这里需要区分 es6 里的 =>Typescript 中的 =>, ts 中只是用来做函数的定义。

1
2
3
function buildName(firstName: string, lastName: string = 'Cat') {
return firstName + ' ' + lastName
}

TypeScript 会将添加了默认值的参数识别为可选参数,此时就不受 可选参数必须接在必需参数后面 的限制

1
2
3
4
5
6
7
8
function push(array: any[], ...items: any[]) {
items.forEach(function(item) {
array.push(item)
})
}

let a = []
push(a, 1, 2, 3)

剩余参数可以用数组来进行定义

1
2
3
4
5
6
7
8
9
function reverse(x: number): number
function reverse(x: string): string
function reverse(x: number | string): number | string {
if (typeof x === 'number') {
return Number(x.toString().split('').reverse().join(''))
} else if (typeof x === 'string') {
return x.split('').reverse().join('')
}
}

可以将精确的函数定义写在前面,多类型的函数实现写在后面来进行函数的重载。
其实这就是声明的合并,适用于接口、函数、类,合并的属性的类型必须是唯一的。

元组

数组合并了相同类型的对象,而元组合并了不同类型的对象

1
2
let tuple: [string, number] = ['a', 25]
let tuple1: [string, number] = ['a', 25, 'b']

类似第二种赋值,是一种越界的赋值,越界的元素会被定义为元组中每个类型的联合类型。所以第三项的类型为 string | number

多数与 C++ 类特性相识,这里提一点特殊的

1
2
3
4
5
6
7
class Animal {
name = 'Jack'

constructor() {
// ...
}
}

ES7 提案中的实例属性在 Typescript 中实现了,可以不在构造函数当中定义变量

1
2
3
4
5
6
7
class Animal {
static num = 42

constructor() {
// ...
}
}

ES7 提案中的静态属性在这里也是支持的

需要注意的是,Typescript 编译之后的代码中,并没有限制 private 属性在外部的可访问性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class Animal {
public name
public constructor(name) {
this.name = name
}
public abstract sayHi()
}

class Cat extends Animal {
public sayHi() {
console.log(`Meow, My name is ${this.name}`)
}
}

let cat = new Cat('Tom')

抽象类与其他面向对象语言也类似,抽象类不能被实例化,抽象方法必须在子类中进行实现

类与接口

多个类之间的公用方法可以抽象在接口中,然后在多个类中自己实现。这是在保证类只继承自另一个类的时候,还能有很高的灵活性(一个类可以实现多个接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
interface Alarm {
alert()
}

class Door {
}

class SecurityDoor extends Door implements Alarm {
alert() {
console.log('SecurityDoor alert')
}
}

class Car implements Alarm {
alert() {
console.log('Car alert')
}
}

当然接口也是可以继承接口

1
2
3
4
5
6
7
8
interface Alarm {
alert()
}

interface LightableAlarm extends Alarm {
lightOn()
lightOff()
}

接口也可以继承类

1
2
3
4
5
6
7
8
9
10
class Point {
x: number
y: number
}

interface Point3d extends Point {
z: number
}

let point3d: Point3d = {x: 1, y: 2, z: 3}

泛型

1
2
3
4
5
6
7
8
9
function createArray<T>(length: number, value: T): Array<T> {
let result: T[] = [];
for (let i = 0; i < length; i++) {
result[i] = value
}
return result
}

createArray(3, 'x') // ['x', 'x', 'x']

在函数名后先申明泛型,然后在函数的申明和实现中进行使用,类似于 C++ 的模板函数进行使用。更好用的是利用泛型约束来对参数类型进行更灵活的约束。

1
2
3
4
5
6
7
8
9
10
interface Lengthwise {
length: number;
}

function loggingIdentity<T extends Lengthwise>(arg: T): T {
console.log(arg.length);
return arg;
}

loggingIdentity(7) // Error

这样可以约束传入的参数必须包含 length

1
2
3
4
5
6
7
8
9
10
function copyFields<T extends U, U>(target: T, source: U): T {
for (let id in source) {
target[id] = (<T>source)[id];
}
return target;
}

let x = { a: 1, b: 2, c: 3, d: 4 }

copyFields(x, { b: 10, d: 20 })

多个参数也可以互相约束,约束 U 中不会出现 T 中不存在的字段,非常好用。
在使用接口来定义一个函数的输入输出类型的时候,也可以使用泛型来定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 普通
interface SearchFunc {
(source: string, subString: string): boolean;
}

let mySearch: SearchFunc;
mySearch = function(source: string, subString: string) {
return source.search(subString) !== -1;
}

// 泛型
interface CreateArrayFunc {
<T>(length: number, value: T): Array<T>;
}

let createArray: CreateArrayFunc;
createArray = function<T>(length: number, value: T): Array<T> {
let result: T[] = [];
for (let i = 0; i < length; i++) {
result[i] = value;
}
return result;
}

createArray(3, 'x'); // ['x', 'x', 'x']

// 泛型参数提前到接口名上
interface CreateArrayFunc<T> {
(length: number, value: T): Array<T>;
}

let createArray: CreateArrayFunc<any>;
createArray = function<T>(length: number, value: T): Array<T> {
let result: T[] = [];
for (let i = 0; i < length; i++) {
result[i] = value;
}
return result;
}

createArray(3, 'x'); // ['x', 'x', 'x']

同样在泛型也能用于类的定义

1
2
3
4
5
6
7
8
class GenericNumber<T> {
zeroValue: T;
add: (x: T, y: T) => T;
}

let myGenericNumber = new GenericNumber<number>();
myGenericNumber.zeroValue = 0;
myGenericNumber.add = function(x, y) { return x + y; };

只要泛型的申明提前到接口名,类名上的时候就需要在实例化的时候带上泛型的类型
Typescript 2.3 以后,可以使用泛型参数的默认类型

1
2
3
4
5
6
7
function createArray<T = string>(length: number, value: T): Array<T> {
let result: T[] = [];
for (let i = 0; i < length; i++) {
result[i] = value;
}
return result;
}
Read More
post @ 2018-01-31

本篇是我学习 docker 多镜像编排的第一篇博客,首先通过一个实际的例子使用 docekr 原生命令来对应用栈进行编排连接,再通过 docker-compose 工具进行自动化编排。并从中介绍一些原生命令的使用以及相关的底层知识。

AUFS & Volume

参考:

Docker 本身的设计理念就与传统虚拟机不同,Docker 更倾向于进行资源的隔离。而对于文件系统,Docker 使用了 AUFS(Advanced union filesystem) 来进行文件的隔离(这里我觉得更准确的说是写保护)。那我们肯定要先从系统的层面了解 AUFS

其实在多个 Linux 系统发行版中都是有 AUFS 对应的实现方式: mount -t aufs **。其初衷是想将一个不想被修改的文件 A 与另一个空闲空间 B 联合,那么所有对 A 进行的修改都会保存在 B 中,不会改坏原来的东西。当然在这个初衷的刺激下就产生了功能更强大一些的 AUFS 命令,可以将多个文件/文件夹 union 到一个文件/文件夹上,并且可以为这多个文件/文件夹设置权限。

1
$ sudo mount -t aufs -o dirs=./a=rw:./b=ro none ./c
  • 该命令就是将 a 文件夹和 b 文件夹 unionc 文件夹,a 相对于 c 的权限是读写权限,意思就是 ac 各自的改变都会在对方身上显现。而 b 只是可读权限,意思就是 b 修改后,c 能够观察到,但是 c 修改 b 下属的文件不会在 b 中有任何作用。

  • ab 中有同名文件的时候,c 中的该文件依照先后顺序决定,越往前的优先级越高。

  • 当多个 rw 的文件被 union 在一起的,当我们创建文件的时候,会被轮流写到各个文件夹中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    $ sudo mount -t aufs  -o dirs=./1=rw:./2=rw:./3=rw -o create=rr none ./mnt
    $ touch ./mnt/a ./mnt/b ./mnt/c
    $ tree
    .
    ├── 1
    │ └── a
    ├── 2
    │ └── c
    └── 3
    └── b

    当然可以设置一些轮询的策略,比如 create=mfs | most-free-space 选一个可用空间最大的分支; create=mfsrr:low 选一个空间大于 lowbranch

而这个 Docker 本身的文件策略也有一些不足的地方,就是当你删除 Docker 容器并通过该镜像重启的时候,之前的更改将会丢失。所以为了持久化保存数据并且共享容器间的数据,Docker 提出了 Volume 的概念,它可以绕过默认的 AUFS 而已正常的文件或者目录的形式存在于 hostcontainer 当中。

1
$ docker run -it --name debian-test -v ~/Projects/DebianTest/App1:/usr/src/app:rw debian /bin/bash

该命令就在 image 运行的时候初始化了 Volume,将 host~/Projects/DebianTest 文件夹与 container/usr/src/app 文件夹 union 了起来,权限是 rw

之后可以新开一个 terminal ,检查一下是否成功。

1
2
$ docker inspect --format "{{ .Volumes }}" debian-test
// docker inspect 命令可以查看镜像和容器的详细信息,默认列出全部信息,可以通过--format参数来指定输出的模板格式,以便输出特定的信息

但是我发现在我的 Mac 上这个命令会出现一些问题,可以尝试打出全部内容,手动过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ docker inspect | grep Mount -A 10
// -A 限制输出的行数

"Mounts": [
{
"Type": "bind",
"Source": "/Users/CoderSong/Projects/DebianTest/App1",
"Destination": "/usr/src/app",
"Mode": "",
"RW": true,
"Propagation": "rprivate"
}
],
"Config": {

然后我们在 host:/Users/CoderSong/Projects/DebianTest/App1 中创建文件就能在 container 中看见。这里我们指定了 container 中的文件夹路径,但是同样的功能如果是使用 dockerfile 文件实现的话,是不能指定的。

1
2
FROM debian
VOLUME ~/Projects/Django/App1

当然更方便的是可以使用别的容器的 Volume

1
$ docker run -it --name debian-re-test --volumes-from debian-test /bin/bash

无论 debian-test 是否在运行,它都能起作用。只要有容器连接 Volume,它就不会被删除。

1
2
$ docker run -it --name redis-test redis /bin/bash
$ docker run -it --name node-test --link redis-test:redis node /bin/bash

以上的命令,我们先启动了一个 redis 镜像,然后启动了一个 node 镜像,并将它连接到了 redis 镜像。这里 node 容器叫做接收容器,或者父容器;redis 容器叫做子容器或者源容器。一个接收容器可以连接多个源容器,多个源容器可以连接多个接收容器。

--link 指令主要做了三件事情:

  • 设置接收容器的环境变量
  • 更新接收容器的 /etc/hosts 文件
  • 建立 iptables 规则进行通信

设置环境变量

  • 每有一个源容器,接收容器就会设置一个名为 <alias>_NAME 环境变量。

  • 预先在源容器中设置的部分环境变量同样会设置在接受容器的环境变量中,这些环境变量包括 Dockerfile 中使用 ENV 命令设置的,以及 docker run 命令中使用 -e, --env=[] 参数设置的。

  • 接收容器同样会为源容器中暴露的端口设置环境变量。如 redis 容器的IP为 172.17.0.2, 且暴露了8000的 tcp 端口,则在web容器中会看到如下环境变量。

    1
    2
    3
    4
    5
    REDIS_PORT_8080_TCP_ADDR=172.17.0.2
    REDIS_PORT_8080_TCP_PORT=8080
    REDIS_PORT_8080_PROTO=tcp
    REDIS_PORT_8080_TCP=tcp://172.17.0.82:8080
    REDIS_PORT=tcp://172.17.0.82:8080

更新容器的 /etc/hosts 文件

Docker 容器的IP地址是不固定的,容器重启后IP地址可能和之前不同。所以 link 操作会在 /etc/hosts 中添加一项–源容器的IP和别名,以用来解析源容器的IP地址。并且当源容器重启以后,会自动更新接收容器的 /etc/hosts 文件。这样就不用担心IP的变化对连接的影响。

这个整个过程是在容器启动的时候完成的:

  • 先找到接收容器(将要启动的容器)的所有源容器,然后将源容器的别名和IP地址添加到接收容器的 /etc/hosts
  • 更新所有父sandbox的 hosts 文件

这样当一个容器重启以后,自身的 hosts 文件和以自身为源容器的接受容器的 hosts 文件更新。

建立 iptabls 规则

Docker 为了安全起见,默认会将 Docker daemon-icc 参数设置为 false,容器间的通信就被禁止了。当 redis 容器想要向外界提供服务时,必定暴露一定的端口,假如暴露了 tcp/5432 端口。这样仅需要 node 容器和 redis 容器的 tcp/5432 端口进行通信就可以了。假如 node IP地址为 172.17.0.2/16 ,db容器的IP为 172.17.0.1/16,则需建立如下 iptables 规则。

1
2
-A DOCKER -s 172.17.0.2/32 -d 172.17.0.1/32 -i docker0 -o docker0 -p tcp -m tcp --dport 5432 -j ACCEPT
-A DOCKER -s 172.17.0.1/32 -d 172.17.0.2/32 -i docker0 -o docker0 -p tcp -m tcp --sport 5432 -j ACCEPT

这样就能确保通信的流量不会被丢弃掉。

1
2
$ docker inspect --format='{{ .NetworkSettings.IPAddress }}' [name]
// 该命令可以查看容器的IP

Example

construct-image

拉取3个需要的 image

1
2
3
$ docker pull redis
$ docker pull node
$ docker pull haproxy

运行6个 container ,注意启动顺序和数据卷的挂载

1
2
3
4
5
6
$ docker run -it --name redis-master -v ~/Projects/redis/master:/data redis /bin/bash
$ docker run -it --name redis-slave1 --link redis-master:master -v ~/Projects/redis/slave1 redis:/data /bin/bash
$ docker run -it --name redis-slave2 --link redis-master:master -v ~/Projects/redis/slave2 redis:/data /bin/bash
$ docker run -it --name APP1 --link redis-master:db -v ~/Projects/Node/App1:/usr/src/app node /bin/bash
$ docker run -it --name APP2 --link redis-master:db -v ~/Projects/Node/App2:/usr/src/app node /bin/bash
$ docker run -it --name HAProxy --link APP1:APP1 --link APP2:APP2 -p 6301:6301 -v ~/Projects/HAProxy:/tmp haproxy /bin/bash

检查一下启动状态

1
2
3
4
5
6
7
8
9
$ docker ps

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4db8c514f5d2 haproxy "/docker-entrypoin..." 6 seconds ago Up 5 seconds 0.0.0.0:6301->6301/tcp HAProxy
f970b888ef3c node "/bin/bash" 9 minutes ago Up 9 minutes APP2
469506f852a9 node "/bin/bash" 20 minutes ago Up 19 minutes APP1
e0afd181685a redis "docker-entrypoint..." About an hour ago Up About an hour 6379/tcp redis-slave2
272b43e402cc redis "docker-entrypoint..." About an hour ago Up About an hour 6379/tcp redis-slave1
ea63586ce28c redis "docker-entrypoint..." About an hour ago Up About an hour 6379/tcp redis-master

现在将 redis 配置复制到 host-dir

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// redis配置

daemonize yes
pidfile /var/run/redis.pid
port 6379
timeout 300
loglevel debug
logfile /usr/local/bin/log-redis.log
databases 8
save 900 1
save 300 10
save 60 10000
rdbcompression yes
dbfilename dump.rdb
dir /usr/local/bin/db/
appendonly no
appendfsync everysec

// slave 节点需要填上这一行配置,master不用
slaveof master 6379

然后进入 redis container 中修改配置并启动 redis(下面以 redis-master 为例)

1
2
3
4
5
6
7
8
9
// 连入正在运行的容器
$ docker attach redis-master
$ cp redis.conf /usr/local/bin/redis.conf
// 新建本地数据库的位置(这是在配置中写的地址)
$ mkdir db
// 用配置文件启动服务
$ redis-server redis.conf
// 用客户端检查一下服务是否启动
$ redis-cli

接下来测试一下3个 redis 节点是否连通

  • 先到 redis-master 节点放入值

    1
    2
    3
    4
    5
    $ docker attach redis-master
    $ redis-cli
    $ 127.0.0.1:6379> set master testtest
    $ 127.0.0.1:6379> get master
    $ 127.0.0.1:6379> testtest
  • 分别到两个 slave 节点检查

    1
    2
    3
    4
    $ docker attach redis-slave1
    $ redis-cli
    $ 127.0.0.1:6379> get master
    $ 127.0.0.1:6379> testtest

初始化 App 节点(下面以 App1 为例)

1
2
3
4
5
6
7
$ docker attach APP1
$ npm i -g koa-generator pm2
$ cd /usr/src/app
$ koa2 APP1
$ cd APP1
$ npm i
$ npm i -s ioredis

然后用下面的文件覆盖 /router/index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const router = require('koa-router')()
const redis = require('ioredis')
const redisClient = new redis(6379, 'db')

router.get('/', async (ctx, next) => {
// 利用我们刚放入 redis 里的值
const result = await redisClient.get('master')
await ctx.render('index', {
title: `Hello Koa 2! -- ${result}`
})
})

router.get('/string', async (ctx, next) => {
ctx.body = 'koa2 string'
})

router.get('/json', async (ctx, next) => {
ctx.body = {
title: 'koa2 json'
}
})

module.exports = router

最后用 pm2 守护进程

1
2
3
$ pm2 start bin/www
// 测试
curl localhost:3000

在配置 APP2 的时候注意更换一个端口,然后配置 HAProxy 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
global
log 127.0.0.1 local0
maxconn 4096
chroot /usr/local/sbin
daemon
nbproc 4
pidfile /usr/local/sbin/haproxy.pid

defaults
log 127.0.0.1 local3
mode http
option dontlognull
option redispatch
retries 2
maxconn 2000
balance roundrobin
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms

listen redis_proxy
bind 0.0.0.0:6301
stats enable
stats uri /haproxy-stats
server APP1 APP1:3000 check inter 2000 rise 2 fall 5
server APP2 APP2:4000 check inter 2000 rise 2 fall 5

然后进入 HAProxy 节点启动服务

1
2
3
4
5
$ docker attach HAProxy
$ cd /tmp
$ cp haproxy.conf /usr/local/sbin/
$ cd /usr/local/sbin/
$ haproxy -f haproxy.conf

然后就能在本地访问 http://[harpoxy-ip]:6301

Read More
post @ 2017-11-02

本文以 UNIX 环境为主,结合一些技术博客和 <<C++ Primer>> 做一些总结和整理。

编译过程

单文件

这里的单文件指的是单独的 .cpp/.c 文件,因为 .h 文件只是进行一些变量的声明,是不需要进行编译的。

  • 预处理(预处理器 cpp ): 预处理器cpp将对源文件中的宏进行展开

    1
    2
    3
    $ gcc -E hello.cpp -o hello.i
    // or
    $ cpp hello.cpp -o hello.i
  • 编译(编译器 gcc/g++ ): gcc将c文件编译成汇编文件,然后编译成机器码。(编译器将 .i 文件编译成汇编文件 .s)。

    1
    $ gcc -S hello.i
  • 汇编(汇编器 as ): 汇编器将汇编文件编译成机器码 (可以通过 nm -a hello.o 查看机器码文件)

    1
    2
    3
    $ gcc -c hello.s -o hello.o
    // or
    $ as hello.s -o hello.o
  • 链接(连接器 ld ): 链接器ld将目标文件和外部符号进行连接,得到一个可执行二进制文件。

    1
    $ gcc hello.o -o hello

头文件

我们在第一步当中可以看到,这一步的作用就是把宏进行了展开。那么我们的头文件也是在这里被以宏的方式引入到了 hello.cpp 当中。那么我们下面展开介绍一下 #include与头文件中的一些注意事项。

#include

#include 是c语言的宏命令,会在第一步(预处理)中起作用。会将后面那个头文件完完整整的引入到当前的文件当中。而且仅是做替换,而不会有其他的副作用。

1
2
3
// math.h
int add(int a, int b);
int del(int a, int b);
1
2
3
4
5
6
// main.cpp
#include "math.h"
int main() {
int c = add(2, 3);
int d = del(3, 2);
}

经过第一步预处理以后:

1
2
3
4
5
6
7
# main.i
int add(int a, int b);
int del(int a, int b);
int main() {
int c = add(2, 3);
int d = del(3, 2);
}

而对于头文件的声明当中 " "< > 是有区别的,如果头文件名在 < > 中,就会被认为是标准头文件。编译器会在预定义的位置查找该头文件,如果是 " " 就认为它是非系统头文件,非系统文件查找通常开始于源文件所在路径。

头文件保护符

头文件保护符是为了保证头文件在一个 .cpp 文件当中被多次引用不会出现问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// file1.h  
class file1
{
};

// file2.h
#include "file1.h"
class file2
{
};

// file3.h
#include "file1.h"
#include "file2.h"

上面file3.h的代码展开以后就变成了

1
2
3
4
5
6
7
8
9
10
11
12
// file1.h展开的内容  
class file1
{
};

// file2.h展开的内容
class file1
{
};
class file2
{
};

class file1 被引用了两次,导致编译器报错。这时就可以加上头文件保护符来解决这个问题。

1
2
3
4
5
6
7
8
9
// file1.h  
#ifndef _FILE1_H_
#define _FILE1_H_

class file1
{
};

#endif // !_FILE1_H_
1
2
3
4
5
6
7
8
9
10
// file2.h  
#ifndef _FILE2_H_
#define _FILE2_H_

#include "file1.h"
class file2
{
};

#endif // !_FILE2_H_
1
2
3
4
5
6
7
8
// file3.h  
#ifndef _FILE3_H_
#define _FILE3_H_

#include "file1.h"
#include "file2.h"

#endif // !_FILE3_H_

这时因为 _FILE1_H_ 只出现了一次,就不会出现重定义的问题。

注意事项

头文件中需要区别 声明定义 两个概念。声明因为不涉及到内存的分配,所以是允许多次出现的,而定义则会进行内存的分配,所以定义只能出现一次。而在头文件中是只允许出现声明和一些特殊的定义的( 类定义, 值在编译时已知的 const 对象和 inline 函数)

  • 值在编译时就已知的 const 对象:
    如:const char c = 'c' 这个是在编译时就已经确定值的,之后程序不能改变。
    const char *c = 'c' 是不可以的,因为指针不是在编译时确定值的。
    并且全局的 const 对象是没有 extend 的声明的,所以只对当前 .cpp 文件有效。所以将它放在头文件中进行引用后,仅对引用文件有效,而对其他文件不可见。所以不会出现重定义。

  • inline:因为在函数的调用执行过程当中,我们需要将实参、局部变量、返回地址以及若干寄存器都压入栈中,然后才能执行函数体中的代码;函数体中的代码执行完毕后还要清理现场,将之前压入栈中的数据都出栈,才能接着执行函数调用位置以后的代码。如果一个运行时间很长的函数,那么这些调用代价也是可以忽略的。不过对于一些简单的函数,这些调用代价是很昂贵的。我们就可以使用 inline 函数,它会像宏定义一样进行代码的替换,而不进行调用过程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #include <iostream>
    using namespace std;
    inline void swap(int *a, int *b){
    int temp;
    temp = *a;
    *a = *b;
    *b = temp;
    }

    int main(){
    int m, n;
    cin>>m>>n;
    cout<<m<<", "<<n<<endl;
    swap(&m, &n);
    cout<<m<<", "<<n<<endl;
    return 0;
    }

    在编译器遇到函数调用 swap(&m, &n) 的时候就会进行替换,并用实参代替形参。

    1
    2
    3
    4
    int temp;
    temp = *(&m);
    *(&m) = *(&n);
    *(&n) = temp;
  • class:对于类的定义成员函数是可以写在定义体内的,这样的话编译器会默认这个函数是 inline 的,也可以声明在定义体内然后在外面进行实现。

多文件

多文件互相依赖的情况,只需要先单独将各文件编译成 .o 文件,然后 link 一下就行了。

1
2
3
4
5
6
7
8
9
// Circle.h

#ifndef CIRCLE_H
#define CIRCLE_H
class Circle
{

};
#endif
1
2
3
4
5
6
7
// Circle.cpp
#include "Circle.h"

#include <iostream>
using namespace std;

...
1
2
3
4
5
6
7
8
9
10
// main.cpp
#include "Circle.h"
#include <iostream>

using namespace std;

int main(int argc, char *argv[])
{
...
}

进行编译:

1
2
3
$ g++ -c Circle.cpp -o Circle.o
$ g++ -c main.cpp -o main.o
$ g++ main.o Circle.o -o main

如果有修改,每次只需要对增量文件进行编译就行了。如果项目比较大,可以使用 makefile 文件来进行自动化编译。(后面会有文章继续介绍 makefile 和其他的自动化编译)

Read More
post @ 2017-10-24

本文参考阮一峰老师的文章,着重从数据流的角度分析 redux

redux

同步数据流

参照图例,这是一次用户行为的数据流图。

  • (1) 用户操作 View

  • (2)(3) View 通过 Action Creator 发出相应的 Action

    • Action Creator 就是一个 Action 工厂,统一管理所有的 Action。让代码更好管理,互用性更强。
  • (4) Store 通过 dispatch 函数获取相应的 Action,并且触发 Reduer 计算新的 State

    • dispatch 之所以可以自动触发 Reducer,是因为在生成 Store 的时候就已经绑定好了。

      1
      2
      import { createStore } from 'redux';
      const store = createStore(reducer);
    • dispatch 的接口只接收对象。

  • (5) Reduer 接收现在的 State,以及 Action。做出相应的状态变化计算,得到新的 State。并且通过 Storesubscibe 监听 State 的变化,并回调对应的 Listener 函数。

    • Reducer 是状态机的核心,定义了状态转移的计算方法。 也正是因为这些 Action 是我们手动绑定并进行处理,保证了数据流的单向性。
    • 当然对于比较繁琐的 Reducer 的设计也有更好的设计模式,比如提供了 combineReducers 函数,详细用法可见阮一峰老师的文档。
    • 对应的 Listener 函数也是在申明的时候已经做好了绑定。
      1
      2
      3
      4
      import { createStore } from 'redux';
      const store = createStore(reducer);

      store.subscribe(listener);
  • (6)Listener 中获取现在的 State 并用它重新渲染 View

    • 可以通过 store.getState() 获取现在的 State
      1
      2
      3
      4
      function listerner() {
      let newState = store.getState();
      component.setState(newState);
      }

异步数据流

由于 Reduxstore.dispatch 的接口要求很严格,只能传递对象类型的 Action,所以这里我们需要先引入中间件来完成我们理想的设计。

中间件

  • (4) 在原第4步过程的基础上,我们引入中间件。 Action 会先被中间件逐步拦截处理以后传递给 Reducer。并且多个中间件是支持通过 applyMiddlewares() 函数来连接在一起。

异步实现

  • (3) 异步数据流首先对传统的仅发送对象的 Action 做了修改,这里发送函数类型的 Action,在这个返回函数当中先发送了一个异步开始的对象 Action ,在结束的时候再发送异步成功/失败的对象 Action

    这里是我纠结比较久的地方,最开始我很不能接受这种将IO操作放在 Action 里的设计,因为我觉得像这样的数据处理相关的操作是应该放在 Reducer 里的。后来我和室友讨论了以后,有了一些新的想法:

    • 首先异步操作是应该被放在网络层(或者叫IO层),而 Reducer 是担任数据计算的任务,所以把异步操作放在 Reducer 里也是一种不适当的分层。
    • 然后因为 Creator 的任务本来就很轻松,只用生成一些 Action 对象,所以这边把网络层放入其中也比较合适。
    • 最后这种设计其实也没有和最初的思想违背,Creator 还是只是生成一些 Action,而并没有执行这些 Action

    以上只是个人看法,欢迎拍砖讨论。

  • 至于实现的方案就有很多了,比如阮一峰老师文章里的 redux-thunkredux-promise,还有现在比较流行的基于 Generatorredux-sage。这里就不一一赘述。

React-Reduce

当直接使用 React-Reduce 的时候,需要按照规定的范式将组件拆分为 UI组件容器组件。详细可见阮一峰老师的文档,这里主要介绍使用了 React-Reduce 以后对于我们的数据流有哪些影响。

  • (1)(2) 通过 mapDispatchToProps 函数,可以设置哪些用户的操作会被当做 Action,并且当做哪个 Action 传递给 Store,也就是我们不需要在第一步中 Hard code 一些动作到 Action 的代码。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    const mapDispatchToProps = (
    dispatch,
    ownProps
    ) => {
    return {
    onClick: () => {
    dispatch({
    type: 'SET_VISIBILITY_FILTER',
    filter: ownProps.filter
    });
    }
    };
    }
  • (6) 通过 mapStateToProps 函数,可以进行 StateUI组件prop 的映射。并对 View 进行重新渲染,也就是说我们不需要在第6步当中再写一些获取现在 State 做渲染的工作。(非常适合做一些过滤、分析、或者担任数据的组织层)

    1
    2
    3
    4
    5
    const mapStateToProps = (state) => {
    return {
    todos: getVisibleTodos(state.todos, state.visibilityFilter)
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
// 根据是否完成进行过滤
const getVisibleTodos = (todos, filter) => {
switch (filter) {
case 'SHOW_ALL':
return todos
case 'SHOW_COMPLETED':
return todos.filter(t => t.completed)
case 'SHOW_ACTIVE':
return todos.filter(t => !t.completed)
default:
throw new Error('Unknown filter: ' + filter)
}
}
Read More
post @ 2017-09-14

我们先从最近手里的一个项目说起,其实这个项目的需求很简单:从多数据源抓取数据,进行数据整合以后分别存入到不同的数据库中。当然不同数据库存储的数据结构是不一样的,但是也只是对数据源 attribute 的重新过滤组合而已。

当每一份数据的体量以及整个数据的体量变得很大的时候,系统的实时性大大降低(因为在数据库的另一端是有客户端等着将数据进行可视化展示的),这个时候我就不得不好好审视一下我的代码结构了。早先在设计的时候,因为没有考虑到数据这么大的体量,并且为了增强日志文件的可读性和一些客观限制,我没有选择大量的异步并发。

为了后面描述比较方便,我先罗列一下这些客观限制:

  • Azure Cosmos DB 是对并发数量有限制的,大概在 20次/s 的时候会发出警告。
  • Azure Cosmos DB 是对 Stored Procedurerequest body 以及 response body 有限制的。
  • Node 的异步数量是存在限制的,数据库连接数也是存在限制的。

初步模型

单进程单线程同步跑任务,这个速度肯定是我们接受不了的。那我们开始优化我们的架构。对于这种多数据源多任务的场景,生产者消费者 的行为模式作为基础应该是最合适不过了。这样我们就可以把业务逻辑完全解耦为:

  • 生产者从数据源抓数据整理为 数据单元 放到缓存队列。
  • 消费者从缓存队列拿出 数据单元 进行处理。

解耦完成后,我们对这两部分一一的审视。这样单一性的任务我们能很好的进行多线程、多进程的处理,这里我选择在生产者的 单一数据源 ,消费者的 文件-数据库(一个数据源会产生多个文件,每份文件要存储到多个数据库,这里的意思是一个文件存储到一个数据库) 粒度上选择多进程处理。

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;

if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
} else {
if (sourceIndex < sourceList.length) {
// 选择数据源
let source = sourceList[++sourceIndex];
// 抓数据并过滤成对象,存入redis
let dataObj = catchAndParser(source);
...
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const worker = require('./worker');

let workers = {
mssql: worker.mssql,
documentdb: worker.documentdb,
...
}

if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
} else {
// 从redis里取任务
let task = getTaskFromRedis();
// 进行数据重组以及存储
worker[task.type](task);
}

这样我们会发现我们起的进程数达到 CUP 的两倍了,其实这边可以自己根据任务是否会吃满 CUP ,会吃多少,并结合进程切换的时间以及内存问题来调整进程数量。到现在我们完成了我们的初步模型,但是有一个很麻烦的问题暴露在了我们面前,我们现在的逻辑没有去维护连接池,也就是说我们会在 Worker 进程中让它自己去进行连接,完成任务再断开连接。这样显然也是我们不希望去看到的。

连接池

最初我有两种设想:

  • 在主进程建立连接池,然后所有 Worker 需要连接的时候就去这个地方取来用。
  • 在每一个进程当中去维护自己的连接池。

思来想去,咨询前辈后发现第一种想法有点太傻了。先不说父子进程间监听 socket 的问题,光是子进程对取回来的连接进行复用这一点上也是对性能的浪费。所以当然是每个进程维护自己的连接池比较好,这样不论是多线程还是异步的情况下,都能不让连接成为性能的瓶颈。

在实现上,我们需要放在全局声明连接池,这样因为子进程是 fork 出去的,也会在自己的进程当中声明连接池。完成了我们每个进程一个连接池的目的。

参考:Nodejs Cluster with MySQL connections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

var mysql = require('mysql');
var pool = mysql.createPool({
connectionLimit : 10,
host : 'example.org',
user : 'bob',
password : 'secret'
});

if (cluster.isMaster) {
...
} else {
...
}

数据库层面

其实我们在数据库层面的操作也比较简单:

  • 根据某个字段获取数据库里的内容(避免设计子增主键的部分重复存储)
  • 批量存储

批量存储

可以看到性能瓶颈的地方就在第二点,这里不同的数据库就会有不同的支持。对于 Documentdb,可以将批量数据封装成数组全部丢给 Stored procedure ,但其实 Stored procedure 中也是一个一个数据存入数据库中。(这里与数据库是多线程还是单线程有关,后面会去研究一下 Documentdb 的底层再来补坑)。对于前面提到的客观条件1、2,其实是可以把数据包分包再异步,如果并发数量超过限制再使用队列管理异步。参考:Azure Cosmos DB server-side programming: Stored procedures

而对于 Sql server,就可以选择用 bulk 还是 insertsql 语句中 OPENROWSET(BULK...) 选项都是可行的,但是从代码组织来看用 bulk 是更好的选择。 参考:使用 BULK INSERT 或 OPENROWSET(BULK…) 导入批量数据

批量查找

数量较少时,我觉得放在服务器端异步并发比较好。数量比较多时就把任务交给服务器端的 Stored procedure 处理。比较麻烦的是, Sql serverSP 是不接受数组的,可以通过字符串操作分隔符来模拟数组。参考:sql server 模拟数组


2017年9月21日更新

上一次的文章当中有一些地方有错误的地方和一些需要完善的地方,这边进行指出并更新。

错误

生产者的伪代码中直接使用了数组来分发数据源的方法是完全错误的,其实每个进程都会拷贝一份代码去执行,这种方法需要让每个进程中去共享 sourceIndex 才能够实现。而我们这边的子进程是通过父进程 fork 所得,所以需要在父进程来维护 sourceIndex,分发给子进程。或者也直接使用消息队列来实现这一部分的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;

if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
let worker = cluster.fork();
worker.send(sourceList[sourceIndex]);
sourceindex++;
}
} else {
process.on('message', (source) => {
let dataObj = catchAndParser(source);
})
}

完善

rsmq 是一个基于 redis 封装好的消息队列的库,使用起来也比较方便。唯一不太好的地方是没有封装 循环队列 ,这使得场景下处理起来比较麻烦。比如我现在的任务是循环不变的,做完了又重头做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const rsmq = require('rsmq');

let worker = async (worker, process) => {
// 封装好的promise对象
let task = await rsmq.receiveMessage('qName');
if (!task) {
worker.send('finished');
process.kill();
} else {
// 执行任务
...
process.kill();
}
}

if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
// 退出后重启进程
cluster.fork();
});

cluster.on('message', (msg) => {
if (msg === 'finished') {
// 关闭退出重启cluster.removeAllListener('exit');
// 重新填充数据源
...
// 重新启动所有进程
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
// 退出后重启进程
cluster.fork();
});
}

})
} else {
worker(cluster.worker, process);
}

当然这种实现只是一个思路,而且不是特别好,因为每次做完任务之后都需要重启一个 worker 进程,可以长期保持几个固定的进程,主进程通过通信来分发任务。不用浪费资源去重启进程。

补充

  • 在一个场景下,我希望使用前台的开关来控制我后台的进程。这种时候因为 fork 进程的句柄在打开开关的那个进程中,我无法在后面关闭的进程当中获取。所以其实可以在启动进程的时候,将进程号发送到消息队列中,关闭时进行关闭。(我这里描述的场景是一个开关同时控制多个任务,如果一个开关控制一个还是直接以任务名做 key, 进程号作为 value 存入 redis 中比较好)
  • 通过 fork 出来的子进程,stdin, stdout, stderr 三个流都会复制父进程,如果需要重定向,需要在 fork 时进行配置。
1
2
3
4
5
6
7
8
9
10
11
const cp = require('child_process');
const fs = require('fs');

let main = () => {
let worker = cp.fork('./test.js', { silent: true });
let fileStream = fs.createWriteStream('./output');

worker.stdout.pipe(fileStream);
};

main();
Read More
post @ 2017-08-16

Schema Type

  • String
  • Number
  • Date
  • Buffer
  • Boolean
  • Mixed
  • Decimal Type (在3.4中新增,最多支持34位小数,并且存储的是实际值而不是浮点数)
  • Object
  • Objectid
  • Array

Usage notes

Date

内建的 ```Date``` 方法没有被纳入 ```Mongoose``` 。所以如果使用像 ```setMonth``` 这样的方法是不会执行的,如果一定要使用需要通过 ```markModified``` 方法告诉 ```Mongoose``` 。
1
2


var Assignment = mongoose.model(‘Assignment’, { dueDate: Date });
Assignment.findOne(function (err, doc) {
doc.dueDate.setMonth(3);
doc.save(callback); // THIS DOES NOT SAVE YOUR CHANGE

doc.markModified(‘dueDate’);
doc.save(callback); // works
})

1
2
### Mixed
当改变了 Mixed 元素的结构类型时,需要通过 ```markModified``` 函数告诉 ```Mongoose``` 后才会自动生效。

person.anything = { x: [3, 4, { y: “changed” }] };
person.markModified(‘anything’);
person.save(); // anything will now get saved

1
2
3
4
5
6
7
8
### ObjectId
声明时需要使用 ```Schema.Types.ObjectId```。

### Arrays
当将元素声明为 ```Array``` 以后,默认值会是空数组 ```[]```,如果想修复这个问题,需要添加 ```default``` 属性。被声明为 ```Array``` 的元素如果被指定为 ```required``` ,那么至少需要一个元素在其中。

## Schema Type handle definition
在 ```Schmea``` 中对于每一个元素都有一些定义好的属性供开发者使用。可以直接定义也可以通过 ```path``` 定义。

// directly
var schema = new Schema({
test: {
type: String,
lowercase: true
}
});

// use path
var schema2 = new Schema({test: String});

schema2.path(test).lowercase(true);

1
2
3
4
5
6
7
8
9
10

### Common
+ **default(val)**

这个属性就是为元素添加默认值,只要在修改之前都会是这个值。
对于文档里提到的对于 ```mixed``` 属性,如果不设置特殊的函数返回默认值,那么多个实例会指向第一个实例。但是我在 ```4.11.7``` 版本测试,这个问题已经得到修复。

+ **validate(obj, [errorMsg], [type])**

为元素添加检验器,来检验输入的值是否符合要求。第一个参数是检验器,支持 ```RegExp```, ```Function```, ```Object```。正则表达式可以进行最简单的字面检验,函数可以进行较为复杂的逻辑检验,对象可以支持多个检验器的组合,并且可以携带错误信息。
var many = [
    { validator: validator, msg: 'uh oh' }
  , { validator: anotherValidator, msg: 'failed' }
]
new Schema({ name: { type: String, validate: many }});
1
2
3
4
5
更加细节的是,错误信息可以自定义,当然如果没有自定义,内部为准备了一些错误信息模板。自定义错误信息中可以使用模板来获得一些内部属性。

+ **{PATH} :** 非法的 document path
+ **{VALUE} :** 非法的值
+ **{TYPE} :** 检验器的类型,比如 ```Regexp```, ```min```, ```user defined
+ **{MIN} :** 检验器设定的最大值 + **{Max} :** 检验器设定的最小值 同时也提供了异步检验器,可以通过设置
属性来告诉 ```Mongoose``` 这是一个异步检验器。在回调中需要返回 ```true``` 或者 ```false``` 来告诉检验器是否成功。**(利用这个属性可以方便的写一个如果不存在则存入数据库的逻辑。)** 这个方法会在 ```save``` 动作之前执行,如果需要的话也可以自己进行[调用](http://mongoosejs.com/docs/api.html#document_Document-validate),```document.validate(function(err){})```。 如果在 ```save``` 之前检验器失败,但是没有错误处理的话,异常会被先抛到 ```Model``` 再到 ```db connection``` ,可以通过监听 ```error``` 捕获。
1
2
3
4

+ **get(fn)**

为元素添加返回转换器,可以对元素的内容进行转换以后再返回。转换函数中可以接受两个值,第一个是需要过滤的参数,第二个是这个元素对应的 ```SchemaType```,可以使用```SchemaType``` 来定制一些功能。官方文档中举了一个日期处理和信用卡卡号隐藏中间几位数字的例子,还是蛮实用的。

function inspector (val, schematype) {
if (schematype.options.required) {
return schematype.path + ‘ is required’;
} else {
return schematype.path + ‘ is not’;
}
}

var VirusSchema = new Schema({
  name: { type: String, required: true, get: inspector },
  taxonomy: { type: String, get: inspector }
})

var Virus = db.model('Virus', VirusSchema);

Virus.findById(id, function (err, virus) {
  console.log(virus.name);     // name is required
  console.log(virus.taxonomy); // taxonomy is not
})
1
2
3
4
5
6
7
+ **set(fn)**

为元素添加保存转换器,先将元素转换成相应格式以后再存入数据库。同样定义 ```get``` 方法, ```set``` 方法的转换函数也支持两个参数,第一个是需要过滤的参数,第二个是这个元素对应的 ```SchemaType```,可以使用```SchemaType``` 来定制一些功能。

+ **select(bool)**

用来决定该元素是否要包含在搜索结果当中,但是这个属性会被 ```query``` 级别的声明覆盖。
T = db.model('T', new Schema({ x: { type: String, select: true }})); T.find(..); // field x will always be selected .. // .. unless overridden; T.find().select('-x').exec(callback);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
     
+ **index(options)**

来定义是否为该元素添加索引,支持 ```Object```, ```Boolean```, ```String```, ```Object``` 用来定义复合索引。**(先留flag,后面会来填MongoDB index的相关问题)**

+ **required(required, [options.isRequired], [options.ErrorConstructor], [message])**

为元素添加存在检查器,算是检查器的一个特例,所以同样有错误信息等。并且 ```required``` 属性同样支持函数来自定义存在检查,如果没有进行自定义那么 ```Mongoose``` 会检查这个元素的值是否等于 ```null``` 或者 ```undefined``` 来判断是否存在。**(但是这边的自定义功能感觉和 ```validate``` 有点冗余)**

+ **sparse(bool)**

为元素添加稀疏索引。**作用是当该元素为空是不进入索引。**

+ **unique(bool)**

为元素添加唯一索引。**作用是只允许一条索引字段为空的记录存在,之后就不允许插入了。再次插入 记录时会报错。**

+ **text(bool)**

为元素添加全文索引。**(全文索引的坑下次和符合索引一起填。)**

### String

+ **checkRequired(value, doc)**

对于 ```String``` 属性特殊定制的 ```required``` 属性,```required``` 会直接调用字符串的 ```checkRequired``` 方法。会针对字符串,不仅检查是否为空,还会检查长度、类型和原型链,空字符串会被判 ```fail``` 。

+ **enum([args...])**

一种特殊的检查器,会检查输入的字符串是否在规定的串中,同样有错误信息。
var states = ['opening', 'open', 'closing', 'closed'] var s = new Schema({ state: { type: String, enum: states }}) var M = db.model('M', s) var m = new M({ state: 'invalid' }) m.save(function (err) { console.error(String(err)) // ValidationError: `invalid` is not a valid enum value for path `state`. m.state = 'open' m.save(callback) // success }) // or with custom error messages var enum = { values: ['opening', 'open', 'closing', 'closed'], message: 'enum validator failed for path `{PATH}` with value `{VALUE}`' } var s = new Schema({ state: { type: String, enum: enum }) var M = db.model('M', s) var m = new M({ state: 'invalid' }) m.save(function (err) { console.error(String(err)) // ValidationError: enum validator failed for path `state` with value `invalid` m.state = 'open' m.save(callback) // success })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    
+ **lowercase(bool)**

一种特殊的 ```set``` 属性,会将输入的字符串全部转为小写之后再存入数据库。

+ **uppercase(bool)**

一种特殊的 ```set``` 属性,会将输入的字符串全部转为大写之后再存入数据库。

+ **match(regExp, [message])**

一种特殊的检查器,会检查输入的字符串是否满足正则表达式。空字符串和 ```null``` 会通过这个检查器,所以如果要过滤这些特殊情况还需要添加 ```required``` 属性。

+ **maxlength(value, [message])**

一种特殊的检查器,会检查输入的字符串的长度是否比阈值小。会对应错误信息模板中的 ```{MAXLENGTH}``` 。

+ **minlength(value, [message])**

一种特殊的检查器,会检查输入的字符串的长度是否比阈值大。会对应错误信息模板中的 ```{MINLENGTH}``` 。

+ **trim(bool)**

一种特殊的 ```set``` 属性,会将输入字符串先修剪两端的空格再存入数据库。

### Number

+ **checkRequired(value, doc)**

对于 ```Number``` 属性特殊定制的 ```required``` 属性,```required``` 会直接调用字符串的 ```checkRequired``` 方法。不仅检查是否为 ```null``` 还会检查类型和原型链。

+ **max(maximum, [message])**

一种特殊的检查器,会检查输入的数字是否比阈值小。会对应错误信息模板中的 ```{MAX}``` 。

+ **min(value, [message])**

一种特殊的检查器,会检查输入的数字是否比阈值大。会对应错误信息模板中的 ```{MIN}``` 。

### Date

+ **checkRequired(value, doc)**

对于 ```Date``` 属性特殊定制的 ```required``` 属性,```required``` 会直接调用字符串的 ```checkRequired``` 方法。不仅检查是否为 ```null``` 还会检查类型和原型链。

+ **expires(when)**

为元素添加 ```TTL Index```,这个属性是 ```Date``` 独有的。传入的属性可以是秒数,也可是比较友好的字符串形式。
// expire in 24 hours new Schema({ createdAt: { type: Date, expires: 60*60*24 }}); // expire in 24 hours new Schema({ createdAt: { type: Date, expires: '24h' }}); // expire in 1.5 hours new Schema({ createdAt: { type: Date, expires: '1.5h' }});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    
+ **max(maximum, [message])**

一种特殊的检查器,会检查输入的数字是否比阈值小。会对应错误信息模板中的 ```{MAX}``` 。输入的值必须是 ```Date``` 类型。

+ **min(maximum, [message])**

一种特殊的检查器,会检查输入的数字是否比阈值大。会对应错误信息模板中的 ```{MIN}``` 。输入的值必须是 ```Date``` 类型。

### Object

+ **auto(bool)**

如果打开会自动添加 ```ObjectId``` 到元素的值中。

## Custom
在 ```Mongoose 4.4.0``` 中,支持了自定义 ```Schema Type``` 的功能,新建的 ```Schema Type``` 需要继承自 ```mongoose.SchemaType``` ,与 ```mongoose.SchemaTypes``` 的原型链保持一致,并且需要实现 ```cast()``` 方法。

function Int8(key, options) {
mongoose.SchemaType.call(this, key, options, ‘Int8’);
}
Int8.prototype = Object.create(mongoose.SchemaType.prototype);

// cast() takes a parameter that can be anything. You need to
// validate the provided val and throw a CastError if you
// can’t convert it.
Int8.prototype.cast = function(val) {
var _val = Number(val);
if (isNaN(_val)) {
throw new Error(‘Int8: ‘ + val + ‘ is not a number’);
}
_val = Math.round(_val);
if (_val < -0x80 || _val > 0x7F) {
throw new Error(‘Int8: ‘ + val +
‘ is outside of the range of valid 8-bit ints’);
}
return _val;
};

// Don’t forget to add Int8 to the type registry
mongoose.Schema.Types.Int8 = Int8;

var testSchema = new Schema({ test: Int8 });
var Test = mongoose.model(‘Test’, testSchema);

var t = new Test();
t.test = ‘abc’;
assert.ok(t.validateSync());
assert.equal(t.validateSync().errors[‘test’].name, ‘CastError’);
assert.equal(t.validateSync().errors[‘test’].message,
‘Cast to Int8 failed for value “abc” at path “test”‘);
assert.equal(t.validateSync().errors[‘test’].reason.message,
‘Int8: abc is not a number’);

1
2
3
4
5

## Creating from ES6 Classes Using
```Mongoose``` 同样支持了 ```ES6``` 的 ```Class``` 特性,可以使用 ```Class``` 新建类之后使用 ```loadClass()``` 导入,

+ ```getter/setter``` 函数对应 ```get/set
  • 类方法对应
    的方法
    1
    + 静态方法对应 ```Model``` 静态方法。

const schema = new Schema({ firstName: String, lastName: String });

class PersonClass {
// fullName becomes a virtual
get fullName() {
return ${this.firstName} ${this.lastName};
}

set fullName(v) {
const firstSpace = v.indexOf(‘ ‘);
this.firstName = v.split(‘ ‘)[0];
this.lastName = firstSpace === -1 ? ‘’ : v.substr(firstSpace + 1);
}

// getFullName() becomes a document method
getFullName() {
return ${this.firstName} ${this.lastName};
}

// findByFullName() becomes a static
static findByFullName(name) {
const firstSpace = name.indexOf(‘ ‘);
const firstName = name.split(‘ ‘)[0];
const lastName = firstSpace === -1 ? ‘’ : name.substr(firstSpace + 1);
return this.findOne({ firstName, lastName });
}
}

schema.loadClass(PersonClass);
var Person = db.model(‘Person’, schema);

Person.create({ firstName: ‘Jon’, lastName: ‘Snow’ }).
then(doc => {
assert.equal(doc.fullName, ‘Jon Snow’);
doc.fullName = ‘Jon Stark’;
assert.equal(doc.firstName, ‘Jon’);
assert.equal(doc.lastName, ‘Stark’);
return Person.findByFullName(‘Jon Snow’);
}).
then(doc => {
assert.equal(doc.fullName, ‘Jon Snow’);
});
```

Read More
⬆︎TOP