Skip to content

Commit

Permalink
cdc: fix old value config glitch when changefeeds with different sett…
Browse files Browse the repository at this point in the history
…ings connect to one region (tikv#9515) (tikv#9565)

cherry-pick tikv#9515 to release-4.0
You can switch your code base to this Pull Request by using [git-extras](https://github.com/tj/git-extras):
```bash
# In tikv repo:
git pr tikv#9565
```

After apply modifications, you can push your change to this PR via:
```bash
git push [email protected]:ti-srebot/tikv.git pr/9565:release-4.0-927e36f952f8
```

---

<!--
Thank you for contributing to TiKV!

If you haven't already, please read TiKV's [CONTRIBUTING](https://github.com/tikv/tikv/blob/master/CONTRIBUTING.md) document.

If you're unsure about anything, just ask; somebody should be along to answer within a day or two.

PR Title Format:
1. module [, module2, module3]: what's changed
2. *: what's changed

If you want to open the **Challenge Program** pull request, please use the following template:
https://raw.githubusercontent.com/tikv/.github/master/.github/PULL_REQUEST_TEMPLATE/challenge-program.md
You can use it with query parameters: https://github.com/tikv/tikv/compare/master...${you branch}?template=challenge-program.md
-->

### What problem does this PR solve?

Problem Summary:
When two TiCDC changefeeds connect to the same region, but one changefeed requires old value but the other doesn't, the one that doesn't need old value still receives old value.

### What is changed and how it works?

What's Changed: Now we record the old value setting in each `Downstream` and filters the old value appropriately before sinking the events. 

### Related changes

- Need to cherry-pick to the release branch

### Check List <!--REMOVE the items that are not applicable-->

Tests <!-- At least one of them must be included. -->

- Unit test


### Release note <!-- bugfixes or new feature need a release note -->
- Fix old value config glitch when changefeeds with different settings connect to one region
  • Loading branch information
ti-srebot authored and gengliqi committed Feb 19, 2021
1 parent 80d0ae9 commit 1333fa0
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 16 deletions.
20 changes: 15 additions & 5 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub struct Downstream {
region_epoch: RegionEpoch,
sink: Option<BatchSender<CdcEvent>>,
state: Arc<AtomicCell<DownstreamState>>,
enable_old_value: bool,
}

impl Downstream {
Expand All @@ -92,6 +93,7 @@ impl Downstream {
region_epoch: RegionEpoch,
req_id: u64,
conn_id: ConnID,
enable_old_value: bool,
) -> Downstream {
Downstream {
id: DownstreamID::new(),
Expand All @@ -101,6 +103,7 @@ impl Downstream {
region_epoch,
sink: None,
state: Arc::new(AtomicCell::new(DownstreamState::default())),
enable_old_value,
}
}

Expand Down Expand Up @@ -362,13 +365,20 @@ impl Delegate {
self.region_id,
change_data_event,
);
for i in 0..downstreams.len() - 1 {
for i in 0..downstreams.len() {
if normal_only && downstreams[i].state.load() != DownstreamState::Normal {
continue;
}
downstreams[i].sink_event(change_data_event.clone());
let mut event = change_data_event.clone();
if !downstreams[i].enable_old_value && self.txn_extra_op == TxnExtraOp::ReadOldValue {
if let Some(Event_oneof_event::Entries(ref mut entries)) = event.event {
for entry in entries.mut_entries().iter_mut() {
entry.mut_old_value().clear();
}
}
}
downstreams[i].sink_event(event);
}
downstreams.last().unwrap().sink_event(change_data_event);
}

/// Install a resolver and return pending downstreams.
Expand Down Expand Up @@ -802,7 +812,7 @@ mod tests {
let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector);
let request_id = 123;
let mut downstream =
Downstream::new(String::new(), region_epoch, request_id, ConnID::new());
Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true);
downstream.set_sink(sink);
let mut delegate = Delegate::new(region_id);
delegate.subscribe(downstream);
Expand Down Expand Up @@ -936,7 +946,7 @@ mod tests {
let rx = BatchReceiver::new(rx, 1, Vec::new, VecCollector);
let request_id = 123;
let mut downstream =
Downstream::new(String::new(), region_epoch, request_id, ConnID::new());
Downstream::new(String::new(), region_epoch, request_id, ConnID::new(), true);
let downstream_id = downstream.get_id();
downstream.set_sink(sink);
let mut delegate = Delegate::new(region_id);
Expand Down
20 changes: 10 additions & 10 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1226,7 +1226,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 1, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1236,7 +1236,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 1);

// duplicate request error.
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 2, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1260,7 +1260,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 1);

// Compatibility error.
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1314,7 +1314,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand All @@ -1339,7 +1339,7 @@ mod tests {

// Register region 2 to the conn.
req.set_region_id(2);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
ep.run(Task::Register {
request: req.clone(),
downstream,
Expand Down Expand Up @@ -1372,7 +1372,7 @@ mod tests {
let conn_id = conn.get_id();
ep.run(Task::OpenConn { conn });
req.set_region_id(3);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 3, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down Expand Up @@ -1439,7 +1439,7 @@ mod tests {
let mut req = ChangeDataRequest::default();
req.set_region_id(1);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
let downstream_id = downstream.get_id();
ep.run(Task::Register {
request: req.clone(),
Expand Down Expand Up @@ -1473,7 +1473,7 @@ mod tests {
}
assert_eq!(ep.capture_regions.len(), 0);

let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch.clone(), 0, conn_id, true);
let new_downstream_id = downstream.get_id();
ep.run(Task::Register {
request: req.clone(),
Expand Down Expand Up @@ -1516,7 +1516,7 @@ mod tests {
assert_eq!(ep.capture_regions.len(), 0);

// Stale deregister should be filtered.
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id);
let downstream = Downstream::new("".to_string(), region_epoch, 0, conn_id, true);
ep.run(Task::Register {
request: req,
downstream,
Expand Down
10 changes: 9 additions & 1 deletion components/cdc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use grpcio::{
use kvproto::cdcpb::{
ChangeData, ChangeDataEvent, ChangeDataRequest, Compatibility, Event, ResolvedTs,
};
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use protobuf::Message;
use security::{check_common_name, SecurityManager};
use tikv_util::collections::HashMap;
Expand Down Expand Up @@ -307,6 +308,7 @@ impl ChangeData for Service {
let recv_req = stream.for_each(move |request| {
let region_epoch = request.get_region_epoch().clone();
let req_id = request.get_request_id();
let enable_old_value = request.get_extra_op() == TxnExtraOp::ReadOldValue;
let version = match semver::Version::parse(request.get_header().get_ticdc_version()) {
Ok(v) => v,
Err(e) => {
Expand All @@ -316,7 +318,13 @@ impl ChangeData for Service {
semver::Version::new(0, 0, 0)
}
};
let downstream = Downstream::new(peer.clone(), region_epoch, req_id, conn_id);
let downstream = Downstream::new(
peer.clone(),
region_epoch,
req_id,
conn_id,
enable_old_value,
);
scheduler
.schedule(Task::Register {
request,
Expand Down
89 changes: 89 additions & 0 deletions components/cdc/tests/integrations/test_cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,92 @@ fn test_old_value_basic() {
event_feed_wrap.as_ref().replace(None);
suite.stop();
}

#[test]
fn test_old_value_multi_changefeeds() {
let mut suite = TestSuite::new(1);
let mut req = suite.new_changedata_request(1);
req.set_extra_op(ExtraOp::ReadOldValue);
let (req_tx_1, event_feed_wrap_1, receive_event_1) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx_1 = req_tx_1
.send((req.clone(), WriteFlags::default()))
.wait()
.unwrap();

req.set_extra_op(ExtraOp::Noop);
let (req_tx_2, event_feed_wrap_2, receive_event_2) =
new_event_feed(suite.get_region_cdc_client(1));
let _req_tx_2 = req_tx_2.send((req, WriteFlags::default())).wait().unwrap();

sleep_ms(1000);
// Insert value
let mut m1 = Mutation::default();
let k1 = b"k1".to_vec();
m1.set_op(Op::Put);
m1.key = k1.clone();
m1.value = b"v1".to_vec();
let ts1 = suite.cluster.pd_client.get_tso().wait().unwrap();
suite.must_kv_prewrite(1, vec![m1], k1.clone(), ts1);
let ts2 = suite.cluster.pd_client.get_tso().wait().unwrap();
suite.must_kv_commit(1, vec![k1.clone()], ts1, ts2);
// Update value
let mut m2 = Mutation::default();
m2.set_op(Op::Put);
m2.key = k1.clone();
m2.value = vec![b'3'; 5120];
let ts3 = suite.cluster.pd_client.get_tso().wait().unwrap();
suite.must_kv_prewrite(1, vec![m2], k1.clone(), ts3);
let ts4 = suite.cluster.pd_client.get_tso().wait().unwrap();
suite.must_kv_commit(1, vec![k1], ts3, ts4);
let mut event_count = 0;
loop {
let events = receive_event_1(false).events.to_vec();
for event in events.into_iter() {
match event.event.unwrap() {
Event_oneof_event::Entries(mut es) => {
for row in es.take_entries().to_vec() {
if row.get_type() == EventLogType::Prewrite {
if row.get_start_ts() == ts3.into_inner() {
assert_eq!(row.get_old_value(), b"v1");
event_count += 1;
} else {
assert_eq!(row.get_old_value(), b"");
event_count += 1;
}
}
}
}
other => panic!("unknown event {:?}", other),
}
}
if event_count >= 2 {
break;
}
}

event_count = 0;
loop {
let events = receive_event_2(false).events.to_vec();
for event in events.into_iter() {
match event.event.unwrap() {
Event_oneof_event::Entries(mut es) => {
for row in es.take_entries().to_vec() {
if row.get_type() == EventLogType::Prewrite {
assert_eq!(row.get_old_value(), b"");
event_count += 1;
}
}
}
other => panic!("unknown event {:?}", other),
}
}
if event_count >= 2 {
break;
}
}

event_feed_wrap_1.replace(None);
event_feed_wrap_2.replace(None);
suite.stop();
}

0 comments on commit 1333fa0

Please sign in to comment.