Scaling Uber’s Elasticsearch as an Geo-Temporal Database
2020-02-23 280浏览
- 1. Scaling Uber’s Elasticsearch as an Geo-Temporal Database Danny Yuan @ Uber
- 2. Use Cases for a Geo-Temporal Database
- 3. Real-time Decisions on Global Scale
- 4. Dynamic Pricing: Every Hexagon, Every Minute
- 5. Dynamic Pricing: Every Hexagon, Every Minute
- 6. Metrics: how many UberXs were in a trip in the past 10 minutes
- 7. Metrics: how many UberXs were in a trip in the past 10 minutes
- 8. Market Analysis: Travel Times
- 9. Forecasting: Granular Forecasting of Rider Demand
- 10. How Can We Produce Geo-Temporal Data for Ever Changing Business Needs?
- 11. Key Question: What Is the Right Abstraction?
- 12. Abstraction: Single-Table OLAP on Geo-Temporal Data
- 13. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT
, FROM WHERE GROUP BY HAVING ORDER BY LIMIT - 14. Abstraction: Single-Table OLAP on Geo-Temporal Data
SELECT
, FROM WHERE GROUP BY HAVING ORDER BY LIMIT - 15. Why Elasticsearch? - Arbitrary boolean query - Sub-second response time - Built-in distributed aggregation functions - High-cardinality queries - Idempotent insertion to deduplicate data - Second-level data freshness - Scales with data volume - Operable by small team
- 16. Current Scale: An Important Context - Ingestion: 850K to 1.3M messages/second - Ingestion volume: 12TB / day - Doc scans: 100M to 4B docs/ second - Data size: 1 PB - Cluster size: 700 ElasticSearch Machines - Ingestion pipeline: 100+ Data Pipeline Jobs
- 17. Our Story of Scaling Elasticsearch
- 18. Three Dimensions of Scale Ingestion Query Operation
- 19. Driving Principles - Optimize for fast iteration - Optimize for simple operations - Optimize for automation and tools - Optimize for being reasonably fast
- 20. The Past: We Started Small
- 21. Constraints for Being Small - Three-person team - Two data centers - Small set of requirements: common analytics for machines
- 22. First Order of Business: Take Care of the Basics
- 23. Get Single-Node Right: Follow the 20-80 Rule - One table <—> multiple indices by time range - Disable _source field - Disable _all field - Use doc_values for storage - Disable analyzed field - Tune JVM parameters
- 24. Make Decisions with Numbers - What’s the maximum number of recovery threads? - What’s the maximum size of request queue? - What should the refresh rate be? - How many shards should an index have? - What’s the throttling threshold? - Solution: Set up end-to-end stress testing framework
- 25. Deployment in Two Data Centers - Each data center has exclusive set of cities - Should tolerate failure of a single data center - Ingestion should continue to work - Querying any city should return correct results
- 26. Deployment in Two Data Centers: trade space for availability
- 27. Deployment in Two Data Centers: trade space for availability
- 28. Deployment in Two Data Centers: trade space for availability
- 29. Discretize Geo Locations: H3
- 30. Optimizations to Ingestion
- 31. Optimizations to Ingestion
- 32. Dealing with Large Volume of Data - An event source produces more than 3TB every day - Key insight: human does not need too granular data - Key insight: stream data usually has lots of redundancy
- 33. Dealing with Large Volume of Data - Pruning unnecessary fields - Devise algorithms to remove redundancy - 3TB —> 42 GB, more than 70x of reduction! - Bulk write
- 34. Data Modeling Matters
- 35. Example: Efficient and Reliable Join - Example: Calculate Completed/Requested ratio with two different event streams
- 36. Example: Efficient and Reliable Join: Use Elasticsearch - Calculate Completed/Requested ratio from two Kafka topics - Can we use streaming join? - Can we join on the query side? - Solution: rendezvous at Elasticsearch on trip ID TripID Pickup Time Completed 1 2018-02-03T… TRUE 2 2018-02-3T… FALSE
- 37. Example: aggregation on state transitions
- 38. Optimize Querying Elasticsearch
- 39. Hide Query Optimization from Users - Do we really expect every user to write Elasticsearch queries? - What if someone issues a very expensive query? - Solution: Isolation with a query layer
- 40. Query Layer with Multiple Clusters
- 41. Query Layer with Multiple Clusters
- 42. Query Layer with Multiple Clusters - Generate efficient Elasticsearch queries - Rejecting expensive queries - Routing queries - hardcoded first
- 43. Efficient Query Generation - “GROUP BY a, b”
- 44. Rejecting Expensive Queries - 10,000 hexagons / city x 1440 minutes per day x 800 cities - Cardinality: 11 Billion (!) buckets —> Out Of Memory Error
- 45. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
- 46. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
- 47. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
- 48. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
- 49. Summary of First Iteration
- 50. Evolution: Success Breeds Failures
- 51. Unexpected Surges
- 52. Applications Went Haywire
- 53. Solution: Distributed Rate limiting
- 54. Solution: Distributed Rate limiting Per-Cluster Rate Limit
- 55. Solution: Distributed Rate limiting Per-Instance Rate Limit
- 56. Workload Evolved - Users query months of data for modeling and complex analytics - Key insight: Data can be a little stale for long-range queries - Solution: Caching layer and delayed execution
- 57. Time Series Cache
- 58. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 59. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 60. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 61. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 62. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 63. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
- 64. Delayed Execution - Allow registering long-running queries - Provide cached but stale data for such queries - Dedicated cluster and queued executions - Rationale: three months of data vs a few hours of staleness - Example: [-30d, 0d] —> [-30d, -1d]
- 65. Scale Operations
- 66. Driving Principles - Make the system transparent - Optimize for MTTR - mean time to recover - Strive for consistency - Automation is the most effective way to get consistency
- 67. Challenge: Diagnosis - Cluster slowed down with all metrics being normal - Requires additional instrumentation - ES Plugin as a solution
- 68. Challenge: Cluster Size Becomes an Enemy - Elasticsearch cluster becomes harder to operate as its size increases - MTTR increases as cluster size increases - Multi-tenancy becomes a huge issue - Can’t have too many shards
- 69. Federation - 3 clusters —> many smaller clusters - Dynamic routing - Meta-data driven
- 70. Federation
- 71. Federation
- 72. Federation
- 73. Federation
- 74. Federation
- 75. Federation
- 76. How Can We Trust the Data?
- 77. Self-Serving Trust System
- 78. Self-Serving Trust System
- 79. Self-Serving Trust System
- 80. Self-Serving Trust System
- 81. Too Much Manual Maintenance Work
- 82. Too Much Manual Maintenance Work - Adjusting queue size - Restart machines - Relocating shards
- 83. Auto Ops
- 84. Auto Ops
- 85. Ongoing Work for the Future
- 86. Future Work - Strong reliability - Strong consistency among replicas - Multi-tenancy
- 87. Summary - Three dimensions of scaling: ingestion, query, and operations - Be simple and practical: successful systems emerge from simple ones - Abstraction and data modeling matter - Invest in thorough instrumentation - Invest in automation and tools