Skip to main content
Version: latest

SQL Interface

Dataframe states

States are a key concept in SDF. They hold a simple value or they can be divided into multiple partitions based on a key. Each partition state can have many different types.

One of the type is arrow-row. For example, follow snippet define state count_per_word that store each row into arrow dataframe.

    states:
      count-per-word:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              count: 
                type: u32

An arrow-row state can be seen as an SQL table. In the previous example, we are defining a state count_per_word to track frequency of the words. For each key, we have a row that has a column count that store the count of the word. For example, let's we have following word: apple, orange, banana, orange, grape, orange, banana.

Then this will be mapped to arrow dataframe as follows:

_keycount
apple2
orange3
banana2
grape1

Updating a Dataframe state

To update the state, you can use the update-state operator as below:

update-state:
    run: |
    fn count_word(_word: String) -> Result<()> {
        let mut state = count_per_word();
        state.count += 1;
        state.update()?;
        Ok(())
    }

Note that state value can be access using count_per_word state function which is automatically injected by SDF builder.

This API is invoked by the update-state operator, which only is able to update the value of the partition state.

In the example, count_per_word represents a row value of the dataframe. If operator sees apple, it will be the first row in the dataframe above.

SQL function

Aggregate operators like flush, or external services that reference a state can perform SQL queries on the aggregated data of all partitions of a state. In order to do that, is introduced a function sql to the context. The sql state function performs the SQL operation passed as parameter on the aggregated view of the states and not in their individual rows. The snippet below shows how to use SQL to get the 3 most frequent words.

flush:
    run: |
        fn aggregate_wordcount() -> Result<TopWords> {

        let top3 = sql("select * from count_per_word order by count desc limit 3")?;
        let rows = top3.rows()?;

        let mut top_words = vec![];
        let key = top3.key()?;
        let count = top3.col("count")?;

        while rows.next() {
            let word = rows.str(&key)?;
            let count = rows.u32(&count)?;
            let word_count = WordCount { word, count };
            top_words.push(word_count);
        }

        Ok(top_words)
        }

The output of the sql function implements also the following methods that will be described below: sql, rows, col, key, next

SQL API

For any state that is dataframe, you can use SQL API to perform dataframe operation. SDF uses polars SQL to perform dataframe operations.
The result of a SQL operation is always a dataframe. So you can perform multiple SQL operation to get the desired result.

The SQL is executed in the context of all the available dataframes, so you can perform any JOIN or complex SQL operations with them. Each dataframe is represented as a table, and each table name is their state name replacing hyphens(-) with underscores(_) as illustrated below.

let top3 = sql("select * from count_per_word order by count desc limit 3")?;

Row API

Once you have dataframe, you can use row api to access values of individual rows. To get rows, you can use rows function on the dataframe.

let rows = top3.rows()?;

You can think of row as iterator that you can use to access individual row. Following functions are available on row object. To access row values, must first get columns like below:

let count = top3.col("count")?;

To get key, can use helper function key:

let key = top.key()?;

Use next function to goto next row. If successful, it will return true. If there are no more rows, it will return false.

while rows.next() {
    ...
}

As long as next is true, you can access current value by passing column. Each column has type associated with it. So you can only access values that are compatible with the column type otherwise it will return error.

For example, to get string value, you can use str function. To get u32 value, you can use u32 function.

let word = rows.str(&key)?;
let count = rows.u32(&count)?;