Connectors: connect to data sources and sinks
A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.
Basics
A connector is a Feldera API object that describes an external data source or sink such as a database table or a Kafka topic. Connectors work together with other API objects, namely programs and pipelines, to build end-to-end streaming analytics pipelines.
- A program defines a set of SQL tables and views.
- A pipeline consists of a program with input and output connectors attached to it.
- A connector must explicitly be attached to a table (for input) or view (for output) of a pipeline.
- A connector can be attached to the tables/views of one or more pipelines, and can even be attached to multiple tables (for input) or views (for output) within the same pipeline.
- Connectors are managed through the API under the /v0/connectors/... endpoints. They can be retrieved individually or as a list, created, overwritten, modified and deleted.
- If changes are made to the connectors of a running pipeline, they will only be applied when the pipeline is restarted.
A connector specification consists of three parts:
- Generic attributes common to all connectors, such as backpressure thresholds.
- Transport specification, which defines the data transport to be used by the connector (e.g., Kafka, URL, Delta Lake, etc.) and its configuration.
- Data format specification, which defines the data format for the connector (e.g., CSV, JSON, Parquet, or Avro), and its specific dialect.
This architecture allows the user to combine different transports and data formats.
Note: these basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints.
Configuring the output buffer
By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.
The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.
The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:
enable_output_buffer
- Enable output buffer.max_output_buffer_time_millis
- Maximum time in milliseconds data is kept in the output buffer.When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every
max_output_buffer_time_millis
milliseconds.NOTE: this configuration option requires the
enable_output_buffer
flag to be set.max_output_buffer_size_records
- Maximum number of updates to be kept in the output buffer.This parameter bounds the maximal size of the buffer.
Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.
NOTE: this configuration option requires the
enable_output_buffer
flag to be set.
When the enable_output_buffer
flag is set, at least one of
max_output_buffer_time_millis
or max_output_buffer_size_records
must be
specified.
See Delta Lake output connector documentation for an example of configuring the output buffer.
Example usage
curl
Retrieve individual connector
Retrieve connector named product-tools
.
curl -s -X GET http://localhost:8080/v0/connectors/product-tools | jq
Retrieve list of connectors
curl -s -X GET http://localhost:8080/v0/connectors | jq
Create connector
Create HTTP GET connector named product-tools
.
curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "URL input connector for tools products",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'
Edit connector description
curl -i -X PATCH http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "New description"
}'
Overwrite the entire connector (description and configuration)
curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "Another description",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/new-tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'
Delete connector
Delete connector named product-tools
.
curl -i -X DELETE http://localhost:8080/v0/connectors/product-tools
Attach input and output connector
A Feldera pipeline consists of an SQL program with input and output connectors attached to it.
The user lists connectors to be attached to the pipeline as part of the pipeline configuration.
In this example, we create pipeline supply-chain-pipeline
with program supply-chain-program
.
The pipeline attaches input connector product-tools
to table product
and output connector kafka-average-price
to view average_price
.
curl -i -X PUT http://localhost:8080/v0/pipelines/supply-chain-pipeline \
-H 'Content-Type: application/json' \
-d '{
"description": "Supply Chain pipeline",
"program_name": "supply-chain-program",
"config": {"workers": 4},
"connectors": [
{
"connector_name": "product-tools",
"is_input": true,
"name": "product-tools",
"relation_name": "product"
},
{
"connector_name": "kafka-average-price",
"is_input": false,
"name": "kafka-average-price",
"relation_name": "average_price"
}
]
}'
Python (direct API calls)
Retrieve list of connectors while providing authorization:
import requests
api_url = "http://localhost:8080"
headers = { "authorization": f"Bearer <API-KEY>" }
for connector in requests.get(f"{api_url}/v0/connectors", headers=headers).json():
print(connector)
Additional resources
For more information, see:
REST API documentation for connectors.