1. Streaming 101: A high-level tour of modern data-processing concepts
1.1. Terminology
What is streaming? a type of data processing engine that is designed with infinite data sets in mind
Unbounded data: A type of ever-growing, essentially infinite data set. Infinite “streaming” data sets could be referred as unbounded data, and finite “batch” data sets as bounded data.
Unbounded data processing: An ongoing mode of data processing, applied to the aforementioned type of unbounded data.
1.2. Capabilities: what streaming systems can and can’t do
Lambda Architecture, the basic idea is that you run a streaming system alongside a batch system, both performing essentially the same calculation. The streaming system gives you low-latency, inaccurate results (either because of the use of an approximation algorithm, or because the streaming system itself does not provide correctness), and some time later a batch system rolls along and provides you with correct output. But maintaining a Lambda system is a hassle: you need to build, provision, and maintain two independent versions of your pipeline, and then also somehow merge the results from the two pipelines at the end.
Well-designed streaming systems actually provide a strict superset of batch functionality.
a. Correctness — This gets you parity with batch.
At the core, correctness boils down to consistent storage. Streaming systems need a method for checkpointing persistent state over time (something Kreps has talked about in his Why local state is a fundamental primitive in stream processing post), and it must be well-designed enough to remain consistent in light of machine failures.
Strong consistency is required for exactly-once processing, which is required for correctness, which is a requirement for any system that’s going to have a chance at meeting or exceeding the capabilities of batch systems.
b. Tools for reasoning about time — This gets you beyond batch.
Good tools for reasoning about time are essential for dealing with unbounded, unordered data of varying event-time skew.
1.3. Time domains
In an ideal world, event time and processing time would always be equal, with events being processed immediately as they occur. Reality is not so kind, however, and the skew between event time and processing time is not only non-zero, but often a highly variable function of the characteristics of the underlying input sources, execution engine, and hardware.
- Event time, which is the time at which events actually occurred.
- Processing time, which is the time at which events are observed in the system.
Since the mapping between event time and processing time is not static, this means you cannot analyze your data solely within the context of when they are observed in your pipeline if you care about their event times (i.e., when the events actually occurred). Unfortunately, this is the way most existing systems designed for unbounded data operate.
Propose that instead of attempting to groom unbounded data into finite batches of information that eventually become complete, we should be designing tools that allow us to live in the world of uncertainty imposed by these complex data sets.
1.4. Data processing patterns
-
Bounded data
- Unbounded data — batch
a. Fixed windows: the most common way to process an unbounded data set using repeated runs of a batch engine is by windowing the input data into fixed-sized windows, then processing each of those windows as a separate, bounded data source.
b. Sessions: Sessions are typically defined as periods of activity (e.g., for a specific user) terminated by a gap of inactivity. When calculating sessions using a typical batch engine, you often end up with sessions that are split across batches, as indicated by the red marks in the diagram below. The number of splits can be reduced by increasing batch sizes, but at the cost of increased latency. Another option is to add additional logic to stitch up sessions from previous runs, but at the cost of further complexity.
- Unbounded data — streaming
You not only find yourself dealing with unbounded data, but also data that are:
Highly unordered with respect to event times, meaning you need some sort of time-based shuffle in your pipeline if you want to analyze the data in the context in which they occurred.
Of varying event time skew, meaning you can’t just assume you’ll always see most of the data for a given event time X within some constant epsilon of time Y.
a. Time-agnostic: Time-agnostic processing is used in cases where time is essentially irrelevant
b. Filtering: A very basic form of time-agnostic processing is filtering. Imagine you’re processing Web traffic logs, and you want to filter out all traffic that didn’t originate from a specific domain. You would look at each record as it arrived, see if it belonged to the domain of interest, and drop it if not.
c. Inner-joins: Another time-agnostic example is an inner-join (or hash-join). When joining two unbounded data sources, if you only care about the results of a join when an element from both sources arrive, there’s no temporal element to the logic.
d. Approximation algorithms
The second major category of approaches is approximation algorithms, such as approximate Top-N, streaming K-means, etc. They take an unbounded source of input and provide output data that, if you squint at them, look more or less like what you were hoping to get. The upside of approximation algorithms is that, by design, they are low overhead and designed for unbounded data. The downsides are that a limited set of them exist, the algorithms themselves are often complicated (which makes it difficult to conjure up new ones), and their approximate nature limits their utility.
e. Windowing
What I mean by windowing since I’ve only touched on it briefly. Windowing is simply the notion of taking a data source (either unbounded or bounded), and chopping it up along temporal boundaries into finite chunks for processing. Windowing is a common approach used to cope with the fact that unbounded data sources technically may never end. The following diagram shows three different windowing patterns:
Fixed windows: Fixed windows slice up time into segments with a fixed-size temporal length.
Sliding windows: A generalization of fixed windows, sliding windows are defined by a fixed length and a fixed period.
Sessions: An example of dynamic windows, sessions are composed of sequences of events terminated by a gap of inactivity greater than some timeout. Sessions are commonly used for analyzing user behavior over time, by grouping together a series of temporally-related events.
Watermarks: A watermark is a notion of input completeness with respect to event times. A watermark with a value of time X makes the statement: “all input data with event times less than X have been observed.” As such, watermarks act as a metric of progress when observing an unbounded data source with no known end.
Triggers: A trigger is a mechanism for declaring when the output for a window should be materialized relative to some external signal. Triggers provide flexibility in choosing when outputs should be emitted.
Accumulation: An accumulation mode specifies the relationship between multiple results that are observed for the same window.
Windowing by processing time
There are a few nice properties of processing time windowing:
- It’s simple.
- Judging window completeness is straightforward.
- If you’re wanting to infer information about the source as it is observed, processing time windowing is exactly what you want.
Big downside to processing time windowing: if the data in question have event times associated with them, those data must arrive in event time order if the processing time windows are to reflect the reality of when those events actually happened. Unfortunately, event-time ordered data are uncommon in many real-world, distributed input sources.
Windowing by event time
Event time windowing is what you use when you need to observe a data source in finite chunks that reflect the times at which those events actually happened. It’s the gold standard of windowing. Sadly, most data processing systems in use today lack native support for it.
Event time windows have two notable drawbacks due to the fact that windows must often live longer (in processing time) than the actual length of the window itself:
Buffering: Due to extended window lifetimes, more buffering of data is required.
Completeness: Given that we often have no good way of knowing when we’ve seen all the data for a given window, how do we know when the results for the window are ready to materialize? In truth, we simply don’t.
2. Streaming 102: The what, where, when, and how of unbounded data processing
2.1. four questions are critical to every unbounded data processing problem:
- What results are calculated? This question is answered by the types of transformations within the pipeline.
- Where in event time are results calculated? This question is answered by the use of event-time windowing within the pipeline.
- When in processing time are results materialized? This question is answered by the use of watermarks and triggers.
- How do refinements of results relate? This question is answered by the type of accumulation used: discarding (where results are all independent and distinct), accumulating (where later results build upon prior ones), or accumulating and retracting (where both the accumulating value plus a retraction for the previously triggered value(s) are emitted).
2.2. fixed windowing in event-time
2.2.1. When: watermarks
Watermarks are temporal notions of input completeness in the event-time domain.
Perfect watermarks: In the case where we have perfect knowledge of all of the input data, it’s possible to construct a perfect watermark; in such a case, there is no such thing as late data; all data are early or on time.
Heuristic watermarks: For many distributed input sources, perfect knowledge of the input data is impractical, in which case the next best option is to provide a heuristic watermark. Heuristic watermarks use whatever information is available about the inputs (partitions, ordering within partitions if any, growth rates of files, etc.) to provide an estimate of progress that is as accurate as possible.
Two shortcomings of watermarks (and any other notion of completeness), specifically that they can be:
Too slow: When a watermark of any type is correctly delayed due to known unprocessed data (e.g., slowly growing input logs due to network bandwidth constraints), that translates directly into delays in output if advancement of the watermark is the only thing you depend on for stimulating results.
Too fast: When a heuristic watermark is incorrectly advanced earlier than it should be, it’s possible for data with event times before the watermark to arrive some time later, creating late data.
2.2.2. When: The wonderful thing about triggers, is triggers are wonderful things!
Triggers declare when output for a window should happen in processing time.
In the heuristic watermarks example, the persistent state for each window lingers around for the entire lifetime of the example; this is necessary to allow us to appropriately deal with late data when/if they arrive. But while it’d be great to be able to keep around all of our persistent state until the end of time, in reality, when dealing with an unbounded data source, it’s often not practical to keep state (including metadata) for a given window indefinitely; we’ll eventually run out of disk space. A clean and concise way of doing this is by defining a horizon on the allowed lateness within the system—i.e., placing a bound on how late any given record may be (relative to the watermark) for the system to bother processing it; any data that arrive after this horizon are simply dropped.
2.2.3. How: accumulation
There are actually three different modes of accumulation:
Discarding: Every time a pane is materialized, any stored state is discarded. This means each successive pane is independent from any that came before.
Accumulating: Every time a pane is materialized, any stored state is retained, and future inputs are accumulated into the existing state.
Accumulating & retracting: Like accumulating mode, but when producing a new pane, also produces independent retractions for the previous pane(s). Retractions (combined with the new accumulated result) are essentially an explicit way of saying, “I previously told you the result was X, but I was wrong. Get rid of the X I told you last time, and replace it with Y.”
If you care about the times at which your events actually happened, you must use event-time windowing or your results will be meaningless.
2.3. When/Where: Processing-time windows
- Event-time windowing
- Processing-time windowing via triggers
- Processing-time windowing via ingress time
If you care about the times at which your events actually happened, you must use event-time windowing or your results will be meaningless.
2.4. Session windows
Sessions are a special type of window that captures a period of activity in the data that is terminated by a gap of inactivity. They’re particularly useful in data analysis because they can provide a view of the activities for a specific user over a specific period of time where they were engaged in some activity.
From a windowing perspective, sessions are particularly interesting in two ways:
They are an example of a data-driven window: the location and sizes of the windows are a direct consequence of the input data themselves, rather than being based off of some predefined pattern within time, like fixed and sliding windows are.
They are also an example of an unaligned window, i.e., a window that does not apply uniformly across the data, but instead only to a specific subset of the data (e.g., per user). This is in contrast to aligned windows like fixed and sliding windows, which typically apply uniformly across the data.