Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: VReplication based SplitClone #4604

Closed
sougou opened this issue Feb 9, 2019 · 11 comments
Closed

RFC: VReplication based SplitClone #4604

sougou opened this issue Feb 9, 2019 · 11 comments
Milestone

Comments

@sougou
Copy link
Contributor

sougou commented Feb 9, 2019

The new VReplication feature introduces many new flexibilities. One of the goals of this design is to allow for “resharding from anything to anything, from any key to any key”. This means that we need the ability to backfill/initialize the target data just like SplitClone did. However, the SplitClone commands were built for predefined and more rigid workflows. Additionally, these workflows had to accommodate statement based replication, which was functionally limited.

There are also other limitations:

  • Inability to support text-based primary keys.
  • If interrupted, it has to start from the beginning
  • There is always a worry that the final sync phase may take too long if too many events have occurred from the time the original copy has started.
  • Efficiency: It’s an external process that needs to pull data from source and target to perform comparisons, and then push data into target. This is potentially 3x the size of the database being transferred around across multiple hops.

We’d like to propose a new and simpler approach to performing the backfill functionality instead. This is based on the fact that VReplication is based on RBR and can do smarter things with the data.

High level design

The split clone functionality will become part of the existing VReplication functionality itself. It will just be a different state.

In the old way, a user would run SplitClone. At the end of its work, SplitClone will setup VReplication.

In the new way, a VReplication record (or many) will be created right in the beginning. There will be two differences:

  1. It will have no starting position, which will get set when the Copying phase begins.
  2. State will be “Copying”.

There will be two sub-states while in this mode:

  1. Copy state: In this state the target tablet will request a dump of all the rows through a streaming select statement, and will bulk insert them as they come. For large tables, this will be performed in chunks. The chunk size is TBD. Most likely, it will be time-based.
  2. Catch-up state: when a copy stops, VReplication will be started, and will perform a catch-up. However, it will only update the rows that have been copied so far. Once it’s caught up, we switch back to the Copy state.

Once all the copy is done. VReplication enters the traditional ‘Running’ state.

The biggest advantage of this approach is operational simplicity. Additionally, it’s more in line with the query-based model of VReplication. The query applied by VReplication is the same one used for performing the copy. And there is no row-by-row comparison any more; Once a row has been copied, it’s kept up-to-date by VReplication.

There are a few corner cases to be dealt with. Those details are covered below.

Details

The detailed steps of the Copying phase are as follows:

  • VReplicate into existing subset of tables/rows until lag is negligible. This is the “catch-up” state. The next few steps are the transition into the “Copy” state.
  • Stop the source tablet’s MySQL Replication.
  • Wait for VReplication catch-up and stop, without changing state in the vreplication table. The sub-states are in-process only.
  • Initiate copy of next set of rows from current table (or next table), but don’t process.
  • Verify source’s MySQL replication position hasn’t changed, and start the Replication.
  • Process streaming rows from the copy request. Due to repeatable read guarantee, all rows will be as of the time when replication was stopped.
  • Stop stream at “stop condition”.
  • Repeat.

New _vt.copy_state table

This table will keep track of the copy progress for each table. It will have a foreign key into vreplication.id, and there will be one entry per table to be copied. The state will be: “copied”, “in-progress”, or “not-started”. If the state is “in-progress”, the value (or set of values) of the last primary key is saved in the position column.

The copy is expected to be in primary key order of the target table.

Choosing the source

This new scheme does not cause replication to stop for substantial periods of time. This means that it’s not strictly necessary to perform these operations from an rdonly replica. Also, there is no need to watch out for min healthy replicas.

This simplifies the config parameters. The existing set of vreplication sources can be equally used for the copy phase also. The only exception is that masters will automatically be excluded because we don’t want to stop them from accepting write traffic.

VReplicating into existing subset

It’s important to note that rows greater than last PK may already be present in the target. This can happen if VReplication can be performing a merge, in which cases there will be multiple streams copying different subsets of the table simulatneously.

Additionally, the VReplication may also be performing rollups in stead of copies. In such cases, deletes may actually be rewritten as updates. The operations below must work correctly under all these circumstances.

For tables in the “copied” state, vreplication behaves just like in the “Running” state.
For tables in the “not-started” state, vreplication ignores all events.
For tables in the “in-progress” state, special processing is required as follows:

insert

For insert, we have to make sure that we only insert if the primary key value is less than or equal to the current position. Vitess cannot reliably perform this comparison, especially for text columns. In order to achieve this, we generate an insert as follows:

insert into t(pk1, pk2, col) select 'pk1', 'pk2', ‘col’ where 'pk1' <= 'pospk1' and 'pk2' <= 'pospk2'

This becomes a conditional insert that succeeds only if the primary key condition is satisfied.

delete

The delete will need to add the additional where clause 'pk1' <= 'pospk1' and 'pk2' <= 'pospk2'. This is important for situations where VReplication is performing aggregations. We only want to apply changes to rows that are within the range of rows that have been copied so far.

update

Update has two cases:
If the primary key value has not changed, then it’s the original update statement, but with the additional where clause 'pk1' <= 'pospk1' and 'pk2' <= 'pospk2'.
If the primary key value has changed, then it’s delete of the old row and insert of the new row as per the insert rules. Doing this if the pk value hasn’t changed will also work, but it may not be as efficient as a direct update.

Stop the source tablet’s MySQL Replication.

This will be a new request made by the VReplication tablet. There may already be a StopSlave API for tablet manager. This new request has the following differences:
The command will return the current stopped position

  • It will also start a timer. If the replication is not restarted (within X time), it will be automatically restarted.
  • If the VReplication tablet crashes after the stop command, the replica will eventually resume replication due to the timer.

Copy phase

The copy command is a streaming request where the query is as follows: select...from t where 'pk1 >= 'pospk1' and 'pk2' > 'pospk2'.

We’ll configure the copy phase to stop after certain time, say one hour. We can later extend this flexibility to stop after N rows or X bytes.

The copy phase will perform bulk inserts. Based on research done so far, a single-threaded bulk insert is one of the fastest ways to populate data into MySQL. There doesn’t seem to be an advantage to performing multi-threaded inserts.

It’s still not known if we can get higher throughput if we parallelized inserts for more than one table. However, if a single table itself goes fast enough, there may be no need to do this parallelization. We also need to be mindful that the MySQL replicas need to keep up. We can revisit this after observing a few real-life scenarios.

In the case of aggregations, the bulk insert statement will be different:

insert into t(pk1, pk2, sumcol)
  select '1', '2', '3'
  union all select '4', '5', '6'
on duplicate key update
  sumcol = sumcol+values(sumcol)

It remains to be seen if this composite statement will perform better than individual inserts.

Conclusion

Beyond the fact that this design is much simpler than SplitClone, we believe that this will be more efficient and reliable. Reiterating the advantages:

  • Support for text-based primary keys: Because we delegate comparisons to mysql, we can support any column type.
  • Continue where we left off: If a copy is interrupted, we continue from where we left off instead of starting from the beginning.
  • No comparisons: Once a row is copied, we keep it up-to-date. So, there’s no need to compare. The comparison based update has often led to obscure dup key errors in the SpliClone phase.
  • Lower network load: We directly pull the data into the vttablet that needs to perform the insert. This eliminates all the bandwidth used by SplitClone to pull the rows for comparison.
  • No special provisioning of rdonly: We stop replication only until we issue the select statements. This means that we don’t need to provision additional rdonly replicas for this.
  • No fear of the final sync phase taking too long: If splitting multiple terabytes of data, there is a fear that the final sync phase may take too long because of how long it took to copy the original data. Too much might have changed since the copy started, and if binlogs start to expire before this phase completes, we won’t be able to start VReplication. In the new scheme, because we perform catch up after every copy phase, we are confident that we’ll never get into such a state.

There is one disadvantage: Each target performs its own scan of the source. However, this trade-off seems worth it because of all the above gains.

For now, the SplitDiff is expected to remain the way it is. We still need to think about how it will work for aggregations.

@dweitzman
Copy link
Member

Slick design. +1 to splits that don't depend consistent reads of entire shards or running a separate vtworker process!

@sougou
Copy link
Contributor Author

sougou commented Feb 17, 2019

As I've been drilling down, I encountered a few issues with the copy phase that require some modifications to the original design.

As an example, let's say the source table has pk1,pk2 as primary keys, and let's say the materialization expression is: select a, hour(b) hb, sum(c) sc from t group by a, hb.
I was initially planning to issue a query like this to fetch rows for copy: select a, hour(b) hb, sum(c) sc from t group by a, hb order by a, hb where a >= :pos_a and hb > :pos_hb

Issues:

  1. The materialization expression does not scan the source table in pk order. Although in the above case there will be no duplicates, there other are scenarios where duplicates are possible. This means that there is no reliable stopping point for results of such queries.
  2. The materialization query may take too long to execute. It will be inefficient to re-issue them if interrupted.
  3. We currently push-down the computation work of functions like hour etc. to the source tablet. This won't work well if the select expression also performs aggregations. This will require the vttablet to fetch the raw rows, apply the function, aggregate and then send the final row.

Options:

  1. Ignore the problem: having dups in the result probably means that the data wasn't modeled correctly. So, why support bad behavior? This will take care of the first problem, but not the second one.
  2. The source tablet can try to ensure that it applies changes only after receiving all rows with identical PK (of target). This is pretty much a no-go because we can't reliably compare pk columns for equality. Also, what if an entire chunk has all identical PK values?
  3. We scan the source table in primary key order and apply the aggregation on the target side as the results come. This guarantees that we'll get a reliable stopping point if interrupted. This addresses the first two problems and most of the third one (because source only has to compute the expressions and not aggregate). The downside of this option is that it's more write heavy (only for aggregations). However, it's more stable overall.

Drilling down on option 3, there are a few ways to do this. The most efficient and simplest option is to have the source tell us the last pk of the chunk it sent, which will be stored and sent back as part of the next request.

To completely solve problem number 3, It's better that the source just send only its column values, and we should have the target perform all computational work. This will essentially allow us to push this work down into the mysql itself. This approach will additionally allow us to support any function that mysql supports.

