Skip to main content

Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.

Basics

A connector is a Feldera API object that describes an external data source or sink such as a database table or a Kafka topic. Connectors work together with other API objects, namely programs and pipelines, to build end-to-end streaming analytics pipelines.

  • A program defines a set of SQL tables and views.
  • A pipeline consists of a program with input and output connectors attached to it.
  • A connector must explicitly be attached to a table (for input) or view (for output) of a pipeline.
  • A connector can be attached to the tables/views of one or more pipelines, and can even be attached to multiple tables (for input) or views (for output) within the same pipeline.
  • Connectors are managed through the API under the /v0/connectors/... endpoints. They can be retrieved individually or as a list, created, overwritten, modified and deleted.
  • If changes are made to the connectors of a running pipeline, they will only be applied when the pipeline is restarted.

A connector specification consists of three parts:

  • Generic attributes common to all connectors, such as backpressure thresholds.
  • Transport specification, which defines the data transport to be used by the connector (e.g., Kafka, URL, Delta Lake, etc.) and its configuration.
  • Data format specification, which defines the data format for the connector (e.g., CSV, JSON, Parquet, or Avro), and its specific dialect.

This architecture allows the user to combine different transports and data formats.

Note: these basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints.

Configuring the output buffer

By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.

The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.

The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:

  • enable_output_buffer - Enable output buffer.

  • max_output_buffer_time_millis - Maximum time in milliseconds data is kept in the output buffer.

    When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every max_output_buffer_time_millis milliseconds.

    NOTE: this configuration option requires the enable_output_buffer flag to be set.

  • max_output_buffer_size_records - Maximum number of updates to be kept in the output buffer.

    This parameter bounds the maximal size of the buffer.
    Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.

    When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.

    NOTE: this configuration option requires the enable_output_buffer flag to be set.

note

When the enable_output_buffer flag is set, at least one of max_output_buffer_time_millis or max_output_buffer_size_records must be specified.

See Delta Lake output connector documentation for an example of configuring the output buffer.

Example usage

curl

Retrieve individual connector

Retrieve connector named product-tools.

curl -s -X GET http://localhost:8080/v0/connectors/product-tools | jq

Retrieve list of connectors

curl -s -X GET http://localhost:8080/v0/connectors | jq

Create connector

Create HTTP GET connector named product-tools.

curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "URL input connector for tools products",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'

Edit connector description

curl -i -X PATCH http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "New description"
}'

Overwrite the entire connector (description and configuration)

curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "Another description",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/new-tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'

Delete connector

Delete connector named product-tools.

curl -i -X DELETE http://localhost:8080/v0/connectors/product-tools

Attach input and output connector

A Feldera pipeline consists of an SQL program with input and output connectors attached to it. The user lists connectors to be attached to the pipeline as part of the pipeline configuration. In this example, we create pipeline supply-chain-pipeline with program supply-chain-program. The pipeline attaches input connector product-tools to table product and output connector kafka-average-price to view average_price.

curl -i -X PUT http://localhost:8080/v0/pipelines/supply-chain-pipeline \
-H 'Content-Type: application/json' \
-d '{
"description": "Supply Chain pipeline",
"program_name": "supply-chain-program",
"config": {"workers": 4},
"connectors": [
{
"connector_name": "product-tools",
"is_input": true,
"name": "product-tools",
"relation_name": "product"
},
{
"connector_name": "kafka-average-price",
"is_input": false,
"name": "kafka-average-price",
"relation_name": "average_price"
}
]
}'

Python (direct API calls)

Retrieve list of connectors while providing authorization:

import requests

api_url = "http://localhost:8080"
headers = { "authorization": f"Bearer <API-KEY>" }

for connector in requests.get(f"{api_url}/v0/connectors", headers=headers).json():
print(connector)

Additional resources

For more information, see: