Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Atomic Distributed Transactions #16245

Open
11 of 21 tasks
harshit-gangal opened this issue Jun 21, 2024 · 10 comments · May be fixed by #16844
Open
11 of 21 tasks

RFC: Atomic Distributed Transactions #16245

harshit-gangal opened this issue Jun 21, 2024 · 10 comments · May be fixed by #16844
Labels
Type: RFC Request For Comment

Comments

@harshit-gangal
Copy link
Member

harshit-gangal commented Jun 21, 2024

Introduction

This document focuses on reintroducing the atomic distributed transaction implementation and addressing the shortcomings with improved and robust support.

Background

Existing System Overview

Vitess has three transaction modes; those are Single, Multi and TwoPC.

In Single Mode, any transaction that spans more than one shard is rolled back immediately. This mode keeps the transaction to a single shard and provides ACID-compliant transactions.

In Multi Mode, a commit on a multi-shard transaction is handled with a best-effort commit. Any commit failure on a shard rolls back the non-committed transactions. The previously committed shard transactions and the failure shard need application-side handling.

In TwoPC Mode, a commit on a multi-shard transaction follows a sequence of steps to achieve an atomic distributed commit. The existing design document is extensive and explains all the component interactions needed to support it. It also highlights the different failure scenarios and how they should be handled.

Existing Implementation

A Two-Phase commit protocol requires a Transaction Manager (TM) and Resource Managers (RMs).

Resource Managers are the participating VTTablets for the transaction. Their role is to prepare the transaction and return a success or failure response. During the prepare phase, RMs store all the queries executed on that transaction in recovery logs as statements. If an RM fails, upon coming back online, it prepares all the transactions using the transaction recovery logs by executing the statements before accepting any further transactions or queries.

The Transaction Manager role is handled by VTGate. On commit, VTGate creates a transaction record and stores it in one of the participating RMs, designating it as the Metadata Manager (MM). VTGate then issues a prepare request to the other involved RMs. If any RM responds with a failure, VTGate decides to roll back the transaction and stores this decision in the MM. VTGate then issues a rollback prepared request to all the involved RMs.

If all RMs respond successfully, VTGate decides to commit the transaction. It issues a start commit to the MM, which commits the ongoing transaction and stores the commit decision in the transaction record. VTGate then issues a commit prepared request to the other involved RMs. After committing on all RMs, VTGate concludes by removing the transaction record from the MM.

All MMs have a watcher service that monitors unresolved transactions and transmits them to the TM for resolution.

Benefits of the Existing Approach:

  1. The application does not have to communicate upfront about transactions going cross-shard.
  2. TM maintains the transaction metadata with RM keeping itself stateless.
  3. Storing transaction metadata with one of the RM stating it as MM avoids the Prepare phase for the MM.
  4. The transaction is committed with non-2PC workflow if the transaction exists on a single shard.

Problem Statement

The existing implementation of atomic distributed commit is a modified version of the Two-Phase Commit (2PC) protocol that addresses its inherent issues while making practical trade-offs. This approach efficiently handles single-shard transactions and adopts a realistic method for managing transactions across multiple shards. However, there are issues with the watchdog design, as well as other reliability concerns. Additionally, there are workflow improvements and performance enhancements that need to be addressed. This document will highlight these issues and provide solutions with the rework.

Existing Issues and Proposal

1. Distributed Transaction Identifier (DTID) Generation

The Transaction Manager (TM) designates the first participant of the transaction as MM. It generates the DTID using MM’s shard prefix and the transaction ID. This method ensures uniqueness across shards, it introduces potential conflicts due to the auto-increment transaction ID being reset upon a VTTablet restart.

Impact:

  • Additional Recovery Workflow: To prevent collisions, on startup all VTTablets must ensure their last transaction ID is set to the maximum value of the in-flight distributed transactions.
  • Risk of DTID Collision: If synchronization is missed or fails, VTGate might generate DTIDs that collide with existing in-flight transactions. This could result in incorrect transactions being modified or committed, leading to data corruption and loss of transactional integrity.
  • Exhaustion of ID Space: The skipping of ID ranges will likely continue, and over time, there is a potential risk of reaching the limit of the auto-increment ID range.

Proposals:

  • Proposal 1: Use a centralized sequence generator that will ensure unique ID across keyspace and shards. This sequence will then be mapped to a shard using a hash function. That shard primary will become the MM for that transaction.
  • Proposal 2: TM will create the DTID using UUIDv7 or Nano ID and will follow similar steps as Proposal 1. There is a possibility that the DTID generated might not be unique and can fail on Create Transaction Record API on MM which will lead to transaction rollback.
  • Proposal 3: The first participant of the transaction will be the MM. It will create the DTID using a 32-byte keyspace-shard name as a prefix along with a 14-byte Nano ID when TM invokes the Create Transaction Record API.
  • Proposal 4: TM creates the DTID using Proposal 3 and sends it to MM to store as part of the Create Transaction Record API. There is a possibility that the DTID generated might not be unique and can fail on Create Transaction Record API on MM which will lead to transaction rollback.

Conclusion:

Proposal 1 is good but it adds a dependency on a new system to provide the DTID. Proposal 2 reduces that dependency by having TM generate the DTID, but it risks generating duplicate DTID which might fail on Create Transaction Record API, leading to transaction rollback. Proposal 3 ensures the DTID is unique but results in a long DTID key. Proposal 4 also risks DTID collisions, causing transaction rollback on the Create Transaction Record API call.

Proposals 1 & 2 can map the DTID to non-participating RMs, making it the MM. These additional network calls will increase the system’s commit latency. Proposals 3 & 4 avoid this extra hop but significantly increase the DTID size. The larger DTID size outweighs the efficiency gains from using one of the participating RMs as MM in the overall commit process.

