Skip to main content

Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.

Basics

A connector is a Feldera API object that describes an external data source or sink such as a database table or a Kafka topic. Connectors work together with other API objects, namely programs and pipelines, to build end-to-end streaming analytics pipelines.

  • A program defines a set of SQL tables and views.
  • A pipeline consists of a program with input and output connectors attached to it.
  • A connector must explicitly be attached to a table (for input) or view (for output) of a pipeline.
  • A connector can be attached to the tables/views of one or more pipelines, and can even be attached to multiple tables (for input) or views (for output) within the same pipeline.
  • Connectors are managed through the API under the /v0/connectors/... endpoints. They can be retrieved individually or as a list, created, overwritten, modified and deleted.
  • If changes are made to the connectors of a running pipeline, they will only be applied when the pipeline is restarted.

A connector specification consists of three parts:

  • Generic attributes common to all connectors, such as backpressure thresholds.
  • Transport specification, which defines the data transport to be used by the connector (e.g., Kafka, URL, Delta Lake, etc.) and its configuration.
  • Data format specification, which defines the data format for the connector (e.g., CSV, JSON, Parquet, or Avro), and its specific dialect.

This architecture allows the user to combine different transports and data formats.

Note: these basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints.

Example usage

curl

Retrieve individual connector

Retrieve connector named product-tools.

curl -s -X GET http://localhost:8080/v0/connectors/product-tools | jq

Retrieve list of connectors

curl -s -X GET http://localhost:8080/v0/connectors | jq

Create connector

Create HTTP GET connector named product-tools.

curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "URL input connector for tools products",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'

Edit connector description

curl -i -X PATCH http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "New description"
}'

Overwrite the entire connector (description and configuration)

curl -i -X PUT http://localhost:8080/v0/connectors/product-tools \
-H 'Content-Type: application/json' \
-d '{
"description": "Another description",
"config": {
"transport": {
"name": "url_input",
"config": {
"path": "https://example.com/new-tools-data.json"
}
},
"format": {
"name": "json",
"config": {}
}
}
}'

Delete connector

Delete connector named product-tools.

curl -i -X DELETE http://localhost:8080/v0/connectors/product-tools

Attach input and output connector

A Feldera pipeline consists of an SQL program with input and output connectors attached to it. The user lists connectors to be attached to the pipeline as part of the pipeline configuration. In this example, we create pipeline supply-chain-pipeline with program supply-chain-program. The pipeline attaches input connector product-tools to table product and output connector kafka-average-price to view average_price.

curl -i -X PUT http://localhost:8080/v0/pipelines/supply-chain-pipeline \
-H 'Content-Type: application/json' \
-d '{
"description": "Supply Chain pipeline",
"program_name": "supply-chain-program",
"config": {"workers": 4},
"connectors": [
{
"connector_name": "product-tools",
"is_input": true,
"name": "product-tools",
"relation_name": "product"
},
{
"connector_name": "kafka-average-price",
"is_input": false,
"name": "kafka-average-price",
"relation_name": "average_price"
}
]
}'

Python (direct API calls)

Retrieve list of connectors while providing authorization:

import requests

api_url = "http://localhost:8080"
headers = { "authorization": f"Bearer <API-KEY>" }

for connector in requests.get(f"{api_url}/v0/connectors", headers=headers).json():
print(connector)

Additional resources

For more information, see: