Parag Kesar & Ben Liu
Software Engineers, Data Engineering
At Pinterest, we run thousands of experiments every day, and rely mostly on daily metrics to evaluate performance. The daily pipelines can take 10+ hours to run and sometimes are delayed, which has created some inconvenience in verifying the setup of the experiment, the correctness of triggering, and the expected performance of the experiment. This is especially a problem when there are bugs in the code, which might take several days to catch, and cause bigger damage to user experience and top line metrics.
As a solution, we developed a near real-time experimentation platform for fresher experiment metrics to help in catching these issues as soon as possible. Some examples of issues that may come up are:
The dashboard above shows the volume (i.e. number of actions) and propensity (i.e number of unique users) of the control and treatment groups of an experiment for a selected event. The counts are aggregated for three days after an experiment gets launched or ramped up. If a re-ramp (increase in user allocation of control and treatment groups) occurs after the three days, the counts accumulate again from zero for another three days.
We then perform several statistical tests to ensure the results and comparisons between control and treatment are statistically valid. Since metrics are delivered in real time, we do these tests every time a new record is received in a sequential fashion. We use different methods from traditional fixed horizon tests so as to not bring high false positive rate. Several sequential testing methods have been considered including Gambler’s Ruin, Bayesian A/B test and Alpha-Spending Method. For the sake of numerical stability, we started from t-test + Bonferroni Correction (treat our case as multiple testing) with the number of tests pre-determined for our initial implementation.
High level design
The realtime experimentation pipeline consists of the following main components:
- Recently ramped experiment groups job → Publishes a CSV file every five minutes to an S3 location. The CSV is a snapshot of the experiment groups that had an increase in user allocation in the past three days. This information is obtained by querying the MySQL database of an internal Analytics application that hosts experiment metadata.
- Filter events job → We analyze hundreds of user actions at Pinterest. This job keeps only the most business-critical events, which are inserted into the ‘filtered_events’ Kafka topic. These events are stripped off fields that are not needed, so the filtered_events topic is fairly lightweight. The job runs in Flink processing time and its progress is saved via Flink’s incremental checkpointing to HDFS every five seconds.
- Filter experiment activations job → Whenever a user gets triggered into an experiment, an activation record is created. Triggering rules depend upon experiment logic and a user can be triggered into an experiment hundreds of times. We only need activations of experiments that launched or had an increase in group allocation in the last three days.
To filter the activations, this job uses Flink’s Broadcast State pattern. The CSV published by the ‘recently ramped experiment groups’ job is checked every 10 seconds for changes and published to every partition of a KeyedBroadcastProcessFunction that also consumes activations.
By joining the broadcasted CSV with activation stream, the KeyedBroadcastProcessFunction filters out those activation records for experiments that are not ramped up within the last 3 days. Additionally, the ‘group-ramp-up-time’ is added to the activation record and it is inserted into the ‘filtered_experiment_activations’ kafka topic.
Real-time experiments aggregation job
Before we dive into the real-time experiments job, it’s worth looking to the objects that are inserted into the intermediate Kafka topics. SimpleEvent objects are inserted into the ‘filtered events’ topic and ExperimentActivationWithRampedUpTime objects are inserted into the ‘filtered_experiment_activations topic’.
Above is a high level overview of the real time aggregation Flink job. Some of the operators are covered here briefly, while some are described in detail in later sections. The source operators read data from Kafka, while the sinks write to our internal Analytics store using a REST interface.
De-duplicate events → This is implemented as a KeyedProcessFunction that is keyed by (event.user_id, event.event_type, event.timestamp). The idea is that if events from the same user for the same event-type have the same timestamps, they are duplicate events. The first such event is sent downstream but is also cached in state for five minutes. Any subsequent events are discarded. After five minutes, a timer runs and clears the state. The assumption is that all duplicate events are within this amount of time of each other.
Find first trigger time → This is a Flink KeyedProcessFunction, keyed by (experiment_hash, experiment_group, user_id). The assumption is that the first experiment activation record received for a user is also the activation with the first trigger time. The first activation received is sent downstream and saved as state for the next three days since an experiment ramp-up (we aggregate counts for 3 days since experiment group got ramped up). A timer clears the state after three days of ramp time.
15 minute processing time tumbling windows → Both Numerator Computer and Denominator computer aggregate counts when events come in and send results downstream. These are millions of records, but we don’t need to send results so frequently to the Analytics store. We accomplish this more efficiently by having a 15 minute Flink tumbling window that runs on processing time. In the case of Numerator Computer, this window is keyed by (“experiment_hash”, “experiment_group”, “event_type”, “timestamp”). When the window fires after 15 minutes, the record with the max_users is taken and sent downstream the Analytics Store sink.
Join Activations with Events
We implement the stream-stream join with Flink’s IntervalJoin operator. IntervalJoin buffers the single activation record per user for the next three days and all matching events are sent downstream with additional experiment metadata from the activation record.
Limitations with this approach
We’ve looked into Flink’s IntervalJoin source code. It does buffer activations for three days into a ‘left-buffer’. However, events will be deleted immediately. Currently, it looks like there is no way to change this behaviour via configuration. We are looking into implementing this activation-event join using Flink’s coprocess function, which is a more general purpose function for stream-stream joins. We can buffer events for X minutes so that even if activation stream get delayed for X minutes, the pipeline can handle the delay without undercounting. This will help us avoid double joins for the same user and can result in a more dynamic pipeline that is immediately aware of re-ramps of experiment groups and support more dynamic behaviour like automatic extension of coverage of aggregations in case of re-ramps of groups.
Join results deduplicator
The Join Results Deduplicator is a Flink KeyedProcessFunction that is keyed by experiment_hash, experiment_group, event_type, user_id. The primary purpose of this operator is to insert a ‘user_first_time_seen’ flag when sending records downstream — this flag is used by the downstream Numerator Computer to compute propensity numbers (# unique users) without using a set data structure.
This operator stores state till the last-ramp-time + three days, after which the state is cleared.
The Numerator Computer is a KeyedProcessFunction that is keyed by experiment_hash, experiment_group, event_type. It maintains rolling 15 minute buckets for the last two hours, which are updated every time a new record comes in.
For volume, every action counts, so for every event, action counts are incremented.
For propensity numbers (unique users) — it depends upon the ‘first_time_seen’ flag (increment only if true).
The buckets roll/rotate as time passes. The buckets data is flushed downstream to the 15 minute tumbling windows every time a new event comes in.
A three day timer (from ramp-time →three days) that clears all state upon firing effectively resetting/clearing counts to zero after three days since ramp-up.
Spammers & capping
In order to make our streaming pipeline fault-tolerant, Flink’s incremental checkpoint & RocksDB statebackend were used for saving application checkpoints. One of the interesting challenges we faced was checkpoint failure. The issue appeared to be that checkpointing process takes an extremely long time and it eventually reaches timeout. We also noticed that typically when checkpoint failure happens, there is also high back-pressure.
After taking a closer look inside the checkpoint failure, we found that the timeout was caused by some subtasks not sending acknowledgment to the checkpoint coordinator and the whole checkpoint process was stuck, as shown below.
Several debugging steps were then applied to root cause the failure:
It turned out the subtask was functioning normally and it was just too busy processing messages. As a result, this specific subtask had high back-pressure which prevented barriers flowing through. Without recipient of barriers, checkpoint process could not move forward.
After further checking Flink metrics for all subtasks, we found that one of them was producing 100x more messages than its peers. Since messages were partitioned by user_id across subtasks, this indicates that there are some users producing much more messages than others, and that leads to a conclusion of spamming. This result was also confirmed by ad hoc querying our spam_adjusted data sets.
In order to mitigate the problem, we applied a capping rule in Filter Events Job: if for a user within one hour, we see more than X messages, we only send the first X messages. We were glad to see there was no checkpoint failure anymore after we applied the capping rule.
Data robustness and validation
Data accuracy could not be more important for computing experiment metrics. In order to ensure our real time experiment pipeline behaves as expected and always delivers accurate metrics, we launched a separate daily workflow that performs the same computation as the streaming jobs does, but in an ad-hoc way. Developers will be alerted if the streaming job results violate any of the following conditions:
By querying experiment metadata, we run the validation on experiments under 3 cases respectively:
This workflow can be visualized as below.
In this section we present some basic stats to show the scale of real-time experiment pipeline:
2. 100G checkpoint
3. 200~300 experiment groups
4. 8 masters, 50 workers with each being ec2 c5d.9xlarge
5. Computation is done with number of parallelism = 256
Real-time Experiment Analytics is the first Flink-based application in production at Pinterest. Huge thanks to our Big Data Platform team (special thanks to Steven Bairos-Novak, Jooseong Kim, and Ang Zhang) for building out the Flink platform and provide it as a service. Also thanks to Analytics Platform team (Bo Sun) for the amazing visualization, Logging Platform team for providing real-time data ingestion, and Data Science team (Brian Karfunkel) for statistical consultancy!
We’re building the world’s first visual discovery engine. More than 250 million people around the world use Pinterest to dream about, plan and prepare for things they want to do in life. Come join us!
Real-time experiment analytics at Pinterest using Apache Flink was originally published in Pinterest Engineering Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.