apache-druid

2 posts

toss

Customers Never Wait: How to (opens in new tab)

Toss Payments addressed the challenge of serving rapidly growing transaction data within a microservices architecture (MSA) by evolving their data platform from simple Elasticsearch indexing to a robust CQRS pattern. While Apache Druid initially provided high-performance time-series aggregation and significant cost savings, the team eventually integrated StarRocks to overcome limitations in data consistency and complex join operations. This architectural journey highlights the necessity of balancing real-time query performance with operational scalability and domain decoupling. ### Transitioning to MSA and Early Search Solutions * The shift from a monolithic structure to MSA decoupled application logic but created "data silos" where joining ledgers across domains became difficult. * The initial solution utilized Elasticsearch to index specific fields for merchant transaction lookups and basic refunds. * As transaction volumes doubled between 2022 and 2024, the need for complex OLAP-style aggregations led to the adoption of a CQRS (Command Query Responsibility Segregation) architecture. ### Adopting Apache Druid for Time-Series Data * Druid was selected for its optimization toward time-series data, offering low-latency aggregation for massive datasets. * It provided a low learning curve by supporting Druid SQL and featured automatic bitmap indexing for all columns, including nested JSON keys. * The system decoupled reads from writes, allowing the data team to serve billions of records without impacting the primary transaction databases' resources. ### Data Ingestion: Message Publishing over CDC * The team chose a message publishing approach via Kafka rather than Change Data Capture (CDC) to minimize domain dependency. * In this model, domain teams publish finalized data packets, reducing the data team's need to maintain complex internal business logic for over 20 different payment methods. * This strategy simplified system dependencies and leveraged Druid’s ability to automatically index incoming JSON fields. ### Infrastructure and Cost Optimization in AWS * The architecture separates computing and storage, using AWS S3 for deep storage to keep costs low. * Performance was optimized by using instances with high-performance local storage instead of network-attached EBS, resulting in up to 9x faster I/O. * The team utilized Spot Instances for development and testing environments, contributing to a monthly cloud cost reduction of approximately 50 million KRW. ### Operational Challenges and Druid’s Limitations * **Idempotency and Consistency:** Druid struggled with native idempotency, requiring complex "Merge on Read" logic to handle duplicate messages or state changes. * **Data Fragmentation:** Transaction cancellations often targeted old partitions, causing fragmentation; the team implemented a 60-second detection process to trigger automatic compaction. * **Join Constraints:** While Druid supports joins, its capabilities are limited, making it difficult to link complex lifecycles across payment, purchase, and settlement domains. ### Hybrid Search and Rollup Performance * To ensure high-speed lookups across 10 billion records, a hybrid architecture was built: Elasticsearch handles specific keyword searches to retrieve IDs, which are then used to fetch full details from Druid. * Druid’s "Rollup" feature was utilized to pre-aggregate data at ingestion time. * Implementing Rollup reduced average query response times from tens of seconds to under 1 second, representing a 99% performance improvement for aggregate views. ### Moving Toward StarRocks * To solve Druid's limitations regarding idempotency and multi-table joins, Toss Payments began transitioning to StarRocks. * StarRocks provides a more stable environment for managing inconsistent events and simplifies the data flow by aligning with existing analytical infrastructure. * This shift supports the need for a "Unified Ledger" that can track the entire lifecycle of a transaction—from payment to net profit—across disparate database sources.

netflix

Scaling Muse: How Netflix Powers Data-Driven Creative Insights at Trillion-Row Scale | by Netflix Technology Blog | Netflix TechBlog (opens in new tab)

Netflix’s Muse platform has evolved from a simple dashboard into a high-scale Online Analytical Processing (OLAP) system that processes trillions of rows to provide creative insights for promotional media. To meet growing demands for complex audience affinity analysis and advanced filtering, the engineering team modernized the data serving layer by moving beyond basic batch pipelines. By integrating HyperLogLog sketches for approximate counting and leveraging in-memory precomputed aggregates, the system now delivers low-latency performance and high data accuracy at an immense scale. ### Approximate Counting with HyperLogLog (HLL) Sketches To track metrics like unique impressions and qualified plays without the massive overhead of comparing billions of profile IDs, Muse utilizes the Apache Datasketches library. * The system trades a small margin of error (approximately 0.8% with a logK of 17) for significant gains in processing speed and memory efficiency. * Sketches are built during Druid ingestion using the HLLSketchBuild aggregator with rollup enabled to reduce data volume. * In the Spark ETL process, all-time aggregates are maintained by merging new daily HLL sketches into existing ones using the `hll_union` function. ### Utilizing Hollow for In-Memory Aggregates To reduce the query load on the Druid cluster, Netflix uses Hollow, an internal open-source tool designed for high-density, near-cache data sets. * Muse stores precomputed, all-time aggregates—such as lifetime impressions per asset—within Hollow’s in-memory data structures. * When a user requests "all-time" data, the application retrieves the results from the Hollow cache instead of forcing Druid to scan months or years of historical segments. * This approach significantly lowers latency for the most common queries and frees up Druid resources for more complex, dynamic filtering tasks. ### Optimizing the Druid Data Layer Efficient data retrieval from Druid is critical for supporting the application’s advanced grouping and filtering capabilities. * The team transitioned from hash-based partitioning to range-based partitioning on frequently filtered dimensions like `video_id` to improve data locality and pruning. * Background compaction tasks are utilized to merge small segments into larger ones, reducing metadata overhead and improving scan speeds across the cluster. * Specific tuning was applied to the Druid broker and historical nodes, including adjusting processing threads and buffer sizes to handle the high-concurrency demands of the Muse UI. ### Validation and Data Accuracy Because the move to HLL sketches introduces approximation, the team implemented rigorous validation processes to ensure the data remained actionable. * Internal debugging tools were developed to compare results from the new architecture against the "ground truth" provided by legacy batch systems. * Continuous monitoring ensures that HLL error rates remain within the expected 1–2% range and that data remains consistent across different time grains. For organizations building large-scale OLAP applications, the Muse architecture demonstrates that performance bottlenecks can often be solved by combining approximate data structures with specialized in-memory caches to offload heavy computations from the primary database.