「@JIKE」 Data engineer/Node player
post @ 2018-11-16

最近需要实现的一个场景是对于社区里的每条动态进行多次的检测和评估,暂且不论检测和评估的具体功能,统称为「计算」。

而这些计算由其他同事提供现成的接口,但是由于其具体功能的差异这些计算有些依赖了外部接口,并且可能依赖外部的长延时接口,例如视频分析。所以这里从计算的实现上将其划分为「瞬时计算」和「长时计算」,而对于上层使用方来说是需要构造出一种使用协议来同时兼容两种类型。而这样的三层结构中是有两次网络传输过程,更需要一种足够健壮的重试策略。并且在动态流量基数较大,峰值与谷值差异较大的情况下,横向扩容能力也非常的重要。

模型

第一次讨论的方案是 「push模型」,多方使用我暴露的接口,在完成数据消费计算之后推送到我的服务中进行下游逻辑。这种方案有几个弊端,第一是多方需要实现消费 Kafka 的逻辑并且需要管理 Kafka offset,保证在事故不会导致数据丢失。第二是网络传输导致的重试逻辑复杂,需要接入第二个重试队列,或者提供重试接口,这样数据流的流向就从单向变为双向,增加了之后的维护和排查成本。但是这种方案还是有一个好处,因为计算方是接触数据的第一层,所以在计算方内部出现错误引起的重试或者报错机制实现其他非常简单。

在否定以后转换为 「pull模型」,多方暴露接口让我进行统一的调用,仅由我方消费 Kafka 并统一维护状态和提供重试策略。但是在实现细节上还是有两种方案可供选择,一种是以「动态为单位」,一种是以「计算为单位」。动态为单位是说,主线程消费 Kafka,然后交由下游异步或多线程方式调用多个接口,全部成功后算作一个动态计算完成。计算为单位是说,多个线程各自使用独立的 group id 消费 Kafka 并维护重试队列,然后在线程内进行接口调用。两种方式的区别是一个计算失败之后的 block 代价不同,第一种方案会 block 整个动态,第二种方案只会 block 动态的一个计算。并且由于计算之间的效率差,第一种的效率取决于最慢的一个计算,第二种在动态为单位的角度来看,也是以最慢的一个计算决定,但是由于计算之间不会互相影响,所以之后想对「慢计算」进行降级的话能很方便的完成。

三个问题

在决定模型以后,需要考虑第一个问题,「构造出一个通用的协议兼容同步计算和异步计算的调用」。一种是同步轮询,一种是异步调用。由于底层依赖的外部长时计算有请求的次数限制,所以同步轮询需要记录请求时间来控制轮询的时间间隔,但是没有性能问题的风险。而对于异步调用,如果下游系统被某些原因 block 住的时候会无限的建立连接,在超过线程池的上限以后会 block 住调用方。所以比较下来还是使用轮询的方式简单实用,只需要维护一个调用队列,每个请求带上「上一次请求的时间戳」和「重试次数」,如果在消费到一个还没有超过调用间隔的请求,不累加重试次数,直接放入队列末尾;如果一个请求失败则直接修改时间戳,累加重试次数放入队列末尾;如果超过了重试次数,直接抛出系统记为一个 bad case。

在确定轮询的方式以后,第二个重试问题也迎刃而解,仅由顶部调用方来控制请求状态,并且提供重试,这样单向的数据流在后期的问题排查和维护过程中是非常重要的。而第三个问题,横向扩展能力,由于使用了「以计算为单位的pull模型」,扩展新的计算可见是非常方便的,只需要添加独立的线程。并且扩展每个计算调用系统本身也是非常方便的,需要对「分发任务逻辑」和「调用逻辑」进行解耦,扩展时只扩展「调用逻辑」部分,不然的话还需要保证每个线程之间分发任务的队列的一致性,是很冗余的设计。那么整个轮询系统内部也被划分为了两个部分,第一个部分消费 Kafka,维护调用队列,第二个部分消费调用队列进行底层接口调用,并且会反馈调用结果给第一个部分,使其进行调用队列的状态维护。

IMG1

而上面也提到了单向数据流的好处,所以这里为了规避掉双向数据流,将请求完成后的队列维护工作也放在接口调用部分。所以就变成了「状态维护」部分只管往调用队列里放请求,「接口调用」部分负责调用接口并且使用 response 来维护调用队列里的请求状态,例如请求次数加一之后放入队列末尾。

IMG2

复用Storm

上面的结构一看非常像 Storm 的流式结构了,并且 Storm 能保证一条 Kafka message 在轮询系统中一定会被成功消费并且是顺序消费,还能帮我们管理 Kafka offset 状态,还不需要写多线程,扩容起来也非常方便。那么何乐为不为呢?

IMG3

将设计图一改,瞬间转变成一个 Storm 架构,其实上面单向数据流的设计也规避了 Storm 中不能由下游 Bolt 给上游 Bolt 传递消息的情况。而 Storm 本身也提供重试机制,在该重试机制下我们可以重新考虑之前数据结构的设计。

因为我们需要考虑的是两类重试情况,一种是通过重试能解决的网络问题,一种是通过重试不能解决的系统问题。而当我们在遇到超过重试次数没有解决的问题时,之前的解决方案是抛出系统持久化到一个地方,之后再想办法解决,但是这样又增添了该系统的复杂程度,需要通过自动化的方案能区分这个 case 到底是网络问题还是系统问题,之后再通过一套方案将它解决掉再写入系统。

