How to Analyze Unbounded Time-Series Data using Bounded State

How to Analyze Unbounded Time-Series Data using Bounded State

Leonid Ryzhyk
Leonid RyzhykCTO / Co-Founder
| November 1, 2024

Financial transactions, IoT events, and cloud observability data are all examples of time series—continuous, real-time streams of events. In many cases, this data needs to be analyzed as it arrives, in real-time, with results produced within milliseconds.

Here’s an appealing idea: why don't we write these real-time analyses in SQL? SQL is the world’s most widely used query language. Modern SQL databases are routinely used for analyzing time-series data, so the language itself is more than capable. The missing piece? A query engine that transforms SQL queries into real-time streaming computations.

Many have tried and failed

The idea isn’t new. For nearly a decade, people have been building streaming SQL engines for time-series data, with Flink SQL being the most notable example. These efforts have consistently led to systems that only experts with deep knowledge of the engine’s internals can use effectively. Even when these systems offer a SQL-like interface, they miss the core appeal of SQL—its simplicity and ease of use.

Why is this such a challenging problem? Several factors contribute, but today we
focus on one: time-series data is unbounded. It continuously grows as new
events are generated in real-time, often at a high rate. Modeling this data as
SQL tables means those tables also become unbounded, potentially requiring
unbounded resources (memory and storage) to process.

Here’s the good news: many types of SQL queries can be efficiently evaluated over unbounded streams using bounded memory. Achieving this automatically, while preserving standard SQL semantics and maintaining SQL’s ease of use, is
challenging—but not impossible. At last, we have a system that does just that.

Original xkcd comic: https://xkcd.com/1433

How is Feldera's approach different?

How do we succeed where others have failed? Unlike predecessors, we know exactly what we're doing—in a strict mathematical sense that is. Feldera is powered by DBSP, a formal framework for computing on changing data. DBSP provides exact semantics for incremental computation and a provably correct algorithm to convert any SQL query into a program that computes efficiently over change streams. It precisely defines the state that the program must maintain and specifies how operators access this state. As a result, we can automatically identify and discard parts of the state that are no longer needed. We implemented this analysis in the Feldera SQL compiler, enabling it to handle a large class of SQL queries over unbounded time-series data using only bounded memory, all while preserving full SQL syntax and semantics.

Examples

Before we describe the solution, let's examine a few examples that will help us develop intuition around the types of queries that can be evaluated using bounded state.

Example 1: Window-based aggregation

Consider a table of purchases and a view that calculates the maximum purchase
amount for each day.

CREATE TABLE purchase (
  customer_id INT,
  ts TIMESTAMP NOT NULL,
  amount BIGINT
);

CREATE VIEW daily_max AS
SELECT
  TIMESTAMP_TRUNC(ts, DAY) as d,
  MAX(amount) AS max_amount
FROM
  purchase
GROUP BY
  TIMESTAMP_TRUNC(ts, DAY);

Generally, maintaining an aggregate—like MAX—over a dynamically changing table would require storing the entire contents of the table indefinitely. In this example, we may be able to do better. The query organizes data into one-day windows based on the value of the ts column, using the GROUP BY clause, and computes a MAX within each window. If new purchase records are only added for recent transactions (e.g., within the last hour), then new inputs affect only the aggregates for the current or previous date. Aggregate values for older dates remain unchanged, allowing us to safely discard older data used to compute them.

Note that, in order to safely discard old data, we rely on a property of the table, namely that changes to it arrive approximately in the timestamp order, and can only be delayed and reordered by up to one hour.

This optimization applies to other aggregate queries over bounded time
windows, including tumbling, hopping and rolling aggregates.

Example 2: Global aggregation

What if we wanted to compute a global aggregate over the entire history of the
time series?

CREATE VIEW global_aggregates AS
SELECT
  SUM(amount) AS total_amount,
  MAX(amount) AS max_amount
FROM
  purchase;


We might still be in luck! SUM is an example of a linear aggregate—a type of aggregate function that can be updated incrementally by storing only the current aggregate value. As new inputs arrive, we can simply adjust the aggregate by adding new values and subtracting any removed ones, without needing to store the entire input table. Other examples of linear aggregates in SQL include COUNT,
AVG, and STDDEV.

In contrast, MAX is not linear and typically requires tracking the entire input set. This ensures that if the current maximum is removed, the next largest value can be found. However, time-series tables are often append-only, meaning events cannot be modified or deleted once they are recorded. For an append-only table, we can compute MAX incrementally by only storing the current maximum value.

As before, this optimization relies on a property of the input time series—in
this case, that it is append-only.


------

There are other SQL constructs and patterns that can be evaluated using bounded state, including time-based joins, as-of joins, SQL window functions, temporal filters, etc. We started a catalog of such patterns here.

The solution

At a high level, Feldera allows users to efficiently compute on time series data by providing LATENESS annotations on tables and views. These annotations describe the data source and tell Feldera the maximum out-of-orderness in the data. For example, it allows users to convey to Feldera a hint of the form "I know that updates to the purchase table will never get delayed by more than one hour/day/week/...":

CREATE TABLE purchase (
  customer_id INT,
  ts TIMESTAMP NOT NULL LATENESS INTERVAL 1 HOURS,
  amount BIGINT
);

Feldera automatically interprets these cues and propagates them across all views in the program to determine when it’s safe to discard state without affecting any outputs. For example, Feldera deduces that, once it receives a purchase record with a timestamp of 1:00 a.m. or later on a given date, no further updates for earlier dates will arrive (accounting for the 1-hour lateness). As a result, the MAX value for those earlier dates won’t need recalculating, and purchase records from before the current day can be safely discarded.

Feldera continuously runs background jobs that garbage collect old records that
fall below computed retention bounds.

Importantly, this garbage collection mechanism is purely an optimization—it
allows running your SQL queries more efficiently without altering their output.
This stands in contrast to many existing solutions, where users must manually define state retention policies, often at the risk of altering query results. If you've had such traumatic experience with other streaming query engines, a few clarifications are in order:

  • Lateness does NOT affect the output of the program. Assuming lateness
    annotations are accurate, i.e., input records do not arrive more than lateness
    time units out of order, the output of the program with lateness annotations
    is guaranteed to be identical to outputs of the same program without annotations.
  • Lateness does NOT delay computation. Feldera does not delay computation until all out-of-order records have been received. It always computes the output of the queries given all inputs received so far and incrementally updates these outputs as new out-of-order inputs arrive. (We are trialing another experimental feature that allows delaying computation on demand).
  • Lateness is NOT time-to-live. Lateness should not be confused with time-to-live annotations used by some stream processing systems. Declaring a column with LATENESS 1 hour does not mean that the data will be discarded after an hour. The compiler decides what data to store and for how long by analyzing the entire SQL program. LATENESS annotations simply inform this analysis.
  • Discarding old records is NOT equivalent to deleting them. These are fundamentally different operations. Deleting a record means that any outputs derived from it should be updated. For example, deleting a year-old purchase requires recomputing the daily_max query for the corresponding date. In contrast, a discarded record is conceptually still part of the relation; however it will not participate in computing any future incremental updates and therefore does not need to be stored.

Other time-series features

We list a few other Feldera features that facilitate time series analysis. See our Time Series Analysis Guide for a detailed description.

  • The append_only annotation on a table instructs Feldera that the table will only receive INSERT updates, enabling additional optimizations, such as the one described in Example 2 above:
CREATE TABLE purchase (
  customer_id INT,
  ts TIMESTAMP NOT NULL LATENESS INTERVAL 1 HOURS,
  amount BIGINT
) WITH (
  'append_only' = 'true'
);
  • The emit_final attribute instructs Feldera to only output final rows of a view, i.e., rows that are guaranteed to never get deleted or updated:
CREATE VIEW daily_total_final
WITH ('emit_final' = 'd')
AS
SELECT
  TIMESTAMP_TRUNC(ts, DAY) as d,
  SUM(amount) AS total
FROM
  purchase
GROUP BY
  TIMESTAMP_TRUNC(ts, DAY);
  • The WATERMARK annotation on a column of a table delays the processing of the input rows by a specified amount of time.

What's next?

We've developed the first rigorous garbage collection framework for an incremental query engine and are actively exploring its full potential. Here’s a look at what’s coming:

  • As we continue to identify new GC-able query patterns, Feldera will
    handle an even broader range of time-series queries with bounded state.
  • We are working on new tooling that will help users make the most of GC
    optimizations.
  • We are continuously improving Feldera's storage layer, so that queries that are not amenable to GC or that require large state even with GC can run efficiently on larger than memory data sets.

Stay tuned!

Other articles you may like

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.