如何打造一款极速数据湖分析引擎
2022-03-04 10:25:00 来源:网络 阅读量:8186
前言
随着数字产业化和产业数字化成为经济驱动的重要动力,企业的数据分析场景越来越丰富,对数据分析架构的要求也越来越高。新的数据分析场景催生了新的需求,主要包括三个方面:
用户希望用更加低廉的成本,更加实时的方式导入并存储任何数量的关系数据数据和非关系数据(例如,来自移动应用程序、IoT 设备和社交媒体的运营数据库和数据)用户希望自己的数据资产受到严密的保护用户希望数据分析的速度变得更快、更灵活、更实时
之后我们会继续发表两篇文章,来更详细地介绍极速数据湖分析引擎的内核和使用案例:
代码走读篇:通过走读 StarRocks 这个开源分析型数据库内核的关键数据结构和算法,帮助读者进一步理解极速数据湖分析引擎的原理和具体实现。Case Study 篇:介绍大型企业如何使用 StarRocks 在数据湖上实时且灵活的洞察数据的价值,从而帮助业务进行更好的决策,帮助读者进一步理解理论是如何在实际场景落地的。什么是数据湖
什么是数据湖,根据 Wikipedia 的定义,ldquo;A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or filesrdquo;。通俗来说可以将数据湖理解为在廉价的对象存储或分布式文件系统之上包了一层,使这些存储系统中离散的 object 或者 file 结合在一起对外展现出一个统一的语义,例如关系型数据库常见的ldquo;表rdquo;语义等。
了解完数据湖的定义之后,我们自然而然地想知道数据湖能为我们提供什么独特的能力,我们为什么要使用数据湖?
在数据湖这个概念出来之前,已经有很多企业或组织大量使用 HDFS 或者 S3 来存放业务日常运作中产生的各式各样的数据。因为这些数据的价值不一定能够在短时间内被发现,所以找一个廉价的存储系统将它们暂存,期待在将来的一天这些数据能派上用场的时候再从中将有价值的信息提取出来。然而 HDFS 和 S3 对外提供的语义毕竟比较单一(HDFS 对外提供文件的语义,S3对外提供对象的语义),随着时间的推移工程师们可能都无法回答他们到底在这里面存储了些什么数据。为了防止后续使用数据的时候必须将数据一一解析才能理解数据的含义,聪明的工程师想到将定义一致的数据组织在一起,然后再用额外的数据来描述这些数据,这些额外的数据被称之为ldquo;元rdquo;数据,因为他们是描述数据的数据。这样后续通过解析元数据就能够回答这些数据的具体含义。这就是数据湖最原始的作用。
随着用户对于数据质量的要求越来越高,数据湖开始丰富其他能力。例如为用户提供类似数据库的 ACID 语义,帮助用户在持续写入数据的过程中能够拿到 point-in-time 的视图,防止读取数据过程中出现各种错误。或者是提供用户更高性能的数据导入能力等,发展到现在,数据湖已经从单纯的元数据管理变成现在拥有更加丰富,更加类似数据库的语义了。
用一句不太准确的话描述数据湖,就是一个存储成本更廉价的ldquo;AP 数据库rdquo;。但是数据湖仅仅提供数据存储和组织的能力,一个完整的数据库不仅要有数据存储的能力,还需要有数据分析能力。因此怎么为数据湖打造一款高效的分析引擎,为用户提供洞察数据的能力,将是本文所要重点阐述的部分。下面通过如下几个章节一起逐步拆解一款现代的 OLAP 分析引擎的内部构造和实现:
怎么在数据湖上进行极速分析现代数据湖分析引擎的架构怎么在数据湖上进行极速分析?
从这一节开始,让我们开始回到数据库课程,一个用于数据湖的分析引擎和一个用于数据库的分析引擎在架构上别无二致,通常我们认为都会分为下面几个部分:
Parser:将用户输入的查询语句解析成一棵抽象语法树Analyzer:分析查询语句的语法和语义是否正确,符合定义Optimizer:为查询生成性能更高、代价更低的物理查询计划Execution Engine:执行物理查询计划,收集并返回查询结果
对于一个数据湖分析引擎而言,Optimizer 和 Execution Engine 是影响其性能两个核心模块,下面我们将从三个维度入手,逐一拆解这两个模块的核心技术原理,并通过不同技术方案的对比,帮助读者理解一个现代的数据湖分析引擎的始末。
RBO vs CBO
基本上来讲,优化器的工作就是对给定的一个查询,生成查询代价最低的执行计划。不同的执行计划性能会有成千上万倍的差距,查询越复杂,数据量越大,查询优化越重要。
Rule Based Optimization 是传统分析引擎常用的优化策略。RBO 的本质是核心是基于关系代数的等价变换,通过一套预先制定好的规则来变换查询,从而获得代价更低的执行计划。常见的 RBO 规则谓词下推、Limit 下推、常量折叠等。在 RBO 中,有着一套严格的使用规则,只要你按照规则去写查询语句,无论数据表中的内容怎样,生成的执行计划都是固定的。但是在实际的业务环境中,数据的量级会严重影响查询的性能,而 RBO 是没法通过这些信息来获取更优的执行计划。
为了解决 RBO 的局限性,Cost Based Optimization 的优化策略应运而生。CBO 通过收集数据的统计信息来估算执行计划的代价,这些统计信息包括数据集的大小,列的数量和列的基数等信息。举个例子,假设我们现在有三张表 A,B 和 C,在进行 A join B join C 的查询时如果没有对应的统计信息我们是无法判断不同 join 的执行顺序代价上的差异。如果我们收集到这三张表的统计信息,发现 A 表和 B 表的数据量都是 1M 行,但是 C 表的 数据量仅为 10 行,那么通过先执行 B join C 可以大大减少中间结果的数据量,这在没有统计信息的情况下基本不可能判断。
随着查询复杂度的增加,执行计划的状态空间会变的非常巨大。刷过算法题的小伙伴都知道,一旦状态空间非常大,通过暴力搜索的方式是不可能 AC 的,这时候一个好的搜索算法格外重要。通常 CBO 使用动态规划算法来得到最优解,并且减少重复计算子空间的代价。当状态空间达到一定程度之后,我们只能选择贪心算法或者其他一些启发式算法来得到局部最优。本质上搜索算法是一种在搜索时间和结果质量做 trade-off 的方法。
Record Oriented vs Block Oriented
执行计划可以认为是一串 operator首尾相连串起来的执行流,前一个 operator 的 output 是下一个 operator 的 input。传统的分析引擎是 Row Oriented 的,也就是说 operator 的 output 和 input 是一行一行的数据。
举一个简单的例子,假设我们有下面一个表和查询:
CREATE TABLE t ; SELECT o FROM t WHERE m lt; n + 1;
上述查询语句展开为执行计划的时候大致如下图所示:
通常情况下,在 Row Oriented 的模型中,执行计划的执行过程可以用如下伪码表示:
next: for: row = source.next if filterExpr.Eval(row): // return a new row containing just column o returnedRow row for col in selectedCols: returnedRow.append(row(col)) return returnedRow
根据 DBMSs On A Modern Processor: Where Does Time Go? 的评估,这种执行方式存在大量的 L2 data stalls 和 L1 I-cache stalls、分支预测的效率低等问题。
随着磁盘等硬件技术的蓬勃发展,各种通过 CPU 换 IO 的压缩算法、Encoding 算法和存储技术的广泛使用,CPU 的性能逐渐成为成为分析引擎的瓶颈。为了解决 Row Oriented 执行所存在的问题,学术界开始思考解决方案,Block oriented processing of Relational Database operations in modern Computer Architectures 这篇论文提出使用按 block 的方式在 operator 之间传递数据,能够平摊条件检查和分支预测的工作的耗时,MonetDB/X100: Hyper-Pipelining Query Execution 在此基础上更进一步,提出将通过将数据从原来的 Row Oriented,改变成 Column Oriented,进一步提升 CPU Cache 的效率,也更有利于编译器进行优化。在 Column Oriented 的模型中,执行计划的执行过程可以用如下伪码表示:
// first create an n + 1 result, for all values in the n column projPlusIntIntConst.Next: batch = source.Next for i lt; batch.n: outCol(i) = intCol(i) + constArg return batch // then, compare the new column to the m column, putting the result into // a selection vector: a list of the selected indexes in the column batch selectLTIntInt.Next: batch = source.Next for i lt; batch.n: if int1Col lt; int2Col: selectionVector.append(i) return batch with selectionVector // finally, we materialize the batch, returning actual rows to the user, // containing just the columns requested: materialize.Next: batch = source.Next for s lt; batch.n: i = selectionVector(i) returnedRow row for col in selectedCols: returnedRow.append(cols(col)(i)) yield returnedRow
可以看到,Column Oriented 拥有更好的数据局部性和指令局部性,有利于提高 CPU Cache 的命中率,并且编译器更容易执行 SIMD 优化等。
Pull Based vs Push Based
数据库系统中,通常是将输入的 SQL 语句转化为一系列的算子,然后生成物理执行计划用于实际的计算并返回结果。在生成的物理执行计划中,通常会对算子进行 pipeline。常见的 pipeline 方式通常有两种:
基于数据驱动的 Push Based 模式,上游算子推送数据到下游算子基于需求的 Pull Based 模式,下游算子主动从上游算子拉取数据。经典的火山模型就是 Pull Based 模式。
Push Based 的执行模式提高了缓存效率,能够更好地提升查询性能。
参考:Push vs. Pull-Based Loop Fusion in Query Engines
现代数据湖分析引擎的架构
通过上一节的介绍,相信读者已经对数据湖分析引擎的前沿理论有了相应了解。在本节中,我们以 StarRocks 为例,进一步介绍数据湖分析引擎是怎么有机的结合上述先进理论,并且通过优雅的系统架构将其呈现给用户。
如上图所示,StarRocks 的架构非常简洁,整个系统的核心只有 Frontend 、Backend (BE) 两类进程,不依赖任何外部组件,方便部署与维护。其中 FE 主要负责解析查询语句(SQL),优化查询以及查询的调度,而 BE 则主要负责从数据湖中读取数据,并完成一系列的 Filter 和 Aggregate 等操作。
Frontend
FE 的主要作用将 SQL 语句通过一系列转化和优化,最终转换成 BE 能够认识的一个个 Fragment。一个不那么准确但易于理解的比喻,如果把 BE 集群当成一个分布式的线程池的话,那么 Fragment 就是线程池中的 Task。从 SQL 文本到 Fragment,FE 的主要工作包含以下几个步骤:
SQL Parse:将 SQL 文本转换成一个 ASTAnalyze:基于 AST 进行语法和语义分析Logical Plan:将 AST 转换成逻辑计划Optimize:基于关系代数,统计信息,Cost 模型对逻辑计划进行重写,转换,选择出 Cost ldquo;最低rdquo; 的物理执行计划生成 Fragment:将 Optimizer 选择的物理执行计划转换为 BE 可以直接执行的 FragmentCoordinate:将 Fragment 调度到合适的 BE 上执行Backend
BE 是 StarRocks 的后端节点,负责接收 FE 传下来的 Fragment 执行并返回结果给 FE。StarRocks 的 BE 节点都是完全对等的,FE 按照一定策略将数据分配到对应的 BE 节点。常见的 Fragment 工作流程是读取数据湖中的部分文件,并调用对应的 Reader 解析这些文件中的数据,使用向量化执行引擎进一步过滤和聚合解析后的数据后,返回给其他 BE 或 FE。
总结
本篇文章主要介绍了极速数据湖分析引擎的核心技术原理,从多个维度对比了不同技术实现方案。
声明:本网转发此文章,旨在为读者提供更多信息资讯,所涉内容不构成投资、消费建议。文章事实如有疑问,请与有关方核实,文章观点非本网观点,仅供读者参考。
猜您喜欢