Skip to main content
Version: 0.12.0

HTTP Connector

Read HTTP Responses given input HTTP request configuration options and produce them to Fluvio topics.

This connector can be configured to operate in three modes.

  • Polling: Unless otherwise specified, the endpoint will be polled periodically, with the polling interval specified by providing the interval config option. Each response will be produced as an individual Fluvio record.
  • Streaming: When the stream config option is provided, the HTTP response will be processed as a data stream. A record will be produced to Fluvio every time a delimiter segment is encountered, which is set to \n by default.
  • WebSocket: When the provided endpoint config option is prefixed with ws://, a WebSocket connection will be established, and each incoming message will be produced.

Supports HTTP/1.0, HTTP/1.1, HTTP/2.0 protocols.

Tutorial for [HTTP to SQL Pipeline].

Configuration

Optiondefaulttypedescription
interval10sStringInterval between each HTTP Request. This is in the form of "1s", "10ms", "1m", "1ns", etc.
methodGETStringGET, POST, PUT, HEAD
endpoint-StringHTTP URL endpoint. Use ws:// for websocket URLs.
headers-Array<String>Request header(s) "Key:Value" pairs
body-StringRequest body e.g. in POST
user-agent"fluvio/http-source 0.1.0"StringRequest user-agent
output_typetextStringtext = UTF-8 String Output, json = UTF-8 JSON Serialized String
output_partsbodyStringbody = body only, full = all status, header and body parts
streamfalseboolFlag to indicate HTTP streaming mode
delimiter'\n'StringDelimiter to separate records when producing from an HTTP streaming endpoint

Record Type Output

MatrixOutput
output_type = text (default), output_parts = body (default)Only the body of the HTTP Response
output_type = text (default), output_parts = fullThe full HTTP Response
output_type = json, output_parts = body (default)Only the "body" in JSON struct
output_type = json, output_parts = fullHTTP "status", "body" and "header" JSON

Usage Example

info

All versions are marked with x.y.z. To find the latest version, run:

  • fluvio hub connector list
  • fluvio hub smartmodule list

This is an example of simple connector config file for polling an endpoint:

# config-example.yaml
apiVersion: 0.1.0
meta:
version: x.y.z
name: cat-facts
type: http-source
topic: cat-facts
secrets:
- name: AUTHORIZATION_TOKEN
http:
endpoint: "https://catfact.ninja/fact"
interval: 10s
headers:
- "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
- "Cache-Control: no-cache"

The produced record in Fluvio topic will be:

{
"fact": "The biggest wildcat today is the Siberian Tiger. It can be more than 12 feet (3.6 m) long (about the size of a small car) and weigh up to 700 pounds (317 kg).",
"length": 158
}

Secrets

Fluvio HTTP Source Connector supports Secrets in the endpoint and in the headers parameters:

# config-example.yaml
apiVersion: 0.1.0
meta:
version: x.y.z
name: cat-facts
type: http-source
topic: cat-facts
secrets:
- name: MY_SECRET_URL
- name: MY_AUTHORIZATION_HEADER
http:
endpoint:
secret:
name: MY_SECRET_URL
headers:
- "Authorization: ${{ secrets.MY_AUTHORIZATION_HEADER }}
interval: 10s

Transformations

Fluvio HTTP Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.

The previous example can be extended to add extra transformations to outgoing records:

apiVersion: 0.1.0
meta:
version: x.y.z
name: cat-facts
type: http-source
topic: cat-facts
http:
endpoint: "https://catfact.ninja/fact"
interval: 10s
transforms:
- uses: infinyon/jolt@x.y.z
with:
spec:
- operation: default
spec:
source: "http-connector"
- operation: remove
spec:
length: ""

In this case, additional transformation will be performed before records are sent to Fluvio topic: field length will be removed and field source with string value http-connector will be added.

Now produced records will have a different shape, for example:

{
"fact": "A cat has more bones than a human; humans have 206, and the cat - 230.",
"source": "http-connector"
}

Read more about JSON to JSON transformations.

Streaming Mode

Provide the stream configuration option to enable streaming mode with delimiter to determine how the incoming records are separated.

apiVersion: 0.1.0
meta:
version: x.y.z
name: wiki-updates
type: http-source
topic: wiki-updates
http:
endpoint: "https://stream.wikimedia.org/v2/stream/recentchange"
method: GET
stream: true
delimiter: "\n\n"

Websocket Mode

Connect to a websocket endpoint using a ws:// URL. When reading text messages, they are emitted as equivalent records. Binary messages are initially attempted to be converted into strings.

apiVersion: 0.1.0
meta:
version: x.y.z
name: websocket-connector
type: http-source
topic: websocket-updates
http:
endpoint: ws://websocket.example/websocket