How about combining a state-of-the-art real-time analytics engine with a state-of-the-art real-time feature store? In this post, we team up with our friends at Hopsworks.ai to do exactly that. We'll show you how to do real-time feature engineering with Feldera computing ML features and Hopsworks handling the storage layer.
We selected credit card fraud detection for this demo because of its stringent feature engineering requirements. Accurate fraud detection demands near-perfect feature freshness and low-latency feature serving. Feldera ensures the former, while Hopsworks guarantees the latter.
Let's start with a high-level overview of the feature engineering pipeline and the role of a feature store in it.
Anatomy of a feature engineering pipeline
At a high level any feature engineering pipeline consists of three parts:
- Storage. Stores raw input data and computed features.
- Compute. Computes feature vectors from raw data.
- ML. Performs model training and inference using feature vectors as inputs.
What makes feature engineering challenging is that this pipeline must operate in two modes. During model training, it computes, stores, and serves features from historical data in batch mode. During inference, the pipeline computes, stores, and serves features from real-time data in streaming mode. The two modes require different compute and storage engines. This is problematic because the capabilities of streaming engines differ from the ones of batch engines. As a result, developers must effectively build two separate feature pipelines.
Feldera eliminates the need for two compute engines by unifying batch and stream analytics. With Feldera, the user defines features once using SQL queries, and the platform evaluates these queries over batch or streaming inputs.
A feature store solves a similar problem for the storage component of the pipeline. A feature store is a database designed specifically for storing, organizing, and serving features, offering a unified API for batch and real-time data ingress and egress.
By combining Feldera with a modern feature store such as Hopsworks, users can reduce the complexity of building and operating feature pipelines, and focus on designing the best features for their applications.
Hopsworks
Hopsworks.ai is a data platform for ML with a Python-centric Feature Store and MLOps capabilities. Hopsworks is a modular platform. You can use it as a standalone Feature Store, you can use it to manage, govern, and serve your models, and you can even use it to develop and operate feature pipelines and training pipelines. Hopsworks brings collaboration for ML teams, providing a secure, governed platform for developing, managing, and sharing ML assets - features, models, training data, batch scoring data, logs, and more.
Architecture
In the rest of this article, we will build a feature engineering pipeline for real-time credit card fraud detection using Feldera and Hopsworks.
Feldera processes inputs in real-time and writes computed features to Hopsworks using the Hopsworks storage API. Internally, Hopsworks stores feature vectors in the offline store where they are available for model training. In addition, the most recent feature vectors are kept in the online store, where they can be looked up during real-time inference.
Prerequisites
This demo is structured as a series of Python notebooks. To run the notebooks, you will need a Hopsworks account, which you can create for free at https://app.hopsworks.ai/.
You will also need access to an instance of Feldera. You can either install it locally using Docker or create a free account at try.feldera.com.
Step 0. Data preparation
We use a synthetic data generator provided by Hopsworks to populate a Kafka topic with data that simulates a stream of credit card transactions. In reality this stream would arrive in real time from points of sale terminals. The generator also creates a set of user profiles, which we store as a feature group (see below) inside the Hopsworks feature store. For complete data preparation code, see Python notebook.
Step 1. Real-time feature computation
See Python notebook for complete implementation in this section.
Step 1.1. Create Hopsworks feature groups
Hopsworks organizes features into feature groups. A feature group is a set of related features with a common primary key. In this example, we define two feature groups:
COMBINED
- features that extend credit card transaction records with attributes extracted from the card holder's profile, such as their age at the time of the transaction and the number of days until the credit card expires.WINDOWED
- frequency of transactions and other metrics in the span of a few hours, modeled as hopping window aggregates.
Create Hopsworks feature groups
import hopsworks
from hsfs import engine
from hsfs.feature import Feature
import json
# Connect to Hopsworks.
project = hopsworks.login(host="c.app.hopsworks.ai", api_key_value=KEY)
kafka_api = project.get_kafka_api()
KAFKA_OUTPUT_TOPICS = ["transactions_fraud_streaming_fg_" + str(project.id), "transactions_aggs_fraud_streaming_fg_" + str(project.id)]
fs = project.get_feature_store()
# Create feature groups to store Feldera outputs.
# COMBINED - features that extend credit card transaction records with attributes extracted from the card
# holder's profile, such as their age at the time of the transaction and the number of days until the credit card expires.
combined_fg = fs.get_or_create_feature_group(
name=KAFKA_OUTPUT_TOPICS[0],
primary_key=["cc_num"],
online_enabled=True,
version=1,
topic_name=KAFKA_OUTPUT_TOPICS[0],
event_time="date_time",
stream=True,
features=[
Feature("tid", type="string"),
Feature("date_time", type="timestamp"),
Feature("cc_num", type="string"),
Feature("category", type="string"),
Feature("amount", type="double"),
Feature("latitude", type="double"),
Feature("longitude", type="double"),
Feature("city", type="string"),
Feature("country", type="string"),
Feature("fraud_label", type="int"),
Feature("age_at_transaction", type="int"),
Feature("days_until_card_expires", type="int"),
Feature("cc_expiration_date", type="timestamp"),
],
)
try:
combined_fg.save()
except Exception as e:
print(e)
if KAFKA_OUTPUT_TOPICS[0] not in [topic.name for topic in kafka_api.get_topics()]:
kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[0], json.loads(combined_fg.avro_schema))
kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[0], KAFKA_OUTPUT_TOPICS[0], 1, replicas=1, partitions=1)
# WINDOWED - frequency of transactions and other metrics in the span of a few hours, modeled as hopping window aggregates.
windowed_fg = fs.get_or_create_feature_group(
name=str(KAFKA_OUTPUT_TOPICS[1]),
primary_key=["cc_num"],
online_enabled=True,
version=1,
topic_name=KAFKA_OUTPUT_TOPICS[1],
event_time="date_time",
stream=True,
features=[
Feature("avg_amt", type="double"),
Feature("trans", type="bigint"),
Feature("stddev_amt", type="double"),
Feature("date_time", type="timestamp"),
Feature("cc_num", type="string"),
],
)
try:
windowed_fg.save()
except Exception as e:
print(e)
if KAFKA_OUTPUT_TOPICS[1] not in [topic.name for topic in kafka_api.get_topics()]:
kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[1], json.loads(windowed_fg.avro_schema))
kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[1], KAFKA_OUTPUT_TOPICS[1], 1, replicas=1, partitions=1)
Step 1.2. Create Feldera pipeline
We build a Feldera pipeline to transform raw transaction and profile data into
features. In Feldera, feature groups are modeled as SQL views. Thus, we create
a SQL program with two input tables (TRANSACTION
and PROFILE
), and two
output views, one for each feature group.
Declare input tables and feature views
# Create SQL program parameterized by source and sink connnector configurations.
def build_sql(transaction_source_config: str, combined_sink_config: str, windowed_sink_config: str) -> str:
return f"""
CREATE TABLE TRANSACTIONS(
tid STRING,
date_time TIMESTAMP,
cc_num STRING,
category STRING,
amount DOUBLE,
latitude DOUBLE,
longitude DOUBLE,
city STRING,
country STRING,
fraud_label INT
) WITH (
'connectors' = '[{transaction_source_config}]'
);
CREATE TABLE PROFILES(
cc_num STRING,
cc_provider STRING,
cc_type STRING,
cc_expiration_date STRING,
name STRING,
mail STRING,
birthdate TIMESTAMP,
age INT,
city STRING,
country_of_residence STRING
);
-- Convert credit card expiration date from MM/YY formatted string to a TIMESTAMP,
-- so that we can perform computations on it.
CREATE LOCAL VIEW CC_EXPIRATION as
SELECT
cc_num,
CAST(
CONCAT(
'20',
SUBSTRING(
cc_expiration_date,
4,
2
),
'-',
SUBSTRING(
cc_expiration_date,
1,
2
),
'-01 00:00:00'
) AS TIMESTAMP
) AS cc_expiration_date
FROM PROFILES;
-- Compute the age of the individual during the transaction, and the number of days until the
-- credit card expires from `PROFILES` and `TRANSACTIONS` tables.
CREATE VIEW COMBINED
WITH (
'connectors' = '[{combined_sink_config}]'
)
AS
SELECT
T1.*,
T2.cc_expiration_date,
TIMESTAMPDIFF(YEAR, T3.birthdate, T1.date_time) age_at_transaction,
TIMESTAMPDIFF(DAY, T1.date_time, T2.cc_expiration_date) days_until_card_expires
FROM
TRANSACTIONS T1 JOIN cc_expiration T2
ON
T1.cc_num = T2.cc_num
JOIN PROFILES T3
ON
T1.cc_num = T3.cc_num;
-- Create a 4 hour hopping window aggregation from data from transactions table
CREATE LOCAL VIEW HOP as
SELECT *
FROM TABLE(HOP(TABLE TRANSACTIONS, DESCRIPTOR(date_time), INTERVAL 4 HOURS, INTERVAL 1 HOURS));
-- Compute aggregates from it
CREATE LOCAL VIEW AGG as
SELECT
AVG(amount) AS avg_amt,
STDDEV(amount) as stddev_amt,
COUNT(cc_num) as trans,
ARRAY_AGG(date_time) as moments,
cc_num
FROM hop
GROUP BY cc_num, window_start;
-- Final output view
CREATE VIEW WINDOWED
WITH (
'connectors' = '[{windowed_sink_config}]'
)
AS
SELECT
avg_amt,
trans,
COALESCE(stddev_amt, 0) as stddev_amt,
date_time,
cc_num
FROM agg CROSS JOIN UNNEST(moments) as date_time;
"""
We use the Kafka topic created during the data prep step as the input for the
TRANSACTION
table. The output views are also connected to the Hopsworks
feature store via Kafka. Hopsworks ingests data from Kafka using the Avro
format, so we configure Feldera Kafka connectors with Avro schemas generated by
Hopsworks for each feature group.
Instantiate the program with input and output connectors
from feldera import FelderaClient, PipelineBuilder
# Connect to the Feldera API
# Use Feldera online sandbox
# client = FelderaClient("https://try.feldera.com", api_key = get_secret('FELDERA_API_KEY'))
# Use local Feldera instance
client = FelderaClient("http://localhost:8080")
# Get Hopsworks public Kafka servers.
kafka_config = engine.get_instance()._get_kafka_config(fs.id, {})
KAFKA_INPUT_TOPIC = "transactions_topic_" + str(project.id)
# Connect the Kafka topic created during data prep to the TRANSACTIONS table.
transaction_source_config = json.dumps({
"transport": {
"name": "kafka_input",
"config": kafka_config | {"topics": [KAFKA_INPUT_TOPIC], "auto.offset.reset": "earliest"}
},
"format": {
"name": "json",
"config": {
"update_format": "raw",
"array": False
}
}
})
def create_sink_config(kafka_config: dict, fg, project_id):
return kafka_config | {
"topic": fg.topic_name,
"auto.offset.reset": "earliest",
"headers": [
{
'key': 'projectId',
'value': str(project_id),
},
{
'key': 'featureGroupId',
'value': str(fg.id),
},
{
'key': 'subjectId',
'value': str(fg.subject["id"]),
},
]
}
# Set the output format to use the avro schema from the feature group.
combined_sink_config = json.dumps({
"transport": {
"name": "kafka_output",
"config": create_sink_config(kafka_config, combined_fg, project.id)
},
"format": {
"name": "avro",
"config": {
"schema": combined_fg.avro_schema,
"skip_schema_id": True
}
}
})
windowed_sink_config = json.dumps({
"transport": {
"name": "kafka_output",
"config": create_sink_config(kafka_config, windowed_fg, project.id)
},
"format": {
"name": "avro",
"config": {
"schema": windowed_fg.avro_schema,
"skip_schema_id": True
}
}
})
sql = build_sql(transaction_source_config, combined_sink_config windowed_sink_config)
pipeline = PipelineBuilder(client, "hopsworks_kafka", sql).create_or_replace()
Step 1.3. Run the pipeline
We are now ready to start the Feldera pipeline. Once the pipeline is
initialized, we populate the PROFILE
table from a Pandas dataframe. The
pipeline continuously ingests credit card transactions from Kafka and sends
computed features to Hopsworks. Hopsworks accumulates all historical features
in the offline store, while the latest features for each primary key are
instantly available for lookup in the online store.
# Start the Feldera pipeline.
pipeline.start()
# Read profile data from the feature store and write it to the `PROFILE`
# table.
profile_fg = fs.get_or_create_feature_group(
name="profile",
version=1
)
profile_df = profile_fg.read()
pipeline.input_pandas("PROFILES", profile_df)
Steps 2 and 3. Training and inference
We can now train an ML model using feature vectors accumulated in the offline store and run real-time inference using the online store. These steps are identical to other demos available from Hopsworks. We provide training and inference notebooks as part of the demo for completeness.
Takeaways
Feature engineering poses a unique challenge: how does one build a data pipeline that can operate in both batch and streaming modes? Feldera solves this problem for the compute stage of the pipeline by offering a unified engine for stream and batch analytics. In this demo we ingested transaction data from a streaming data source (Kafka), while reading profile data as a batch from the Hopsworks feature store. In general, we can use arbitrary combinations of streaming and batch inputs without changing a line in our SQL code.
In a similar vein, the Hopsworks team has built a unified storage engine optimized for ML workloads. By combining the two platforms, we obtain an end-to-end solution that:
Reduces the costly platform engineering required for real-time feature pipelines
Eliminates the need for writing two implementations of the same feature queries
Allows users to concentrate on designing the best features for their applications
We will continue exploring Hopsworks integration in a future blog post; in particular we will examine the performance achieved with Feldera and provide a comparison with Flink.