Window Processing
Window processing, a fundamental concept in stream processing extensively detailed in the "The Dataflow Model" whitepaper, addresses the challenge of analyzing unbounded data streams by partitioning them into finite sets of records known as windows or bounded contexts. A window operation is characterized by its window size, defining the temporal or count-based boundaries of each window, and is typically governed by a watermark mechanism to manage event-time progression and late-arriving data. SDF performs a window processing operation by chaining multiple operators to assign timestamps, group them by key, and apply custom operations.
While there are several types of windows, and SDF will eventually implement all of them, this preview will focus on two: tumbling window
and sliding window
.
Tumbling windows are equal-sized, continuous and non-overlapping
windows. Each record is present in exactly one window.
Sliding windows are equal-sized, continuous and overlapping
windows. Each record may be present in one or more window.
Configuring Windows in SDF
In SDF, window processing is configured within a service
definition using the window
block. This block specifies the type of window, its parameters, and the sequence of operations to be performed on the data within those windows.
Here's a brief overview of the typical structure of the window
block:
# Overview of the 'window' block structure
window:
# --- Window Type (Choose one) ---
tumbling: # Or 'sliding:'
duration: <time_value> # e.g., 10s
# slide: <time_value> # Only for sliding windows, specifies the window advance interval
# --- Watermark Configuration (Optional) ---
# Defines how to handle late data and advance event time
watermark:
grace-period: <time_value> # e.g., 3s, allows data to arrive late
# idleness: <time_value> # For advancing watermarks when detected inactive input
# --- Event Timestamp Assignment (Mandatory for event-time windowing) ---
# Assigns a logical event timestamp to each incoming record
assign-timestamp:
run: # ... Rust function (TopicData, system_event_time) -> Result<i64> ...
# --- Data Transformation (Optional, applied before partitioning) ---
# Operations like map, flat-map, filter on records entering the window
transforms:
- operator: <operator_type> # e.g., map, flat-map
run: # ... Rust function (InputType) -> Result<OutputType> ...
# --- Partitioning and State Update (Mandatory for keyed window processing) ---
# Groups records by key and updates state for each record
partition:
assign-key:
run: # ... Rust function (RecordType) -> Result<KeyType> ...
update-state:
run: # ... Rust function (RecordForPartitionType) -> Result<()> ...
# This function typically interacts with 'states' defined in the service
# --- Window Output Generation (Mandatory) ---
# Defines what happens when a window closes (e.g., aggregation and emission of results)
flush:
run: # ... Rust function () -> Result<OutputMessageType> ...
# This function typically queries the aggregated state for the window
Now, let's explore these components in more detail.
Tumbling Window
Tumbling windows are defined by a single parameter: duration. This specifies the fixed length of each window. Windows are contiguous and non-overlapping.
Example:
window:
tumbling: # Defines the window type
duration: 10s # Specifies the window length
...
In this configuration, window.tumbling.duration: 10s sets up tumbling windows that collect records for 10 seconds based on their assigned event timestamps.
Sliding Window
Sliding windows are defined by two parameters:
duration
: The length of the window (how much data it covers).slide
: The interval at which new windows are created. The slide period must be less than or equal to the duration. If slide < duration, windows will overlap, meaning a single record can belong to multiple windows.
Example:
window:
sliding: # Defines the window type
duration: 60s # Specifies the window length
slide: 10s # Specifies the interval for new windows
...
Here, window.sliding.duration: 60s and window.sliding.slide: 10s mean that every 10 seconds, a new window is initiated, covering the last 60 seconds of data.
assign-timestamp
Operator
The first crucial step in event-time window processing is to determine the event time of each incoming record. This is handled by the assign-timestamp operator. The event timestamp is vital because it dictates which window(s) a particular record belongs to.
You must define a custom function (e.g., in Rust) that takes as input a type (that should match with previous configurations of the service) and its system event time (the time it was received by SDF) and returns the logical event timestamp (as an i64 Unix timestamp in milliseconds) to be used for windowing.
Example:
# Inside the 'window' block
assign-timestamp:
run: |
fn assign_timestamp_fn(value: InputMessage, _event_time: i64) -> Result<i64> {
// Logic to extract or assign a timestamp from the 'value'
// For example, if 'value' has a 'timestamp' field:
Ok(value.timestamp)
}
In this example, InputMessage is a defined type that includes a timestamp field. The function extracts this field to be used as the event time. If the actual event time from the source system is not present in the message, you might use _event_time (SDF's ingestion time) or implement logic to derive it.
Watermark
Watermarks are a fundamental concept in stream processing, representing the point in event time up to which SDF assumes all data has been received for a given partition. They are crucial for triggering window computations (i.e., deciding when a window is "closed") and handling late-arriving data. SDF advances watermarks based on the timestamps assigned by assign-timestamp.
The watermark configuration is nested within the window block.
grace-period
: The grace-period (also known as allowed lateness) defines an additional amount of time after a window's end time (as determined by the watermark) during which late-arriving records are still accepted and processed for that window. Records whose timestamps fall within the window's original range but arrive after the watermark has passed the window's end (but before the grace period expires) can still be included.idleness
: The idleness configuration helps in scenarios where there are not new events for an extended period of time. If the service is idle for the specified duration (i.e., no new messages arrive), SDF can advance the watermark and therefore closing a window, preventing the overall system watermark from being stalled. it's generally recommended that the idleness timeout is larger than the duration of the window to avoid premature window closure.
Example:
# ... (sources, states) ...
window:
tumbling:
duration: 10s
watermark:
idleness: 12s # Consider a partition idle after 12s of no data
grace-period: 3s # Allow data to be 3s late for this window
In this setup, for a 10-second tumbling window (e.g., for event times 00:00:00 to 00:00:10), even if the watermark has advanced past 00:00:10, records with timestamps within this [0, 10s) interval can still be included if they arrive before the watermark passes 00:00:13 (window_end + grace_period). Also, If SDF hasn't received any data for 12 seconds, then the current watermark is advanced and that triggers closing of current window and creation of a new one.
Inner Window operators
Within the window block, several operators define how data is processed once its timestamp is assigned and it's notionally part of one or more window instances:
transforms (within window context):
As shown in the overview , transforms can be defined as a direct child of the window block. These transformations (e.g., map, flat-map, filter, filter-map) are applied to each record after its timestamp has been assigned by assign-timestamp but before it is passed to the partition operator. This allows for data shaping, enrichment, or filtering specific to the windowing logic before key-based operations commence.
partition
Windowed operations are typically stateful and performed on a per-key basis. The partition operator defines how incoming records (which have already been timestamped and possibly transformed) are grouped by a key, and how state associated with that key is updated for each record.
- assign-key: This sub-operator contains a function that takes a record and returns a key. All records sharing the same key will be processed together within their respective window instances, allowing for stateful computations per key.
- transforms: Group of operations to run before update-state step, can be any of the SDF transforms operators (filter, map, filter-map, flat-map)
- update-state: This sub-operator contains a function that is executed for each record within its assigned partition (i.e., for its key). It typically interacts with a keyed state (defined in the states section of the service) to accumulate information or perform calculations.
flush Operator
The flush operator defines the action to be taken when a window instance closes for a particular key (triggered by the watermark advancing past the window's end plus any grace period). This is typically where final aggregations are computed from the accumulated state, and the results for that window instance are emitted. The flush function has access to the state that was built up by update-state operations for the specific key and window that is closing.
Example:
# Inside the 'window' block
flush:
run: |
fn aggregate_wordcount() -> Result<TopWords> {
// 'count_per_word' is the keyed state holding word counts for the key whose window is closing.
// Perform a SQL query on the state for the current key's window data.
// Note: The SQL query operates on the state associated with the window for which the flush is triggered.
let top10 = sql("select * from count_per_word order by count desc limit 10")?;
let rows = top10.rows()?;
let mut top_words = vec![];
// 'key()' refers to the key column of the 'count_per_word' state table.
let key_col_name = top10.key()?;
let count_col_name = top10.col("count")?; // 'col("count")' refers to the 'count' column.
// Iterate over the query results to build the TopWords list
while rows.next() {
let word = rows.str(&key_col_name)?;
let count = rows.u32(&count_col_name)?;
let word_count = WordCount { word, count }; // 'WordCount' is a defined output type
top_words.push(word_count);
}
// 'TopWords' is the defined output type for the sink
Ok(top_words)
}
When a window instance for a given key closes, the aggregate_wordcount function is executed for that key. In this case, it queries the count_per_word
state (which, for this key, contains the final count accumulated during the window via update-state), selects the top 10 entries and prepares them for output. The result of the flush function (e.g., TopWords) is then sent to the configured sinks.
These components—window type definition, timestamp assignment, optional watermarking rules, pre-partitioning transformations, data partitioning, state updates per key, and the flushing logic—provide a comprehensive and flexible framework for stateful stream processing over bounded windows of data in SDF.
As a running example, take a look at the How-To example of tumbling windows.