State Example Arrow Row
This tutorial is a continuation from the previous state example. This tutorial shows how to use arrow-rows to store more complicated data types not available in primitive key-values. Read more about arrow rows. We will continue from the tutorial from the merge example to have a saved balance.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
In this example, we will first show how to create a state, update the state as data enters from the source, and how to interface with the state. The state and update will be defined in the mergeservice
and the interfacing will be defined in the interface
service.
Mergeservice
1. Define the state
For this state, we will simply only track the balance as a float.
states:
tracker:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
balance:
type: f32
Here, the key
is a string but the value is stored as an arrow-row
which can contain multiple properties
(acts like columns).
2. Assign key
Like our previous example, we will use a trivial key to store the balance.
partition:
assign-key:
run: |
fn map_cash(order: f32) -> Result<String> {
Ok("cash".to_string())
}
update-state:
(...)
3. Updating State
To update the state in an arrow-row
, we need to update the individual row's columns manual and call an update()
.
partition:
assign-key:
(...)
update-state:
run: |
fn add_count(order: f32) -> Result<()> {
let mut tracker = tracker();
tracker.balance += order;
tracker.update()?;
Ok(())
States are terminal so no other action will be run.
Interface
The second service serves as a way to read from the state.
interface:
sources:
- type: topic
id: command
states:
tracker:
from: mergeservice.tracker
sinks:
- type: topic
id: message
transforms:
- operator: map
run: |
fn new_input(_input: String) -> Result<String> {
let track = tracker();
let trackrow = track.sql(&format!("select * from `tracker`"))?;
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Ok(format!("{:#?}",balance))
}
The service first has to refer to the state created by the mergeservice
. Inside the sink is the transform that will iterface with the state. For simplicity, whatever is sent to the source command
will result in the service message
outputting how much the balance is. For the transform function:
1. We use a sql statement to read from track
, a LazyDf
.
let trackrow = track.sql(&format!("select * from `tracker`"))?;
2. Afterwards, we can select the column balance
.
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml
.
# dataflow.yaml
apiVersion: 0.5.0
meta:
name: arrow-example
version: 0.1.0
namespace: examples
config:
converter: json
types:
order:
type: object
properties:
name:
type: string
amount:
type: u32
price:
type: f32
topics:
buy:
schema:
value:
type: order
sell:
schema:
value:
type: order
command:
schema:
value:
type: string
converter: raw
message:
schema:
value:
type: string
services:
interface:
sources:
- type: topic
id: command
states:
tracker:
from: mergeservice.tracker
sinks:
- type: topic
id: message
transforms:
- operator: map
run: |
fn new_input(_input: String) -> Result<String> {
let track = tracker();
let trackrow = track.sql(&format!("select * from `tracker`"))?;
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Ok(format!("{:#?}",balance))
}
mergeservice:
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<f32> {
Ok(order.amount as f32 * order.price * -1.0)
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<f32> {
Ok(order.amount as f32 * order.price)
}
states:
tracker:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
balance:
type: f32
partition:
assign-key:
run: |
fn map_cash(order: f32) -> Result<String> {
Ok("cash".to_string())
}
update-state:
run: |
fn add_count(order: f32) -> Result<()> {
let mut tracker = tracker();
tracker.balance += order;
tracker.update()?;
Ok(())
}
Running SDF
To run example:
$ sdf run
Produce data
We will produce some data for the first service through the buy
and sell
topics.
$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell
Make sure the state exists by entering show state
in sdf. It should have the following states:
>> show state
Namespace Keys Type
(...)
mergeservice/tracker/state 1 u32
(...)
And when running a show state
on the that state
>> show state mergeservice/tracker/state
Key Window balance
cash * 921.6001
Consume data
Then lets send any string to command
and consume the output found in message
$ echo 'Do stuff' | fluvio produce command
$ fluvio consume message -Bd
921.6001
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
We just implement example using arrow states. The following link contains another example with arrow-states.