一种多数据源索引同步设计
背景
在我接触的很多应用开发中,搜索引擎(ElasticSearch)是充当业务宽表使用,也就是聚合多张业务表,免去线上查询的join操作。
初始方案通过canal来同步多表数据源,因为canal同步binlog消息是单线程运行的,所以不存在顺序问题,另外多表之间也没有重叠的业务字段,所以多表更新也不要求顺序性。
在基础服务平台化之后,如果多个业务(索引同步算是业务方之一)各自根据业务需求使用canal订阅binlog变更消息,会对db造成一定的压力,而且其实canal订阅这部分工作是重复的,也就是可以从业务中拆分出数据同步的生命周期(数据变更->binlog->获取变更),作为数据总线的一部分,数据交互通过消息队列解耦。
这样拆分之后,因为mq这个环节的加入,使得消息到达同步程序时可能发生乱序现象(原本状态是“支付”->“发货”,实际可能是“发货”->“支付”),导致了一条脏数据;如果要保证消息顺序,可以使用mq的局部有序特性(服务端分为若干partition,每个partition单线程生产、单线程消费,指定字段的消息只路由到某一个partition),但这样会严重影响消息吞吐量。
所以初始方案加入mq之后就只有两个选择,要么容忍一定程度的脏数据,通过数据补偿实现最终一致性,要么降低吞吐量使用局部顺序消息数据。
目标
消息乱序问题是使用mq过程中经常遇到的,多线程生产/多线程消费/ack处理不当都可能引起消息乱序,另外mq的特性就是最多一次或者最少一次,为了保证数据不丢失,一般我们的选择都是最少一次(保证每条消息至少会被consumer消费一次)。
这里我们的目标就是在保持现有同步程序吞吐量的情况下解决消息乱序问题,简单归纳就是:
- 使用mq同步消息
- 不依赖局部有序特性
- 保证数据一致性
方案
上面说到通过mq来同步数据,其实binlog消息是可以有顺序标记的(filename+offset),如果将其转化为一个long类型数值就可以作为版本号使用,既然有版本号,那么只要保证新数据的版本号高于旧数据即可(按上面的转换方式天然保证),然而Es并不支持列级别版本号,只能设置行级别,多数据源有多个版本号,所以使用原生的Es版本号并不可行。
作为NoSql届的一哥,HBase是可以支持列版本号的,row+column+version才可以唯一确定某个cell中的值,默认保留3个version的数值,get操作如果不指定version则返回cell中version最大的数值。
写到这里熟悉的同学应该已经猜到我们的方案了:
- 在HBase中建立HTable,作为镜像索引
- 通过mq消费多个数据源的binlog消息,消息中包含binlog序号转换而来的version
- 将带version消息写入HBase,即使旧数据后于新数据到达,只要保证version按更新顺序递增就可以保证get到最新数据
- 通过get操作从HBase中获取单条记录(不用指定version),建议是保持rowkey和索引唯一键一致,方便两侧的读写
- 判断记录是否完整(是否所有非空字段都有值,涉及业务操作,此步骤可选)
- 通过index操作回写到索引(因为是完整数据,所以直接覆盖更方便,还可以省去update的取/合并操作)
- 正常返回结束,否则重试即可
具体需要实现一个消费多个数据源消息的consumer,需要同时访问HBase和Es:
补充说明几点:
- HTable字段配置和索引配置一样,也可以根据需要加上其它的字段,只要保证索引字段是HTable字段的子集就可以了
- cf可以配置为一个表一个cf或者全部合并为一个cf
- 因为HBase中有全量数据,如果索引需要重建,在维持增量同步程序运行状态下,只需要单独启动程序遍历HTable写入Es。这样就可以做到不暂停线上任务的情况下,在任意时间点全量重刷索引数据了
- 从HBase取数据写索引是个幂等操作,重试简单可行
结语
这个方案论证了挺久,理论上是实际可行的,然而因为各种原因没有真正实现,所以可能有些坑并没有踩到,比如性能/cf设计等。
抛砖引玉,希望对大家有所帮助。
End.
转载请注明来自36大数据(36dsj.com): 36大数据 » 一种多数据源索引同步设计