导读: 随着业务量快速增长,云积互动对数据的实时性及灵活性提出更高要求,早期基于 CDH 的大数据平台已无法满足当前难度以及复杂度较高的的业务需求,因此云积互动于 2021 引进 Apache Doris 在部分业务中使用,并在使用过程中逐渐发掘出 Apache Doris 更多强大之处以及优势,最终决定在 2022 年全面应用 Apache Doris ,基于 Apache Doris 来构建云积互动企业级实时离线统一数仓。
作者|云积互动大数据团队 王杰&蒙磊
云积互动,全称深圳市云积分科技有限公司,成立于 2014 年,是国内领先的 AI 驱动的消费者运营服务提供商,致力于发展消费者运营相关理论、技术、算法、模型及软件工具,为全球消费性企业提供基于 AI 的消费者运营系统及运营策略服务,打造消费者运营领域最佳服务和实践标准,帮助企业构建消费者运营核心能力,以应对当前及未来的场景化运营挑战。目前已成为天猫、京东、抖音、腾讯等主流电商和社交平台深度合作伙伴,服务客户 2300+,其中世界 500 强客户超过 18 家,包括全球第一的美妆、日化、医药集团,均深度服务超过 7 年。
云积互动的主要业务是以消费者运营为核心,包含了会员通,CRM,策略营销,数据资产等一系列业务,如下图所示。
早期云积互动的大数据需求较少,起步也比较晚,2019 年才开始搭建基于 CDH 的大数据平台,因此大数据平台的主要目的是为了满足早期较为单一的 BI 数据看板及报表功能。近年来,随着业务量快速增长,数据量的增长,业务对数据的实时性及灵活性提出更高的要求,大数据平台也从早期的只需要满足单一的 BI 服务需求,扩展到需要支持各业务线,包含圈人服务,人群分析,AI智能数据等多种业务需求。早期基于 CDH 的大数据平台已无法满足当前难度以及复杂度较高的的业务需求。
早期公司业务量较少,基于 Hive+Spark 构建的离线数仓即可满足早期大数据的需求。早期架构主要用于支持 BI 相关功能,数据大屏,自助报表等应用,大部分的指标仅要求 T+1 的指标。
下图为云积互动早期数仓架构,早期的数据源主要为业务数据库 MySQL 以及日志,数据通过 Streamsets 实时采集数据并经 ETL 后传入ODS层,存储到 Kudu 中,通过 Impala 对 ODS 层的数据进行处理,实现实时查询业务的需求。通过 Hive 构建了离线数仓的 DWD、DWS 以及 DIM 层,使用 Spark 进行离线任务的计算与调度,最终处理并计算完成的数据输出到 MySQL 和 Kylin 中,应用于上层业务应用及分析。
存在的问题
基于业务对数据实时性及灵活性更高的要求,我们在 2021 年初对当前市面上较为流行的分析引擎 ClickHouse 和 Apache Doris 进行了调研,调研中我们发现, Apache Doris 具有高性能、简单易用、实现成本低等诸多优势。基于此,我们决定在部分业务上开始使用 Apache Doris,在使用的过程中逐渐发掘出 Apache Doris 更多强大之处以及优势,Apache Doris 在很多方面十分贴合我们的诉求,因此,我们决定在 2022 年全面应用 Apache Doris 在数据仓库中,基于 Apache Doris 构建云积互动企业级数仓,选择 Apache Doris 的主要原因如下:
1. 该架构开发效率高,查询性能远高于 Hive。
2. 该架构对 OLAP 支持更好,支持更为灵活的查询。
3. 支持主键唯一和聚合模型的表,极大的减少了开发难度。
最开始使用新的数仓架构主要是要解决圈人速度慢的问题,圈人服务的核心在于人群圈选,通过 SQL 代码或标签取值组合等多种方式,实现人群查找,帮客户找到符合画像的人群,现在各行各业都会设计广告营销场景,其中也包括云积互动,而如何快速准确找到对的人推送广告就成了大数据场景需要解决的问题。当时我们只是在部分功能上使用了 Apache Doris,用 Apache Doris 替代了 Spark+Impala 来实现实时圈人功能,出乎意料的是,Apache Doris 投入使用之后效果极佳,新版架构的实时圈人业务平均每个任务耗时由 3~5 分钟降低到 10 秒左右,并且在人群落地方面,使用存储更小的 Bitmap 代替原来的人群落地为表,不仅数据管理方便,而且磁盘空间占用减少了 80% 左右。
除此之外,在一段时间的使用和学习中我们发现 Apache Doris 丰富的功能和核心优势,综上原因,我们产生了用 Apache Doris 数仓替代 Hive 数仓的想法,并迅速的付诸于实践。
当确定数仓使用 Apache Doris 之后,结合当前的业务需求以及早期架构需要解决的问题,需要将多平台数据打通,构建统一数据口径和数据指标,我们将数据仓库构建分为:ODS 层,DWD 层,DWS 层和 ADS 层,下图为各个分层主要负责的数据类型。
新数仓的分层逻辑如下:
新架构设计
数据接入:
业务数据 MySQL 存储在多台 RDS 中,因 Binlog 的留存时间较短,且数据存放于多服务器,同时还进行了分库存储,因此如何接入历史数据以及如何同时接入多个库的数据成了棘手的问题。在调研过程中我们发现 FlinkCDC 可以完美解决上述问题:FlinkCDC 可以在接入历史数据之后自动切换为读取 Binlog,且 2.x 版本已经支持断点续传,支持水平扩展,支持动态添加表。
日志和埋点数据我们采用 Kafka + Doris Routine Load 导入方式,Routine Load 支持支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中,支持 Json 解析,并且可以做一些简单的 ETL,极大的减少了代码开发的工作。
数据加工:
数据加工采用了 Doris SQL、Insert into 的方式将增量计算完的结果导入到数仓分层中(ODS/DWD/DWS/ADS)。因业务需求对数据的实时性允许存在一点延迟性,因此将 Dolphinscheduler 设置为每 5 分钟调度一次增量 SQL;同时设置数仓每一层错峰执行任务,避免任务堵塞。
对于数据量比较小的表可以用一个 SQL 完成导入,而对于数据量大的表,为避免 union,需要分成多个 insert into 来执行。但有些大表的逻辑是多个大表的 Join 结果,对于这种场景,我们应用 AGGREGATE KEY 模型的表来解决,利用表的聚合特性来代替 SQL 的 Join 操作。
任务调度:
任务调度一直沿用 Dolphinscheduler,页面化的操作简单方便,且对 SQL 的支持友好,整个大数据平台的任务都是通过该调度器完成。目前使用了2.x以上的版本,支持使用钉钉报警和邮件报警的功能来监控任务,任务失败将通过钉钉或者邮件发送。
监控:
使用 Prometheus+Grafana 对 Doris 集群和 Flink 任务进行监控管理,页面化的监控,极大的减少运维成本。
3. Bitmap 存储散列用户 ID,使用 Bitmap 相关函数计算时性能较差。
目前新的数据仓库已经建设完成,基于 Apache Doris 较多优异特性以及与业务需求较高吻合性,当前团队已经在着手搭建基于 Apache Doris 的数据质量管理和数据血缘,后续我们计划基于 Apache Doris 搭建数据指标体系。下面简单分享一下我们在数据质量管理和数据管理的实现想法。
1. 数据质量管理: 在 Apache Doris 的 ODS 层建表时设定一些非空主键,这些字段都是业务逻辑上的必须字段,当数据接入会给定一些默认值,这样就可以清晰的分类出这些数据,在质量分析中进行输出;在 ETL 中也会存在一些逻辑错误数据,这类数据会通过定时的 Doris SQL 脚本进行输出,同时也可以反馈到业务侧进行数据修复。2. 数据血缘: 依托 Doris 提供的 SQL 审计功能,使用采集工具 Filebeat/Logstash 持续采集审计日志发送到Kafka,使用开源的 SQL 解析工具或者抽取 Doris 的 SQL 解析模块针对 DDL 或者 DML 进行解析,解析后的数据存入图数据库或者关系型数据库供业务端展示;该功能的实现对于数据问题排查、数据资产管理均有意义。
围绕着 Apache Doris 为核心的数据平台建设目前也在一直迭代发展,当然在使用中也发现了该产品的一些需要优化的地方,但不可否认它优秀的性能和丰富的功能,后续我们也将持续不断地进行优化,将优化方案贡献给 Apache Doris 社区。
作者介绍:
王杰:云积互动大数据团队leader,负责数据平台研发及数据治理
蒙磊:云积互动大数据高级开发,负责数据平台研发和数仓开发