手机版 欢迎访问伴佳68(www.banjia68.com)网站
首先在最底层是一个个 Cloud,包括计算、存储等。在这一基础之上,是数据采集层,采集一些原始数据,比如用户行为日志数据、RDBMS 关系型数据库的增量日志数据,以及其他一些文件系统等。
然后基于源头数据层(ODS 层)之上是数据存储和加工层,主要分为两大块:一是偏离线的部分,主要使用 Hive、Spark 计算,使用 AWS S3 存储;二是偏实时的部分,主要使用 Flink 计算,使用 Kafka 存储。
再往上是一个数据共享层,我们把一些聚合数据、Join 数据和宽表数据写入数据共享的一些分析引擎中,比如 ClickHouse、StarRocks、TiDB、HBase 等等。这些都是作为数据共享层数据存储的底座,以及计算分析引擎的一个入口。
最上面是应用层,我们基于这一层做报表、即时查询等,还会对数据做封装,打造一些统一的数据产品。
小红书采用的是典型的 Lambda 架构。实时链路主要使用 Flink 和 Kafka;离线、Spark 和 Hive。Lambda 的特点就是两条链路互相独立建设,互不影响。
① 实时和离线数据不一致,造成数据不一致的原因主要有三点:计算引擎不一致,相同 SQL 定义也容易产生不同结果;作业不同,开发人员需要维护两套代码,技术门槛高;数据 TTL 不同,Join 分析天然误差。
② Kafka 缺乏数据检索能力,对用户来说 Kafka 更像一个黑盒。不管 Kafka 中数据存储的是一些类似 protobuf 的数据还是 json 格式的数据,在做检索的时候都非常困难。如果用户想要根据某个条件去检索数据,这个数据很难被查找。KSQL 产品更像是一个 streaming 的处理,更注重的是实时流处理能力,用来做离线大规模检索并不适合。
③ 流存储存数据有限,回溯效率低。这一点最大的原因是成本高,数据不能无限存。而且如果要去回溯读,从历史上去回追数据,它读的性能也不及批量读。
基于 Lambda 带来的痛点,我们萌生了去开发一个流批存储的产品的想法来解决 Lambda 的痛点。下面就来介绍一些设计细节。
我们的流批统一存储叫 Morphing Server,对用户提供的 API 还是跟 Kafka 完全兼容,都是使用流式的方式去写入和消费,这些接口都没有变,所以用户的使用方式不会有任何变化。
区别在于用户写入数据到 Kafka,Kafka 内部会有一个线程,异步将数据同步到数据湖中。我们的数据湖是采用的 Iceberg,当数据写入到 Kafka 中,内部线程会去抓取 Leader 数据,经过一些 Schema 数据解析转换为 Table Format 格式写入到 Iceberg 中,这个过程是异步的,对用户来说是无感的。
Kafka 的数据会被其他 Flink 作业消费,消费完之后可以写到下一个 Kafka 中,在下一个 Kafka 依然是以异步的形式将数据落地到数据湖中。数据湖中的数据就可以提供批读取和批存储的能力。对于 Iceberg 中的数据如何去读取的问题,我们会根据实际情况选取一些高性能的分析引擎,比如 StarRocks、小红书自研的 RedCK 等来读取离线. 产品能力
② 无感写入:对外提供的写入接口为原生 Kafka API,用户无需关注落数据湖过程,自动异步写湖。
③ Schema 解析:数据在落湖前会提前进行 Schema 解析,以结构化、半结构化的 Table 形式提供查询。
④ 高速分析:借助 StarRocks 引擎的强大湖上查询能力,能够提供向量化、CBO 等高速查询能力。
接下去,我们介绍一下技术选项是如何去考量?关于技术选项分为两个部分:自动落湖的过程如何选择;对于数据湖中的数据如何选取合适的引擎去更加高效读取
对于自动落湖过程我们考虑了两种形式,Builtin(内嵌)和 Extension(外挂插件),这两种形式其实都是可以的。
在 Builtin 的形式下,我们看到只有一个独立的进程,在里面处理落日志之外,还会有一个异步的线程叫 Iceberg Syncer 去不断拉取日志中的数据,然后写入湖中,这种方式有优势也有劣势。
① 企业内生成集群版本难以升级,在企业中有一些集群并没有流批一体的功能,在升级中会非常困难。
② 进程隔离性弱,如果在异步线程中产生 bug,可能影响 Kafka 正常的读写功能。
针对 Builtin 形式的一些劣势,我们当初考虑了另外一种选项 - Extension,这个方式相对更加直观。
① 接入灵活,集群不需要升级,我们把 Kafka 落湖进程摘取到 Kafka 进程之外,是一个单独的进程,这是最大的一个好处。
目前我们落地的是 Builtin 的方式,所以后面介绍的一些细节方案都是基于 Builtin 方式的。
接下来介绍查询分析引擎的选型。我们希望找到一款 OLAP 产品,具备以下特点:
③ 大规模,离线分析数量大,数据种类多的情况下,在大规模数据量下性能不退化。
左边的分析引擎对分布式支持更好,对 SQL 协议兼容性高,提供更加一站式的查询平台。右边的分析引擎对单表性能更加优秀,在超大规模下的数据承载能力更强,特别是我们在 RedCK 上做了一些深度的定制化自研去满足更多应用场景。
StarRocks 支持湖上分析能力。它本身支持读数据湖,不需要将数据以任何形式同步到 StarRocks 上,更像一种外表的形式,可以通过 Iceberg 的 Catalog 去查询数据,还会做一些 Cache 缓存来加速查询。
我们对 StarRocks 和 Presto 在流批一体上做了查询性能的对比,主要分为两大类,四小类的 SQL 进行比对。
左边主要是 Scan 全表扫描相关,在这一方面 Presto 的性能更加优越,但是两者差距不大。右边主要是 GroupBy 相关的聚合场景,具有 MPP 架构的 StarRocks 在性能上明显更加优于 Presto。这也是我们选择 StarRocks 的原因。因为在这个应用场景下 Join 使用较少,所以这里没有进行对比。
还有一类分析引擎就是之前提到的 ClickHouse 和 RedCK,如何去更好的分析湖上的数据,这里介绍一下我们自研的 RedCK。
RedCK 通过 MergeTree 的格式跟其他查询引擎打通,比如 Spark、Flink 等计算可以直接读写 MergeTree 上的数据,然后通过 RedCK 在 MergeTree 上做 OLAP 分析。这样的好处是使用 Spark 在写数据的时候可以有一个更好的性能,做到了读和写两种引擎的解耦。
基于这个考虑,我们在 Kafka 流批一体的引擎在落湖的过程中,原本只支持传统的 Parquet 现在也支持写 MergeTree 格式,同时也去提交一些和 RedCK 相兼容的元数据信息。这样 RedCK 可以根据元数据信息直接找到 MergeTree 去做一些分析。
Broker 模块主要负责的是数据湖写入,利用 Kafka 本身的 Fetch 机制,将 Leader 上的最新数据进行解析并且不断写入,按照 Partition 维度来做单独的线)Broker 设计细节
Exactly-once 语义主要依托于两阶段提交来实现数据不丢不重,具体如下:
⑤ 最后一步,等第一阶段完成之后,发起第二阶段提交,发出一个 Commit 提交告诉 Iceberg 可以落盘。
实际生产中,常会出现一些故障。接下来介绍各种故障情况下,如何保证数据的不丢不重。
② Controller 故障:在第一阶段提交的时候失败,会被自动切换到别的机器上面去再起一个 Commiter 线程,会发现第一阶段还没完成,那么会重新向所有 Broker 发起一轮新的 RPC 请求,重新做一次 Checkpoint,这一次其它 Broker 在接受到 RPC 请求之后会发现不需要做 flush 操作,就会立刻返回 ACK。在收到所有 ACK 之后,会重新做一次第一阶段提交;第一阶段提交之后成功了,但是在第二阶段提交的时候失败了,那么 Controller 切换到另外的一个机器首先会去 Checkpoint Storage 中查询,如果第一阶段提交信息已经存在就会直接发起第二阶段提交工作。
③ Object Store 故障/HMS 故障:我们会做一个无限重试,并且将一些告警信息发送出来。
流批统一存储在公司内部落地之后,可以解决一些 Lambda 架构带来的问题,下面将从四个方面来介绍。1. Kafka 数据检索
在流批一体之前,开发同学去检索 Kafka 数据比较复杂,如左图显示:第一步需要去申请一个 topic,按照需要写数仓作业;第二步找 DBA 申请一个 OLAP 表;第三步再去写 Flink JOB 去解析 topic 数据写到刚刚申请的 OLAP 表中,这个表纯粹是用来查询和排障,整个链路比较长。在使用流批一体之后,开发同学申请一个 Topic,然后往 Topic 中写作业,这个时候开发同学可以直接查询流批统一存储。
流批统一的存储,可作为数仓 ODS 层,建设下游链路。因为流批统一存储是 Excatly-once 语义,所以可以做到实时和离线存储完全匹配,可以避免双链路带来的数据不一致问题。
结合 Flink 提供的流批统一的计算能力,同时从批存储和流存储回刷数据,极大提升回刷性能。与 Kafka 相比,批存储提供更长的数据生命周期,数据 SLA 更有保障。
利用 StarRocks 良好的湖上分析能力,充分发挥向量化引擎和 CBO 优势,在统一的计算引擎上实现多业务多维分析。例如用户行为分析、用户画像、自助报表、跨域分析等多种分析场景,都可以在一站式平台上去完成。
banjia68.com 版权所有 | 备案号:鲁ICP备2021038504号-2