试想最简单的处理方案是遇到网络问题通过重试自然的解决它,而遇到系统问题直接 block 整个系统,等待计算提供方解决问题后再继续。而 Storm 的机制刚好提供了这种方案的解决策略,如果遇到重试的请求都进行无限次的重试,因为短暂的问题肯定是会在有限次重试的过程中恢复,而系统问题是无限次重试都不能解决的,那么遇到很多的系统问题 case 不是会浪费IO,并且也没有 block 整个系统吗?

其实并不会,Storm 内部设计的时候一次会从 Kafka 中拿出多个 message 形成多个 tuple,只有在这些 tuple 全部 ack 掉以后才会继续向后面拿数据。所以如果在一批数据中出现了一些系统问题的 case,他们通过无限的 fail 重试是会 block 整个 topology 的,并且他们的个数不会很多,所以不会对下游造成 IO 的压力。对 Kafka 的 offset 进行检测接上警报以后,很快消费能力就跟不上生产能力,就能知道出现了这样的系统问题,在下游的服务修好以后,再接着进行消费,也不会丢失数据。

思考

系统的设计中每一部分一定要简单纯粹,不要想让一个部分做多件事情。

Read More
post @ 2018-10-29

洪荒纪元 「? - 2004」

我非常认可时代发展是存在「间断点」的,因为一些大事件的发生导致发展函数打破连续性,跃迁到一个更高的阶段。而在整部电竞史上,我觉得「星际争霸」和「Counter-Strike」的完成应该是从0到1跃迁的关键。

1997年秋天的美国,Blizzard 的程序员 Strain 坐在医院里忙着设计BlizzardEntertainment 公司Starcraft的战役编辑器,他在医院完成了5到6个小时的编程工作。她的妻子Annie从生产前的麻醉休息中醒来后,看到她的丈夫正在把手从键盘上拿开,她大叫道:“在我们女儿出生的时候你还在为这个该死的游戏工作吗?”。Strain的手指停在了键盘上。他扫了一眼显示屏幕反驳说:“Annie,这可不是’该死的游戏’。它是Starcraft(星际争霸)1。“

而在之后的1年,由于《雷神之锤》在业界的风靡程度,各个公司、游戏爱好者都开始了类似第一人称FPS游戏的开发与mod制作。而其中有一个划时代的作品脱颖而出,正是前微软核心员工G胖所创建的V社推出的FPS游戏《半条命》。在之后的两年时间里,Minh开发的《反恐精英》mod深受玩家喜爱,为此V社对其重新整合开发以后,向外界发布了《半条命:反恐精英》。这也是之后让全世界狂热的作品。

《星际争霸》和《反恐精英》在推出以后开始席卷全球,出现在每一台青少年的个人电脑中。而当时的韩国正受到亚洲金融危机的沉重打击,大批韩国银行倒闭、公司裁员,连一直被视为最后一根救命稻草的足球都在1998年法国世界杯上以糟糕的成绩出局,这时候韩国急需一个新兴产业来重振经济、一批勇于奋斗充满正能量的明星来恢复民族的自信力。而电竞行业与《星际争霸》也就随理成章的被选中,很快形成了赛事IP + 俱乐部IP + 选手IP + 电视转播推广的行业模式。一场场制作精美、奖金丰硕、跌宕起伏的赛事和一位位外表稚嫩、怀揣梦想、勇于拼搏的选手在超大流量电视转播的推广效应下成功发挥了其功效,成功打造了以《星际争霸》为核心的电竞行业与文化,并在2000年第一届WCG上夺得冠军,开启了制霸全球的王朝。(趣事:那一年呼声最高的欧洲星际王者Grrrr在比赛中遭遇“美人计”,在复赛前一天晚上与新认识的韩国女友约会,导致第二天比赛迟到,被判负出局)

而反观国内当时还仅仅是以地方网吧为主要据点,玩家仅凭热爱与杯水车薪的收入在苦苦坚持的蛮荒时代。在当时的舆论氛围中还对“电子竞技”没有正确的认识,职业选手的社会地位极低,精神境遇也非常惨淡,得不到家人的支持理解。而经济环境也很差,在一些职业选手的回忆里大致能拼凑出那个时代,网吧俱乐部一般很少提供固定工资,仅提供训练场地,而唯一的收入就是比赛奖金。但是由于当时行业发展并不成熟,缺少法律监管,经常出现奖金拖欠、拒给的事情,也让当时的职业选手更加的难以坚持。所以很多人还是选择有一份正式工作的同时坚持自己的电子竞技事业。

不过在如此艰难的环境下,在2001年的WCG比赛中,中国选手 MTY 和 DEEP 在星际2v2的比赛中收获了中国的第一个电竞冠军。而该阶段国内电竞的商业模式也非常的初期,媒体通过广告获利、大赛主办方通过赞助商获利。

至暗时代「2004-2008」

2004年4月,广电一纸禁令宣告电竞的至暗时代来临。“2004年4月12日,国家广电总局就电脑网络游戏类节目的问题发出《关于禁止播出电脑网络游戏类节目的通知》,《通知》指出,各级广播电视播出机构一律不得开设电脑网络游戏类栏目,不得播出电脑网络游戏节目。同时,在相应的节目中宣传电脑网络游戏可能给未成年人健康成长带来的负面影响,积极引导他们正确利用电脑网络的有益功能,正确对待电脑网络游戏。”

国内电竞圈想效仿韩国进行电竞生态、经济发展的梦彻底破碎,整个电竞圈开始陷入迷茫,因为先前在电视+电竞的布局中投入的大量资金都付诸东流,导致很多从业人员不得不逃离电竞开始寻找新的机会维系生活。由于失去了当时流量最大的电视台的支持,电竞游戏也逐渐淡出了人们的视野当中。并且在这几年的时间里《热血传奇》、《梦幻西游》等网络游戏大力发展、不断吸收新的玩家,也让传统的电子竞技游戏陷入了一个尴尬的境地,新网民在内容更加非常、学习成本更加低廉的网络RPG游戏与新手不友好、学习成本很高的电竞游戏当中更加容易选择前者,导致电竞游戏在这几年里玩家的供血也如流星坠落般极具直下。

但是也正是在这样的时代里出现的英雄会更加的耀眼。2005年sky横空出世,在War3 项目中获得 ESWC 中国区冠军,ESWC 世界赛殿军,WCG 中国区季军,WCG 世界赛冠军。在星际项目转 War3 项目一年以后,黄袍加身,斩获最高荣誉。这也是中国第一个个人项目的冠军,在当下多个门户网站开始关注这颗耀眼的新星,都给出了很大的版面介绍,尤其是那张身披五星红旗的照片也开始在一些青年玩家的心中埋下了种子。并且在之后的2006年的WCG比赛当中 sky 再此蝉联了冠军,第一个中国玩家IP从此打响,“中国电竞第一人”的slogan 也出现在了各种电竞杂志和主流门户网站当中。而两次全球总冠军也给 sky 带来了 40000 美元的收入,在当时国内电竞环境来看,完全算是一笔巨款了。

而 War3 强大的地图编辑器也开启了另外一个电竞时代的到来,Dota。这是一个更加复杂的对抗游戏,每边由五名玩家挑选五名英雄进行对抗,最终的获胜条件是摧毁对方的基地。由于其中有几十名英雄可供选择,几十种道具可供装备,让这个游戏的战术与玩法呈指数级的增加,也吸引了非常多的玩家进入到这个游戏当中。而2005年末,国内也开始组建一些dota战队,不过比起 War3,星际等已经较为成熟的电子竞技项目,Dota的起步也是布满荆棘的一条路,在现在一些退役玩家的描述中那段峥嵘岁月多数都是苦涩的回忆,扛着棉絮打比赛,睡网吧,吃不上饭等都是家常便饭。但是也正是这群人成为了日后世界对抗中国的 Dota 格局中的中坚力量。

在 2007 年,另外一名电竞新星也悄然诞生,正在浙江大学求学的2009选择翘掉几个月的课程和队友进行 WCG2007 前最后的集训。在同年的比赛当中,这只浙江大学校队 AVNC 居然淘汰了夺冠大热 HTML 战队。之后他也展现出了惊人的指挥能力和操作,不过他在职业大赛中的惊艳发挥才只是它的电竞行业之路的开始。

在这4年的时间里,国内的电竞大环境发展停步不前,而网络游戏呈现井喷式的发展。但是在这场寒冬里,却也涌现出了一些天才式的电竞人物,他们为日后电竞行业的发展起到了榜样作用,并且也将这些种子埋在了新一批玩家的心中。2008年,国家体育总局将电子竞技重新定义为第78号体育运动,随后电子竞技的地位逐步获得官方认可,寒冬总算过去了。

萌芽时期「2008-2013」

中国电子竞技的模式也在几年的摸索以后得到了答案,互联网+电竞的时代正式开启。由于电子竞技项目的专业门槛极高,玩家之间的水平参差不齐,普通玩家想学习更加新颖的战术、操作无门。而正好在国内井喷发展的互联网提供了一种解决方案,不能进行电视转播的游戏比赛被发到了互联网上,职业选手的教学也被放到了互联网当中。这样的流量运营模式也逐渐成型。并且在08年,某跳水冠军在互联网中否定电子竞技的价值被网民群起而攻之,最后以道歉的方式灰溜溜的收尾,可以看出电子竞技在舆论中也终于拨开云雾见晴天。

2011年注定是会被记录在史策上的一年,为了摆脱 War3 编辑器对 dota 创造性的限制,V社与冰蛙重新打造了 dota2 并且在这一年举办了第一届 TI 比赛,而这届比赛的奖金池是突破历史的 100 万美元,而同年 WCG 大赛多个项目的总奖金也不过 30 万美元,这个噱头足够刺激整个电竞圈,让职业选手们看到了通过刻苦的训练来改变生活的机会。并且同年一个之后都会和 dota 一起津津乐道的 moba 游戏 《英雄联盟》正式在大陆上线,由国内第一游戏运营商腾讯代理。由于比 dota 更加精简的操作方式和moba游戏固有的激烈华丽的战斗体验,很快风靡了全球。而也是这一年,王思聪回国,组建了 IG 战队 Dota 分部,重金挖走了其他战队的主力队员,也标志着资本开始进入国内电竞行业。传统的电竞格局就此打破,并重新洗牌。

而也是在期间,War 3 和星际等老牌电竞项目受到了这些新兴电竞项目的巨大冲击,新手不友好以及巨大的学习成本消磨掉了年轻新玩家的耐心,显得非常疲软,几乎再没有新鲜血液的注入。而 WCG 比赛也由于被类似 TI 类的专项比赛抢走了噱头与人气,也逐步的走入了低谷。

而之前提到的 Dota 职业选手 2009 在2010年也宣布退役,并开始经商,通过自己在职业生涯当中积攒的人气开始制作《从0单排》视频来交流游戏经验,并开设了自己的淘宝店。搭乘着 Dota 飞速发展的快车,迅速在玩家当中扩大自己的知名度,并且通过精心制作的游戏视频来获得玩家的喜爱,将自己的 IP 效应扩大之后通过淘宝店售卖零食、外设等物品将自己的 IP 价值变现。并且在获得大量收入以后又将部分的资金注入到游戏行业当中,开创了 Dota 业界传奇战队 LGD 的前身 FTD,并举办一些大型比赛。以这样的方式再为电竞行业的发展注入能量,形成了一个完美的生态闭环,不仅仅为很多职业选手退役之后的生活方式作出了一个示范,也算是通过电子竞技来进行变现的第一人。而同期的 sky 由于还有比赛压力则选择了另外一条路,通过写自己的自传来扩大 IP 效应,并开始进入WE俱乐部的管理层来更加深入的了解电竞行业上游的运营模式。

也正是这个时期,电子竞技最大的催化剂直播平台已经开始生根发芽。YY直播上已经开始出现成体系的直播社区,在当时最热门的dota直播社区 90016 已经招募了 10 多名主播,分时段的进行直播,让水友无论什么时候进入直播平台都能够收看到顶尖玩家的游戏操作。并且经常举办社区活动,让玩家在这里寻找到归属感,并将这份崇拜与羁绊变现成为送给主播的礼品。虽然还是直播的初期阶段,但是到2010年,当年YY直播的当年营收达到了3600万左右。

在2012年9月,TI2 160万美金的总奖金池,100万的头筹被中国战队 IG 抢走。并且前6名中有4支中国队,世界对抗中国的格局基本形成,而 IG 的冠军也离不开资本的注入,将多名各个位置上最强力的选手集结在一起是第一步,更加科学的管理团队、分析团队是第二步。这样一支队伍已经和蛮荒时代的队伍有了明显的差距,特别是 IG 在夺冠后的一年,拿下了去参加的所有比赛冠军。也为其他战队的管理、资本运作好好上了一课。

在上一个阶段中,玩家人数自然增长到了一定的数量级。政策的放宽,电竞项目的丰富、大赛IP的丰富,互联网的加速推广都成为了这个自然增长的助推器。资本也正是看到了这个助推器,开始将资金涌入行业当中来点燃这个助推器。电竞行业的格局初步形成,下游产业也露出了苗头,整个社会环境也更加的包容,形势一片大好。

黄金盛世「2013年-今」

随着IG战队的大获成功,众多资本纷纷进入到中国的电竞圈中,孕育了多个职业战队。并且不同的电竞项目都形成了自己的管理协会,制订了详细的职业转会规则以及联赛模式,顶尖职业选手的转会费用已经高达数百万人民币,提供了更加规范的监管模式,整个行业趋向于更加专业的竞技体育模式发h展。在这过程当中发展出了众多的分支产业,例如数据分析、星探、教练等行业,吸引更多的优秀人才投身到电竞行业中来,并且能够感受到一场比赛早已不是早期的几名选手之间的对抗了,而是背后的团队、资本之间的对抗,也正是因为多方能量的积攒才让比赛时的碰撞更加的精彩、吸引人。并且也逐步形成了上游各大赛事主办方、中游俱乐部、下游转播平台,电视台的格局。越来越好的环境也让职业选手有了基本的生活保障以及社会地位,有开明的父母能够接受自己的孩子选择电子竞技这条路,而不是一味的阻拦。不过近几年但凡输掉了比赛,都会听到是好的条件磨灭的选手的斗志的言论,我并不认同这种看法,不过这也是发展中不可避免的阵痛。

2014年,ACFUN生放送直播正式改名为「斗鱼tv」。随后火猫直播、熊猫tv等直播平台陆续出现。随着4G技术的普及,在移动端畅快的观看直播视频的时代已经到来。每个直播平台开始从职业选手当中招募人气选手作为平台的支柱,随着《英雄联盟》若风、PDD,《Dota2》FFY、LongDD等电竞明星的加盟,直播平台的人气日益剧增,每天晚上都有上千万人守在电脑前等待他们开始直播。原本只能在世界大赛上一睹他们风采的日子过去了,现在只要打开手机、电脑看到他们的精彩操作,并通过弹幕与他们进行交流。这样一个个电竞明星的形象变得鲜活,早已不在是高高在上的英雄,而更像在身边的朋友。不仅仅塑造了一个个百万观看的IP,也大大冲击了早期变现退役选手的录播模式,改变了电子竞技的下游生态。而直播平台的商业模式也丰富了传统互联网,传统的「虚拟道具」是会对购买方在产品中有一定的增强效果或者提供某些便利,而在直播中的「虚拟道具」变成了一种单方面的情感表达,这已经将粉丝经济融入到了互联网中,为游戏玩家提供了另外一种游戏盈利途径。也可以说部分玩家玩游戏的初衷已经不再单纯、职业选手花了更多的时间在直播上而不是训练上,但是这对于整个游戏行业来说肯定是利大于弊的,这更加有助于游戏氛围、圈层的形成,能和自己的同好在一个直播间中进行交流算是一种最简单但是最高效的兴趣社交模式。并且这样的圈层概念形成以后,不仅能给玩家提供归属感,更是一种比广告更加自然的推广途径。

正是在这样的大环境中塑造了一个又一个的大流量赛事IP、俱乐部IP以及选手IP。英雄联盟的S联赛正在如火如荼的展开,每一场战局都能刷爆微博的热搜榜, 最高在线观看人数达到了1.3亿。Ti 8刚在夏天以中国军队折戟收尾,超过2500万美元的奖金池、凌晨直播也能有几千万的观看人数,可见人气。而且中国上海历史性的获得了 TI 9 的主办权,也能看到中国在全球电子竞技大环境下的话语权。这样的比赛在一年中还有多次,每次都能吸引无数的赞助商冠名,并且可以预见在的5-10年内能实现类似足球顶级赛事一般的通过转播权分成进行获利。在国内已经形成了 IG、LGD、RNG 等多支顶尖的职业战队,他们背后有强大的资本加持、科学的管理方式,并且像明星经纪人一样开始运营手里的王牌选手,通过选手的粉丝转化为俱乐部的粉丝,从而实现俱乐部的流量变现。但是和传统的粉丝经济不同,粉丝的忠诚度会因为战队成绩、人员变更等因素有强烈的波动,而正是因为这种波动带来了粉丝督促战队,战队取得成绩再变现的良性竞争氛围。由于起步比较早的著名前职业选手大多都选择了直播,所以形成了多个顶级的流量明星,PDD手握530万微博粉丝,已经超过了很多二线明星,而若风更是拥有830万微博粉丝,已经逼近张继科等一线体育明星。而这些明星、俱乐部的粉丝数量也和负面舆论数量成正比的,在这个残酷的电子竞技行业当中,如果没有成绩,连呼吸都是错的。在比赛中的一个小失误,有上亿双眼睛看着,输掉比赛可能会引来无数的骂声,但是这在哪个竞技项目当中都没有办法避免,NBA不行、FIFA也不行。

近几年,又有《绝地求生》、《守望先锋》等新的电竞项目出现开始冲击市场,还有基于移动平台的《王者荣耀》、《刺激战场》等新鲜血液的出现,由于游戏形式的雷同可以说是端游的延伸。电子竞技市场是越来越大、越来越丰富。从分析报告数据来看移动电竞的占比从2015年的16.6%到2018年的52.9%,算是一场意味深远的变更,随着手机计算力的增强和网络技术的增强,不知道这场变更会不会以移动电竞代替端游电竞结束,但是一个行业的多样性发展也是我们更愿意看到的。让我们对未来拭目以待吧。

Read More
post @ 2018-10-23

还是不知道要怎么开头。

和二喜第一次见面是在周四的凌晨,捡到二喜的人家因为备孕不得不把它放在室外。入秋上海的夜对于小猫必定是难熬的,哪里舍得让小朋友冻得直哆嗦,就去接它回家。

它蜷缩在一个纸箱子里,把头埋得低低的,只想着暖和点,哪管外面的人来人往。笨拙的把它放在借来的包里,透着两侧的网第一次看到了它的正脸,小家伙睁着圆圆的大眼睛在审视外面的两脚怪,不停的用爪子勾住筛网。能看到它脸上或多或少流浪的痕迹,心里也暗暗的嘀咕,没事啦,跟着我有你胖的。

在同事家里抓了些它主子的营养膏就往二喜嘴里喂,小家伙应该是饿坏了,不等你挤它出来,自己就上口开始咬。咕咕嘟嘟几下就吃完一条,同事家里小猫隔着窗子馋的哇哇直叫,它才不管这么多,吃的满脸都是。

到家应该凌晨一点了,小朋友哪里有要睡觉的样子,这看看那看看,简单用鞋盒给它做了小厕所,铺上猫砂,它也不嫌弃,上去就开始舒服。小爪子熟练的把土埋得平平整整,肯定有一个爱干净的猫妈妈。

把从同事那众筹的用品布置了下,它也不闲着,一步不落的跟着我。从小就流浪的孩子哪里有一点安全感,不过放心啦,我不会丢下你了。啊?要上床,当然没问题了,来吧,睡觉了。

第一晚我是很惶恐的,不知道小猫咕噜是什么意思之前,我就在纳闷,是谁在这小家伙身上装了震动器吗?就没有停过,后来想起来也是,应该没有谁比它更渴望一个家了。

周五的早上给它喂饱了早餐,我就出门上班了。出门之前,只要我一离开,它就嗷嗷叫,这哪里是猫呀,比我家的狗还要粘人。后来假装离开,没有听到它的叫声,我就偷偷回去看了看,原来和窗帘打起来了,嘿,那今天你有得玩了。

下午的时候买的东西除了猫窝都到了,趁着晚饭的时候回家给它好好安置一下。装上了豪华的厕所,有了看上去还不错的猫粮。也不用再用泡沫碗将就了,还能自己和逗猫棒玩。这样的日子应该还不错吧。

因为小家伙有一点猫藓,晚上回家决定彻底的把家里打扫一下,也能让它早日康复。小朋友还是很顽皮的,老是和扫把过不去,那没办法只能先把它关在阳台上。但是哪里关的住那颗向往自由的心啊,一直对着我嗷嗷叫,我在做正事呢,可不能放你出来。

第二晚显然更有默契了,我躺好,手和枕头间露了一个缝,它就知道乖乖的窝在缝里,把头枕在手臂上。睡得比我可快多了,不一会小手就开始踩奶,肯定想妈妈了吧。

到了周末了,该去医院好好的检查一下身体了。第一次养猫的铲屎官哪里知道猫瘟到底是什么,嗨,这都8012年了,哪里有什么治不好的病呀。而且这不还没发作吗?肯定只是携带,这能吃能喝的,别蒙我了。行吧,那先回家观察吧。

回到家里,开始大吃大喝。吃饱喝足开始和逗猫棒大战三百回合,这能是病患?虽然很不情愿,还是瞟了一眼粑粑,这不挺正常的吗?

玩累了,我们都开始午睡。直到一声声剧烈的呕吐声把我吵醒,看着吐出来一颗颗没有消化的猫粮,我才意识到,这小家伙应该还没长牙吧,惨了,该给它泡一泡猫粮的。不能嚼还吃这么多,看来在外面真的饿坏了吧。

一次次的呕吐把整个夜拉的很长,它每一次起身去猫厕所,我都赶紧偷偷摸摸的跟上去看看,心里虽然每次都在祈祷不要吐了,赶紧好吧。但是现实哪有那么尽如人意。

碎片般的睡眠好歹也撑过了整个夜,二喜早没了前两天的好胃口和活力。每一次吐完之后都会在厕所里静坐一会才会出来,一个月的小孩子哪里经得住这样的折腾。就想着快点带它飞到医院去。

有了前一天的经验,它对冷冰冰的观察台很是畏惧,但是早已没有力气再和我们抗争,蜷缩在台上,耷拉着眼皮。能做的不多,也只有摸摸它,告诉它我还在旁边的,别害怕。

在住院的小笼子里,每一次起身我都能感受到有多费劲。但是我心里一直不愿意相信那个高概率事件,我更愿意或者说只敢相信奇迹会发生。在医院陪伴它的时候还在逗笑它的病友是一只大肥兔子,你们肯定交流不了,还是赶紧好了回家吧,就不用这么无聊了。

我快走的时候,它从小笼子的远角起身走到了我面前,窝了下来。我摸摸它的头,还是能感觉到它有向上蹭的趋势,但是早已没了力气。小声告诉它一定要好起来,它对着我喵呜喵呜了两声,我以为是我们订好的小鱼干协议呢。

周一早上洗漱好准备去医院看它的时候,接到了医生的电话。二喜已经离开了。

Read More
post @ 2018-10-16

Environment

  • Spark version: 2.2.1
  • EMR: Amazon EMR
    • Master: m4.xlarge [8vCore, 16GB] * 1
    • Task: r4.xlarge[4vCore, 30.5GB] * (0-20)
    • Core: r4.xlarge[4vCore, 30.5GB] * (1-15)
  • Mongo Collection:
    • A: 73.4G
    • B: 28.7G

Task 和 Core 都是 Auto Scaling, B表与A表是一对多的关系。

操作非常简单,从 Mongo 中分别读取 A , B 表。再将两表 join 后,选取字段,存入一个已经按照 C 字段 shard 的 Mongo 当中。并且 C 字段不是 _id

Trap

Full Scan

在 MongoSpark 中如果使用 schema,并且在 schema 中对一些参数设置了 nullable=false 会出现在 NodeManager 进行 sample partition统计的时候需要使用这个 filter 条件对全表进行 scan,所以如果有些字段没有索引的话,会发现 load 数据的时间特别长。(还好 Dreamsome 踩过这个坑,不然不知道猴年马月能发现。

1
2
3
4
5
6
7
8
val count = if (matchQuery.isEmpty) {
results.getNumber("count").longValue()
} else {
connector.withCollectionDo(readConfig, { coll: MongoCollection[BsonDocument] => coll.countDocuments(matchQuery) })
}
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue()
val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt
val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt

mongo-spark/MongoSamplePartitioner.scala at master · mongodb/mongo-spark · GitHub

源码中可以看到,如果不包含 matchQuery 是没有问题的,如果有的话会使用 matchQuery 进行 count。

NodeManger Restart

在任务执行中间,偶尔会出现 java.lang.RuntimeException: Executor is not registered 的报错。查看后主要原因是因为 NodeManager 在任务运行中挂掉重启以后,本来在它管理下的 Executor 没有办法重现注册导致的。但是看到 Spark 社区有人报这个bug,并且被标记为在 1.6.0 版本已经 fix 了。黑人问号脸。

SPARK-9439 ExternalShuffleService should be robust to NodeManager restarts in yarn - ASF JIRA

Mongo Spark Upsert

Mongo 中的 Upsert 操作不是原子操作,所以在两个线程同时 upsert 一个不存在的 _id 时,是可能出现报错的。

1
2
3
4
5
6
7
8
To prevent MongoDB from inserting the same document more than once, create a unique index on the name field. With a unique index, if multiple applications issue the same update with upsert: true, exactly one update() would successfully insert a new document.

The remaining operations would either:

1. update the newly inserted document, or

2. fail when they attempted to insert a duplicate.
If the operation fails because of a duplicate index key error, applications may retry the operation which will succeed as an update operation.

db.collection.update() — MongoDB Manual

以上是官方文档中的说明,可能出现同时插入时,后一个 upsert 报错的情况。而对于这种情况来说,使用 Mongo spark 是很难处理的,没办法 catch 住后一个异常。先来看看 Mongo spark 的 save 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
val updateOptions = new UpdateOptions().upsert(true)
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, updateOptions)
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), updateOptions)
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava)
})
})
})
}
}

在集合一些 ReplaceOneModel, UpdateOneModel, InsertOneModel 最后调用的是 collection.bulkWrite 方法。对使用者来说是没有办法 catch 其中一条的异常的,所以可能导致整个 task 失败重试。在设计上就应该尽量规避有两个 partition 同时 upsert 一个 _id 对象的情况。

Mongo Shard

当 mongo 的 shard key 不是 _id,而是其他 filed 的时候,会出现同一个 _id 的多个元素被写入到数据库中。因为 mongo 内部只能保证在一个 shard 中的 collection 强制的唯一性。并且在进行 shard 的时候,会自动对 shard 进行索引,但是不会创建唯一性索引。所以在可能会出现多个同一 _id 的情况下,需要注意。

mongodb - Duplicate documents on _id (in mongo) - Stack Overflow

而且经过实验发现,在 Mongo Spark save 的过程当中需要指定 WriteConfig 中的 shardKey ,而且必须包含 {_id: 1} ,不然会报错。比如 shardKeyuser,需要写成 {_id: 1, user: 1}。这是因为 user 并不是数据库的 unique index,而 _id 在这个 shard 中是 unique index 并且是 immutable 的, 所以如果我用 user 做 query 条件去更新 _id 就会出错。

Yarn Resource

在分 Executor Container 的时候是一台物理机器一台的分,所以可能出现内存碎片。比如一台16G内存的机器,4.5G一个 Executor Container,那么只能产生3个Executor Container,还剩下 2.5G 的内存不够启一个 Executor Container,所以产生碎片。并且这个碎片是不会在 Yarn UI 中表现出来, 所以会导致在 UI 中出现 total <> used + reserved 的情况。

并且每个 Executor Container 的内存使用不只是通过 spark.executor.memory 设置的大小,会有多余的内存来作为 Container 的运行使用。

Optimization

Spark Join Shuffle

Spark 在进行 join 操作的之前会对 join key 进行 repartition。而 Mongo Spark 在从 Mongo 中读取数据的时候会使用 _id 进行 partition,这样会多做一次较为耗时的工作。可以在 MongoSpark 读取数据的时候直接通过 join key 进行 partition

但是 MongoSpark 中没有支持一种 partition 策略,保证一些 join key 对应的 Document 全部分在一个 partition 当中,基本都是按照 partitionSize 结合 join key 来做切分,所以需要自己实现,并且如果数据分布不均匀的话可能导致数据倾斜而造成内存问题。所以需要对自己的数据集有一定认识以后再选取合适的方法。

下文中有详细的指出各种 partitioner 的策略:
mongo-spark/2-configuring.md at master · mongodb/mongo-spark · GitHub

并且如果是一个小的集合和另外一个大的集合进行 join 的时候,可以考虑 broadcast join 通过将小的集合广播到其他 Excutor 上的形式来避免 shuffle。

EMR Auto Scaling

EMR 的自动收缩会导致一些并没有完成所有 Task 的机器被回收,导致一些机器重启,而之上的所有 Excuter 执行的任务都需要重新运行。如果需要依赖 cache 的任务还需要重跑上游 Task,在跑大体量的任务的时候,不应该再把这个风险引入。

Resource assignment

Spark性能优化:资源调优篇 – 过往记忆
浅谈Spark应用程序的性能调优 | 青云QingCloud 社区

Read More
post @ 2018-08-19

Target

在不暂停服务的前提下将一个 290G 的 Mongo 数据表中的部分字段迁移到 Postgresql 数据库,保证足够低的差异性。

Strategy

一切的工作的基础是 MongoDB oplog 的幂等原则:

  • 首先启动一个服务将 MongoDB 中实时生产的 oplog 同步到 kafka 中。
  • 启动 Spark 任务将目标表中的字段批量同步到 Postgresql 中。
  • 启动 Storm 服务将 kafka 中的 oplog 回放到 Postgresql 中。
  • 启动 Spark 任务进行一致性的检查。

design-model

必须保证第一个过程在第二个过程之前启动,第三个任务在第二个任务完成之后启动。任务流程如图,oplog 被同步到 kafka 的一个 topic 中,这个 topic 被分成了8个 partition,启动了 3个 broker。然后将这个 kafka topic 作为 Storm 任务的 KafkaSpout,并发数为4。下游进行持久化任务的 Bolt 的并发数也为4。

