Scaling Uber’s Elasticsearch as an Geo-Temporal Database

2020-02-23 280浏览

  • 1. Scaling Uber’s Elasticsearch as an Geo-Temporal Database Danny Yuan @ Uber
  • 2. Use Cases for a Geo-Temporal Database
  • 3. Real-time Decisions on Global Scale
  • 4. Dynamic Pricing: Every Hexagon, Every Minute
  • 5. Dynamic Pricing: Every Hexagon, Every Minute
  • 6. Metrics: how many UberXs were in a trip in the past 10 minutes
  • 7. Metrics: how many UberXs were in a trip in the past 10 minutes
  • 8. Market Analysis: Travel Times
  • 9. Forecasting: Granular Forecasting of Rider Demand
  • 10. How Can We Produce Geo-Temporal Data for Ever Changing Business Needs?
  • 11. Key Question: What Is the Right Abstraction?
  • 12. Abstraction: Single-Table OLAP on Geo-Temporal Data
  • 13. Abstraction: Single-Table OLAP on Geo-Temporal Data SELECT , FROM WHERE GROUP BY HAVING ORDER BY LIMIT
  • 14. Abstraction: Single-Table OLAP on Geo-Temporal Data SELECT , FROM WHERE GROUP BY HAVING ORDER BY LIMIT
  • 15. Why Elasticsearch? - Arbitrary boolean query - Sub-second response time - Built-in distributed aggregation functions - High-cardinality queries - Idempotent insertion to deduplicate data - Second-level data freshness - Scales with data volume - Operable by small team
  • 16. Current Scale: An Important Context - Ingestion: 850K to 1.3M messages/second - Ingestion volume: 12TB / day - Doc scans: 100M to 4B docs/ second - Data size: 1 PB - Cluster size: 700 ElasticSearch Machines - Ingestion pipeline: 100+ Data Pipeline Jobs
  • 17. Our Story of Scaling Elasticsearch
  • 18. Three Dimensions of Scale Ingestion Query Operation
  • 19. Driving Principles - Optimize for fast iteration - Optimize for simple operations - Optimize for automation and tools - Optimize for being reasonably fast
  • 20. The Past: We Started Small
  • 21. Constraints for Being Small - Three-person team - Two data centers - Small set of requirements: common analytics for machines
  • 22. First Order of Business: Take Care of the Basics
  • 23. Get Single-Node Right: Follow the 20-80 Rule - One table <—> multiple indices by time range - Disable _source field - Disable _all field - Use doc_values for storage - Disable analyzed field - Tune JVM parameters
  • 24. Make Decisions with Numbers - What’s the maximum number of recovery threads? - What’s the maximum size of request queue? - What should the refresh rate be? - How many shards should an index have? - What’s the throttling threshold? - Solution: Set up end-to-end stress testing framework
  • 25. Deployment in Two Data Centers - Each data center has exclusive set of cities - Should tolerate failure of a single data center - Ingestion should continue to work - Querying any city should return correct results
  • 26. Deployment in Two Data Centers: trade space for availability
  • 27. Deployment in Two Data Centers: trade space for availability
  • 28. Deployment in Two Data Centers: trade space for availability
  • 29. Discretize Geo Locations: H3
  • 30. Optimizations to Ingestion
  • 31. Optimizations to Ingestion
  • 32. Dealing with Large Volume of Data - An event source produces more than 3TB every day - Key insight: human does not need too granular data - Key insight: stream data usually has lots of redundancy
  • 33. Dealing with Large Volume of Data - Pruning unnecessary fields - Devise algorithms to remove redundancy - 3TB —> 42 GB, more than 70x of reduction! - Bulk write
  • 34. Data Modeling Matters
  • 35. Example: Efficient and Reliable Join - Example: Calculate Completed/Requested ratio with two different event streams
  • 36. Example: Efficient and Reliable Join: Use Elasticsearch - Calculate Completed/Requested ratio from two Kafka topics - Can we use streaming join? - Can we join on the query side? - Solution: rendezvous at Elasticsearch on trip ID TripID Pickup Time Completed 1 2018-02-03T… TRUE 2 2018-02-3T… FALSE
  • 37. Example: aggregation on state transitions
  • 38. Optimize Querying Elasticsearch
  • 39. Hide Query Optimization from Users - Do we really expect every user to write Elasticsearch queries? - What if someone issues a very expensive query? - Solution: Isolation with a query layer
  • 40. Query Layer with Multiple Clusters
  • 41. Query Layer with Multiple Clusters
  • 42. Query Layer with Multiple Clusters - Generate efficient Elasticsearch queries - Rejecting expensive queries - Routing queries - hardcoded first
  • 43. Efficient Query Generation - “GROUP BY a, b”
  • 44. Rejecting Expensive Queries - 10,000 hexagons / city x 1440 minutes per day x 800 cities - Cardinality: 11 Billion (!) buckets —> Out Of Memory Error
  • 45. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
  • 46. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
  • 47. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
  • 48. Routing Queries "DEMAND": { "CLUSTERS": { "TIER0": { "CLUSTERS": ["ES_CLUSTER_TIER0"], }, "TIER2": { "CLUSTERS": ["ES_CLUSTER_TIER2"] } }, "INDEX": "MARKETPLACE_DEMAND-", "SUFFIXFORMAT": “YYYYMM.WW", "ROUTING": “PRODUCT_ID”, }
  • 49. Summary of First Iteration
  • 50. Evolution: Success Breeds Failures
  • 51. Unexpected Surges
  • 52. Applications Went Haywire
  • 53. Solution: Distributed Rate limiting
  • 54. Solution: Distributed Rate limiting Per-Cluster Rate Limit
  • 55. Solution: Distributed Rate limiting Per-Instance Rate Limit
  • 56. Workload Evolved - Users query months of data for modeling and complex analytics - Key insight: Data can be a little stale for long-range queries - Solution: Caching layer and delayed execution
  • 57. Time Series Cache
  • 58. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 59. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 60. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 61. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 62. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 63. Time Series Cache - Redis as the cache store - Cache key is based on normalized query content and time range
  • 64. Delayed Execution - Allow registering long-running queries - Provide cached but stale data for such queries - Dedicated cluster and queued executions - Rationale: three months of data vs a few hours of staleness - Example: [-30d, 0d] —> [-30d, -1d]
  • 65. Scale Operations
  • 66. Driving Principles - Make the system transparent - Optimize for MTTR - mean time to recover - Strive for consistency - Automation is the most effective way to get consistency
  • 67. Challenge: Diagnosis - Cluster slowed down with all metrics being normal - Requires additional instrumentation - ES Plugin as a solution
  • 68. Challenge: Cluster Size Becomes an Enemy - Elasticsearch cluster becomes harder to operate as its size increases - MTTR increases as cluster size increases - Multi-tenancy becomes a huge issue - Can’t have too many shards
  • 69. Federation - 3 clusters —> many smaller clusters - Dynamic routing - Meta-data driven
  • 70. Federation
  • 71. Federation
  • 72. Federation
  • 73. Federation
  • 74. Federation
  • 75. Federation
  • 76. How Can We Trust the Data?
  • 77. Self-Serving Trust System
  • 78. Self-Serving Trust System
  • 79. Self-Serving Trust System
  • 80. Self-Serving Trust System
  • 81. Too Much Manual Maintenance Work
  • 82. Too Much Manual Maintenance Work - Adjusting queue size - Restart machines - Relocating shards
  • 83. Auto Ops
  • 84. Auto Ops
  • 85. Ongoing Work for the Future
  • 86. Future Work - Strong reliability - Strong consistency among replicas - Multi-tenancy
  • 87. Summary - Three dimensions of scaling: ingestion, query, and operations - Be simple and practical: successful systems emerge from simple ones - Abstraction and data modeling matter - Invest in thorough instrumentation - Invest in automation and tools