Proposal 3 looks like the most balanced and reliable option here.

2. Transaction Resolution Design

The MM is currently being provided with a fixed IP address of the TM on startup to invoke TM ResolveTransaction API for unresolved transactions.

Impact:

  • Operational Overhead: If the TM's IP Address changes then MM will not be able to contact the TM without a manual update and restart of MM.
  • Single Point of Failure: If the TM is down or not reachable, then MM will not be able to provide unresolved transactions.
  • Scalability: Having a single TM to resolve the transactions will limit the ability to scale horizontally.

Proposals:

  • Proposal 1: MM will notify VTGates via the vttablet health stream for pending unresolved transactions. VTGate will invoke MM to retrieve the unresolved transaction details. MM will manage the multiple invocations from VTGates and provide the different sets of unresolved transactions to each VTGate. VTGate will then resolve the transaction based on the current state of the transaction.
  • Proposal 2: VTOrc will track the unresolved transactions from the MM. VTOrch will then notify the VTGates to resolve the transaction. This would need service discovery for VTGates.

Conclusion:

Proposal 1 is the more practical choice as it utilizes existing infrastructure, which is proven and already used for other purposes like real-time stats and schema tracking. Unlike Proposal 2, which requires full-fledged development of a VTGate service discovery system.

3. Connection Settings

The current implementation does not store changes in the connection settings in the transaction recovery logs. Its omission risks the integrity and consistency of the distributed transaction during a failure recovery scenario.

Impact:

  • Data Integrity Risk: When the transaction is recovered using the recovery logs, the session state may not reflect the connection settings that were in effect when the transaction was originally executed. This can lead to inconsistencies in transaction behaviour such as differences in character sets, time zones, isolation levels and other session-specific settings.
  • Failure Recovery Issue: During the failure recovery, the lack of stored connection settings can lead to some statements failing to get applied which will lead to failed prepared transactions and loss of atomicity of the transaction.

Proposal: Along with redo statement logs, the connections settings as set statements will be stored in the sequence of when they were executed. On recovery, the same sequence will be used to prepare the transaction.

4. Prepared Transactions Connection Stability

The current implementation assumes a stable MySQL connection after preparing a transaction on an RM. Any connection disruption will roll back the transaction and may cause data inconsistency due to modifications by other concurrent transactions.

Impact:

  • Transaction Atomicity Loss: Recovery of the prepared transaction can fail as the underlying data might have changed and hence this transaction will be called as unrecoverable losing the atomicity of the transaction.

Proposals:

  • Proposal 1: All the database connections that are part of the distributed transaction to MySQL would only be allowed on the Unix socket. Prepare Transaction step will fail if they are using a network connection.
  • Proposal 2: The locking mechanism needs to be moved to MySQL this will ensure the rows part of the transaction remains locked for modification by other transactions even when the connection is disconnected. MySQL should be able to provide a recovery mechanism for such locked rows. MySQL does solve this problem via the XA protocol.

Conclusion:

Proposal 1 is recommended for immediate adoption to enhance connection stability and prevent unreliable TCP connections. If testing identifies issues with Unix socket stability, Proposal 2 will be implemented to leverage MySQL's XA protocol for transactional integrity and recovery.

5. Transaction Recovery Logs Application Reliability

The current implementation stores the transaction recovery logs as DML statements. On transaction recovery, while applying the statements from these logs it is not expected to fail as the current shutdown and startup workflow ensure that no other DMLs leak into the database. Still, there remains a risk of statement failure during the redo log application, potentially resulting in lost modifications without clear tracking of modified rows.

Impact:

  • Lack of Visibility: In case of a failed statement application, it becomes unclear which rows were originally modified, complicating data recovery.

Proposals:

  • Proposal 1: Implement a copy-on-write approach, where raw mutations are stored in a separate shadow table. Upon commit, these mutations are materialized to the primary data tables. This will ensure that any failure during redo log application can be traced back to the original modifications, improving data recovery and maintaining transactional integrity.
  • Proposal 2: MySQL provides XA protocol support which handles the transaction recovery logs for distributed transactions. This will eliminate the need to store the recovery logs by Vitess.

Conclusion:

Currently, neither proposal will be implemented, as the expectation is that redo log applications should not fail during recovery. Should any recovery tests fail due to redo log application issues, Proposal 2 will be prioritized for its inherent advantages over Proposal 1.

6. Unsupported Consistent Lookup Vindex

The current implementation disallows the use of consistent lookup vindexes and upfront rejects any distributed transaction commit involving them.

Impact:

  • Operational Disruption: Existing Vitess clusters using consistent lookup vindexes must drop them before enabling distributed commits, causing operational disruption and requiring significant changes to the existing setup.

Proposal: Allow the consistent lookup vindex to continue. The pre-transaction will continue to work as-is. Any failure on the pre-transaction commit will roll back the main transaction. The post-transaction will only continue once the distributed transaction is completed. Otherwise, the post-transaction will be rolled back.

7. Resharding, Move Tables and Online Schema Change not Accounted

The current implementation has not handled the complications of running a resharding workflow, a move tables workflow, or an online schema change workflow in parallel with in-flight prepared distributed transactions.

Impact:

  • Transaction Atomicity Loss: Running these workflows in parallel with distributed transactions can destabilize prepared transactions, potentially compromising the atomicity guarantee.

Proposals:

  • Proposal 1: All the VReplication-related workflow needs to account for ongoing distributed transactions during their cut-over. A new tablet manager API will be added to check if it is safe to continue the cutover. This API will take a lock on the involved table and the workflow needs to unlock them at the end of it. VTOrc needs to handle the unlocking of the table if the workflow is abandoned for any reason.
  • Proposal 2: Look into different kinds of workflow and see if a safe cutover can happen and have different case-by-case implementations to support distributed transactions along the cutover.

Conclusion:

Proposal 1 is relatively easier to argue about the expectation. All workflows will use the same strategy. The new API can be extended to be used for other flows as well.

Exploratory Work

MySQL XA was considered as an alternative to having RMs manage the transaction recovery logs and hold up the row locks until a commit or rollback occurs.

There are currently 20 open bugs on XA. On MySQL 8.0.33, reproduction steps were followed for all the bugs, and 8 still persist. Out of these 8 bugs, 4 have patches attached that resolve the issues when applied. For the remaining 4 issues, changes will need to be made either in the code or the workflow to ensure they are resolved.

MySQL’s XA seems a probable candidate if we encounter issues with our implementation of handling distributed transactions that XA can resolve. XA’s usage is currently neither established nor ruled out in this design.

Rework Design

Commit Phase Interaction

The Component interaction for different cases.

Any error in the commit phase is indicated to the application with a warning flag. If an application's transaction receives a warning signal, it can execute a show warnings to know the distributed transaction ID for that transaction. It can watch the transaction status with show transaction status for <dtid>.

Case 1: All components respond with success.

sequenceDiagram
  participant App as App
  participant G as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  App ->>+ G: Commit
  G ->> MM: Create Transaction Record
  MM -->> G: Success
  par
  G ->> RM1: Prepare Transaction
  G ->> RM2: Prepare Transaction
  RM1 -->> G: Success
  RM2 -->> G: Success
  end
  G ->> MM: Store Commit Decision
  MM -->> G: Success
  par
  G ->> RM1: Commit Prepared Transaction
  G ->> RM2: Commit Prepared Transaction
  RM1 -->> G: Success
  RM2 -->> G: Success
  end
  opt Any failure here does not impact the reponse to the application
  G ->> MM: Delete Transaction Record
  end
  G ->>- App: OK Packet
Loading

Case 2: When the Commit Prepared Transaction from the RM responds with an error. In this case, the watcher service needs to resolve the transaction and commit the pending prepared transactions.

sequenceDiagram
  participant App as App
  participant G as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  App ->>+ G: Commit
  G ->> MM: Create Transaction Record
  MM -->> G: Success
  par
  G ->> RM1: Prepare Transaction
  G ->> RM2: Prepare Transaction
  RM1 -->> G: Success
  RM2 -->> G: Success
  end
  G ->> MM: Store Commit Decision
  MM -->> G: Success
  par
  G ->> RM1: Commit Prepared Transaction
  G ->> RM2: Commit Prepared Transaction
  RM1 -->> G: Success
  RM2 -->> G: Failure
  end
  G ->>- App: Err Packet
Loading

Case 3: When the Commit Descision from MM responds with an error. In this case, the watcher service needs to resolve the transaction as it is not certain whether the commit decision persisted or not.

sequenceDiagram
  participant App as App
  participant G as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  App ->>+ G: Commit
  G ->> MM: Create Transaction Record
  MM -->> G: Success
  par
  G ->> RM1: Prepare Transaction
  G ->> RM2: Prepare Transaction
  RM1 -->> G: Success
  RM2 -->> G: Success
  end
  G ->> MM: Store Commit Decision
  MM -->> G: Failure
  G ->>- App: Err Packet
Loading

Case 4: When a Prepare Transaction fails. TM will decide to roll back the transaction. If any rollback returns a failure, the watcher service will resolve the transaction.

sequenceDiagram
  participant App as App
  participant G as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  App ->>+ G: Commit
  G ->> MM: Create Transaction Record
  MM -->> G: Success
  par
  G ->> RM1: Prepare Transaction
  G ->> RM2: Prepare Transaction
  RM1 -->> G: Failure
  RM2 -->> G: Success
  end
  par
  G ->> MM: Store Rollback Decision
  G ->> RM1: Rollback Prepared Transaction
  G ->> RM2: Rollback Prepared Transaction
  MM -->> G: Success / Failure
  RM1 -->> G: Success / Failure
  RM2 -->> G: Success / Failure
  end
  opt Rollback success on MM and RMs
    G ->> MM: Delete Transaction Record
  end
  G ->>- App: Err Packet
Loading

Case 5: When Create Transaction Record fails. TM will roll back the transaction.

sequenceDiagram
  participant App as App
  participant G as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  App ->>+ G: Commit
  G ->> MM: Create Transaction Record
  MM -->> G: Failure
  par
  G ->> RM1: Rollback Transaction
  G ->> RM2: Rollback Transaction
  RM1 -->> G: Success / Failure
  RM2 -->> G: Success / Failure
  end
  G ->>- App: Err Packet
Loading

Transaction Resolution Watcher

If there are long pending distributed transactions in the MM. This watcher service will ensure that TM is invoked to resolve them.

sequenceDiagram
  participant G1 as VTGate
  participant G2 as VTGate
  participant MM as VTTablet/MM
  participant RM1 as VTTablet/RM1
  participant RM2 as VTTablet/RM2

  MM -) G1: Unresolved Transaction
  MM -) G2: Unresolved Transaction
  Note over G1,MM: MM sends this over health stream.
  loop till no more unresolved transactions
  G1 ->> MM: Provide Transaction details
  G2 ->> MM: Provide Transaction details
  MM -->> G2: Distributed Transaction ID details
  Note over G2,MM: This VTGate recieves the transaction to resolve.
  end
  alt Transaction State: Commit
  G2 ->> RM1: Commit Prepared Transaction
  G2 ->> RM2: Commit Prepared Transaction
  else Transaction State: Rollback
  G2 ->> RM1: Rollback Prepared Transaction
  G2 ->> RM2: Rollback Prepared Transaction
  else Transaction State: Prepare
  G2 ->> MM: Store Rollback Decision
  MM -->> G2: Success
  opt Only when Rollback Decision is stored
  G2 ->> RM1: Rollback Prepared Transaction
  G2 ->> RM2: Rollback Prepared Transaction
  end
  end
  opt Commit / Rollback success on MM and RMs
  G2 ->> MM: Delete Transaction Record
  end
Loading

Improvements and Enhancements

  1. Track which shards have DML applied as not all transactions open will have DML. This can reduce the participating RMs in the distributed transaction. Any shard that does not have a DML applied but an open transaction can be closed without impacting the atomicity of the transaction.
  2. All the atomic transactions related APIs will be idempotent so that if TM tries to resolve the same DTID multiple times, the outcome will not be impacted.
  3. During Commit if DTID is generated and there is any error in the commit flow, it will be notified via a warning message to the app. The DTID provided can be tracked by the app via the show transaction status for <dtid> command.

Implementation Plan

Task Breakdown:

  • Modify the commit phase component interaction in TM based on new flows.
  • Implement the new transaction resolution design
  • Add support for tracking DTID state
  • Modify existing API implementation to be idempotent
  • Add prepared transaction protection from VTGate Restart
  • Add prepared transaction protection from VTTablet Restart
  • Add prepared transaction protection from MySQL Restart
  • Add prepared transaction protection from PRS & ERS
  • Add prepared transaction protection from Online DDL
  • Add prepared transaction protection from Move Tables
  • Add prepared transaction protection from Resharding - Making Reshard work smoothly with Atomic Transactions #16844
  • Add prepared transaction protection from modifying query rules
  • In flight Distributed transaction visibility and actions through VTAdmin
  • Record and store connection settings to the transaction recovery log
  • Record and store savepoints to the transaction recovery log
  • Reject distributed commit on a network connection
  • Tracking DMLs on shard transactions and using them for improved commit phase.
  • Emitting metrics and documenting the use case for them
  • Documentation (User Guide, Troubleshooting)
  • Troubleshooting tools
  • Add support for consistent lookup vindex with distributed transaction
  • [ ] Implement new DTID generator logic

Testing Strategy

This is the most important piece to ensure all cases are covered, and APIs are tested thoroughly to ensure correctness and determine scalability.

Test Plan

Basic Tests

Commit or rollback of transactions, and handling prepare failures leading to transaction rollbacks.

Test Case Expectation
Distributed Transaction - Commit Transaction committed, transaction record cleared, and metrics updated
Distributed Transaction - Rollback Transaction rollbacked, transaction record cleared, and metrics updated
Distributed Transaction - Commit (Prepare to Fail on MM) Transaction rollbacked, metrics updated
Distributed Transaction - Commit (Prepare to Fail on RM) Transaction rollbacked, transaction record updated, metrics updated

Resilient Tests

Handling failures of components like VTGate, VTTablet, or MySQL during the commit or recovery steps.

Test Case Expectation
Distributed Transaction - Store Commit Decision fails on MM Transaction recovered based on transaction state.
Distributed Transaction - Prepared Commit fail on RM Transaction recovered and committed
Distributed Transaction - Delete Transaction Record fail Recovery and transaction record removed
Distributed Transaction - VTGate restart on commit received Transaction rolled back on timeout
Distributed Transaction - VTGate restart after transaction record created on MM Recovery and transaction rolled back
Distributed Transaction - VTGate restart after transaction prepared on a subset of RMs Recovery and transaction rolled back
Distributed Transaction - VTGate restart after transaction prepared on all RMs Recovery and transaction rolled back
Distributed Transaction - VTGate restart after storing the commit decision on MM Recovery and transaction committed
Distributed Transaction - VTGate restart after transaction prepared commit on a subset of RMs Recovery and transaction committed
Distributed Transaction - VTGate restart after transaction prepared commit on all RMs Recovery and transaction record removed

The failure on MM and RM includes the VTTablet and MySQL interuption cases.

System Tests

Tests Involving multiple moving parts such as distributed transactions with Reparenting (PRS & ERS), Resharding, OnlineDDL, and MoveTables.

Stress Tests

Tests will run conflicting transactions (single and multi-shard), and validate on error metrics related to distributed transaction failure.

Reliability tests

A continuous stream of transactions (single and distributed) will be executed, with all successful commits recorded along with the expected rows. The binary log events will be streamed continuously and validated against the ordering of the change stream and the successful transactions.

This test should run over an extended period, potentially lasting a few days or a week, and must endure various scenarios including:

  • Failure of different components (e.g., VTGate, VTTablets, MySQL)
  • Reparenting (PRS & ERS)
  • Resharding
  • Online DDL operations

Deployment Plan

The existing implementation has remained experimental therefore no compatibility guarantees will be maintained with the new design changes.

Monitoring

The existing monitoring support will continue as per the old design.

Future Enhancements

1. Read Isolation Guarantee

The current system lacks isolation guarantees, placing the burden on the application to manage it. Implementing read isolation will enable true cross-shard ACID transactions.

2. Distributed Deadlock Avoidance

The current system can encounter cross-shard deadlocks, which are only resolved when one of the transactions times out and is rolled back. Implementing distributed deadlock avoidance will address this issue more efficiently.

@harshit-gangal harshit-gangal added the Type: RFC Request For Comment label Jun 21, 2024
@deepthi
Copy link
Member

