Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql] Add a module to periodically check ip on DNS (for DB Switching) #2457

Closed
wants to merge 23 commits into from

Conversation

SML0127
Copy link
Contributor

@SML0127 SML0127 commented Sep 2, 2023

First, we'd like to thank everyone who contributes to Flink CDC.

Our team uses Flink CDC to perform MySql CDC.

In the real-world DB use case, DBs are organized in a master-slave structure and DB switching occurs periodically.
When DB Switching occurs, the connected slave server is promoted to the master server.

The problem is that flink CDC connector keeps the connection, but in the real-world use case, there is a policy that it should not be attached to the master server of DB.


For resolve this issue, our team implemented DnsIpChecker modules. (We saw similar issue and answer)

Here's how it works

  1. Allocate one thread at the time of MySqlSourceReader(subtaskId=0) creation. (to share lifecycle of MySqlSourceReader)
  2. That thread periodically checks the IP in DNS based on the FQDN.
  3. If the IP changes, it raises a FlinkRuntimeException by sending a SourceEvent to MySqlSourceEnumerator to restart the Flink Job to reset the DB connection.
  4. Add a close function to MySqlSourceReader, so that the thread is also terminated when the Flink CDC Job is terminated,

This is portion of our code (We assumed that we only capture 1 table per flink cdc job)

  def getMySQLSourceOperator(): MySqlSource[String] = {
    MySqlSource.builder[String]()
      .hostname(mySqlConfig.host)
      .port(mySqlConfig.port)
      .serverTimeZone(mySqlConfig.timeZone)
      .databaseList(mySqlConfig.database)
      .tableList(mySqlConfig.table)
      .username(mySqlConfig.user)
      .serverId(mySqlConfig.serverIdRange)
      .password(mySqlConfig.password)
      .startupOptions(mySqlConfig.startupMode)
      .fetchSize(mySqlConfig.fetchSize)
      .splitSize(mySqlConfig.splitSize)
      .chunkKeyColumn(new ObjectPath(mySqlConfig.database, mySqlConfig.table), mySqlConfig.chunkKeyColumn)
      .connectionPoolSize(mySqlConfig.poolSize)
      .scanNewlyAddedTableEnabled(false)
      .includeSchemaChanges(false)
      .debeziumProperties(mySqlConfig.dbzProps)
      .closeIdleReaders(true)
      .restartOnDbSwitch(true) // here what we implemented
      .deserializer(new JsonDebeziumDeserializationSchema(true, mySqlConfig.jsonConverterProps))
      .build()
  

We considered just resetting the DB connection only,
but it was difficult to get a recovery point, so we chose to restart the Flink Job for using Flink's checkpoints.

Like the log below, we also take periodic logs to check if the IP scan is performing normally.
(Hostname and ip masked)

2023-09-02 07:10:45.003 INFO  com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264361] Current IP address for hostname test_host_name.db.server is XXX
...
2023-09-02 07:11:45.026 INFO  com.ververica.cdc.connectors.mysql.source.utils.DNSIpChecker [] - [264421] Current IP address for hostname test_host_name.db.server is XXX

If you are interested in this feature, please feel free to check it out. Any comments or feedback would also be appreciated.
Have a great day all Flink users 😄

ruanhang1993 and others added 23 commits June 25, 2023 10:21
* [docs] add docs for the mysql tables without primary keys
…the supported features of each connector

This closes apache#1736.
Co-authored-by: rookiegao <rookiegao712@gmail>
)

* [hotfix] Fix sqlserver monitor same table in other database

* Add unit test

---------

Co-authored-by: gongzhongqiang <[email protected]>
@SML0127 SML0127 closed this Sep 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants