Skip to content

Commit

Permalink
Added Debezium Source for MS SQL Server (apache#12256)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5c28686)
  • Loading branch information
dlg99 authored and eolivelli committed Mar 8, 2022
1 parent 7ae3a97 commit 0e98fbd
Show file tree
Hide file tree
Showing 11 changed files with 515 additions and 1 deletion.
1 change: 1 addition & 0 deletions distribution/io/src/assemble/io.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/oracle/target/pulsar-io-debezium-oracle-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mssql/target/pulsar-io-debezium-mssql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
Expand Down
58 changes: 58 additions & 0 deletions pulsar-io/debezium/mssql/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-debezium</artifactId>
<version>2.9.0-SNAPSHOT</version>
</parent>

<artifactId>pulsar-io-debezium-mssql</artifactId>
<name>Pulsar IO :: Debezium :: Microsoft SQL</name>

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.debezium.mssql;

import java.util.Map;

import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;


/**
* A pulsar source that runs debezium mssql source
*/
public class DebeziumMsSqlSource extends DebeziumSource {
private static final String DEFAULT_TASK = "io.debezium.connector.sqlserver.SqlServerConnectorTask";

@Override
public void setDbConnectorTask(Map<String, Object> config) throws Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: debezium-mssql
description: Debezium Microsoft SQL Server Source
sourceClass: org.apache.pulsar.io.debezium.mssql.DebeziumMsSqlSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

tenant: "public"
namespace: "default"
name: "debezium-mssql-source"
topicName: "debezium-mssql-topic"
archive: "connectors/pulsar-io-debezium-mssql-2.9.0-SNAPSHOT.nar"

parallelism: 1

configs:
database.hostname: "localhost"
database.port: "1521"
database.user: "sa"
database.password: "MyP@ssword1"
database.dbname: "MyDB"
database.server.name: "mssql"

database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
1 change: 1 addition & 0 deletions pulsar-io/debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<module>postgres</module>
<module>mongodb</module>
<module>oracle</module>
<module>mssql</module>
</modules>

</project>
19 changes: 18 additions & 1 deletion site2/docs/io-connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Built-in connector
sidebar_label: Built-in connector
---

Pulsar distribution includes a set of common connectors that have been packaged and tested with the rest of Apache Pulsar. These connectors import and export data from some of the most commonly used data systems. For a full set of third-party connectors that Pulsar supports, refer to [StreamNative Hub](https://hub.streamnative.io/).
Pulsar distribution includes a set of common connectors that have been packaged and tested with the rest of Apache Pulsar. These connectors import and export data from some of the most commonly used data systems.

Using any of these connectors is as easy as writing a simple connector and running the connector locally or submitting the connector to a Pulsar Functions cluster.

Expand Down Expand Up @@ -45,6 +45,23 @@ Pulsar has various source connectors, which are sorted alphabetically as below.

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java)

### Debezium Oracle

* [Configuration](io-debezium-source.md#configuration)

* [Example](io-debezium-source.md#example-of-oracle)

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java)

### Debezium Microsoft SQL Server

* [Configuration](io-debezium-source.md#configuration)

* [Example](io-debezium-source.md#example-of-microsoft-sql)

* [Java class](https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java)


### DynamoDB

* [Configuration](io-dynamodb-source.md#configuration)
Expand Down
66 changes: 66 additions & 0 deletions site2/docs/io-debezium-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,72 @@ This example shows how to change the data of a MongoDB table using the Pulsar De
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.products.Key"},"payload":{"id":"104"}}, value = {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"after":"{\"_id\": {\"$numberLong\": \"104\"},\"name\": \"hammer\",\"description\": \"12oz carpenter's hammer\",\"weight\": 1.25,\"quantity\": 4}","patch":null,"source":{"version":"0.10.0.Final","connector":"mongodb","name":"dbserver1","ts_ms":1573541905000,"snapshot":"true","db":"inventory","rs":"rs0","collection":"products","ord":1,"h":4983083486544392763},"op":"r","ts_ms":1573541909761}}.
```

## Example of Microsoft SQL

### Configuration

Debezium [requires](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#sqlserver-overview) SQL Server with CDC enabled.
Steps outlined in the [documentation](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#setting-up-sqlserver) and used in the [integration test](https://github.com/apache/pulsar/blob/master/tests/integration/src/test/java/org/apache/pulsar/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java).
For more information, see [Enable and disable change data capture in Microsoft SQL Server](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server).

Similarly to other connectors, you can use JSON or YAMl to configure the connector.

* JSON

```json
{
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "sa",
"database.password": "MyP@ssw0rd!",
"database.dbname": "MyTestDB",
"database.server.name": "mssql",
"snapshot.mode": "schema_only",
"topic.namespace": "public/default",
"task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"typeClassName": "org.apache.pulsar.common.schema.KeyValue",
"database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
"database.tcpKeepAlive": "true",
"decimal.handling.mode": "double",
"database.history.pulsar.topic": "debezium-mssql-source-history-topic",
"database.history.pulsar.service.url": "pulsar://127.0.0.1:6650"
}
```
* YAML

```yaml
tenant: "public"
namespace: "default"
name: "debezium-mssql-source"
topicName: "debezium-mssql-topic"
parallelism: 1

className: "org.apache.pulsar.io.debezium.mssql.DebeziumMsSqlSource"
database.dbname: "mssql"

configs:
database.hostname: "localhost"
database.port: "1433"
database.user: "sa"
database.password: "MyP@ssw0rd!"
database.dbname: "MyTestDB"
database.server.name: "mssql"
snapshot.mode: "schema_only"
topic.namespace: "public/default"
task.class: "io.debezium.connector.sqlserver.SqlServerConnectorTask"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
typeClassName: "org.apache.pulsar.common.schema.KeyValue"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.tcpKeepAlive: "true"
decimal.handling.mode: "double"
database.history.pulsar.topic: "debezium-mssql-source-history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
```
For the full list of configuration properties supported by Debezium, see [Debezium Connector for MS SQL](https://debezium.io/documentation/reference/1.5/connectors/sqlserver.html#sqlserver-connector-properties).
## FAQ
### Debezium postgres connector will hang when create snap
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;


import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class DebeziumMsSqlContainer extends ChaosContainer<DebeziumMsSqlContainer> {

// This password needs to include at least 8 characters of at least three of these four categories:
// uppercase letters, lowercase letters, numbers and non-alphanumeric symbols
public static final String SA_PASSWORD = "p@ssw0rD";
public static final String NAME = "debezium-mssql";
static final Integer[] PORTS = { 1433 };

// https://hub.docker.com/_/microsoft-mssql-server
// EULA: https://go.microsoft.com/fwlink/?linkid=857698
// "You may install and use copies of the software on any device,
// including third party shared devices, to design, develop, test and demonstrate your programs.
// You may not use the software on a device or server in a production environment."
private static final String IMAGE_NAME = "mcr.microsoft.com/mssql/server:2019-latest";

public DebeziumMsSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
}

@Override
public String getContainerName() {
return clusterName;
}

@Override
protected void configure() {
super.configure();
// leaving default MSSQL_PID (aka Developer edition)
this.withNetworkAliases(NAME)
.withExposedPorts(PORTS)
.withEnv("ACCEPT_EULA", "Y")
.withEnv("SA_PASSWORD", SA_PASSWORD)
.withEnv("MSSQL_AGENT_ENABLED", "true")
.withStartupTimeout(Duration.of(300, ChronoUnit.SECONDS))
.withCreateContainerCmdModifier(createContainerCmd -> {
createContainerCmd.withHostName(NAME);
createContainerCmd.withName(getContainerName());
})
.waitingFor(new HostPortWaitStrategy());
}

}
Loading

0 comments on commit 0e98fbd

Please sign in to comment.