# Database computations on Z-sets

# Database computations on Z-sets (part 1)

## Introduction

In a couple of blog posts we have introduced Z-sets as a mathematical representation for database tables and *changes* to database tables, and we have given a simple Java implementation of the basic Z-set operations. In this blog post we build a few additional methods for Z-sets which enable us to perform traditional database computations.

Recall that a Z-set is just a table where each row has an attached integer weight, which can be either positive or negative. We can then define operations of addition and negation for Z-sets, where the sum of two Z-sets contains all rows in either Z-set; the weight of a row in the result is the sum of the row's weights in both sets.

It helps to state exactly we want to achieve: our goal is the following: let's say we have a database table `I`

and a query `Q`

computing on table `I`

to produce another table `O`

.

The Java implementation has a method for converting a collection into a Z-set (the `public ZSet(Collection<Data> data, WeightType<Weight> weightType)`

, and another method `toCollection`

that produces a collection from a ZSet.

We say that a computation `QZ`

on Z-sets is *correct* if we have the relationship depicted in the following diagram:1

In words: if we execute query `Q`

on input `I`

we get the exact same result as if we convert `I`

to a Z-set, compute using `Q Z`

, and then convert the result to a table. This should be true for any input table `I`

.

Notice that this rule does not say anything about what `Q Z`

does when the input is a Z-set that does not represent a table. For these cases our function can do anything. But we will strategically chose implementations that give useful results even when the weights are negative.

## Implementing database computations on Z-sets

All the code in this blog post is available in the Feldera repository in the `org.dbsp.simulator`

Java namespace. In particular, all the following methods are in `collections/ZSet.java`

.

### Filtering (SQL `WHERE`

)

For the first task we consider filtering, which is expressed in SQL as the `WHERE`

clause in a query, e.g.: `SELECT * FROM T WHERE T.COL > 10`

. Which rows of the input table are preserved in the result is decided by a *predicate*, which is a function that computes a Boolean value for each row of a table. In the previous example the predicate is `T.COL > 10`

. In Java we will use the `Predicate<T>`

interface to describe a predicate. The filtering function on Z-sets can be written as:

```
public class ZSet<Data, Weight> {
// extending existing class
public ZSet<Data, Weight> filter(Predicate<Data> keep) {
Map<Data, Weight> result = new HashMap<>();
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
Weight weight = entry.getValue();
if (keep.test(entry.getKey()))
result.put(entry.getKey(), weight);
}
return new ZSet<>(result, this.weightType);
}
}
```

The `filter`

function just applies the predicate to each entry in a Z-set; when the predicate is true the entry is added to the result, with *its original weight* (even if the weight is negative). This clearly gives the correct result when weights are all positive.

### Transforming (SQL `SELECT`

)

Let us consider a SQL query with a `SELECT`

statement, without aggregation or grouping. The `SELECT`

statement is followed by a list of expressions that are evaluated for each row of the source collection. The expressions can be simple column expressions `SELECT T.COLUMN1 FROM T`

or they can be complex arithmetic expressions that involve multiple columns: `SELECT T.COL1 + T.COL2 FROM T`

. We define a corresponding method for Z-sets called `map`

: map is given by an arbitrary function, implementing the Java `Function<T, R>`

interface.

```
public class ZSet<Data, Weight> {
// extending existing class
public <OData> ZSet<OData, Weight> map(Function<Data, OData> tupleTransform) {
Map<OData, Weight> result = new HashMap<>();
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
Weight weight = entry.getValue();
OData out = tupleTransform.apply(entry.getKey());
result.merge(out, weight, this::merger);
}
return new ZSet<>(result, this.weightType);
}
}
```

Similar to the `filter`

function, the `map`

function is applied to each row and copies the weight of the source row to the destination row.

Let us notice two properties of `map`

:

- applying
`map`

to a Z-set with positive weights will also produce a Z-set with positive weights. - applying
`map`

to a Z-set representing a set (where all the weights are 1) may produce a Z-set where some weights are greater than 1. This is true in SQL as well, where`SELECT 1 FROM T`

will produce a multiset with as many`1`

values as there are rows in`T`

.

### Cartesian products

The Cartesian product of two sets builds all pairs of elements from two sets. In SQL this can be expressed in multiple ways. The simplest one is to just use multiple tables in the `FROM`

clause: `SELECT T.COL1, S.COL1 FROM T, S`

. This is also called a cross-join.

We implement a slight generalization of the Cartesian product for Z-sets, where the final result is obtained by applying a user-supplied function to each pair of rows. For this purpose we use the Java `BiFunction<T, U, R>`

interface.

We call our Z-set operation `multiply`

:

