Skip to content

Commit

Permalink
refactor: introduce abstraction for passing window's time clauses
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Mar 10, 2020
1 parent 795fc08 commit bc6bc8d
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.confluent.ksql.execution.streams.ExecutionStepFactory;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
import io.confluent.ksql.execution.windows.SessionWindowExpression;
import io.confluent.ksql.execution.windows.WindowTimeClause;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.metastore.model.KeyField;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class SchemaKGroupedStreamTest {
);

private static final KsqlWindowExpression KSQL_WINDOW_EXP = new SessionWindowExpression(
100, TimeUnit.SECONDS
new WindowTimeClause(100, TimeUnit.SECONDS)
);

private static final List<ColumnName> NON_AGGREGATE_COLUMNS = ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,71 +21,51 @@
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.serde.WindowInfo;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Immutable
public class HoppingWindowExpression extends KsqlWindowExpression {

private final long size;
private final TimeUnit sizeUnit;
private final long advanceBy;
private final TimeUnit advanceByUnit;
private final WindowTimeClause size;
private final WindowTimeClause advanceBy;

public HoppingWindowExpression(
final long size,
final TimeUnit sizeUnit,
final long advanceBy,
final TimeUnit advanceByUnit
final WindowTimeClause size,
final WindowTimeClause advanceBy
) {
this(Optional.empty(),
size,
sizeUnit,
advanceBy,
advanceByUnit,
Optional.empty(),
Optional.empty());
}

public HoppingWindowExpression(
final Optional<NodeLocation> location,
final long size,
final TimeUnit sizeUnit,
final long advanceBy,
final TimeUnit advanceByUnit,
final Optional<Duration> retention,
final Optional<Duration> gracePeriod
final WindowTimeClause size,
final WindowTimeClause advanceBy,
final Optional<WindowTimeClause> retention,
final Optional<WindowTimeClause> gracePeriod
) {
super(location, retention, gracePeriod);
this.size = size;
this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit");
this.advanceBy = advanceBy;
this.advanceByUnit = requireNonNull(advanceByUnit, "advanceByUnit");
this.size = requireNonNull(size, "size");
this.advanceBy = requireNonNull(advanceBy, "advanceBy");
}

@Override
public WindowInfo getWindowInfo() {
return WindowInfo.of(
WindowType.HOPPING,
Optional.of(Duration.ofNanos(sizeUnit.toNanos(size)))
Optional.of(size.toDuration())
);
}

public TimeUnit getSizeUnit() {
return sizeUnit;
}

public long getSize() {
public WindowTimeClause getSize() {
return size;
}

public TimeUnit getAdvanceByUnit() {
return advanceByUnit;
}

public long getAdvanceBy() {
public WindowTimeClause getAdvanceBy() {
return advanceBy;
}

Expand All @@ -96,13 +76,16 @@ public <R, C> R accept(final WindowVisitor<R, C> visitor, final C context) {

@Override
public String toString() {
return " HOPPING ( SIZE " + size + " " + sizeUnit + " , ADVANCE BY "
+ advanceBy + " " + "" + advanceByUnit + " ) ";
return " HOPPING ( SIZE " + size + " , "
+ "ADVANCE BY " + advanceBy
+ retention.map(w -> " , RETENTION " + w).orElse("")
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("")
+ " ) ";
}

@Override
public int hashCode() {
return Objects.hash(size, sizeUnit, advanceBy, advanceByUnit);
return Objects.hash(size, advanceBy, retention, gracePeriod);
}

@Override
Expand All @@ -114,8 +97,9 @@ public boolean equals(final Object o) {
return false;
}
final HoppingWindowExpression hoppingWindowExpression = (HoppingWindowExpression) o;
return hoppingWindowExpression.size == size && hoppingWindowExpression.sizeUnit == sizeUnit
&& hoppingWindowExpression.advanceBy == advanceBy && hoppingWindowExpression
.advanceByUnit == advanceByUnit;
return Objects.equals(size, hoppingWindowExpression.size)
&& Objects.equals(advanceBy, hoppingWindowExpression.advanceBy)
&& Objects.equals(gracePeriod, hoppingWindowExpression.gracePeriod)
&& Objects.equals(retention, hoppingWindowExpression.retention);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@
import io.confluent.ksql.parser.Node;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.serde.WindowInfo;
import java.time.Duration;
import java.util.Optional;

@Immutable
public abstract class KsqlWindowExpression extends Node {

protected final Optional<Duration> retention;
protected final Optional<Duration> gracePeriod;
protected final Optional<WindowTimeClause> retention;
protected final Optional<WindowTimeClause> gracePeriod;

KsqlWindowExpression(final Optional<NodeLocation> nodeLocation,
final Optional<Duration> retention,
final Optional<Duration> gracePeriod) {
final Optional<WindowTimeClause> retention,
final Optional<WindowTimeClause> gracePeriod) {
super(nodeLocation);
this.retention = retention;
this.gracePeriod = gracePeriod;
}

public Optional<Duration> getRetention() {
public Optional<WindowTimeClause> getRetention() {
return retention;
}

public Optional<Duration> getGracePeriod() {
public Optional<WindowTimeClause> getGracePeriod() {
return gracePeriod;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,29 @@
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.serde.WindowInfo;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Immutable
public class SessionWindowExpression extends KsqlWindowExpression {

private final long gap;
private final TimeUnit sizeUnit;
private final WindowTimeClause gap;

public SessionWindowExpression(final long gap, final TimeUnit sizeUnit) {
this(Optional.empty(), gap, sizeUnit, Optional.empty(), Optional.empty());
public SessionWindowExpression(final WindowTimeClause gap) {
this(Optional.empty(), gap, Optional.empty(), Optional.empty());
}

public SessionWindowExpression(
final Optional<NodeLocation> location,
final long gap,
final TimeUnit sizeUnit,
final Optional<Duration> retention,
final Optional<Duration> gracePeriod
final WindowTimeClause gap,
final Optional<WindowTimeClause> retention,
final Optional<WindowTimeClause> gracePeriod
) {
super(location, retention, gracePeriod);
this.gap = gap;
this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit");
this.gap = requireNonNull(gap, "gap");
}

public TimeUnit getSizeUnit() {
return sizeUnit;
}

public long getGap() {
public WindowTimeClause getGap() {
return gap;
}

Expand All @@ -68,12 +59,15 @@ public <R, C> R accept(final WindowVisitor<R, C> visitor, final C context) {

@Override
public String toString() {
return " SESSION ( " + gap + " " + sizeUnit + " ) ";
return " SESSION ( " + gap
+ retention.map(w -> " , RETENTION " + w).orElse("")
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("")
+ " ) ";
}

@Override
public int hashCode() {
return Objects.hash(gap, sizeUnit);
return Objects.hash(gap, retention, gracePeriod);
}

@Override
Expand All @@ -85,6 +79,8 @@ public boolean equals(final Object o) {
return false;
}
final SessionWindowExpression sessionWindowExpression = (SessionWindowExpression) o;
return sessionWindowExpression.gap == gap && sessionWindowExpression.sizeUnit == sizeUnit;
return Objects.equals(gap, sessionWindowExpression.gap)
&& Objects.equals(gracePeriod, sessionWindowExpression.gracePeriod)
&& Objects.equals(retention, sessionWindowExpression.retention);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,37 @@
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.serde.WindowInfo;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Immutable
public class TumblingWindowExpression extends KsqlWindowExpression {

private final long size;
private final TimeUnit sizeUnit;
private final WindowTimeClause size;

public TumblingWindowExpression(final long size, final TimeUnit sizeUnit) {
this(Optional.empty(), size, sizeUnit, Optional.empty(), Optional.empty());
public TumblingWindowExpression(final WindowTimeClause size) {
this(Optional.empty(), size, Optional.empty(), Optional.empty());
}

public TumblingWindowExpression(
final Optional<NodeLocation> location,
final long size,
final TimeUnit sizeUnit,
final Optional<Duration> retention,
final Optional<Duration> gracePeriod
final WindowTimeClause size,
final Optional<WindowTimeClause> retention,
final Optional<WindowTimeClause> gracePeriod
) {
super(location, retention, gracePeriod);
this.size = size;
this.sizeUnit = requireNonNull(sizeUnit, "sizeUnit");
this.size = requireNonNull(size, "size");
}

@Override
public WindowInfo getWindowInfo() {
return WindowInfo.of(
WindowType.TUMBLING,
Optional.of(Duration.ofNanos(sizeUnit.toNanos(size)))
Optional.of(size.toDuration())
);
}

public TimeUnit getSizeUnit() {
return sizeUnit;
}

public long getSize() {
public WindowTimeClause getSize() {
return size;
}

Expand All @@ -71,12 +62,15 @@ public <R, C> R accept(final WindowVisitor<R, C> visitor, final C context) {

@Override
public String toString() {
return " TUMBLING ( SIZE " + size + " " + sizeUnit + " ) ";
return " TUMBLING ( SIZE " + size
+ retention.map(w -> " , RETENTION " + w).orElse("")
+ gracePeriod.map(g -> " , GRACE PERIOD " + g).orElse("")
+ " ) ";
}

@Override
public int hashCode() {
return Objects.hash(size, sizeUnit);
return Objects.hash(size, retention, gracePeriod);
}

@Override
Expand All @@ -88,6 +82,8 @@ public boolean equals(final Object o) {
return false;
}
final TumblingWindowExpression tumblingWindowExpression = (TumblingWindowExpression) o;
return tumblingWindowExpression.size == size && tumblingWindowExpression.sizeUnit == sizeUnit;
return Objects.equals(size, tumblingWindowExpression.size)
&& Objects.equals(gracePeriod, tumblingWindowExpression.gracePeriod)
&& Objects.equals(retention, tumblingWindowExpression.retention);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.windows;

import static java.util.Objects.requireNonNull;

import com.google.errorprone.annotations.Immutable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Pojo for a time clause added to a window expression
*/
@Immutable
public class WindowTimeClause {

private final long value;

private final TimeUnit unit;

public WindowTimeClause(final long value, final TimeUnit unit) {
this.value = value;
this.unit = requireNonNull(unit, "unit");
}

public Duration toDuration() {
return Duration.ofMillis(unit.toMillis(value));
}

public String toString() {
return value + " " + unit;
}

@Override
public int hashCode() {
return Objects.hash(value, unit);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final WindowTimeClause otherClause = (WindowTimeClause) o;
return otherClause.value == value && otherClause.unit == unit;
}
}
Loading

0 comments on commit bc6bc8d

Please sign in to comment.