deepthi commented Jun 24, 2024

This is very well written. Everything I could think of on a first read has already been addressed.
One minor suggestion is to change this wording to be clearer (it has been used in 2 places):
Distributed Transaction - VTGate restart after transaction prepared on partial RMs -> Distributed Transaction - VTGate restart after transaction prepared on a subset of RMs

@GuptaManan100
Copy link
Member

Just some thoughts and questions -

With respect to Distributed Transaction Identifier (DTID) Generation. In proposal 3 why do we need a nano ID? If we are going to get the DTID from the MM, then can't it just use auto-incrementing numbers after the keyspace shard? Since the transaction will be written to the MM's mysql table for the creation to be successful. We can initialize the initial value with 1 more than the max available in the said table. This will prevent collisions even across restarts, reparents, or any other issues.

For Transaction Resolution Design, In proposal 2, we can get VTOrc to just run the Resolution code too. It is only going to entail calling RPCs against vttablet. If we create a common function in a package, we can get VTOrc to do it too. For example, we don't get VTOrc to tell vtctld to run PRS/ERS, it just runs it. The same concept here can be used.

For Prepared Transactions Connection Stability. I'm just summarizing what we already discussed before. If we decide to go ahead with Proposal 1, then we need to make sure that the Unix socket connections can't be dropped by MySQL. If for whatever reason MySQL drops a connection that is running a distributed transaction, any other write that was blocked on the same rows might go through, and then the distributed transaction's state would be unrecoverable.
There is a proper fix for read and write exclusion but it entails doing locking at a level higher than MySQL connection using a shadow table as implemented by dropbox. That has a significant performance penalty however.
Edit: ☝️ is essentially the same as proposed in the next section's first proposal too. We would just overload the shadow table for also read/write exclusion.

@systay
Copy link
Collaborator

systay commented Jun 28, 2024

Could you explain case 4? Why do we rollback the transactions on RM1 and RM2? There was no transaction preparation done before this, right?

@harshit-gangal
Copy link
Member Author

Could you explain case 4? Why do we rollback the transactions on RM1 and RM2? There was no transaction preparation done before this, right?

We still have open transactions so we should roll them back otherwise they will remain holding locks till transaction timeout is not achieved.

@GuptaManan100
Copy link
Member

Me and @harshit-gangal were looking at what we would need to do to integrate atomic transactions into SwitchTraffic and OnlineDDL workflows.

We found that both SwitchTraffic and OnlineDDL use the Query Rules stored in the QueryEngine field queryRuleSources.
SwitchTraffic updates this by changing the DeniedTables in ShardInfo and then asking the vttablets to refresh their state. OnlineDDL engine has the advantage that is also lives on the vttablet and doesn't require a RPC, it can make the changes by calling a local function.

Changes to these query rules govern how vttablet would run incoming queries. There is one gotcha here though, currently query rules don't impact the queries that have already run and are part of a transaction. So, when these transactions are committed, there is a possibility of going against the query rules even after they've been updated.
SwitchTraffic tries to address this race by trying to run LOCK TABLES twice giving the transactions some more time to finish running.

This is a problem for atomic transactions because once we have prepared a transaction and the transaction manager decides to commit it, then we cannot refuse. So, for example, if a transaction is in a prepared state, and then online ddl changes the rules and proceeds with the cutover, the transaction might not be able to commit because of a structural change to the underlying table.

Because both the operations are relying on query rules to communicate with the query server on what kind of queries to allow and deny, we think it's a good idea to use them for the transactions going into the prepared state too. Here is a timing diagram with the order of operations that we think will ensure that no atomic transaction interferes with the workflows -

sequenceDiagram
    participant SwitchTraffic
    participant vttablet
    participant onlineDDL

    Note over SwitchTraffic: Update Denied<br/>Tables in Vschema
    Note over onlineDDL: Register new Query Rules
    SwitchTraffic-)vttablet: RefreshState

    Note over vttablet: Refresh ShardInfo<br/>From Vschema
    Note over vttablet: Update Query Rules
    Note over vttablet: All new transactions while<br/>being Prepared will consult<br/>new query rules
    vttablet-)SwitchTraffic: Success

    SwitchTraffic-)vttablet: WaitForPrepared
    onlineDDL-)vttablet: WaitForPrepared
    Note over vttablet: Wait for all current set<br/>of prepared transactions<br/>to succeed.
    Note over vttablet: New transactions are<br/>guaranteed to be using<br/>the new query rules.
    vttablet-)SwitchTraffic: Success
    vttablet-)onlineDDL: Success


    Note over SwitchTraffic: Proceed
    Note over onlineDDL: Proceed
Loading

@GuptaManan100
Copy link
Member

GuptaManan100 commented Jul 31, 2024

Atomic Transactions with PRS and MySQL Restarts

Current state and the problems

I have been looking at the code that tries to redo the prepared transactions in case of a MySQL restart or PRS, etc, and have found a few shortcomings.
To start with, the order of operations that we follow when we promote a new primary has a race condition.

  1. We start by calling ChangeTabletType.
  2. This call sets MySQL to read-write.
  3. When we transition to the primary state, we open the query engine first as part of sm.connect in servePrimary.
  4. After this step we open the transaction engine, and it starts the prepareFromRedo function in a go routine.

Because we turn off MySQL read-only and open the query engine first, we can potentially have a write that goes through before the transaction engine has had a chance to try and redo the prepared transactions. This can cause the prepare to fail since the write might be incompatible with the ones that the prepared transaction is running.

