Skip to main content
Version: latest

Key-Value - Input

In this tutorial we use key-values. Key-values are entries that take record a key and the corresponding value. This example will focus on taking inputs from a key-value topic to a non-key-value topic.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

In this example, we will write an example that takes inputs from a key-value topic then transforms it via a filter-map to select only students in a certain class.

Visual of defined dataflow

Define the types

For this example, we will not be using primitive types. We have the following objects.

types:
  student-info:
    type: object
    properties:
      age:
        type: u32
      teacher:
        type: string
      grade:
        type: f32
  student-in-class:
    type: object
    properties:
      name:
        type: string
      grade:
        type: f32

We have two object types here. A student-info that stores the value of the source. And student-in-class that captures the sink's type.

Topic List

The following is our list of topics.

topics:
  student-list:
    schema:
      key:
        type: string
      value:
        type: student-info
  profx-students:
    schema:
      value:
        type: student-in-class
Source

The student-list is the source topic. It maps a primitive string to a student-info. For our example, the primitive string will be the name of the student.

Sink

The profx-students is the sink topic. It contains the type student-in-class defined above.

Transform

We will apply a transform to take the tuple (String,StudentInfo) entries into StudentInClass. We will apply a filter-map transform to filter out students that do not have profX in their teacher entry.

transforms:
  - operator: filter-map
    run: |
      fn make_user(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
        if info.teacher != "profX" {
            return Ok(None);
        }
        let ret = StudentInClass{
            name: name.unwrap_or("".to_string()),
            grade: info.grade
        };
        Ok(Some(ret))
      }

In this transfrom function, the input parameters contain a variable for the key, name, and another for the value, info.

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

apiVersion: 0.5.0

meta:
  name: key-value-type
  version: 0.1.0
  namespace: example

config:
  converter: json
  consumer:
    default_starting_offset:
      value: 0
      position: End

types:
  student-info:
    type: object
    properties:
      age:
        type: u32
      teacher:
        type: string
      grade:
        type: f32
  student-in-class:
    type: object
    properties:
      name:
        type: string
      grade:
        type: f32

topics:
  student-list:
    schema:
      key:
        type: string
      value:
        type: student-info
  profx-students:
    schema:
      value:
        type: student-in-class

services:
  filter-profx:
    sources:
      - type: topic
        id: student-list

    transforms:
      - operator: filter-map
        run: |
          fn student_filter(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
            if info.teacher != "profX" {
                return Ok(None);
            }
            let ret = StudentInClass{
                name: name.unwrap_or("".to_string()),
                grade: info.grade
            };
            Ok(Some(ret))
          }

    sinks:
      - type: topic
        id: profx-students

Running SDF

To run example:

$ sdf run

Produce data

We will produce some data by first writing it into a file name student.txt.

Jerry>{"age":13,"teacher":"profX","grade":90.1} 
Tom>{"age":14,"teacher":"profY","grade":99.2} 
Jane>{"age":-1,"teacher":"profZ","grade":99.11} 
Terry>{"age":13,"teacher":"profX","grade":50}

We can produce data via

$ fluvio produce student-list --key-separator ">" -f student.txt

We should see the following output if we consume with -k

$ fluvio consume student-list -Bdk
[Jerry] {"age":13,"teacher":"profX","grade":90.1}
[Tom] {"age":14,"teacher":"profY","grade":99.2}
[Jane] {"age":-1,"teacher":"profZ","grade":99.11}
[Terry] {"age":13,"teacher":"profX","grade":50}

Consume data

To consume the data.

$ fluvio consume profx-students -Bd
{"grade":90.1,"name":"Jerry"}
{"grade":50.0,"name":"Terry"}

Only students with teacher names profX is sent to the sink.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

This how-to focused on using key-values as inputs. The following pages contains another example of key-value as inputs.

  1. Key Value Input