```
public class ZSet<Data, Weight> {
// extending existing class
public <OtherData, Result> ZSet<Result, Weight> multiply(
ZSet<OtherData, Weight> other,
BiFunction<Data, OtherData, Result> combiner) {
ZSet<Result, Weight> result = new ZSet<>(this.weightType);
for (Map.Entry<Data, Weight> entry: this.data.entrySet()) {
for (Map.Entry<OtherData, Weight> otherEntry: other.data.entrySet()) {
Result data = combiner.apply(entry.getKey(), otherEntry.getKey());
Weight weight = this.weightType.multiply(entry.getValue(), otherEntry.getValue());
result.append(data, weight);
}
}
return result;
}
}
```

There are two reasons this operation is called `multiply`

:

- the number of rows in the result Z-set is the product of the number of rows in the multiplied Z-sets
- the weight of each row in the result Z-set is the multiplication of the weights of the corresponding rows in the source Z-sets

### SQL `UNION`

The SQL `UNION`

operation can be applied to two tables that have the same schema (same columns). It produces a table that contains all the rows that appear in either table. There are two forms of `UNION`

in SQL: `UNION ALL`

, which keeps duplicate records, and `UNION`

, which strips away duplicates.

It turns out that we already have implemented the required operations on Z-sets: `UNION ALL`

is the `add`

method, while `UNION`

is `add`

followed by a call to `distinct`

.

```
public class ZSet<Data, Weight> {
// extending existing class
public ZSet<Data, Weight> union(ZSet<Data, Weight> other) {
return this.add(other).distinct();
}
public ZSet<Data, Weight> union_all(ZSet<Data, Weight> other) {
return this.add(other);
}
}
```

### SQL `EXCEPT`

The SQL `EXCEPT`

operation is only defined for two tables with the same schema, and where both are sets (no duplicates allowed). This operation keeps all rows from the first table which do not appear in the second table. We already have all the ingredients to build `except`

:

```
public class ZSet<Data, Weight> {
// extending existing class
public ZSet<Data, Weight> union(ZSet<Data, Weight> other) {
return this.add(other).distinct();
}
public ZSet<Data, Weight> union_all(ZSet<Data, Weight> other) {
return this.add(other);
}
}
```

We need to apply the operation `distinct`

three times: once for each input, to make sure that the inputs are sets, and once for the output, to eliminate rows with negative weights. (If we know that an input is always a set, then the corresponding `distinct`

can be omitted.)`

### Aggregations

An aggregation function in SQL transforms a collection into a single value. The most common aggregation functions are `COUNT`

, `SUM`

, `AVG`

, `MIN`

, `MAX`

. Each of these operations can be expressed as a loop that keeps updating an accumulator, with logic defined by the following pseudo-code:

```
accumulator = initialValue;
foreach (row in collection) {
accumulator = update(accumulator, row)
}
return finalize(accumulator)
```

`initialValue`

is the result produced when the collection is empty. The `finalize`

step is used for computing aggregates like "average," which will use it to divide the sum of values by their count.

To compute on Z-sets we will generalize this scheme slightly to allow aggregating values with a weight. We define a helper class `AggregateDescription`

which bundles three pieces of information:

```
public class AggregateDescription<Result, IntermediateResult, Data, Weight> {
public final IntermediateResult initialValue;
public final TriFunction<IntermediateResult, Data, Weight, IntermediateResult> fold;
public final Function<IntermediateResult, Result> finalize;
public AggregateDescription(IntermediateResult initialValue,
TriFunction<IntermediateResult, Data, Weight, IntermediateResult> update,
Function<IntermediateResult, Result> finalize) {
this.initialValue = initialValue;
this.update = update;
this.finalize = finalize;
}
}
```

Unfortunately Java does not have a `TriFunction`

interface similar to `BiFunction`

, so we had to define our own in a separate file. The definition is very simple:

```
@FunctionalInterface
public interface TriFunction<T, U, V, R> {
R apply(T var1, U var2, V var3);
}
```

With these preparations the implementation of `aggregate`

looks very much the pseudo-code above:

```
public class ZSet<Data, Weight> {
// extending existing class
public <Result, IntermediateResult> Result aggregate(
AggregateDescription<Result, IntermediateResult, Data, Weight> aggregate) {
IntermediateResult result = aggregate.initialValue;
for (Map.Entry<Data, Weight> entry : this.data.entrySet()) {
Weight weight = entry.getValue();
result = aggregate.update.apply(result, entry.getKey(), weight);
}
return aggregate.finalize.apply(result);
}
}
```

In a future installment we will talk about additional computations on Z-sets, such as `GROUP-BY`

and `JOIN`

.

## Putting everything together

We have built a lot of infrastructure, but we have never seen exactly how to use it to compute on concrete data.

### Reading and writing Z-sets from CSV data

We start by writing two helper functions to read and write positive Z-Sets from CSV (comma-separated values) data. We use serialization facilities from the `jackson`

project. The following two functions read/write POJO (Plain Old Java Objects) from/into CSV strings, including headers:

```
public static <T> ZSet<T, Integer> fromCSV(String data, Class<T> tclass) {
try {
CsvMapper mapper = new CsvMapper();
CsvSchema schema = CsvSchema.emptySchema().withHeader();
MappingIterator<T> it = mapper.readerFor(tclass)
.with(schema)
.readValues(data);
List<T> collection = new ArrayList<>();
it.readAll(collection);
return new ZSet<>(collection, IntegerWeight.INSTANCE);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
public <T> String toCsv(ZSet<T, Integer> data, Class<T> tclass) {
try {
Collection<T> values = data.toCollection();
CsvMapper mapper = new CsvMapper();
final CsvSchema schema = mapper.schemaFor(tclass).withUseHeader(true);
ObjectWriter writer = mapper.writer(schema);
StringWriter str = new StringWriter();
writer.writeValue(str, values);
return str.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
```

If you want to compute on a database of persons you need to define a corresponding `Person`

Java class:

```
@JsonPropertyOrder({"name", "age"})
public class Person {
@Nullable
public String name;
public int age;
@SuppressWarnings("unused")
public Person() {}
public Person(String name, int age) { ... }
@Override
public String toString() { ... }
@Override
public boolean equals(Object o) { ... }
@Override
public int hashCode() { ... }
}
```

Remember that Z-sets use `HashMap`

s to store the data, so every value that appears in a Z-set must implement proper `equals`

and `hashCode`

Java methods!

We can easily read Persons from CSV data:

```
public static ZSet<Person, Integer> getPersons() {
String data = "name,age\n" +
"Billy,28\n" +
"Barbara,36\n" +
"John,12\n";
return fromCSV(data, Person.class);
}
```

### Unit tests for Z-set operations

And now we can write a unit test for `where`

:

```
@Test public void testWhere() {
ZSet<Person, Integer> input = getPersons();
ZSet<Person, Integer> adults = input.filter(p -> p.age >= 18);
ZSet<Person, Integer> expected = fromCSV("name,age\n" +
"Billy,28\n" +
"Barbara,36\n", Person.class);
Assert.assertTrue(expected.equals(adults));
}
```

We are using a helper function in Z-sets to check that two Z-sets are equal:

```
public class ZSet<Data, Weight> {
// extending existing class
public boolean equals(ZSet<Data, Weight> other) {
return this.subtract(other).isEmpty();
}
}
```

Here are some tests for set operations:

```
@Test public void testExcept() {
ZSet<Person, Integer> input = getPersons();
ZSet<Person, Integer> adults = input.filter(p -> p.age >= 18);
ZSet<Person, Integer> children = input.except(adults);
ZSet<Person, Integer> expected = fromCSV("name,age\n" +
"John,12\n", Person.class);
Assert.assertTrue(expected.equals(children));
}
@Test public void testUnion() {
ZSet<Person, Integer> input = getPersons();
ZSet<Person, Integer> adults = input.filter(p -> p.age >= 18);
ZSet<Person, Integer> children = input.except(adults);
ZSet<Person, Integer> all = adults.union(children);
Assert.assertTrue(input.equals(all));
}
@Test public void testUnionAll() {
ZSet<Person, Integer> input = getPersons();
ZSet<Person, Integer> adults = input.filter(p -> p.age >= 18);
ZSet<Person, Integer> all = input.union_all(adults);
Integer johnCount = all.getWeight(new Person("John", 12));
Assert.assertEquals(1, (int)johnCount);
Integer billyCount = all.getWeight(new Person("Billy", 28));
Assert.assertEquals(2, (int)billyCount);
Integer tomCount = all.getWeight(new Person("Tom", 28));
Assert.assertEquals(0, (int)tomCount);
}
```

If we want to write a unit test for `SELECT`

we need to define an additional helper class for the results produced:

```
class Age {
public final int age;
...
}
@Test public void testSelect() {
ZSet<Person, Integer> input = getPersons();
ZSet<Age, Integer> ages = input.map(p -> new Age(p.age));
ZSet<Age, Integer> expected = fromCSV("age\n28\n36\n12\n", Age.class);
Assert.assertTrue(expected.equals(ages));
}
```

### Testing aggregates

Let's us build four aggregate computations, in increasing order of complexity:

```
@Test public void testAggregate() {
ZSet<Person, Integer> input = getPersons();
AggregateDescription<Integer, Integer, Person, Integer> count =
new AggregateDescription<>(0, (a, p, w) -> a + w, r -> r);
Integer personCount = input.aggregate(count);
Assert.assertEquals(3, (int)personCount);
AggregateDescription<Integer, Integer, Person, Integer> sum =
new AggregateDescription<>(
0, (a, p, w) -> a + p.age * w, r -> r);
Integer ageSum = input.aggregate(sum);
Assert.assertEquals(76, (int)ageSum);
AggregateDescription<Integer, Integer, Person, Integer> max =
new AggregateDescription<>(
0, (a, p, w) -> Math.max(a, p.age), r -> r);
Integer maxAge = input.aggregate(max);
Assert.assertEquals(36, (int)maxAge);
AggregateDescription<Integer, AvgHelper, Person, Integer> avg =
new AggregateDescription<>(
new AvgHelper(0, 0),
(a, p, w) -> new AvgHelper(a.count + w, a.sum + p.age * w),
r -> r.sum / r.count);
Integer avgAge = input.aggregate(avg);
Assert.assertEquals(25, (int)avgAge);
}
```

Notice how the aggregates use the weight of the records:

`COUNT`

adds up the weights. Clearly, if an item has a weight of 2 it has to be counted as 2 items.`SUM`

multiples each value by its weight. Clearly, adding a value of 5 with a weight of 2 should add 5*2 = 10 to the sum.`MAX`

ignores the weights altogether and just computes on the supplied values. Duplicate values do not influence the result of`MAX`

.`AVG`

uses a simple helper class`AvgHelper`

(not shown here) to keep both a count and a sum, and then uses the`finalize`

step of the aggregation to perform the division.

And finally, let's write a test for `multiply`

; this test uses another helper class `NameAddress`

:

```
@Test
public void testMultiply() {
ZSet<Person, Integer> input = getPersons();
ZSet<Address, Integer> address = getAddress();
ZSet<NameAddress, Integer> product = input.multiply(address, (p, a) -> new NameAddress(p.name, a.city));
System.out.println(product);
Assert.assertEquals(input.entryCount() * address.entryCount(), product.entryCount());
}
We have printed the value of product using a helper toString() method for Z-sets which we may discuss in a future blog post:
{
NameAddress{name='Billy', address='Seattle'} => 1,
NameAddress{name='John', address='New York'} => 1,
NameAddress{name='John', address='Miami'} => 1,
NameAddress{name='John', address='San Francisco'} => 1,
NameAddress{name='Barbara', address='Miami'} => 1,
NameAddress{name='Billy', address='San Francisco'} => 1,
NameAddress{name='Billy', address='New York'} => 1,
NameAddress{name='Barbara', address='Seattle'} => 1,
NameAddress{name='Billy', address='Miami'} => 1,
NameAddress{name='Barbara', address='San Francisco'} => 1,
NameAddress{name='Barbara', address='New York'} => 1,
NameAddress{name='John', address='Seattle'} => 1
}
```

## Conclusions

In this blog post we have implemented a set of basic SQL computations and shown how they can be performed on data represented as Z-sets. We have started by defining a criterion which tells us when a Z-set computation correctly emulates a database computation. Z-sets are strictly more expressive than traditional database relations or multisets -- for example, Z-sets can describe collections where elements appear a negative number of times. We require our implementations of database computations to be correct for Z-sets that have positive weights.

We have implemented the following operators: `SELECT`

, `WHERE`

, `CROSS JOIN`

, `UNION`

, `EXCEPT`

, and aggregations. A remarkable feature of this implementation is how short the code for each of these operations is: none of the implementations has more than 10 lines of Java. However, let us remember that these implementations are not optimized for performance. However, it is still handy to have a simple implementation around: if we build a more complicated one, optimized for speed or memory, we can compare the behavior with the simple one to more easily find bugs.

In the next blog post in the series we implement additional operations, such as GROUP BY, JOINS, and aggregations on groups.

## Footnotes

- Mathematicians call such diagrams "commutative diagrams": no matter which path you take from a source to a sink in this diagram, you should obtain the same result.
- SQL is very peculiar in this respect: for some functions like
`COUNT(*)`

the result for an empty collection is 0, but for functions like`COUNT()`

or`SUM`

the result is`NULL`

for a collection which has only`NULL`

values; doing this properly will require some additional machinery. We have not discussed at all how to implement`NULL`

values, so far all our Z-set operations work with an abstract`Data`

type; we will discuss NULLs in a future blog post. Here we build an implementation which returns 0 for an empty set.)

### Other articles you may like

### Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

### Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.

### Incremental Update 2

A quick overview of what's new in v0.21.