美团点评 焦向 - 《SnappyData+在美团酒店实时数据分析中的应用》
2020-02-27 290浏览
- 1.《SnappyData在美团酒店实时数据分析中的应用》 演讲 / 焦向
- 2.
- 3.
- 4.This talk is ‘opinionated’ • How so? o 特定业务问题出发找方案 o 基于自己总结的方法论 • Why? • 分布式存储及业务出身
- 5.业务 交互式报表、实时情报分析
- 6.• 交互式报表(Interactive Reporting) o 实时数仓 • 实时情报分析(Intelligence analysis) o 类似日志流场景,但复杂得多
- 7.交互式报表( Interactive Reporting ) • 供给侧、售卖侧、流量数据多维度查询 ӱۓහഝପ ׀ᕳ ҁ32,̵*RRGV҂ ࠓܕ ҁᦈ̵ܔᲀ҂ ၞᰁ ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ࣈऒ " ŏŏ q 数据新鲜度 q 低延迟(<3s) ग़ᖌັᧃ 'DVKERDUG q 研发效率
- 8.交互式报表( Interactive Reporting ) • 初期 • 基于MySQL,人肉lambda架构 • 以支持业务的思路实现 ၞᰁ ׀ᕳ ҁ32,̵*RRGV҂ ࠓܕ ҁᦈ̵ܔᲀ҂ හഝՙପ ҁᕟᕢᖌଶᤒᒵ҂ MQ Databus ETL 0\64/ Job ग़ᖌັᧃ 'DVKERDUG
- 9.交互式报表( Interactive Reporting ) • 初期 • 基于MySQL,人肉lambda架构 • 以支持业务的思路实现 ၞᰁ ׀ᕳ ҁ32,̵*RRGV҂ ࠓܕ ҁᦈ̵ܔᲀ҂ හഝՙପ ҁᕟᕢᖌଶᤒᒵ҂ MQ Databus ETL 逐渐失控 • 指标一致性 • 可维护性 • 数据准确度 • 0\64/ MySQL容ग़量ᖌັ、ᧃ性能瓶颈 'DVKERDUG Job
- 10.情报分析(Intelligence analysis) • 对比美团&友商数据,实时分析战况,自动跟进 ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ࣈऒ ŏŏ ಬݐᔮᕹ ᒋහഝ ܃ᯈ ௳מᤑق ܻতၾ௳ " ᗦࢫᯌମ ӱۓᔮᕹ ᗦࢫහഝ ࣁᕚଫአ ҁᤑᩂഴګᒵ҂ ग़ᖌັᧃ q 原始消息6~120亿/天 q 8大类维度, 120亿种组合 q 实时+30天历史 q 低延迟(<3s)
- 11.情报分析(Intelligence analysis) • 从批处理开始,Job + MySQL + Hive + Kylin • 15min滑动窗口 • 定时任务 • 计算逻辑复杂,Java实现 • 多维查询基于该结果 ӱۓ ܻতၾ௳ Պॠ ჶۖᑻݗPLQ ᭦ᬋ॔෫ဩSQLṛපᤒᬡ PLQ PLQ PLQ PLQ ᕮຎ ᕮຎ ᕮຎ ᕮຎ ŏŏ ܻতၾ௳ ჶۖᑻݗ SHUPLQ ᶼ॒ቘ-RE JavaդᎱ ٟف᭦ᬋ 0\64/ ڠୌෛᤒ හഝ໊ḵ +LYH ᯈᗝ .\OLQ ᯈᗝ ᒵஇ ս۸ᛆᙡሲ ग़ᖌັᧃ JavaդᎱ ັᧃ᭦ᬋ
- 12.情报分析(Intelligence analysis) •问从题批处理开始,Job + MySQL + Hive + Kylin • 15min滑动窗口 • 研• 发定效时率任低务 ӱۓ ܻতၾ௳ •• 计链算条逻长,辑调复试杂周,期J长av,a实大量现等待 Պॠ ჶۖᑻݗPLQ • 需• 求多变维化查频询繁基于该结果 • 流程相似,体力活 ᭦ᬋ॔෫ဩSQLṛපᤒᬡ PLQ PLQ PLQ PLQ ᕮຎ ᕮຎ ᕮຎ ᕮຎ ŏŏ • 浪费资源,每个需求一套流程 • 无法支持实时多维分析 ܻতၾ௳ ჶۖᑻݗ SHUPLQ ᶼ॒ቘ-RE 0\64/ +LYH 30PD JavaդᎱ ٟف᭦ᬋ ڠୌෛᤒ හഝ໊ḵ ᯈᗝ .\OLQ ᯈᗝ ᒵஇ ս۸ᛆᙡሲ ग़ᖌັᧃ JavaդᎱ ັᧃ᭦ᬋ
- 13.情报分析(Intelligence analysis) • 而且,这样的流程还有N个…… …… $ ᶼ॒ቘ-RE 0\64/ +LYH .\OLQ % ᶼ॒ቘ-RE 0\64/ +LYH .\OLQ ܻতၾ௳ & ᶼ॒ቘ-RE ' ᶼ॒ቘ-RE 0\64/ 0\64/ +LYH +LYH .\OLQ .\OLQ ( ᶼ॒ቘ-RE 0\64/ +LYH .\OLQ ) ᶼ॒ቘ-RE 0\64/ +LYH .\OLQ ग़ᖌັᧃ ग़ᖌັᧃ ग़ᖌັᧃ ग़ᖌັᧃ ग़ᖌັᧃ ग़ᖌັᧃ
- 14.问题回顾 • 交互式报表( Interactive Reporting ) • 情报分析(Intelligence analysis) ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ӱۓහഝପ ׀ᕳ ҁ32,̵*RRGV҂ ࠓܕ ҁᦈ̵ܔᲀ҂ ၞᰁ ࣈऒ " ŏŏ ग़ᖌັᧃ 'DVKERDUG q 可变数据,保证数据新鲜度 ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ࣈऒ ŏŏ ܻতၾ௳ " ग़ᖌັᧃ q 复杂业务逻辑
- 15.选型之路 调研、问题剖析、总结方法论
- 16.大数据生态 • Google大数据的脉络完整清晰,业界鱼龙混杂错综复杂 Spanner:《Spanner:Google’s Globally-Distributed Database》 F1:《F1:A Distributed SQL Database That Scales》 Dremel :《Dremel:Interactive Analysis of Web-Scale Datasets》 PowerDrill :《Processing a Trillion Cells per Mouse Click》 Mesa:《Mesa:Geo-Replicated, Near Real-Time, Scalable Data Warehousing》 Shasta:《Shasta:Interactive Reporting at Scale》 FlumeJava:《FlumeJava:Easy, Efficient Data-Parallel Pipelines》 MillWheel:《MillWheel:Fault-Tolerant Stream Processing at Internet Scale》 DataFlow:《The Dataflow Model》
- 17.解构 • 全都是trade-off • OLAP vs OLTP • 流式处理 vs 批处理(Streaming vs Batch) • 预处理 vs 后处理(Pre-processing vs Post-processing) • 快照 vs 变更记录(Snapshot vs Changelog) • 平台 vs 业务 • 第一步是解构问题
- 18.数据处理简化模型 ᬌفහഝ ᶼᦇᓒ ਂؙ ᦇݸᓒ ᬌڊහഝ
- 19.引擎特性解构 • 存储和计算分离考虑 ᬌفහഝ ᶼᦇᓒ ਂؙ ᦇݸᓒ ᬌڊහഝ ⁃&XEHҁක҂ ⁃ᶼᘸݳҁක҂ ⁃හഝଘҁӱۓ҂ ⁃ၞୗ॒ቘ ⁃PHGLDғᶎ̵ਂٖݻᶎݻᏺፏ ⁃OD\RXWғ̵ਂڜᤈਂ ⁃VFKHPDғᕮ۸̵ᶋᕮ۸ ⁃PXWDEOHғඪ೮ๅෛ̵ӧඪ೮ๅෛ ⁃ඪ೮-RLQ ⁃ӧඪ೮-RLQ 举例:Druid、Kylin、Hbase、Spark、Linkedin Pinot ັᧃളݗ ⁃ᶋ64/ ⁃ᔄ64/ ⁃ӧਠෆ64/ ⁃ਠෆ64/ ⁃ਠෆ64/8')
- 20.The trade-offs
- 21.OLAP vs OLTP • OLAP:扫表性能,列存储 • OLTP:随机读写,行存储,索引 • 交互式报表是典型的HTAP(OLAP+OLTP)场景
- 22.Pre-processing vs Post-processing හഝ ᦇᓒ ਂؙ ᦇᓒ ັᧃ ᶼᦇᓒ ਂᦇ ؙᓒ 预处理优先 o 面向查询需求生成结果 o 灵活性低:需求变更需要重新生成数据 o 查询性能高 o 耗存储资源、人力 ᦇᓒ ਂؙ ᦇݸᓒ 后处理优先 o 在线计算需求数据 o 灵活性高 o 查询性能难以保证 o 耗计算资源
- 23.Pre-processing vs Post-processing හഝ ᦇᓒ ਂؙ ᦇᓒ ັᧃ ᶼᦇᓒ ਂᦇ ؙᓒ ᦇᓒ ਂؙ ᦇݸᓒ 预处理优先 后处理优先 需求不稳定(ad hoc查询),选后处理优先方案 人 vs 机器
- 24.Streaming vs Batch • The Dataflow Model by Google • 理论层面大一统 • Flink, Spark Streaming • 主要是预处理
- 25.Snapshot vs Changelog • 历史数据如何保存? • 数据量 • 信息损耗(数据更新) • 易用性
- 26.平台 vs 业务 • 平台关注通用,业务多特化需求 • 通用方案 vs 专有方案 • 资源利用率未必高 • 研发效率与deadline
- 27.业务场景解构 ٵᏟଶ ꧋ᦜഖ०ᔜଶ හഝᰁ *% *% ֗ ݺރ ᬴ PV V V ෛẌଶ V ᵱᰁ V PLQ PLQ ٵᏟ 牺牲少量精度,可实现巨大性能提升 7% PLQ 3% 单机 or 分布式,全内存 or 磁盘 ṛ 低吞吐、低延迟比较容易 KRXU 高吞吐、低延迟比较困难 • 分析场景下,高吞吐意味着预处理和Cache KRXU GD\ 流式 or 批处理 or 在线计算 ग़ 预处理 or 后处理
- 28.我们的场景 业务类型 准确度 数据量 吞吐要求 延迟要求 新鲜度 需求特点 分析型 100% 1TB~3TB,热数据500GB 低,<1qps 高,3s 高,多维度,数据可更新 量大、变更频繁 列存储 无法使用近似计算 分布式,全内存 低吞吐 低延迟 后处理,需要高效分布式Join 后处理
- 29.SnappyData
- 30.SnappyData • Database on Spark • OLTP + OLAP + Streaming • 深度整合GemFire XD和Spark • 100%兼容Spark API • Row or Column tables • Replicated and partitioned tables • In-memory计算,支持overflow • 近似计算(商业Feature) • 牺牲1%准确度,换取200x性能提升
- 31.整体结构
- 32.集群架构 • P2P • 三种角色 • Lead Node • Locator • Data Node • Long running Spark Executors • Driver HA • Data HA
- 33.SnappyData的优势 • 简化数据处理架构 • 流式处理、存储、计算引擎一体 • 高性能 • 相较于Spark+X方案,有7~142x查询性能提升 • 尤其是Join性能 Transaction (point lookups, small updates) Stream Processing Interactive Analytics
- 34.简化数据处理架构 • Lambda架构,不论系统维护还是业务开发都很复杂 • 两条链路,开发、调试都是两次 • 开发方式不同,逻辑一致难以保证 .DIND හഝრ 6WRUP .9 +,9( ୌFXEH .\OLQ ىᘶݱᐿᖌଶᤒ 0\64/ ਫහഝ ܲݥහഝ ັᧃ ᐏ ԈֺғਫఘಸړຉҁLambda Architecture҂
- 35.简化数据处理架构 • Lambda架构,不论系统维护还是业务开发都很复杂 • 两条链路,开发、调试都是两次 • 开发方式不同,逻辑一致难以保证 හഝრ .DIND Stream Processing 5RZ 7DEOH &ROXPQ 7DEOH 6DPSOH 7DEOH Spark SQL ັᧃ ᐏ ԈֺғਫఘಸړຉҁSnappyData҂
- 36.高性能 - 深度整合 • 计算数据与Spark在同一个JVM中 • 无数据拷贝/移动 6SDUN࣋ว • 内存中数据格式与Spark一致 6SDUN • 无序列化/反序列化 ཛྷࣳ ݒഘ +')6ٌ՜ਂؙක 6QDSS\'DWD࣋ว 6SDUN ፗള ֵአ ٖਂ'%
- 37.高性能 - Colocated Join • 控制相关表使用相同的数据分布策略,如POI_ID • 极大减少了Shuffle,使Join在日常使用成为可能
- 38.架构变化 交互式报表( Interactive Reporting ) ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ӱۓහഝପ ׀ᕳ ҁ32,̵*RRGV҂ ࠓܕ ҁᦈ̵ܔᲀ҂ ၞᰁ ࣈऒ 6QDSS\'DWD ŏŏ ग़ᖌັᧃ 'DVKERDUG 结果 • 线上4节点集群(128G, 32CPUs, SSD) • 平均查询耗时 <2s • 10亿行事实表,5维度表Join,5s • 消除20+中间表,仅使用原始表查询 • 消除20+预计算任务 • 指标完全一致 • 研发效率大幅提升 • Java + 中间表 + SQL → SQL
- 39.架构变化 情报分析(Intelligence analysis) ᖌଶ ᲀࠓࢫᴚ ᕟᕢຝ ࣈऒ ŏŏ ܻতၾ௳ 6QDSS\'DWD ग़ᖌັᧃ 结果 • 线上36节点集群(128G, 32CPUs, SSD) • 核心业务(UDF实现) • 120行事实表,3表join,0.4s • 研发效率大幅提升 • Java + 中间表 + SQL → SQL
- 40.应用中遇到的几个问题 1. 复杂业务逻辑如何支持 2. 历史数据如何存储 3. 实时数仓如何建模 4. 首次执行性能优化
- 41.1. 复杂业务逻辑如何支持 • 情报分析核心逻辑用SQL无法高效表达 • 涉及对数亿行数据进行排序,无法在线使用 • Spark支持两种UDAF • ImperativeAggregate • 支持外部注册,用代码直接直接实现逻辑,易于开发,但有函数调用开销 • DeclarativeAggregate • 内部使用,无公开接口 • 直接操纵Catalyst表达式,支持代码生成,性能强大 • 写法类似于写Lisp Macro 修改SnappyData,开放Spark的声明式UDAF注册(释放了魔鬼)
- 42.2.历史数据如何存储 • 业务数据库的变更历史,如何存储、如何使用 • 例:查询某一历史时刻整个业务订单状态,进行多维度分析 • 两个选择:定期快照(Snapshot),变更记录(Changlog) • 方案: • 将Changelog转换为SCD Type2形式的事实表 # 当前状态 where current_flag = 1 # 历史状态 wherebetween start_time and end_time • 依赖SD的join update性能,将changelog表转成SCD表
- 43.3. 实时数仓建模 • 数仓建模是分层的,存在实体中间层 • 依赖于在线join的实现方式,中间层不存在实体
- 44.4. 首次查询性能优化 • SnappyData对Spark SQL代码生成进行了优化 • 基于Tokenization的QueryPlan复用 select name from person where age > 26 and city = 1 select name from person where age > 30 and city = 20 • 首次查询慢根因在于JIT优化需要Profiling数据 • 不同SQL模板对应不同QueryPlan,需重新生成代码 • QueryPlan缓存是Connection Scope • 优化 • 没有彻底的办法 • Query预热,适用于查询模式数量较少场景 • 减少连接数(限制连接池大小)
- 45.总结 ັᧃᚆ ؙਂڜಚᤒᚆ ॔᭦ᬋ ्กୗ8') ܲݥහഝ හഝᰁ 6&'Ԫਫᤒ ਂٖق FRORFDWHGMRLQ
- 46.Parting Thoughts
- 47.Lesson 1: 机器换人力很划算 • 我们常常低估人力成本、高估机器成本 • 高昂的人力成本通常伴随着低效率
- 48.Lesson 2: 预处理不该是数据业务首选 • 需求变更频繁的场景,预处理越多,通常意味维护成本越高 o 做的越多,错得越多 o 否则就需要一个强大的约束与检查系统 • 单个垂直业务(or 创业公司)的数据量真的没有那么大 o 不要搞“尬并行” • 作为数据服务,通常不用考虑吞吐量 o Just串行!
- 49.Lesson 3: SQL, not Java • 表达能力对于研发效率和可维护性至关重要 o 让机器生成low-level代码,而不是自己写 • 但SQL并不图灵完备 o UDF and RVL
- 50.谢谢!
- 51.
- 52.
- 53.
- 54.