Skip to main content

Who's accessing your S3 buckets? SecOps solution using CloudTrail, EventBridge and Feldera

3 May, 2024

Abhinav Gyawali

Software Engineer

Security operations or SecOps combines IT processes and security teams to prevent and mitigate attacks. Having the right degree of visibility into your IT systems is key to make this work. There is an entire industry of security products that help enterprises towards this end, by collecting and analyzing a firehouse of raw data that can be used to infer security relevant insights.

In this blog post, we'll show you an end-to-end example of how you'd build such a solution using Feldera. Consider using Feldera if you'd prefer to build and iterate quickly on your own SecOps analytics.

We'll analyze event accesses to AWS S3 in real-time. The goal is to gather insights on which users are accessing sensitive buckets, and whether they have the right permissions to do so. Thanks to Michael Gasch for suggesting this idea!

Our ingredients for this recipe will be:

  • AWS CloudTrail, a service that helps log events for configured AWS Services. For example, if enabled for S3, CloudTrail generates a continuous log of events that records every operation against your S3 bucket. These events note the users/accounts, the bucket name, the type of operation and more.
  • AWS EventBridge, a serverless service that uses events to integrate different applications. It makes it extremely easy to pick up CloudTrail events and route them to...
  • Feldera, where we will analyze these events using SQL in real-time. In doing so, we'll demonstrate how Feldera can interoperate with a diverse ecosystem of data sources like EventBridge, run complex analytics in real-time and work with data sources that reside in different administrative domains.

We'll set this up in three steps. First, we'll configure CloudTrail and S3. Then we'll prepare a Feldera pipeline that is ready to receive events. Then we'll use EventBridge as the integration point. We will use Feldera to aggregate and analyze S3 access patterns by IP address, buckets, and even by time. In future blogposts, we'll also show how to integrate such analytics with historical data from your lakehouse.

EventBridge Integration Diagram

When we are all done, we can see the data updating live in the views. Error Analytics View

Set up CloudTrail for two S3 buckets

Go to your AWS Console, then search for CloudTrail (or go to https://<region>.console.aws.amazon.com/cloudtrailv2/home). Click on Create a trail, give the trail a name like tutorial-trail, and complete the form. We will refer to this name later.

Next, go to your AWS S3 console (e.g. https://<region>.console.aws.amazon.com/s3/get-started). Click on Create Bucket, give the bucket a name (which we will reference later), and complete the form. Once you've created the bucket, go back to the list of buckets (https://<region>.console.aws.amazon.com/s3/buckets?region=<region>), pick the bucket you just created, go to Properties, and under AWS CloudTrail data events, choose Configure in CloudTrail. You can see these steps in detail here.

Repeat these steps for two different S3 buckets.

Setup Feldera

With CloudTrail and S3 in place, let's login to the Feldera Cloud Sandbox. We'll start by creating a SQL program. Go to SQL Programs and then click Add SQL Program. Add the following program that will eventually receive CloudTrail events and compute some views on it in real-time.

CREATE TABLE s3_data
(
account STRING,
eventname STRING,
eventtime STRING,
eventsource STRING,
sourceip STRING,
useragent STRING,
eventid STRING,
eventtype STRING,
bucketname STRING,
id STRING,
requestid STRING,
region STRING,
usertype STRING,
userarn STRING,
userprincipalid STRING,
useraccountid STRING,
useraccesskeyid STRING,
sessioncreationdate STRING,
errorCode STRING,
errorMessage STRING
);


CREATE VIEW ip_frequency AS
SELECT
sourceip,
count(sourceip) as frequency
from s3_data
group by sourceip;

CREATE VIEW bucket_frequency AS
SELECT
bucketname,
count(bucketname) as frequency
from s3_data
group by bucketname;

CREATE VIEW eventname_frequency_by_minute AS
SELECT
*
FROM
(
SELECT
eventname,
count(eventname) as ct,
eventminute
FROM
(
SELECT
minute(
-- convert the timestamp to Feldera TIMESTAMP literal then cast to TIMESTAMP type
cast(
trim(
trailing 'Z'
FROM
replace(eventtime, 'T', ' ')
) as TIMESTAMP
)
) as eventminute,
eventname
from s3_data
)
GROUP BY (eventname, eventminute)
) PIVOT (
SUM(ct) FOR eventname IN (
'GetObject' as getobject,
'HeadObject' as headobject,
'ListObjects' as listobjects,
'PutObject' as putobject
)
);

CREATE VIEW errors AS
SELECT
substring(
userarn from position('/' in userarn) + 1
) as username,
region,
bucketname,
sourceip,
eventname,
cast(
trim(
trailing 'Z' FROM
replace(
eventtime, 'T', ' '
)
) AS TIMESTAMP
) as eventtimestamp,
errorcode,
errormessage
FROM s3_data
where errorcode != '' or errormessage != '';

CREATE VIEW error_analytics AS
SELECT
username,
sourceip,
count(*) as frequency,
TUMBLE_START(
eventtimestamp,
INTERVAL '5' MINUTES
) as tumble_start_time
FROM errors
GROUP BY
TUMBLE(
eventtimestamp,
INTERVAL '5' MINUTES
), username, sourceip;

With the program in place, let's now create a pipeline that makes use of it. Go to Pipelines and click Add Pipeline. Select the SQL program you just created (s3-data), and name the pipeline s3pipeline. We'll need to refer to this pipeline's name in EventBridge. Next, run the pipeline by clicking the play button.

Setup a Feldera API Key

We now need to make sure EventBridge has credentials to stream data to your Feldera pipeline. To do so, click on your profile on the top right. Go to Settings and click Generate new key. Generate the key after giving it a name like aws-eventbridge-demo. Copy the key somewhere and keep it safe.

Setup EventBridge

Create an EventBridge API destination

Go to the EventBridge console at https://<region>.console.aws.amazon.com/events/home?region=<region>#/apidestinations. Click on Create API destination and give it a name (feldera-demo).

Next, set the API destination endpoint to the ingress URL for the Feldera pipeline you created above: https://try.feldera.com/v0/pipelines/s3pipeline/ingress/s3_data. Here, s3pipeline is the pipeline name, and s3_data is the table you created above. Then configure the rest of the API destination as follows:

  1. Set the HTTP method to POST
  2. In the connection type, select Create a new connection
  3. Set the connection name: TryFelderaConnection
  4. Set the destination type as Other
  5. Set the Authorization type as API Key
    1. Set the API key name as: Authorization
    2. Set the Value as: Bearer <feldera-api-key>, using the API key contents that you'd saved earlier.
  6. Click on Create

Create an EventBridge Rule

  1. Go to: https://<region>.console.aws.amazon.com/events/home
  2. Click on Create Rule
  3. Give it a name: s3logstofeldera
  4. Select the default Event bus
  5. Set the rule type to: Rule with an event pattern
  6. Set the event source to: Other
  7. Set the creation method to Custom pattern (JSON editor)
  8. Set the event pattern to:
{
"source": ["aws.s3"],
"detail": {
"userAgent": [{
"anything-but": ["cloudtrail.amazonaws.com"]
}]
}
}
  1. Click on Next
  2. Set Target type as EventBridge API destination
  3. Select Use an exisitng API destination
  4. Select the previously created API destination: ToTryFeldera
  5. For query string parameters
    • Set Key to: format
    • Set Value to: json
  6. Click on additional settings
  7. Set the configure target input to: Input transformer
  8. Click on configure input transformer
  9. Set the Sample event type to: AWS events
  10. Search for S3 in the Sample events input section, and select: AWS API Call via CloudTrail - Simple Storage Service (S3)
  11. You should see a sample event where the source field says "aws.s3".
  12. Set the target input transformer as follows:
{
"account": "$.account",
"bucketName": "$.detail.requestParameters.bucketName",
"errorCode": "$.detail.errorCode",
"errorMessage": "$.detail.errorMessage",
"eventID": "$.detail.eventID",
"eventId": "$.detail.eventID",
"eventName": "$.detail.eventName",
"eventSource": "$.detail.eventSource",
"eventTime": "$.detail.eventTime",
"eventType": "$.detail.eventType",
"id": "$.id",
"region": "$.region",
"requestID": "$.detail.requestID",
"sessionCreationDate": "$.detail.userIdentity.sessionContext.attributes.creationDate",
"sessionMFA": "$.detail.userIdentity.sessionContext.attributes.mfaAuthenticated",
"sourceIP": "$.detail.sourceIPAddress",
"time": "$.time",
"userARN": "$.detail.userIdentity.arn",
"userAccessKeyId": "$.detail.userIdentity.accessKeyId",
"userAccountId": "$.detail.userIdentity.accountId",
"userPrincipalId": "$.detail.userIdentity.principalId",
"userType": "$.detail.userIdentity.type",
"useragent": "$.detail.userAgent"
}
  1. Set the template as follows:
{"insert":{"account":"<account>","time":"<time>","eventName":"<eventName>","eventTime":"<eventTime>","eventSource":"<eventSource>","sourceIP":"<sourceIP>","useragent":"<useragent>","eventId":"<eventId>","eventType":"<eventType>","bucketName":"<bucketName>","id":"<id>","requestID":"<requestID>","eventID":"<eventID>","region":"<region>","userType":"<userType>","userARN":"<userARN>","userPrincipalId":"<userPrincipalId>","userAccountId":"<userAccountId>","userAccessKeyId":"<userAccessKeyId>","sessionCreationDate":"<sessionCreationDate>","sessionMFA":"<sessionMFA>", "errorCode": "<errorCode>", "errorMessage": "<errorMessage>"}}

Note, we use the minified version for API destinations because Feldera's HTTP input currently accepts NDJSON (newline-delimited JSON) -- we do this so you can send a batch of JSON events at a time. See here to learn more. 21. We define an input transformer to keep the Feldera schema simple. This transformation can be done from within Feldera once JSON support lands. 22. Click confirm and create the Rule.

Let's generate some test data!

We'll simulate some users accessing S3 buckets next. We'll create two users with differing permissions.

Create three IAM users

  1. Go to AWS Console IAM Users: https://us-east-1.console.aws.amazon.com/iam/home?region=ap-southeast-2#/users
  2. Click on Create User
  3. Set the User name: elliotalderson
  4. In Set permissions, create a group: Select the AmazonS3FullAccess policy and give the group a name: s3users
  5. Create the user
  6. In the user details, create an access key
    1. Select Command Line Interface (CLI)
    2. Check the Conformation (I understand) part
    3. Give it a tag, describing what the access key is for
    4. Copy the access key
    5. Copy the secret access key
    6. Click done
  7. Go to create another user: mrrobot
  8. Set the group to: s3users
  9. Create the user
  10. Go to create another user: terminator
  11. In Set permissions, create a group: Select the AmazonS3ReadOnlyAccess policy and give the group a name: s3readonly
  12. Create the user
  13. Create an access key for both mrrobot and terminate, and copy the information

Configure AWS CLI

  1. Install AWS CLI: sudo apt install awscli
  2. Configure the profiles for the previously created users
    1. Run: aws configure --profile <username>
    2. Paste and set the access key and secret access key

Access the S3 objects using the different IAM users

Create a new file s3-accesses.py and run it python3 s3-accesses.py

import random
import os
import sys

PROFILES = ["elliotalderson", "mrrobot", "terminator"]
BUCKETS = ["bucket-1", "bucket-2"] # change to match the created bucket names
SUBCOMMANDS = ["ls", "cp"]

def select_profile():
return random.choices(PROFILES, weights = [50, 30, 20], k = 1)[0]

def select_bucket():
return random.choice(BUCKETS)

def run_command():
cmd_type = random.choice(SUBCOMMANDS)
profile = select_profile()
bucket = select_bucket()
filename = ""

if cmd_type == "ls":
command = subprocess.Popen(["aws", "s3", cmd_type, "s3://%s/" % bucket, "--profile", profile], stdout=subprocess.DEVNULL)
else:
filename = "file%s.txt" % str(random.randint(0, 10000))
subprocess.run("echo hello > %s" % filename, shell=True)
command = subprocess.Popen(["aws", "s3", cmd_type, filename, "s3://%s/" % bucket, "--profile", profile] stdout=subprocess.DEVNULL)
command.wait()

print(" ".join(command.args), ": return code: ", command.returncode)

if filename != "":
subprocess.run(["rm", filename])


runs = int(sys.argv[1]) if 1 < len(sys.argv) else 20

print(f"generating {runs} data points")

for _ in range(runs):
run_command()

Now go back to the Feldera Console and see the output view

  1. Go to: https://try.feldera.com/streaming/management/#s3pipeline
  2. Click on the eye icon in the Output table and explore the different views, you should see data coming to feldera and the views being updated
    1. ip_frequency shows the source IP addresses and total access count
    2. bucket_frequency shows the bucket name and total access count
    3. eventname_frequency_by_minute pivots on the different types of S3 events and shows their frequency per minute
    4. errors shows rows that contain an error message
    5. error_analytics shows analytics on the errors for a tumbling window of 5 minutes

Conclusion

If you're on AWS, you can conveniently use Feldera and EventBridge to analyze CloudTrail events from your data sources of choice, in real-time.

More broadly, we showed how Feldera can be used to engineer SecOps use cases, consuming data from diverse sources across administrative domains, and analyzing these results in real-time to produce insights. We've worked on a couple of these use cases already and are excited to see what our early users are coming up with. Don't hesitate to connect with us below to learn more.

Error Analytics View