Skip to main content
Version: latest

State Example Arrow Row

This tutorial is a continuation from the previous state example. This tutorial shows how to use arrow-rows to store more complicated data types not available in primitive key-values. Read more about arrow rows. We will continue from the tutorial from the merge example to have a saved balance.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

In this example, we will first show how to create a state, update the state as data enters from the source, and how to interface with the state. The state and update will be defined in the mergeservice and the interfacing will be defined in the interface service.

Visual of defined dataflow

Mergeservice

1. Define the state

For this state, we will simply only track the balance as a float.

states:
  tracker: 
    type: keyed-state
    properties:
      key: 
        type: string
      value: 
        type: arrow-row
        properties:
          balance:
            type: f32

Here, the key is a string but the value is stored as an arrow-row which can contain multiple properties(acts like columns).

2. Assign key

Like our previous example, we will use a trivial key to store the balance.

partition:
  assign-key:
    run: |
      fn map_cash(order: f32) -> Result<String> {
        Ok("cash".to_string())
      }
  update-state:
    (...)

3. Updating State

To update the state in an arrow-row, we need to update the individual row's columns manual and call an update().

partition:
  assign-key:
    (...)
  update-state:
    run: |
      fn add_count(order: f32) -> Result<()> {
        let mut tracker = tracker();
        tracker.balance += order; 
        tracker.update()?;
        Ok(())

States are terminal so no other action will be run.

Iterface

The second service serves as a way to read from the state.

interface:
  sources:
    - type: topic
      id: command
  states:
    tracker:
      from: mergeservice.tracker
  sinks:
    - type: topic
      id: message
      transforms:
        - operator: map 
          run: |
            fn new_input(_input: String) -> Result<String> {
              let track = tracker();
              let trackrow = track.sql(&format!("select * from `tracker`"))?;
              let rows = trackrow.rows()?;
              if !rows.next() {
                return Ok("empty".to_string())
              }
              let balancecol = trackrow.col("balance")?;
              let balance = rows.f32(&balancecol)?;
              Ok(format!("{:#?}",balance))
            }

The service first has to refer to the state created by the mergeservice. Inside the sink is the transform that will iterface with the state. For simplicity, whatever is sent to the source command will result in the service message outputting how much the balance is. For the transform function:

1. We use a sql statement to read from track, a LazyDf.
let trackrow = track.sql(&format!("select * from `tracker`"))?;
2. Afterwards, we can select the column balance.
let rows = trackrow.rows()?;
if !rows.next() {
  return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: arrow-example
  version: 0.1.0
  namespace: examples

config:
  converter: json

types:
  order:
    type: object
    properties:
      name:
        type: string
      amount:
        type: u32
      price:
        type: f32  

topics:
  buy:
    schema:
      value:
        type: order
  sell:
    schema:
      value:
        type: order
  command:
    schema:
      value:
        type: string
        converter: raw
  message:
    schema:
      value:
        type: string
  

services:
  interface:
    sources:
      - type: topic
        id: command
    states:
      tracker:
        from: mergeservice.tracker
    sinks:
      - type: topic
        id: message
        transforms:
          - operator: map 
            run: |
              fn new_input(_input: String) -> Result<String> {
                let track = tracker();
                let trackrow = track.sql(&format!("select * from `tracker`"))?;
                let rows = trackrow.rows()?;
                if !rows.next() {
                  return Ok("empty".to_string())
                }
                let balancecol = trackrow.col("balance")?;
                let balance = rows.f32(&balancecol)?;
                Ok(format!("{:#?}",balance))
              }
  mergeservice:
    sources:
      - type: topic
        id: buy
        transforms:
          - operator: map
            run: |
              fn buy_order(order: Order) -> Result<f32> {
                Ok(order.amount as f32 * order.price * -1.0)
              }
      - type: topic
        id: sell 
        transforms:
          - operator: map
            run: |
              fn sell_order(order: Order) -> Result<f32> {
                Ok(order.amount as f32 * order.price)
              }
    states:
      tracker: 
        type: keyed-state
        properties:
          key: 
            type: string
          value: 
            type: arrow-row
            properties:
              balance:
                type: f32
    partition:
      assign-key:
        run: |
          fn map_cash(order: f32) -> Result<String> {
            Ok("cash".to_string())
          }
      update-state:
        run: |
          fn add_count(order: f32) -> Result<()> {
            let mut tracker = tracker();
            tracker.balance += order; 
            tracker.update()?;
            Ok(())
          }

Running SDF

To run example:

$ sdf run

Produce data

We will produce some data for the first service through the buy and sell topics.

$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell

Make sure the state exists by entering show state in sdf. It should have the following states:

>> show state
 Namespace                                Keys  Type   
 (...)
 mergeservice/tracker/state               1     u32    
 (...)

And when running a show state on the that state

>> show state mergeservice/tracker/state
 Key   Window  balance  
 cash  *       921.6001

Consume data

Then lets send any string to command and consume the output found in message

$ echo 'Do stuff' | fluvio produce command
$ fluvio consume message -Bd
921.6001

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

We just implement example using arrow states. The following link contains another example with arrow-states.

  1. Temperature Example