Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: Visualisation of my changes #14

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@
.idea
*.iml
.vscode
.metals
.bloop

# Scala related files
scala/project
scala/target
scala/project/target/
scala/project/project/target/
scala/target/stream/*
scala/src/test/scala/Playground.scala
# Avoid checking in proprietary jars
scala/lib
.bsp

# ignore build files
**/build
**/dist

# Mac files
.DS_Store
.DS_Store
102 changes: 12 additions & 90 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Apart from utilising existing high-level implementations, a couple of implementa

The thesis that this project laid the foundation for can be found here: http://www.diva-portal.org/smash/get/diva2:1695672/FULLTEXT01.pdf.

# This project is not being actively developed or maintained. If you wish to continue development on this project, feel free to make a fork!

## Table of contents

Expand All @@ -15,11 +14,7 @@ The thesis that this project laid the foundation for can be found here: http://w
- [Scala (Spark)](#scala-spark)
- [Python (PySpark)](#python-pyspark)
- [Quickstart (Python)](#quickstart-python)
1. [Creating the context](#1-creating-the-context)
2. [Performing a PIT join](#2-performing-a-pit-join)
1. [Early stop sort merge](#21-early-stop-sort-merge)
2. [Union merge](#22-union-merge)
3. [Exploding PIT join](#23-exploding-pit-join)
- [Early stop sort merge](#early-stop-sort-merge)
- [QuickStart (Scala)](#quickstart-scala)
- [Early stop sort merge](#early-stop-sort-merge)
- [Union ASOF merge](#union-asof-merge)
Expand All @@ -29,7 +24,7 @@ The thesis that this project laid the foundation for can be found here: http://w

| Dependency | Version |
| --------------- | ------- |
| Spark & PySpark | 3.2 |
| Spark & PySpark | 3.5.0 |
| Scala | 2.12 |
| Python | >=3.6 |

Expand All @@ -45,6 +40,9 @@ To make the artifacts available for the executors, set the configuration propert

Alternatively, set the configuration property `spark.jars` to include the path to the jar-file to make it available for both the driver and executors.

Additionally set `spark.sql.extensions` to include `io.github.ackuq.pit.SparkPIT`. This config
is a comma separated string.

### Python (PySpark)

Configure Spark using the instructions as observed in [the previous section](#scala-spark).
Expand All @@ -57,73 +55,28 @@ pip install spark-pit

## Quickstart (Python)

### 1. Creating the context

The object `PitContext` is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:

```py
from pyspark import SQLContext
from ackuq.pit import PitContext

sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
```

### 2. Performing a PIT join

There are currently 3 ways of executing a PIT join, using an early stop sort merge, union merge algorithm, or with exploding intermediate tables.

#### 2.1. Early stop sort merge

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
```

#### 2.2. Union merge
### Early stop sort merge

```py
pit_join = pit_context.union(
left=df1,
right=df2,
left_prefix="df1_",
right_prefix="df2_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
```
from ackuq.pit.joinPIT import joinPIT

#### 2.3. Exploding PIT join

```py
pit_join = pit_context.exploding(
left=df1,
right=df2,
left_ts_column=df1["ts"],
right_ts_column=df2["ts"],
partition_cols = [df1["id"], df2["id"]],
)
pit_join = joinPIT(spark, df1, df2, df1.ts, df2.ts, (df1.id == df2.id))
```

## Quickstart (Scala)

Instead of using a context, which is done in the Python implementation, all of the functionality is divided into objects.

### Early stop sort merge

```scala
import io.github.ackuq.pit.EarlyStopSortMerge.{pit, init}
import org.apache.spark.sql.functions.lit
import io.github.ackuq.pit.EarlyStopSortMerge.joinPIT

// Pass the spark session, this will register the required stratergies and optimizer rules.
init(spark)

val pitJoin = df1.join(df2, pit(df1("ts"), df2("ts"), lit(0)) && df1("id") === df2("id"))
val pitJoin = joinPIT(df1, df2, df1("ts"), df2("ts"), df1("id") === df2("id"), "inner", 0)
```

#### Adding tolerance

The UDF takes a third argument (required) for tolerance, when this argument is set to a non-null value, the PIT join does not return matches where the timestamp differ by a specific value. E.g. setting the third argument to `lit(3)` would only accept PIT matches that differ by at most 3 time units.
The final argument is the tolerance, when this argument is set to a non-zero value, the PIT join does not return matches where the timestamps differ by more than the specific value. E.g. setting tolerance to `3` would only accept PIT matches that differ by at most 3 time units.

#### Left outer join

Expand All @@ -132,36 +85,5 @@ The default join type for PIT joins are inner joins, but if you'd like to keep a
Usage:

```scala
val pitJoin = df1.join(
df2, pit(df1("ts"), df2("ts"), lit(0)) && df1("id") === df2("id"),
"left"
)
```

### Union merge

```scala
import io.github.ackuq.pit.Union

val pitJoin = Union.join(
df1,
df2,
leftPrefix = Some("df1_"),
rightPrefix = "df2_",
partitionCols = Seq("id")
)
```

### Exploding PIT join

```scala
import io.github.ackuq.pit.Exploding

val pitJoin = Exploding.join(
df1,
df2,
leftTSColumn = df1("ts"),
rightTSColumn = df2("ts"),
partitionCols = Seq((df1("id"), df2("id")))
)
val pitJoin = joinPIT(df1, df2, df1("ts"), df2("ts"), df1("id") === df2("id"), "left")
```
70 changes: 0 additions & 70 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,76 +8,6 @@ This projects aims to expose different ways of executing PIT-joins, also called

In order to run this project in PySpark, you will need to have the JAR file of the Scala implementation be available inside you Spark Session.

## Quickstart

### 1. Creating the context

The object `PitContext` is the entrypoint for all of the functionality of the lirary. You can initialize this context with the following code:

```py
from pyspark import SQLContext
from ackuq.pit import PitContext

sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
```

### 2. Performing a PIT join

There are currently 3 ways of executing a PIT join, using an early stop sort merge, union asof algorithm, or with exploding intermediate tables.

#### 2.1. Early stop sort merge

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts) & (df1.id == df2.id))
```

##### 2.1.2. Adding tolerance

In this implementation, tolerance can be added to not allow matches whose timestamp differ by at most some value. To utilize this, set the third argument of the UDF to the desired integer value of the maximum different between two timestamps.

```py
pit_join = df1.join(df2, pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id))
```

##### 2.1.3. Left outer join

Left outer joins are supported in this implementation, the main difference between a regular inner join and a left outer join is that whether or not a left row gets matched with a right row, it will still be a part of the resulting table. In the resulting table, all the left rows that did not find a match have the values of the right columns set to `null`.

```py
pit_join = df1.join(
df2,
pit_context.pit_udf(df1.ts, df2.ts, 100) & (df1.id == df2.id),
"left"
)
```

#### 2.2. Union merge

```py
pit_join = pit_context.union(
left=df1,
right=df2,
left_prefix="df1_",
right_prefix="df2_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
```

#### 2.3. Exploding PIT join

```py
pit_join = pit_context.exploding(
left=df1,
right=df2,
left_ts_column=df1["ts"],
right_ts_column=df2["ts"],
partition_cols = [df1["id"], df2["id"]],
)
```

## Development

### Testing
Expand Down
2 changes: 1 addition & 1 deletion python/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.6.1-tom
2 changes: 1 addition & 1 deletion python/ackuq/pit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
# SOFTWARE.
#

from ackuq.pit.context import PitContext # noqa: F401
from ackuq.pit.joinPIT import joinPIT # noqa: F401
Loading