Skip to content

Commit

Permalink
fix(filters): enhance DateFilter to support parsing simple date (#277)
Browse files Browse the repository at this point in the history
Resolves: #277
  • Loading branch information
fhussonnois committed Jun 2, 2022
1 parent d85ce86 commit 5b5b501
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2022 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.internal;

import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalField;
import java.time.temporal.TemporalQueries;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

public final class DateTimeParser {

private static final Function<ZoneId, ZonedDateTime> DEFAULT_ZONED_DATE_TIME =
zid -> ZonedDateTime.of(1970, 1, 1, 0, 0, 0, 0, zid);

private final DateTimeFormatter formatter;


/**
* Creates a new {@link DateTimeParser} instance.
*
* @param pattern the datetime formatter.
*/
public DateTimeParser(final String pattern) {
this(pattern, Locale.ROOT);
}

/**
* Creates a new {@link DateTimeParser} instance.
*
* @param pattern the datetime formatter.
*/
public DateTimeParser(final String pattern, final Locale locale) {
Objects.requireNonNull(pattern, "'pattern' should not be null");
Objects.requireNonNull(pattern, "'locale' should not be null");
this.formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern(pattern)
.toFormatter(locale);
}

public ZonedDateTime parse(final String datetime, final ZoneId zoneId) {
final TemporalAccessor parsed = formatter.parse(datetime);

// Get the target ZoneId from the parsed datetime, or default to the one passed in arguments.
final ZoneId parsedZoneId = TemporalQueries.zone().queryFrom(parsed);
final ZoneId atZonedId = Optional.ofNullable(parsedZoneId).orElse(zoneId);

// Get a new default ZonedDateTime for the ZoneID, then override each temporal field.
ZonedDateTime resolved = DEFAULT_ZONED_DATE_TIME.apply(atZonedId);
for (final TemporalField override : ChronoField.values()) {
if (parsed.isSupported(override)) {
final long value = parsed.getLong(override);
resolved = resolved.with(override, value);
}
}
return resolved;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,10 @@
import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments;
import io.streamthoughts.kafka.connect.filepulse.expression.function.EvaluatedExecutionContext;
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction;
import io.streamthoughts.kafka.connect.filepulse.internal.DateTimeParser;

import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalField;
import java.time.temporal.TemporalQueries;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

public class ToTimestamp implements ExpressionFunction {

Expand Down Expand Up @@ -89,43 +80,4 @@ public TypedValue invoke(final EvaluatedExecutionContext context) throws Express
}
};
}

static final class DateTimeParser {

private static final Function<ZoneId, ZonedDateTime> DEFAULT_ZONED_DATE_TIME =
zid -> ZonedDateTime.of(1970, 1, 1, 0, 0, 0, 0, zid);

private final DateTimeFormatter formatter;

/**
* Creates a new {@link DateTimeParser} instance.
*
* @param pattern the datetime formatter.
*/
public DateTimeParser(final String pattern) {
Objects.requireNonNull(pattern, "'pattern' should not be null");
this.formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern(pattern)
.toFormatter(Locale.ROOT);
}

public ZonedDateTime parse(final String datetime, final ZoneId zoneId) {
final TemporalAccessor parsed = formatter.parse(datetime);

// Get the target ZoneId from the parsed datetime, or default to the one passed in arguments.
final ZoneId parsedZoneId = TemporalQueries.zone().queryFrom(parsed);
final ZoneId atZonedId = Optional.ofNullable(parsedZoneId).orElse(zoneId);

// Get a new default ZonedDateTime for the ZoneID, then override each temporal field.
ZonedDateTime resolved = DEFAULT_ZONED_DATE_TIME.apply(atZonedId);
for (final TemporalField override : ChronoField.values()) {
if (parsed.isSupported(override)) {
final long value = parsed.getLong(override);
resolved = resolved.with(override, value);
}
}
return resolved;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext;
import io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers;
import io.streamthoughts.kafka.connect.filepulse.internal.DateTimeParser;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand All @@ -43,7 +43,9 @@ public class DateFilter extends AbstractRecordFilter<DateFilter> {

private Expression targetExpression;

private List<DateTimeFormatter> dtf;
private ZoneId zoneId;

private List<DateTimeParser> dtp;

/**
* {@inheritDoc}
Expand All @@ -61,15 +63,13 @@ public void configure(final Map<String, ?> props) {
throw new ConnectException("Invalid configuration, at least one date format must be provided");
}


zoneId = config.timezone();
dtp = new ArrayList<>(config.formats().size());
final Locale locale = config.locale();
final ZoneId timezone = config.timezone();
dtf = new ArrayList<>(config.formats().size());
for (String format : config.formats()) {
try {
final DateTimeFormatter formatter = DateTimeFormatter
.ofPattern(format, locale)
.withZone(timezone);
dtf.add(formatter);
dtp.add(new DateTimeParser(format, locale));
} catch (IllegalArgumentException e) {
throw new ConnectException("Invalid configuration, cannot parse date format : " + format);
}
Expand Down Expand Up @@ -100,19 +100,20 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
);

final Expression field = mayEvaluateFieldExpression(evaluationContext);
final String date = field.readValue(evaluationContext, String.class);
final String datetime = field.readValue(evaluationContext, String.class);

if (date == null) {
if (datetime == null) {
throw new FilterException("Invalid field name '" + config.field() + "'");
}

Exception lastException = null;

final Expression target = mayEvaluateTargetExpression(evaluationContext);
for (DateTimeFormatter formatter : dtf) {
for (DateTimeParser parser : dtp) {
long epochMilli = -1;
try {
epochMilli = Instant.from(formatter.parse(date)).toEpochMilli();
final ZonedDateTime zdt = parser.parse(datetime, zoneId);
epochMilli = zdt.toInstant().toEpochMilli();
} catch (Exception e) {
lastException = e;
continue;
Expand All @@ -122,7 +123,7 @@ public RecordsIterable<TypedStruct> apply(final FilterContext context,
}

throw new FilterException(
String.format("Failed to parse date from field '%s' with value '%s'", config.field() , date),
String.format("Failed to parse date from field '%s' with value '%s'", config.field() , datetime),
lastException
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,7 +37,7 @@ public class DateFilterTest {
private FilterContext context;
private Map<String, Object> configs;

@Before
@BeforeEach
public void setUp() {
filter = new DateFilter();
configs = new HashMap<>();
Expand All @@ -47,6 +47,21 @@ public void setUp() {
.build();
}

@Test
public void shouldConvertToEpochTimeGivenDate() {
configs.put(DateFilterConfig.DATE_FIELD_CONFIG, "$.date");
configs.put(DateFilterConfig.DATE_TARGET_CONFIG, "$.timestamp");
configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd"));

filter.configure(configs, alias -> null);
TypedStruct struct = TypedStruct.create().put("date", "2001-07-04");
List<TypedStruct> results = filter.apply(context, struct, false).collect();

TypedStruct record = results.get(0);

Assertions.assertEquals(994204800000L, record.getLong("timestamp").longValue());
}

@Test
public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() {
configs.put(DateFilterConfig.DATE_FIELD_CONFIG, "$.date");
Expand All @@ -59,7 +74,7 @@ public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() {

TypedStruct record = results.get(0);

Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue());
Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue());
}

@Test
Expand All @@ -75,7 +90,7 @@ public void shouldConvertToEpochTimeGivenTimezone() {

TypedStruct record = results.get(0);

Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue());
Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue());
}

@Test
Expand All @@ -91,7 +106,7 @@ public void shouldConvertToEpochTimeGivenLocale() {

TypedStruct record = results.get(0);

Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue());
Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue());
}

}

0 comments on commit 5b5b501

Please sign in to comment.