In other words, the request we'll send to the source will become:

query: select a, b, c from t
position: last_pk1, last_pk2

On the target, the apply statement will look like this:

insert into t(a, hb, sc) values
  select '1', from_unixtime(1234, '%Y%m%d%H'), 3
on duplicate key update
  sc = sc+values(sc)

There will also be a subtle change on the vplayer that applies the statements: The additional where condition will reference the primary key values of the source table.

@sougou sougou changed the title CFP: VReplication based SplitClone RFP: VReplication based SplitClone Feb 18, 2019
@mpawliszyn
Copy link
Collaborator

I think you will need to do the gtid/vreplication dance in order to get a table ready for the next chunk. The reason being unique indexes on that table. If we allow foreign keys then we might need the same gtid db-wide.

@sougou
Copy link
Contributor Author

sougou commented Mar 1, 2019

Yeah. The switch between copy and vreplication is based on gtid stopping position. Is that what you meant?

As for foreign keys, the best approach (as recommended by @dweitzman ) is to disable the checks until we finish the copying.

@mpawliszyn
Copy link
Collaborator

Even when doing the next chunk I think you will want to stop replication and sync up the previous chunk before doing the copy. Disabling checks did not work well at all when we tried that for secondary unique indexes.

I really like how even under extreme load each chunk ratchets you towards a split. It's like chunked offline clone.

Andres walked us through it this morning. The only other thing I am thinking about is that this chunked method should have a compare mode too. In case the copy failed, you don't want to throw all your work away and have to start from an empty table again.

@sougou
Copy link
Contributor Author

sougou commented Mar 1, 2019

Right. The intent is do a catchup before doing the next chunk.

I believe the trouble you were having with secondary and foreign keys was because of the dirty copy that split clone was doing.

The new scheme does not do a dirty copy, and every copied row remains consistent with the rest of the rows copied so far. I haven't been able to think of a situation where consistency can be broken.

As for the last issue, the copy is made by pulling the rows as a streaming query and updates are performed in chunks (likely 32K each). If something fails, the restart will first perform a catch-up, and then resume copy. There's not much difference between a voluntary or involuntary stopping of a copy.

@dweitzman
Copy link
Member

The vreplication player could potentially do set foreign_key_checks = 0 for just the sessions that are executing vreplication while the rest of the db sessions have enforcement still enabled.

Same for CopySchemaShard and ApplySchema. I just sent out #4696 for ApplySchema, since we already have the code sitting around. I haven't bothered to implement it yet for CopySchemaShard because we have to turn off foreign key checks globally anyway to satisfy SplitClone, which uses QueryService so I can't easily override the foreign key checks setting on a per-connection basis.

sougou added a commit to planetscale/vitess that referenced this issue Mar 16, 2019
This new way, as described in vitessio#4604, is more flexible and will
make it easer to support copy functionality.

Signed-off-by: Sugu Sougoumarane <[email protected]>
@sougou
Copy link
Contributor Author

sougou commented Apr 5, 2019

The initial cut for SplitClone and VerticalSplitClone have been committed to https://github.com/planetscale/vitess/tree/ss-vcopy. This branch has a few commits ahead of master. So, it will take some time to get this submitted. However, you're welcome to try it out and report any early issues or bugs.

Since this is an offline process, it should not affect prod traffic. So, it should be safe to try out on large prod data. The main input I'm looking for is: how fast does the copy finish.

For VerticalSplitClone, the command looks like: ./lvtctl.sh VerticalSplitClone commerce customer 'customer,corder'
For SplitClone, the command looks like: ./lvtctl.sh SplitClone customer '0' '-80,80-'.

There is currently no validation of input parameters. This will be added in subsequent commits.

Once you've issued the commands, the copy will start immediately. You can monitor the copy by watching these two tables: vt.vreplication and vt.copy_state. The vreplication state will be Copying. Once the copy finishes, it will go into the Running state. After this, you should run SplitDiff to see if there are any discrepancies.

Note that the new SplitClone will work with text columns also.

The Migrate commands will also work as before.

@tirsen
Copy link
Collaborator

tirsen commented Apr 9, 2019

Will this support consistent snapshot based cloning? We're not in a position where we can stop MySQL replication.

@sougou
Copy link
Contributor Author

sougou commented Apr 9, 2019

Yes. I made improvements to the algorithm, but forgot to update here. The new scheme does work using snapshot:

On the source:

  • Lock the table.
  • Obtain the GTID.
  • Issue the select, obtain field metadata, and send it along with the GTID.
  • Unlock the table.
  • Stream rows.

On the target:

  • Stop replication.
  • Request a read, wait for field metadata and GTID.
  • Fast-forward replication till GTID and stop.
  • Apply streaming rows.

@sougou sougou changed the title RFP: VReplication based SplitClone RFC: VReplication based SplitClone May 4, 2019
@sougou
Copy link
Contributor Author

sougou commented Sep 15, 2020

This is now well beyond done, with new workflows, commands, etc.

@sougou sougou closed this as completed Sep 15, 2020
@askdba askdba added this to the v8.0 milestone Oct 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants