最近需要实现的一个场景是对于社区里的每条动态进行多次的检测和评估,暂且不论检测和评估的具体功能,统称为「计算」。

而这些计算由其他同事提供现成的接口,但是由于其具体功能的差异这些计算有些依赖了外部接口,并且可能依赖外部的长延时接口,例如视频分析。所以这里从计算的实现上将其划分为「瞬时计算」和「长时计算」,而对于上层使用方来说是需要构造出一种使用协议来同时兼容两种类型。而这样的三层结构中是有两次网络传输过程,更需要一种足够健壮的重试策略。并且在动态流量基数较大,峰值与谷值差异较大的情况下,横向扩容能力也非常的重要。

模型

第一次讨论的方案是 「push模型」,多方使用我暴露的接口,在完成数据消费计算之后推送到我的服务中进行下游逻辑。这种方案有几个弊端,第一是多方需要实现消费 Kafka 的逻辑并且需要管理 Kafka offset,保证在事故不会导致数据丢失。第二是网络传输导致的重试逻辑复杂,需要接入第二个重试队列,或者提供重试接口,这样数据流的流向就从单向变为双向,增加了之后的维护和排查成本。但是这种方案还是有一个好处,因为计算方是接触数据的第一层,所以在计算方内部出现错误引起的重试或者报错机制实现其他非常简单。

在否定以后转换为 「pull模型」,多方暴露接口让我进行统一的调用,仅由我方消费 Kafka 并统一维护状态和提供重试策略。但是在实现细节上还是有两种方案可供选择,一种是以「动态为单位」,一种是以「计算为单位」。动态为单位是说,主线程消费 Kafka,然后交由下游异步或多线程方式调用多个接口,全部成功后算作一个动态计算完成。计算为单位是说,多个线程各自使用独立的 group id 消费 Kafka 并维护重试队列,然后在线程内进行接口调用。两种方式的区别是一个计算失败之后的 block 代价不同,第一种方案会 block 整个动态,第二种方案只会 block 动态的一个计算。并且由于计算之间的效率差,第一种的效率取决于最慢的一个计算,第二种在动态为单位的角度来看,也是以最慢的一个计算决定,但是由于计算之间不会互相影响,所以之后想对「慢计算」进行降级的话能很方便的完成。

三个问题

在决定模型以后,需要考虑第一个问题,「构造出一个通用的协议兼容同步计算和异步计算的调用」。一种是同步轮询,一种是异步调用。由于底层依赖的外部长时计算有请求的次数限制,所以同步轮询需要记录请求时间来控制轮询的时间间隔,但是没有性能问题的风险。而对于异步调用,如果下游系统被某些原因 block 住的时候会无限的建立连接,在超过线程池的上限以后会 block 住调用方。所以比较下来还是使用轮询的方式简单实用,只需要维护一个调用队列,每个请求带上「上一次请求的时间戳」和「重试次数」,如果在消费到一个还没有超过调用间隔的请求,不累加重试次数,直接放入队列末尾;如果一个请求失败则直接修改时间戳,累加重试次数放入队列末尾;如果超过了重试次数,直接抛出系统记为一个 bad case。

在确定轮询的方式以后,第二个重试问题也迎刃而解,仅由顶部调用方来控制请求状态,并且提供重试,这样单向的数据流在后期的问题排查和维护过程中是非常重要的。而第三个问题,横向扩展能力,由于使用了「以计算为单位的pull模型」,扩展新的计算可见是非常方便的,只需要添加独立的线程。并且扩展每个计算调用系统本身也是非常方便的,需要对「分发任务逻辑」和「调用逻辑」进行解耦,扩展时只扩展「调用逻辑」部分,不然的话还需要保证每个线程之间分发任务的队列的一致性,是很冗余的设计。那么整个轮询系统内部也被划分为了两个部分,第一个部分消费 Kafka,维护调用队列,第二个部分消费调用队列进行底层接口调用,并且会反馈调用结果给第一个部分,使其进行调用队列的状态维护。

a3-1

而上面也提到了单向数据流的好处,所以这里为了规避掉双向数据流,将请求完成后的队列维护工作也放在接口调用部分。所以就变成了「状态维护」部分只管往调用队列里放请求,「接口调用」部分负责调用接口并且使用 response 来维护调用队列里的请求状态,例如请求次数加一之后放入队列末尾。

a3-2

复用Storm

上面的结构一看非常像 Storm 的流式结构了,并且 Storm 能保证一条 Kafka message 在轮询系统中一定会被成功消费并且是顺序消费,还能帮我们管理 Kafka offset 状态,还不需要写多线程,扩容起来也非常方便。那么何乐为不为呢?

a3-3

将设计图一改,瞬间转变成一个 Storm 架构,其实上面单向数据流的设计也规避了 Storm 中不能由下游 Bolt 给上游 Bolt 传递消息的情况。而 Storm 本身也提供重试机制,在该重试机制下我们可以重新考虑之前数据结构的设计。

因为我们需要考虑的是两类重试情况,一种是通过重试能解决的网络问题,一种是通过重试不能解决的系统问题。而当我们在遇到超过重试次数没有解决的问题时,之前的解决方案是抛出系统持久化到一个地方,之后再想办法解决,但是这样又增添了该系统的复杂程度,需要通过自动化的方案能区分这个 case 到底是网络问题还是系统问题,之后再通过一套方案将它解决掉再写入系统。

试想最简单的处理方案是遇到网络问题通过重试自然的解决它,而遇到系统问题直接 block 整个系统,等待计算提供方解决问题后再继续。而 Storm 的机制刚好提供了这种方案的解决策略,如果遇到重试的请求都进行无限次的重试,因为短暂的问题肯定是会在有限次重试的过程中恢复,而系统问题是无限次重试都不能解决的,那么遇到很多的系统问题 case 不是会浪费IO,并且也没有 block 整个系统吗?

其实并不会,Storm 内部设计的时候一次会从 Kafka 中拿出多个 message 形成多个 tuple,只有在这些 tuple 全部 ack 掉以后才会继续向后面拿数据。所以如果在一批数据中出现了一些系统问题的 case,他们通过无限的 fail 重试是会 block 整个 topology 的,并且他们的个数不会很多,所以不会对下游造成 IO 的压力。对 Kafka 的 offset 进行检测接上警报以后,很快消费能力就跟不上生产能力,就能知道出现了这样的系统问题,在下游的服务修好以后,再接着进行消费,也不会丢失数据。

思考

系统的设计中每一部分一定要简单纯粹,不要想让一个部分做多件事情。

Comments

⬆︎TOP