This problem is also present when MySQL restarts.

  1. When MySQL restarts, all current connections return a failure and it triggers the codepath in checkMySQL.
  2. We enter the non-connected state that closes all engines including transaction engine.
  3. This code too then sets the wanted state to serving and when we transition, we end up calling servePrimary which has the same race described above ☝️.
  4. It is also worth noting that when MySQL restarts, it starts with super-read-only turned on, and its VTOrc that fixes this by calling UndoDemotePrimary.

Solutions that were tried but didn't work

Me and @harshit-gangal have talked about this issue and I've thought off and tested a few different solutions. These are some of the solutions I tried but didn't work (If this is not interesting to read, then you can skip this segment and go to the proposed solution) -

  1. One of the things that we thought we can do is that before we turn off read only in MySQL globally, we try to redo the prepared transactions and while doing so we turn off read only in that session. This would have resolved the race, because this would have ensured that no writes could have gone through until the prepared transactions were restored. This however had to be scrapped because read_only is not a session setting.
mysql [localhost:8033] {msandbox} (performance_schema) > set session read_only = 'false';
ERROR 1229 (HY000): Variable 'read_only' is a GLOBAL variable and should be set with SET GLOBAL
  1. MySQL allows running FLUSH TABLES WITH READ LOCK even in the super_read_only state and this takes a lock on all the tables. So like the previous solution before we turn super_read_only off, we can take run the mentioned command to lock all the tables, and then turn off read_only and proceed with the prepared transaction and then UNLOCK TABLES. This however doesn't work because FLUSH TABLES doesn't let you acquire a write lock, so even if we lock the tables, we can't write to them...
mysql [localhost:8033] {msandbox} (test) > INSERT INTO t1 values (234);
ERROR 1223 (HY000): Can't execute the query because you have a conflicting read lock
  1. At this point, I tried using a LOCK TABLES ... WRITE command instead of the flush tables command above, and I had high hopes that this would indeed work for us because lock tables acquires a write lock. But unfortunately this doesn't work either because as per MySQL docs -
If a session begins a transaction (for example, with START TRANSACTION), an implicit UNLOCK TABLES is performed, which causes existing locks to be released.

So even if we acquire the lock, we can't start the transaction to restore the state. Also we can't acquire the table lock after we start the transaction because again as per docs -

LOCK TABLES is not transaction-safe and implicitly commits any active transaction before attempting to lock the tables.

Proposed solutions that should work

  1. We will store the last time the transaction engine restored the prepared state. When a new connection is being created to MySQL, we can check the MySQL uptime (which will tell us when MySQL restarted) against the stored time of last restore to infer whether it is safe to allow this connection to run writes or should we delay it.
    This solution works because we are indirectly preventing any writes to MySQL until the transaction engine has been able to redo the prepared transactions by preventing any new connections from being created. There will be some performance penalty because we will have to do this check everytime we create a new connection, but it shouldn't be too bad because of connection pooling.

  2. Before we turn off read_only in MySQL, we do the following -
    a. Turn off super_read_only but leave read_only on. This will continue to prevent any writes from the users from going through.
    b. Use the dbaPool that has admin privileges to restore the prepared transactions.
    c. Turn read_only off and start all the other engines.
    This solution relies on no writes going through from the dbaPool. It is only meant to be used for replication-related queries and to kill queries. This is an invariant that we'll need to verify and in case there are DMLs using this pool, we can move them to the appPool instead.

Of the 2 proposed solutions, we like the second one more. It is less hacky and it feels like we are less likely to run into any unexpected issues with this fix.

We still need to discuss what should we do in case the redoing of transactions fails. It shouldn't happen with proposed fixes ☝️, but if it does, then what should we do? Should we panic the vttablet to prevent any writes? Or should we just setup metrics for this so that the user is alerted but allow the vttablets and other writes to function?

EDIT: We decided to go ahead with solution 2. The expectation is that the default mode that Vitess uses on replicas (super_read_only) is not being overridden. Atomic transactions are reliant on the guarantees provided by super_read_only and assume that all writes are being done using an application user that does not have admin privileges. Also, we should check that the durability policy is semi-sync when atomic transactions are being used.

In case of a failure, we want to allow vttablets to continue to take writes. We will add metrics around this so that users can get notified of these failures. We want to build vtctld commands to be able to see and resolve atomic transactions. These should then be used to implement the same functionality in vtadmin so that users can resolve transactions using the UI as well.

Task List

@GuptaManan100
Copy link
Member

GuptaManan100 commented Aug 1, 2024

Atomic Transactions with OnlineDDL

What Online DDL does right now

