Skip to content

Commit

Permalink
Added Debezium Source for MS SQL Server (apache#12256)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 authored and ciaocloud committed Oct 28, 2021
1 parent 21912e7 commit e878ff8
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 0 deletions.
1 change: 1 addition & 0 deletions distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/oracle/target/pulsar-io-debezium-oracle-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mssql/target/pulsar-io-debezium-mssql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
Expand Down
58 changes: 58 additions & 0 deletions pulsar-io/debezium/mssql/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-debezium</artifactId>
<version>2.9.0-SNAPSHOT</version>
</parent>

<artifactId>pulsar-io-debezium-mssql</artifactId>
<name>Pulsar IO :: Debezium :: Microsoft SQL</name>

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.debezium.mssql;

import java.util.Map;

import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;


/**
* A pulsar source that runs debezium mssql source
*/
public class DebeziumMsSqlSource extends DebeziumSource {
private static final String DEFAULT_TASK = "io.debezium.connector.sqlserver.SqlServerConnectorTask";

@Override
public void setDbConnectorTask(Map<String, Object> config) throws Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: debezium-mssql
description: Debezium Microsoft SQL Server Source
sourceClass: org.apache.pulsar.io.debezium.mssql.DebeziumMsSqlSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

tenant: "public"
namespace: "default"
name: "debezium-mssql-source"
topicName: "debezium-mssql-topic"
archive: "connectors/pulsar-io-debezium-mssql-2.9.0-SNAPSHOT.nar"

parallelism: 1

configs:
database.hostname: "localhost"
database.port: "1521"
database.user: "sa"
database.password: "MyP@ssword1"
database.dbname: "MyDB"
database.server.name: "mssql"

database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
1 change: 1 addition & 0 deletions pulsar-io/debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>postgres</module>
<module>mongodb</module>
<module>oracle</module>
<module>mssql</module>
</modules>

</project>
9 changes: 9 additions & 0 deletions site2/docs/io-connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ Pulsar has various source connectors, which are sorted alphabetically as below.
* [Example](io-debezium-source.md#example-of-oracle)

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java)

### Debezium Microsoft SQL Server

* [Configuration](io-debezium-source.md#configuration)

* [Example](io-debezium-source.md#example-of-microsoft-sql)

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java)


### DynamoDB

Expand Down
66 changes: 66 additions & 0 deletions site2/docs/io-debezium-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,72 @@ configs:
For the full list of configuration properties supported by Debezium, see [Debezium Connector for Oracle](https://debezium.io/documentation/reference/1.5/connectors/oracle.html#oracle-connector-properties).
## Example of Microsoft SQL
### Configuration
Debezium [requires](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#sqlserver-overview) SQL Server with CDC enabled.
Steps outlined in the [documentation](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#setting-up-sqlserver) and used in the [integration test](https://github.com/apache/pulsar/blob/master/tests/integration/src/test/java/org/apache/pulsar/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java).
For more information, see [Enable and disable change data capture in Microsoft SQL Server](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server).
Similarly to other connectors, you can use JSON or YAMl to configure the connector.
* JSON
```json
{
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "sa",
"database.password": "MyP@ssw0rd!",
"database.dbname": "MyTestDB",
"database.server.name": "mssql",
"snapshot.mode": "schema_only",
"topic.namespace": "public/default",
"task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"typeClassName": "org.apache.pulsar.common.schema.KeyValue",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"database.tcpKeepAlive": "true",
"decimal.handling.mode": "double",
"database.history.pulsar.topic": "debezium-mssql-source-history-topic",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
}
```
* YAML

```yaml
tenant: "public"
namespace: "default"
name: "debezium-mssql-source"
topicName: "debezium-mssql-topic"
parallelism: 1

className: "org.apache.pulsar.io.debezium.mssql.DebeziumMsSqlSource"
database.dbname: "mssql"

configs:
database.hostname: "localhost"
database.port: "1433"
database.user: "sa"
database.password: "MyP@ssw0rd!"
database.dbname: "MyTestDB"
database.server.name: "mssql"
snapshot.mode: "schema_only"
topic.namespace: "public/default"
task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
typeClassName: "org.apache.pulsar.common.schema.KeyValue"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.tcpKeepAlive: "true"
decimal.handling.mode: "double"
database.history.pulsar.topic: "debezium-mssql-source-history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
```
For the full list of configuration properties supported by Debezium, see [Debezium Connector for MS SQL](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#sqlserver-connector-properties).
## FAQ
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;


import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class DebeziumMsSqlContainer extends ChaosContainer<DebeziumMsSqlContainer> {

// This password needs to include at least 8 characters of at least three of these four categories:
// uppercase letters, lowercase letters, numbers and non-alphanumeric symbols
public static final String SA_PASSWORD = "p@ssw0rD";
public static final String NAME = "debezium-mssql";
static final Integer[] PORTS = { 1433 };

// https://hub.docker.com/_/microsoft-mssql-server
// EULA: https://go.microsoft.com/fwlink/?linkid=857698
// "You may install and use copies of the software on any device,
// including third party shared devices, to design, develop, test and demonstrate your programs.
// You may not use the software on a device or server in a production environment."
private static final String IMAGE_NAME = "mcr.microsoft.com/mssql/server:2019-latest";

public DebeziumMsSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
}

@Override
public String getContainerName() {
return clusterName;
}

@Override
protected void configure() {
super.configure();
// leaving default MSSQL_PID (aka Developer edition)
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withEnv("ACCEPT_EULA", "Y")
.withEnv("SA_PASSWORD", SA_PASSWORD)
.withEnv("MSSQL_AGENT_ENABLED", "true")
.withStartupTimeout(Duration.of(300, ChronoUnit.SECONDS))
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
})
.waitingFor(new HostPortWaitStrategy());
}

}
Loading

0 comments on commit e878ff8

Please sign in to comment.