Chengcheng Hu | Engineering Manager, Ads Indexing
Ads indexing is a data processing pipeline that builds servable ad documents for the ads delivering stack at Pinterest, including ads targeting, retrieval, ranking and auction. The pipeline converts ad campaigns in the raw format to the servable format via a heavy computational process with a lot of data joining. This is a key component in the ads delivery stack because all data used in ads delivery must go through this pipeline. The freshness and quality of the ads documents has a direct impact on Pinner and partner experiences. As the Pinterest ads business has been growing rapidly in the past years, there was an urgent need to scale up the size of index and lower the E2E indexing latency at the same time, which traditional batch index builders can not meet. Thus, we built a scalable incremental indexing system inspired by Google’s web indexing system “Caffeine”, which achieves seconds-level indexing E2E latency with the scale up to 100M+ documents.
The ads indexing system is tailored to ads indexing business requirements. Let’s walk through the basic ads indexing knowledge to understand the requirements for this system.
All the data processed by ads indexing pipeline falls into two categories: ads control and ads content data. Ads control data is the ads metadata set up by advertisers used in ads targeting, auction bidding, etc. Ads content data consists of various signals we use to understand the ads Pin for optimizing ads delivery performance. Here we list a few representatives in each category:
Ads control data
- Targeting specs
- Creative type
Ads content data
- Image signature
- Pin interests(Coteries)
- Text annotation
- Pinvisual embedding
- Historical performance data
Different data sources may have different SLAs on the data freshness and integrity. Here we list the general guidelines for two catalogs:
For ads control data, we need low latency and absolute data correctness, because the data freshness directly impacts advertisers’ experience and data correctness is the key to have advertisers’ trusts. On the other hand, Ads content data has a relatively loose SLA ondata freshness, as many upstream data sources have a low data update frequency. However, they still require strong data correctness as low quality data can lead to severe ads delivery performance issues. To meet these requirements, we designed the ads indexing system as a combination of real-time incremental and batch pipelines. Here are the major responsibilities for each pipeline.
Real-time incremental pipeline
- Support distributed transactions on data processing to ensure the data consistency during large-scale concurrent processing
- Support push-based notifications on ads control data changes to achieve seconds-level update-to-serve E2E latency
- Have two logical pipelines as High and Medium priorities
Batch pipeline requirements
- Keep the ads index update to date on Ads Content data via the periodical refresh on the pull-based data.
- Achieve eventual consistency for both Ads control and content data as it covers any possible message loss or process failure in realtime pipeline
- Have one logical pipeline as Low priority
In addition, this incremental and base combined indexing system provides better pipeline availability. If the real-time pipeline is clogged, the serving index will fall back to the base index generated by the batch pipeline. If the batch pipeline fails one time, the real-time serving index can still cover all the delta updates since the previous batch index. As the trade off, there is additional merging logic between two indices at serving.
High Level System Introduction
Fig.1: An overview on ads indexing system with data flow
The above graph describes the high level ads indexing system, where the highlighted components are the four core components as Gateway, Updater, Storage Repo and Argus.
These four components are connected but under sufficient isolation. There are event queues among Gateway, Updater and Argus, which allow each component to process at their own speeds. Gateway, Updater and Argus are all stateless services, which are made to be easily scaled up and down. Updater and Argus both have three cluster instances as High, Medium and Low priorities. All four components can have their own release schedules as their interactive APIs are designed to be backward compatible.
In addition, there are a few batch workflows in the system as a supplement to the real-time incremental processing. They take inputs from database dumps to either trigger actions to Updater and Argus, or build base index outputs for serving. We will describe more details about each workflow below.
We built this lightweight stateless streamer service based on Kafka Stream, as it provides good support on “at least once process”. The major upstream is the mysql binlog from the ads database. As the mysql binlog contains only the delta change per transaction per table, we need to materialize it by querying ads database again. It also provides a short-cut path from ads database to serving by skipping the following heavy indexing process for some light-weighted serving documents, such as budget updates. For example, if a campaign budget is updated, Gateway will directly push this change to real-time budget control service via Kafka.
Updater is a lightweight data ingestion service built on Kafka stream. It basically tails Kafka topics, extracts structured data from messages and writes them into Storage Repo with transaction protection. For versioned data, it also does the version check before updating to keep data update sequence while dealing with unordered upstream events. Besides updates from the Kafka, Updater also takes updates from RPC from batch processes for the backfill purpose.
Storage Repo is a columnar Key-Value storage that provides cross-table transactional operations for Updater and Argus. It also pushes the column level change notification to Argus. Because ads indexing pipeline needs to support full batch index building, Storage Repo also needs to integrate well with common big data workflows, such as snapshot dump, MapReduce jobs, and etc. Thus, we chose Apache Omid & HBase as the transactional nosql database for the Storage Repo. Omid is the transaction management service on top of HBase, which is inspired by Google’s Percolator. In Percolator, the change notifications are stored in a bigtable column and workers constantly scan partitioned range to pull change notifications. Because HBase can’t afford heavily scanning operations, we built the change notification service as HBase proxy, which converts HBase WAL to change notifications and publishes them to Kafka for data processing.
Argus is the service that monitors the changes in Storage Repo to further process changed data. It is built on Kafka consumer with data APIs to Storage Repo. It tails the change notification from Storage Repo, and runs various handlers according to event type. Each handler reads multiple columns across different rows and tables in Storage, generates derived data objects via data joinings or enrichments, writes them back to Storage Repo within a single transaction. We built ads handlers inside Argus to do the following work:
- Ads handlers contain all the ads business logic of generating final servable Ads documents.
- Ads handlers pull some of Ads content data from external data sources for data enrichments. There is an in-memory cache layer inside handlers to improve the efficiency
- Ads handlers have options to publish the final servable documents to serving services via Kafka
Batch process workflows
There are a few batch process workflows running in this system to ensure the data integrity. Because we already have the real-time incremental process pipelines, the batch workflows are used mainly to compensate the incremental pipeline and they share the same execution binary on the data process. Here are the list of workflows running in system
- Base index builder workflows: they run every few hours to generate the base index
- Data sync workflows: 1) the data sync between Ads Database and Storage Repo ensures the data consistency of ads control data. 2) the data sync among different tables in Storage Repo to check the data consistency within Storage Repo.
- Refresh workflows: they run periodically to mark all ads documents as newly updated to trigger the reprocessing for refreshing all pull type signals.
- GC workflows: they run infrequently to remove very old inactive ad documents so that Storage Repo won’t grow infinitely to hurt its performance.
The major downstream consumers are Ads Manas cluster(inverted index) and key-value store for Ads Scorpion(forward index). Both of them use delta architecture serving, taking base index and real-time per document update. The base index is published through S3 while the real-time document update is via Kafka broadcasting.
The final ad servable document contains more than 100 ads control and content fields, and many engineers actively contribute to ads indexing by adding new fields or modifying existing fields. It is necessary for the platform to provide a good E2E visibility for engineers to improve their developing velocity. Therefore, we provide the following features in the ads indexing system:
- Integration test and Dev run: Gateway, Updater and Argus all provide the same RPC interfaces corresponding to their Kafka message ingestion interfaces. This setting simplifies the dev run for integration test on these components. Each component has its dev run with before/after comparison report for developers to understand the impact of their code changes to ads index
- Code Release: Each service has its own release cycle. But we set an E2E staging pipeline for continuous deployment.
- Presubmit integration test: Argus handlers have most complicated application logic and many more engineers work in its code base, thus we set the presubmit integration test on Argus handlers
- Debugging UI: Storage Repo keeps all intermediate data and additional debugging information. We built a web-based UI for developers to easily access these information
- Historical records: In addition to HBase versioning, we periodically snapshot HBase tables to the data warehouse for the historical records
In practice, ads indexing team is often involved in various ads delivery debugging tickets, such as why my ads campaign not spending. Having the above visibility features can allow engineers to quickly locate the root cause.
Ads indexing system is one of the most critical components in the ads delivery service, thus we build a comprehensive monitoring for its system health in two major areas: 1) data freshness, 2) data coverage. The followings are the key metrics in our dashboard.
- Incremental pipeline health: E2E latency per document, latency breakdown by stages, process throughputs, daily report on the number of dropped messages in ads incremental pipeline and etc.
- Base pipeline health: Base index staleness, index data volume change, the coverage of critical fields in base index and etc.
- Data consistency: Number of unsynced documents reported by various sync workflows.
Ads indexing system has been running smoothly in production for more than one year. The hybrid system does not only provide seconds level E2E latency at most of the time, but also ensures the system high availability and data integrity. Here we select a few highlights from the production results
- Ads control data update-to-serve p90 latency < 60 seconds in 99.9% time
- Ads control data update-to-serve max latency < 24 hours in 100% time
- Single-digit daily number of dropped messages in incremental pipeline
Ads indexing system is highly resilient to handling various anomalies in production. With simple manual operations, it can recover very quickly from incidents to ensure data freshness and integrity. Here are some common production issues.
- Ads real-time pipeline clogging: This type of problem is usually caused by a huge spike in upstream updates. In this case, we can choose to shut down the data publishing from incremental pipeline to realtime serving so that it won’t be overwhelmed. The pipeline relies on the sync workflow to keep base index update to date. The Gateway and Update are both set to drop stale messages to quickly clear up the message buffed in Kafka, so that the ads real-time pipeline can catch up very fast after the spike passes.
- Incorrect data introduced by release: This is usually caught in staging vs prod monitoring, data coverage check in base index workflow, or performance degradation in prod. We can temporarily shut down the realtime serving and roll back base index to a good version. To clean up the polluted data in Storage Repo, we can trigger the refresh workflow to reprocess all documents. If we know only partial data is polluted, we can refresh the selective documents to shorten the recovery time.
- Data quality/coverage drop: This often starts in the external pull-type data sources and get caught in the coverage check in the base index workflow. We can rollback external data sources and backfill the data through refresh workflow. Because the backfill goes through the low priority pipeline, it won’t impact the high and medium priority pipelines to slow down the processing of updates from critical data sources.
Brief technical discussions
Throughout the process of building the ads indexing system from scratch to fully meet all business requirements with production maturity, we made a few design decisions to balance between system flexibility, scalability, stability as well as implementation difficulty.
Data joining via remote Storage Vs multi-stream joining with local storage: Ads indexing involves many data joining operations upon update notifications as the ads document enrichment needs data joined on several keys such as adID, pinID, imageSignatureID, advertiserID, etc. We chose to do the data joining via fetching them from remote Storage with transactions instead of doing multi-streaming joining from local state. By doing so, the system provides more flexibility for distributed data processing applications as they don’t need to choose data partition key to avoid racing conditions. As the tradeoff, it pays an extra roundtrip for data fetching. But since the ads indexing processor is computational heavy so that this roundtrip cost is a tiny proportion of the total infra cost.
Decoupled services Vs merged services: During the design phase, we did consider merging services to have fewer components in ads indexing system, which is a tradeoff between system isolation and simplicity. Gateway and Updater could be merged to save one message hop. However, we chose to separate them as Gateway is a streaming process service while Updater is more on the data ingestion service. Now Gateway and Updater have quite different paths for evolving. Gateway became a popular lightweight streamer framework for a few other use cases, while Updater starts to ingest Ads content updates as they don’t need signal materialization as ads control data. Updater and Argus could be merged as they are both built on Kafka consumers with interactions to Storage Repo. We chose to have them as separate services, because they have quite different directions for performance optimizations; Updater is a very light weighted data extractor and sinker, while Argus is a heavy lifting computational component.
System scalability: there is one scalability bottleneck in ads indexing system as Omid relies on a centralized transaction manager for timestamp allocation and conflict resolution. To address this scalability limitation, we customized Omid at Pinterest to improve its throughout for better scalability, but its implementation details are beyond the scope of this tech blog.
The ads indexing system has evolved since its initial launch. We’ve been focusing on improving system stability and visibility. In addition, some of the core components are adapted by other application as they are designed to be running independently from the beginning. For example, Gateway is widely used as the lightweight streaming framework in ads retargeting and advertiser experiences. Moving forward, we plan to focus on improving system usability to further improve developer velocity in the following areas:
- Build a centralized config-based component for developers to manage data flows through ads indexing system in one single place. Right now the developers have to config in multiple places to cherry pick signals/fields among different data schemas. We also plan to build an assistant visualization tool based on configs for developers to easily browse the existing signals and their dependencies for data explorations.
- Enhance the pipeline health monitoring by integrating with the probing type test framework, as the current data health are mostly relying on the aggregation data view. Vertical feature developers have hard times to leverage aggregated metrics for monitoring, and they have to run ad hoc offline integration tests or data analysis reports to monitor the health of narrowed data segments. The probing framework can allow developers to launch their real-time validation tests on particular type of documents in production.
Acknowledgments: Many engineers at Pinterest worked together to build ads indexing system, including Ji Hong, Grace Chin, Sreshta Vijayaraghavan, Lianghong Xu, Qi Li, Kapil Bajaj, Kevin Lin, Mingsi Liu, Mingjian Liu, Susan Liu and Ang Xu. Huge thanks to Caijie Zhang, Sam Meder, Chiyoung Seo, Zheng Liu, Zack Drach, Liquan Pei and the entire Ads Infra team for design discussions. We would also like to thank our peer teams for supporting — Storage & Caching, Logging, Search Infra, Serving Systems, Ads SRE, Dev Tool, etc.
We’re building the world’s first visual discovery engine. More than 320 million people around the world use Pinterest to dream about, plan and prepare for things they want to do in life. Come join us!