I have looked at the Online DDL code and it is only the cutover step that needs to coordinate with atomic transactions. As it stands now, Online DDL does the following steps during a cutover (More details available at https://vitess.io/blog/2022-04-06-online-ddl-vitess-cut-over/) -

  1. It waits until vreplication has finished the copy phase and the lag between the table and the shadow table is not that high.
  2. The first thing we do is that we start buffering of queries for the given table in question. This is done by using query rules that exist in query engine. These are described in more detail in RFC: Atomic Distributed Transactions #16245 (comment). This affects all the new queries that are coming in, but if there are open transactions which have already executed some DMLs against the table, then they can still commit. Also it can happen that the check for query rules for a query happened right before they were updated, so it might still run.
  3. Online DDL then wait for 100 milliseconds to allow for queries that would have passed the query rules check before they were updated to complete. This step is not crucial for correctness, it just prevents some user errors. It can still happen though that a user query doesn't run in this time frame and runs when the RENAME has already started. In which case it will momentarily block and on unblocking it would find that the table no longer exists.
  4. If the force cutover has been specified, then online DDL goes ahead and actively kills all the queries and transactions that are holding any locks against the table. This can have false positives too (code comment says so). To find the connections that are holding the locks, we are using information_schema.processlist, performance_schema.data_locks and information_schema.innodb_trx tables.
  5. Online DDL now acquires write locks on both the original and the sentry table.
  6. It then proceeds to start a go routine to perform the renames of the tables which gets blocked because the other connection is holding the locks.
  7. After that we get the MySQL position after the lock and wait for vreplication to finish processing all the updates till that point and then we stop vreplication.
  8. Then we unlock the tables and wait for the rename to complete. (original table to a sentry table, shadow table to original, and then sentry to shadow table, effectively swapping original and shadow table)
  9. At this point online ddl cutover is complete, we asynchronously reload the schema and reenable the writes that were previously buffered.

From the safety of online-ddl standpoint is concerned, the buffering and then acquiring locks on writes is sufficient to guarantee that we don't lose any writes that happen on the original table when we swap.
However, the force cutover step that kills the transactions holding locks is going to cause issues for atomic transactions. For an atomic transaction that has entered the prepared state, we cannot under any circumstances kill it, because if the transaction manager decides to commit it, then we have to abide by that decision.

Proposed changes

Basically after we start the buffering and before we kill the transactions, we'll just need to wait for the prepared pool to have finished all the transactions present at that point in time. And we need to do something special for transactions trying to enter the prepared state. We have two alternates described below. Then we should be safe to kill all the remaining transactions (atomic or not).

I'm hoping this will work quite nicely in terms of timing, because prepared transactions should really be committed/rollbacked pretty quickly. Transactions are in the prepared state only after the user has already run the commit command on vtgate, and we have completed the first part of 2pc. So if things are not broken (mostly they won't be), then it should be a matter of less than a second for it to receive the result of the decision (commit/rollback). So, unless something catastrophic happens that leave a transaction in the prepared state and we need the resolution to happen by a transaction watcher, the transactions in prepared state would be very short lived. Waiting is different for a transaction that the user has currently "open" (in the sense they haven't committed it), so the user can run more DMLs. We don't need to wait for them. They can be killed. Atomic transaction guarantee is based on the premise that prepared transactions can't fail, and we only enter the prepared state after the transaction has reached a conclusion from the user perspective (they have issued commit).

Another thing that we need to consider apart from waiting for all prepared transactions to go through, is whether we allow other transactions to enter the prepared state -

  1. The safest and simplest to implement solution would be to not allow any transaction to enter the prepared state while the online-ddl cutover is ongoing. Much like how we buffer the queries for the table, we can buffer all the requests for entering the prepared state. When we try to enter the prepared state, we just need to then check if the connection was killed or not. If it was then we fail the prepare. The disadvantage of this approach is that we will stall atomic transactions that are not affecting this table too. Even though the cutover is a pretty fast process, the delay will be visible in the transaction commit latencies.
  2. The other option that we have is that we check which tables a transaction is accessing when it enters the prepared state, and we check the query rules again. If we have a query rule that asks to buffer or reject queries for a table that is being used, then we fail the prepare request. This approach allows us to continue preparing transactions that don't affect the table that is part of the online DDL. We will have to ensure that the killing logic for transactions doesn't accidentally end up killing any of the new prepared transactions.

Other Online DDL variants (ghost and pt-osc)

The above discussion was all limited to online DDL being run using Vreplication. This is the only mode in which we have any control over the cutover phase. Both ghost and pt-osc are running as separate binaries, so it is going to be very hard to collaborate with either of them. We need to ensure that neither of them is killing transactions during their cutover phase. If they are, then it is going to be impossible to guarantee atomic transactions with them. Even if they aren't killing transactions, it might be very hard for them to find a window for running the cutover if the system is under enough write load because they won't be able to acquire the locks on the table.

UPDATE: ghost has been deprecated and pt-osc has always been experimental. #15692 We can just state that their limitations with atomic transactions and move on.

UPDATE: We decided to go ahead with the second solution. We will need to make sure we handle all races correctly because it could happen that the check for query rules in prepare state happens before we update the rules, but it takes some time for the transaction to show up in the prepared pool, so we could potentially have a prepared transaction not in the list of transactions that we wait for, but still using the table in question. To handle this, we need to be defensive in what we kill in the online DDL segment. We need to consult the prepared transactions pool to make sure we don't kill something that is prepared.

Task List

  • Implement solution 2 - Atomic Transactions handling with Online DDL #16585
  • Online DDL Kill logic should be defensive wrt to what it kills. Check prepared pool to make sure we don't kill prepared transactions.
  • Check and verify races are handled. - Atomic Transactions handling with Online DDL #16585
  • Verify the additional load on vttablets because of atomic transactions with online DDL doesn't cause it to OOM (running atomic transactions will slow down online ddl so it could cause more queries to be buffered, and thus an OOM)

@GuptaManan100
Copy link
Member

GuptaManan100 commented Aug 7, 2024

Atomic Transactions with MoveTables and Reshard

Order of operations for MoveTables and Resharding

The only operation that we are concerned about wrt to atomic transactions is the switch write. The following is the order of operations for Switch Write -

  1. Read the workflow state that entails reading the shard and tablet records to find the shards that have already switched writes.
  2. Run validations that the list of shards between the source and destination matches.
  3. Lock the keyspace for both the source and target.
  4. We check the metadata for sequence tables and then whether we already have the journal entry. If we do, then a previous switch write would have already run a few of the following steps, so we skip them.
  5. We read information about existing streams running on the shards, and stop them.
  6. Next we stop writes on the source and this is the step that concerns atomic transactions. It is implemented slightly differently for both Resharding and MoveTables -
    a. For MoveTables, we want to disallow writes to only the tables involved. We use DeniedTables in ShardInfo to accomplish this. Both vtgate and vttablet use the DeniedTables. In vtgate, it is used in the keyspaceState to generate keyspace events to know when to stop buffering. On vttablets, it is used to set the queryRules as described previously in RFC: Atomic Distributed Transactions #16245 (comment). These query rules are used to reject any further DML against the tables and the returned error starts buffering on vtgate. After we update the topo server with the new DeniedTables, we make all the vttablets refresh their topo to ensure that they've registered the change. We also call RebuildSrvVschema which causes vtgates watcher on the srv vchema to cause it to recheck the shard states. The updation of denied tables on vtgate is however not synchronous like vttablets.
    b. For Resharding, we want to disallow all writes for the given shard range, so we set DisableQueryService in the srvVschema. This field too is something that vttablets monitor. And like before, after we set the disable query service field, we get vttablets to refresh the information from topo. The field is used to set tabletControls in the tablet state which is eventually used to transition the tablet into a non-serving state, effectively blocking any queries. In StartRequest , which encapsulates all DMLs and transactional calls etc, checks for this serving state before allowing any query.
  7. Once we have prevented new writes, we get the MySQL positions of the primary tablets.
  8. Next, for MoveTables, we try Locking the tables to ensure no existing write is pending. We do this twice to allow for races between the denied tables being set and some writes still pending. (This part I was a little confused about. Shouldn't we be getting the primary position after we have executed the lock on the tables?)
  9. We wait for vreplication to catch up to the position we stored previously.
  10. Next we migrate all the streams (information for which we stored previously). Similarly we handle sequences and then start reverse vreplication.
  11. We create the journals, and allow writes on the target. This is the exact opposite of what we do for stopping writes. i.e. we remove the denied tables (for MoveTables) and DisableQueryService (for Resharding)
  12. Finally, we update the routing rules to allow vtgates to direct traffic to the target.

Proposed Changes

We propose a similar fix to the one proposed for onlineDDL in #16245 (comment).

On the vttablets we can always tell if a reshard or moveTables is in progress because of the fields that they set (DisableQueryService, and DeniedTables respectively). So, after we have updated the said field, we'll just need to wait for the prepared pool to have finished all the transactions present at that point in time. And like before, we will have to prevent new prepared transactions from being prepared by consulting these fields -

  1. For MoveTables, the change we make for online DDL will work too, because it updates the query rules.
  2. For Reshard, we will have to check the tabletControls field to check if resharding is in progress.

The same race conditions that we need to handle for online DDL stand here too. One thing that is different though, is that we'll have to introduce a new RPC for getting vtctld to wait for the prepared transactions to finish on the vttablet side. In the case of OnlineDDL the executor was also running on the vttablet so no RPC was required. But in this case we will need to implement something like WaitForPreparedTransactionsToRoll.

Another important consideration is that the way that the code is written currently, for CommitPrepared (function we call to commit a prepared transaction), also goes through the StartRequest flow. If we don't fix this, we would end up in a situation, wherein Resharding is waiting for prepared transactions to roll (as stated above), but those transactions won't be able to be committed because startRequest will continue to block the CommitPrepared call. We need to allow running CommitPrepared even in the case where the tablet is not serving and then check for correctness in that call later.

Update: We will go ahead with the proposed fix. We discussed a few other considerations that we might run into with Resharding and CommitPrepared not being allowed in the non-serving state. Me and Harshit will talk about this more after looking at the code.
Another important thing to note is that atomic transactions won't work for shard-by-shard migration or tenant-based migration. If the unmanaged tablet being used for them is connected to the MySQL instance with a TCP connection, then it will reject any call to prepare the transaction.

Update: We went with an alternate way of making atomic transactions work with Resharding. Instead of doing the changes proposed ☝️, we instead went ahead and changed RefreshState RPC that Reshard uses. In that call, when the tablet sees that it should disable query service, it first turns off the prepared pool from accepting any new prepared transactions and waits for the existing prepared transactions to conclude. This ensures that Reshard only proceeds when all the prepared transactions have been resolved and there are no prepared transactions that are pending.

Task List

@GuptaManan100
Copy link
Member

Here are the state transitions for an atomic transaction on a single resource manager as me and @harshit-gangal discussed -

---
title: State Changes for each participant in an atomic transaction
---
stateDiagram-v2
    classDef action font-style:italic,font-weight:bold,fill:white
    state "Redo Prepared Logs" as rpl
    state "Delete Redo Logs" as drl
    state "Check Type of Failure" as ctf

    state "Prepared" as p
    state "Failure" as f
    state commitRPC <<choice>>
    state failureType <<choice>>
    state redoSuccess <<choice>>

    [*] --> p: Commit redo logs<br>Prepare Call Succeeds
    drl --> [*]
    p --> drl:::action: Receive Rollback<br>Rollback transaction
    p --> commitRPC: Receive CommitRPC
    commitRPC --> drl: If commit succeeds
    ctf --> failureType
    commitRPC --> ctf:::action: If commit fails
    failureType --> f: Unretriable failure<br>Wait for user intervention
    f --> drl: User calls ConcludeTransaction
    failureType --> rpl:::action: Failure because of MySQL restart
    rpl --> redoSuccess: Check if redo prepare fails
    redoSuccess --> p: Redo prepare succeeds
    redoSuccess --> ctf: Redo prepare fails
Loading

@GuptaManan100
Copy link
Member

  • As an extra TODO item, we need to also wait for atomic transactions to be rolled when a user externally changes the query rules.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: RFC Request For Comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants