diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/DateTimeParser.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/DateTimeParser.java new file mode 100644 index 000000000..7d3da165c --- /dev/null +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/internal/DateTimeParser.java @@ -0,0 +1,82 @@ +/* + * Copyright 2022 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.internal; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalField; +import java.time.temporal.TemporalQueries; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +public final class DateTimeParser { + + private static final Function DEFAULT_ZONED_DATE_TIME = + zid -> ZonedDateTime.of(1970, 1, 1, 0, 0, 0, 0, zid); + + private final DateTimeFormatter formatter; + + + /** + * Creates a new {@link DateTimeParser} instance. + * + * @param pattern the datetime formatter. + */ + public DateTimeParser(final String pattern) { + this(pattern, Locale.ROOT); + } + + /** + * Creates a new {@link DateTimeParser} instance. + * + * @param pattern the datetime formatter. + */ + public DateTimeParser(final String pattern, final Locale locale) { + Objects.requireNonNull(pattern, "'pattern' should not be null"); + Objects.requireNonNull(pattern, "'locale' should not be null"); + this.formatter = new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .appendPattern(pattern) + .toFormatter(locale); + } + + public ZonedDateTime parse(final String datetime, final ZoneId zoneId) { + final TemporalAccessor parsed = formatter.parse(datetime); + + // Get the target ZoneId from the parsed datetime, or default to the one passed in arguments. + final ZoneId parsedZoneId = TemporalQueries.zone().queryFrom(parsed); + final ZoneId atZonedId = Optional.ofNullable(parsedZoneId).orElse(zoneId); + + // Get a new default ZonedDateTime for the ZoneID, then override each temporal field. + ZonedDateTime resolved = DEFAULT_ZONED_DATE_TIME.apply(atZonedId); + for (final TemporalField override : ChronoField.values()) { + if (parsed.isSupported(override)) { + final long value = parsed.getLong(override); + resolved = resolved.with(override, value); + } + } + return resolved; + } +} diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/datetime/ToTimestamp.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/datetime/ToTimestamp.java index 11876ab2e..c59f6899e 100644 --- a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/datetime/ToTimestamp.java +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/datetime/ToTimestamp.java @@ -26,19 +26,10 @@ import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments; import io.streamthoughts.kafka.connect.filepulse.expression.function.EvaluatedExecutionContext; import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction; +import io.streamthoughts.kafka.connect.filepulse.internal.DateTimeParser; import java.time.ZoneId; import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.time.temporal.TemporalAccessor; -import java.time.temporal.TemporalField; -import java.time.temporal.TemporalQueries; -import java.util.Locale; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; public class ToTimestamp implements ExpressionFunction { @@ -89,43 +80,4 @@ public TypedValue invoke(final EvaluatedExecutionContext context) throws Express } }; } - - static final class DateTimeParser { - - private static final Function DEFAULT_ZONED_DATE_TIME = - zid -> ZonedDateTime.of(1970, 1, 1, 0, 0, 0, 0, zid); - - private final DateTimeFormatter formatter; - - /** - * Creates a new {@link DateTimeParser} instance. - * - * @param pattern the datetime formatter. - */ - public DateTimeParser(final String pattern) { - Objects.requireNonNull(pattern, "'pattern' should not be null"); - this.formatter = new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .appendPattern(pattern) - .toFormatter(Locale.ROOT); - } - - public ZonedDateTime parse(final String datetime, final ZoneId zoneId) { - final TemporalAccessor parsed = formatter.parse(datetime); - - // Get the target ZoneId from the parsed datetime, or default to the one passed in arguments. - final ZoneId parsedZoneId = TemporalQueries.zone().queryFrom(parsed); - final ZoneId atZonedId = Optional.ofNullable(parsedZoneId).orElse(zoneId); - - // Get a new default ZonedDateTime for the ZoneID, then override each temporal field. - ZonedDateTime resolved = DEFAULT_ZONED_DATE_TIME.apply(atZonedId); - for (final TemporalField override : ChronoField.values()) { - if (parsed.isSupported(override)) { - final long value = parsed.getLong(override); - resolved = resolved.with(override, value); - } - } - return resolved; - } - } } diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilter.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilter.java index f4b6c7816..51965151e 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilter.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilter.java @@ -23,13 +23,13 @@ import io.streamthoughts.kafka.connect.filepulse.expression.Expression; import io.streamthoughts.kafka.connect.filepulse.expression.StandardEvaluationContext; import io.streamthoughts.kafka.connect.filepulse.expression.parser.ExpressionParsers; +import io.streamthoughts.kafka.connect.filepulse.internal.DateTimeParser; import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.errors.ConnectException; -import java.time.Instant; import java.time.ZoneId; -import java.time.format.DateTimeFormatter; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -43,7 +43,9 @@ public class DateFilter extends AbstractRecordFilter { private Expression targetExpression; - private List dtf; + private ZoneId zoneId; + + private List dtp; /** * {@inheritDoc} @@ -61,15 +63,13 @@ public void configure(final Map props) { throw new ConnectException("Invalid configuration, at least one date format must be provided"); } + + zoneId = config.timezone(); + dtp = new ArrayList<>(config.formats().size()); final Locale locale = config.locale(); - final ZoneId timezone = config.timezone(); - dtf = new ArrayList<>(config.formats().size()); for (String format : config.formats()) { try { - final DateTimeFormatter formatter = DateTimeFormatter - .ofPattern(format, locale) - .withZone(timezone); - dtf.add(formatter); + dtp.add(new DateTimeParser(format, locale)); } catch (IllegalArgumentException e) { throw new ConnectException("Invalid configuration, cannot parse date format : " + format); } @@ -100,19 +100,20 @@ public RecordsIterable apply(final FilterContext context, ); final Expression field = mayEvaluateFieldExpression(evaluationContext); - final String date = field.readValue(evaluationContext, String.class); + final String datetime = field.readValue(evaluationContext, String.class); - if (date == null) { + if (datetime == null) { throw new FilterException("Invalid field name '" + config.field() + "'"); } Exception lastException = null; final Expression target = mayEvaluateTargetExpression(evaluationContext); - for (DateTimeFormatter formatter : dtf) { + for (DateTimeParser parser : dtp) { long epochMilli = -1; try { - epochMilli = Instant.from(formatter.parse(date)).toEpochMilli(); + final ZonedDateTime zdt = parser.parse(datetime, zoneId); + epochMilli = zdt.toInstant().toEpochMilli(); } catch (Exception e) { lastException = e; continue; @@ -122,7 +123,7 @@ public RecordsIterable apply(final FilterContext context, } throw new FilterException( - String.format("Failed to parse date from field '%s' with value '%s'", config.field() , date), + String.format("Failed to parse date from field '%s' with value '%s'", config.field() , datetime), lastException ); } diff --git a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilterTest.java b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilterTest.java index fb0e433dc..a4c990cb4 100644 --- a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilterTest.java +++ b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DateFilterTest.java @@ -22,9 +22,9 @@ import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset; import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; @@ -37,7 +37,7 @@ public class DateFilterTest { private FilterContext context; private Map configs; - @Before + @BeforeEach public void setUp() { filter = new DateFilter(); configs = new HashMap<>(); @@ -47,6 +47,21 @@ public void setUp() { .build(); } + @Test + public void shouldConvertToEpochTimeGivenDate() { + configs.put(DateFilterConfig.DATE_FIELD_CONFIG, "$.date"); + configs.put(DateFilterConfig.DATE_TARGET_CONFIG, "$.timestamp"); + configs.put(DateFilterConfig.DATE_FORMATS_CONFIG, Collections.singletonList("yyyy-MM-dd")); + + filter.configure(configs, alias -> null); + TypedStruct struct = TypedStruct.create().put("date", "2001-07-04"); + List results = filter.apply(context, struct, false).collect(); + + TypedStruct record = results.get(0); + + Assertions.assertEquals(994204800000L, record.getLong("timestamp").longValue()); + } + @Test public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() { configs.put(DateFilterConfig.DATE_FIELD_CONFIG, "$.date"); @@ -59,7 +74,7 @@ public void shouldConvertToEpochTimeGivenNoTimezoneAndNoLocale() { TypedStruct record = results.get(0); - Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue()); + Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue()); } @Test @@ -75,7 +90,7 @@ public void shouldConvertToEpochTimeGivenTimezone() { TypedStruct record = results.get(0); - Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue()); + Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue()); } @Test @@ -91,7 +106,7 @@ public void shouldConvertToEpochTimeGivenLocale() { TypedStruct record = results.get(0); - Assert.assertEquals(994248536000L, record.getLong("timestamp").longValue()); + Assertions.assertEquals(994248536000L, record.getLong("timestamp").longValue()); } } \ No newline at end of file