Skip to content

Commit

Permalink
[cdc-common] Introduce "pipeline.local-time-zone" config option which…
Browse files Browse the repository at this point in the history
… help handle time zone well (#2797)

This closes #2797.
  • Loading branch information
leonardBang authored Dec 4, 2023
1 parent e435004 commit 95921d5
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.util.Arrays;
import java.util.Collections;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Unit test for {@link YamlPipelineDefinitionParser}. */
class YamlPipelineDefinitionParserTest {
Expand Down Expand Up @@ -64,11 +66,81 @@ void testMinimizedDefinition() throws Exception {
void testOverridingGlobalConfig() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
ImmutableMap.<String, String>builder().put("parallelism", "1").put("foo", "bar");
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
PipelineDef pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.put("foo", "bar")
.build()));
assertThat(pipelineDef).isEqualTo(fullDefWithGlobalConf);
}

@Test
void testEvaluateDefaultLocalTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(Paths.get(resource.toURI()), new Configuration());
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE))
.isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
}

@Test
void testValidTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "Asia/Shanghai")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE))
.isEqualTo("Asia/Shanghai");

pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "GMT+08:00")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("GMT+08:00");

pipelineDef =
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(PIPELINE_LOCAL_TIME_ZONE.key(), "UTC")
.build()));
assertThat(pipelineDef.getConfig().get(PIPELINE_LOCAL_TIME_ZONE)).isEqualTo("UTC");
}

@Test
void testInvalidTimeZone() throws Exception {
URL resource = Resources.getResource("definitions/pipeline-definition-minimized.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
assertThatThrownBy(
() ->
parser.parse(
Paths.get(resource.toURI()),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(
PIPELINE_LOCAL_TIME_ZONE.key(),
"invalid time zone")
.build())))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Invalid time zone. The valid value should be a Time Zone Database ID"
+ " such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}

private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.Configuration;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -63,23 +61,21 @@ public interface Factory {
@PublicEvolving
interface Context {

/** Gives the configuration of the current session. */
Configuration getConfiguration();

/**
* Returns the class loader of the current session.
* Returns the factory options used to create the object instances.
*
* <p>The class loader is in particular useful for discovering factories.
* @return options of the current session.
*/
ClassLoader getClassLoader();
Configuration getFactoryConfiguration();

/** Returns the configuration of current pipeline. */
Configuration getPipelineConfiguration();

/**
* Returns the options of the current session.
* Returns the class loader of the current session.
*
* @return options of the current session.
* <p>The class loader is in particular useful for discovering factories.
*/
default Map<String, String> getEnrichmentOptions() {
return Collections.emptyMap();
}
ClassLoader getClassLoader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,39 @@
import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.configuration.Configuration;

import java.util.Map;

/** A helper for working with {@link Factory}. */
@PublicEvolving
public class FactoryHelper {

/** Default implementation of {@link Factory.Context}. */
public static class DefaultContext implements Factory.Context {

private final Map<String, String> enrichmentOptions;
private final Configuration factoryConfiguration;
private final ClassLoader classLoader;
private final Configuration configuration;
private final Configuration pipelineConfiguration;

public DefaultContext(
Map<String, String> enrichmentOptions,
Configuration configuration,
Configuration factoryConfiguration,
Configuration pipelineConfiguration,
ClassLoader classLoader) {
this.enrichmentOptions = enrichmentOptions;
this.configuration = configuration;
this.factoryConfiguration = factoryConfiguration;
this.pipelineConfiguration = pipelineConfiguration;
this.classLoader = classLoader;
}

@Override
public Configuration getConfiguration() {
return configuration;
public Configuration getFactoryConfiguration() {
return factoryConfiguration;
}

@Override
public ClassLoader getClassLoader() {
return classLoader;
public Configuration getPipelineConfiguration() {
return pipelineConfiguration;
}

@Override
public Map<String, String> getEnrichmentOptions() {
return enrichmentOptions;
public ClassLoader getClassLoader() {
return classLoader;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ public class PipelineOptions {
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
.build());

public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
ConfigOptions.key("pipeline.local-time-zone")
.stringType()
// "systemDefault" is a special value to decide whether to use
// ZoneId.systemDefault() in
// PipelineOptions.getLocalTimeZone()
.defaultValue("systemDefault")
.withDescription(
Description.builder()
.text(
"The local time zone defines current session time zone id. ")
.linebreak()
.text(
"It is used when converting to/from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>. "
+ "Internally, timestamps with local time zone are always represented in the UTC time zone. "
+ "However, when converting to data types that don't include a time zone (e.g. TIMESTAMP, STRING), "
+ "the session time zone is used during conversion. The input of option is either a full name "
+ "such as \"America/Los_Angeles\", or a custom timezone id such as \"GMT-08:00\".")
.build());

public static final ConfigOption<String> SCHEMA_OPERATOR_UID =
ConfigOptions.key("pipeline.schema.operator.uid")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@

package com.ververica.cdc.composer.definition;

import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.types.LocalZonedTimestampType;

import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;

/**
* Definition of a pipeline.
Expand Down Expand Up @@ -58,7 +64,7 @@ public PipelineDef(
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.config = config;
this.config = evaluatePipelineTimeZone(config);
}

public SourceDef getSource() {
Expand Down Expand Up @@ -117,4 +123,50 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(source, sink, routes, transforms, config);
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/**
* Returns the current session time zone id. It is used when converting to/from {@code TIMESTAMP
* WITH LOCAL TIME ZONE}.
*
* @see LocalZonedTimestampType
*/
@VisibleForTesting
private static Configuration evaluatePipelineTimeZone(Configuration configuration) {
final String zone = configuration.get(PIPELINE_LOCAL_TIME_ZONE);
ZoneId zoneId;
if (PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zone)) {
zoneId = ZoneId.systemDefault();
} else {
validateTimeZone(zone);
zoneId = ZoneId.of(zone);
}
configuration.set(PIPELINE_LOCAL_TIME_ZONE, zoneId.toString());
return configuration;
}

/**
* Validates a time zone is valid or not.
*
* @param zone given time zone
*/
private static void validateTimeZone(String zone) {
boolean isValid;
try {
isValid = TimeZone.getTimeZone(zone).toZoneId().equals(ZoneId.of(zone));
} catch (Exception ignore) {
isValid = false;
}

if (!isValid) {
throw new IllegalArgumentException(
"Invalid time zone. The valid value should be a Time Zone Database ID "
+ "such as 'America/Los_Angeles' to include daylight saving time. "
+ "Fixed offsets are supported using 'GMT-08:00' or 'GMT+08:00'. "
+ "Or use 'UTC' without time zone and daylight saving time.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
Expand Down Expand Up @@ -80,14 +81,14 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Source
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, parallelism);
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());

// Route
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink());
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand Down Expand Up @@ -118,7 +119,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef) {
private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -131,8 +132,8 @@ private DataSink createDataSink(SinkDef sinkDef) {
// Create data sink
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
sinkDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.FlinkSourceFunctionProvider;
Expand All @@ -39,8 +41,9 @@
*/
@Internal
public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, int sourceParallelism) {
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -50,15 +53,16 @@ public DataStreamSource<Event> translate(
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig().toMap(),
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));

// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(), DataSourceFactory.class)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.GLOBAL_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void testCreateDataSinkFromSinkDef() {
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
sinkDef.getConfig(),
new Configuration(),
Thread.currentThread().getContextClassLoader()));

Assert.assertTrue(dataSink instanceof DataSinkFactory1.TestDataSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void testCreateDataSourceFromSourceDef() {
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig().toMap(),
sourceDef.getConfig(),
new Configuration(),
Thread.currentThread().getContextClassLoader()));

Assert.assertTrue(dataSource instanceof DataSourceFactory1.TestDataSource);
Expand Down
Loading

0 comments on commit 95921d5

Please sign in to comment.