Vivek Balakrishnan, Senior Data Engineer
Juho Autio, Principal Data Engineer
Many dashboard queries at Rovio are served by Apache Druid™ today. To get the data from our Data Lake to Druid easily and efficiently, we built the rovio-ingest library for Apache Spark™. This library is the first project that Rovio has published as open source. Our data stack relies heavily on open source components, so we also want to give back to the community.
The diagram below shows on a high level how Druid and rovio-ingest are used at Rovio.
Background: Low-latency dashboards with Druid
Data is at the heart of everything we do at Rovio. It enables us to continually improve our games and provide incredible experiences for the millions of players who play our games every day. A key component in that effort is our in-house game services cloud platform called Beacon, which is used by all of our games. Beacon provides user identity, analytics, advertising, segmentation, live ops, payments and integration with third party services. Analytics dashboards are an important part of Beacon. This includes standard reporting as well as custom dashboards that any Rovian can create and publish.
We needed a low-latency backend for pivot & time-series dashboards. Druid works well for those use cases. The challenge is in making batch ingestion scale.
Beacon Dashboards use various data sources. Druid is the data source that is typically used on our most used dashboards, as it offers sub-second latency even for rather big tables. Before Druid we were using Postgres with custom indexes and caching + Redshift (also with caching), but the latency was not as low as we’d like, and adding new datasets to those databases was not easily available to different teams.
Druid as a backend for Pivot dashboards
Back at the end of 2018, we wanted to add support for pivot dashboards in Beacon. We expected to have at least tens of concurrent users. We wanted to offer an interactive response time, so that for example drilling down on pivot dashboards would be possible without long wait times after every click. We soon realized that none of our existing query engines were able to handle that requirement. We found Druid to work more than well for this purpose, thanks to its indexing strategies that fit the pivot query pattern well, offering sub-second latency.
Druid as a backend for time-series batch data
In addition to pivoting, we soon realized that Druid is also a performant choice for aggregating time series data in general – as long as it doesn’t require joins. In fact, that is clearly the more common use case for Druid out there. Now, at the same time we were in the process of replacing our standard reporting dashboards, and it became a no-brainer to use Druid also to run the queries behind the new standard charts that we were adding. We were moving our batch pipelines from Redshift to Spark to be able to scale compute resources on demand. Druid also solved the need for a performant query engine to replace Redshift for queries that span over a longer history. So, we were able to simplify our platform and save costs, because a much smaller Druid server could serve the same queries that we had been running on a rather massive Redshift cluster.
Druid is often presented primarily as a tool for real-time analytics, but we are not using the streaming capabilities at the moment. It was not immediately obvious to us that it also makes sense to use Druid for ultra fast serving of non-real time datasets that live primarily in a data lake and are products of batch processing. Using the existing batch ingestion mechanisms required some effort, but it was OK for a one time test case. So we managed to load one of our biggest reporting tables to Druid for initial testing, and saw the potential. This is where the challenge presented itself: ingesting the data in big batches to Druid was not as quick and easy as we needed it to be, to truly enable this for any Rovian that is working with data.
The need to build a new library
Let’s have a look at the background for creating rovio-ingest.
Druid provides its own batch ingestion feature, but it requires manual configuration for each ingested dataset. It is also slow, at least based on our entry level tests. We didn’t put much effort on tuning the job parameters. We didn’t want to use Druid’s native ingestion because we would have had to handle provisioning of Druid services like indexer and middle manager so we were looking into options where the heavy lifting for ingestion happens on isolated on-demand clusters.
We wanted to make it easy to ingest any dataset to Druid, so that any Rovian that is building data pipelines is able to do it as self-service. We also needed reasonable performance to have the data available early in the morning. Both goals were reached with the introduction of rovio-ingest.
The existing options for batch ingestion to Druid were difficult to deploy and scale, given our infrastructure. We tried Apache Hive™ for Druid ingestion, but it required manual patching and didn’t perform as well as desired. Making a library for Spark made it easy to isolate the plugin from peculiarities of the underlying infrastructure. We’ll go into more technical details in another article that we will publish later.
Getting hands-on with rovio-ingest
This section gives a tour of the main features of the library and demonstrates how to use it.
The rovio-ingest library makes it easy to accomplish this in a single Spark job:
- Read a Hive table with Spark
- Write the data to Druid
Naturally the source data doesn’t have to be a Hive table, it can be any Spark DataFrame. In addition to being easy, rovio-ingest also aims to handle the transformation to Druid segment files as efficiently as possible. For the heavy lifting we rely on Spark.
The library handles two things:
- Writing the segment files to Druid deep storage
- Updating the segments in Druid metadata DB
The main steps for using rovio-ingest in a Spark job are:
- Use the provided extension to modify a Dataset, so that it satisfies the requirements of DruidSource
- Write the prepared Dataset with DruidSource format
Here’s a code example for using rovio-ingest with PySpark (Java & Scala are also supported):
The user only needs to specify the name of the target Druid datasource and the name of the column that should be mapped to __time dimension of Druid. Settings for deep storage and metadata DB are usually the same within an organization, so those can be abstracted away in internal tooling, like we do.
Sensible defaults are used for the optional parameters.
Some of the optional parameters are:
- Segment granularity
- Query granularity
- Metrics spec (to define dimension vs. metric columns). By default metric columns are inferred from the data types.
Full set of supported write options can be found in the documentation.
Tip: Internally we use a general-purpose pyspark script that enables users to ingest to Druid without writing any Spark code. In the basic case they only need to pass the source table name. Our script also offers some optional arguments for convenience, for example to customize the target types of columns. This kind of driver script is also a natural place for providing common options for Druid deep storage & DB.
Incremental updates are made possible by overwriting segments. For example, we typically have daily batch jobs that update a date partition in a data lake table. In such a case we use segment granularity DAY, which allows inserting or overwriting just the daily segment(s) that changed. Internally, rovio-ingest determines the affected segments by checking what values of the time column are found in the input data.
Appending to a segment is not supported, but for us it has been enough to be able to overwrite on segment level. Overwriting happens without downtime, as Druid uses the previous version of the segment to serve queries until the overwrite is complete. Overwriting full segments fits perfectly with the strategy of overwriting full partitions in the upstream data transformation jobs.
Druid has a kill task for cleaning unused old segment versions, but we don’t have it enabled in our setup because we’re not running Druid middle manager. As a workaround we have a scheduled script to do the same (it cleans up the overshadowed segments from deep storage and metadata).
Our decision to adopt Druid and Spark has helped us to scale our analytics workloads cost-effectively. Making the ingestion work seamlessly has paid off: we have not had any problems with Druid ingestion breaking or being a bottleneck for time-to-data in production. It has been a valuable learning experience for us as developers to create our first library in a truly productized form so that it can be also shared in public.
In the spirit of transparency, we should mention that there is an open pull request in the main Druid repository to add Spark Writer support, which will offer the same core features as rovio-ingest. We published rovio-ingest already on April 8th, 2021, after using it internally in production for about one year. Back then, we didn’t notice that a similar effort was worked on by the Druid community and ended up creating our own library.
At the time of writing this, we still see rovio-ingest as a viable option for organizations at large. We have paid special attention to making the library easy to use. How to use rovio-ingest is documented in the GitHub repo. We encourage you to try it out and reach out to us for any questions or comments.
Apache®, Apache Hive™, Apache Spark™, the Apache Spark project logo, Apache Druid™, and the Apache Druid project logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. All other marks mentioned may be trademarks or registered trademarks of their respective owners. No endorsement by or affiliation with the Apache Software Foundation or any third-party trademark owner is claimed or implied by the use of these marks.