同步 Oplog

这一步需要注意的是同一个 _id 的 oplog 的乱序问题,我们在回放 oplog 的时候必须按照发生的时间顺序进行回放,不然会出现丢失数据的情况。所以一定要保证 oplog 是按照生成的顺序放入 kafka 队列中的。在这种情况下肯定是单线程的服务来同步数据更加的合适,就不需要担心由于并发带来的乱序问题。

第二个可能导致乱序的点是 oplog 在 kafka 中的存储位置,我们为了保证同一个 _id 的对象 oplog 不乱序,那么必须保证它们被存储在同一个 partition 当中。如果存储在不同的 partition 当中的话可能会在 Storm 的不同 Spout Consumer thread 中被处理,那么就有可能会出现创建时间在后面的 oplog 先被回放到 Postgresql 当中。而如何能保证同一个 _id 的对象放入到同一个 kafka partition 当中?只需要将 _id 作为 kafka message 的 key,因为 kafka 的 partition 机制就是如果有 message key 就按照 message key 进行 hash 以后进行分区。参考源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}

如果消息的 key 为 null,则先根据 topic 名获取上次计算分区时使用的一个整数并加一。然后判断 topic 的可用分区数是否大于0,如果大于0则使用获取的 nextValue 的值和可用分区数进行取模操作。如果 topic 的可用分区数小于等于0,则用获取的 nextValue 的值和总分区数进行取模操作(随机选择了一个不可用分区)

如果消息的 key 不为 null,就是根据 hash 算法 murmur2 算出 key 的 hash 值,然后和分区数进行取模运算。

Storm 回放

乱序

现在我们已经保证同 _id 的消息会进入到同一个 Storm Spout thread 当中,现在还需要保证在发送到 Bolt Task 的时候也进入到同一个。这就需要考虑 Storm 的 grouping 策略了,其中只有 FieldGrouping 能满足要求,FieldGrouping 是通过 parentBolt 发出的 stream 当中声明的某一个或者几个 field 来做 grouping,比如 parentBolt 发出的 stream 中有:usernamepassword 两个 field,而在声明 FieldGrouping 的时候设置按照 username 这个 field 来做 grouping。那么 username 相同的stream 则一定会进入到同一个 BoltThread 当中。

但是这次的设计当中只有1个 Bolt,它的父级 Bolt 是一个 Spout。而没有找到合适的api来对一个 Spout 的 stream 声明 field,默认只有一个 bytes field。通过分析发现在 KafkaSpout 类中有一个 public 方法:

1
2
3
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(this._spoutConfig.scheme.getOutputFields();
}

是用来声明 Fields 的,通过参看文档,没有携带 streamName 参数的 declare 方法会声明一个 streamNamedefault 的 stream。

所以我们只要使用自己的 scheme ,并且重载它的 getOutputFields 参数即可完成对这个 Spout 的 stream 声明 field。并且发现 KafkaConfig 类中 stream 是一个 MultiScheme 类型的 public 参数,构造函数里使用 RawMultiScheme 类进行初始化它的值,而该类中的 getOutputFields 方法:

1
2
3
public Fields getOutputFields() {
return new Fields(new String[]{"bytes"});
}

所以默认的 Fieldsbytes。那么只要继承 backtype.storm.spout.Scheme 类重载 getOutputFieldsdeserialize 即可。由于传入 Spout 的要么是 bytes,要么是一个 String,所以如果不重载 deserialize 方法对其进行反序列化,那么设定的 field 也是没有实际意义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
try {
...
} catch (Exception ex) {
return null;
}
}

@Override
public Fields getOutputFields() {
return new Fields(...);
}
}

网络不稳定性

上面说了怎么保证 Storm 内部的不乱序问题,而仅凭以上的方法还不足以保证数据的完整性。因为在 Storm 内部传输信息是需要通过网络进行传输,所以数据是有丢失的风险的。而 Storm 本身为了应对这种风险也设立了一种容错机制。具体可以参考: Guaranteeing Message Processing

总得来说就是 Storm 内部通过 SpoutOutputCollector 给每一个从 Spout 发出的 tuple 给定一个 id,如果在设定的 timeout 时间内没有完成这个 tuple 应该完成的所有 task,那么会发出一个 fail 信号让 Storm 通过这个 id 来对这个 tuple 进行重试。而使用者仅需要设定 timeout 时间,并且在 collector.emit 的时候将 tuple 作为第一个参数。

TIPS

  • 无论是离线的批量导入还是在线的回放,insertupdate 操作需要使用 upsert 操作代替,不然会出现大量的由于 id 重复或者 id 不存在的报错。比如在表中有一个 id 为 a 的数据,而在第一步中记录了对 a 的 update 操作和 delete 操作,所以在第二步启动的时候这个 a 数据已经不存在了,而在回放的时候直接使用 update 操作回放会报错。

  • 由于是对表中的部分数据进行迁移,所以在 Bolt 当中需要对 oplog 进行过滤,只对包含目标 field 的 oplog 进行重放,否则会存在大量的垃圾数据。

  • MongoDB oplog 的 offset 最好进行缓存,做好容错工作。

Read More
⬆︎TOP