Skip to content
This repository has been archived by the owner on Mar 14, 2023. It is now read-only.

Commit

Permalink
Update with review comment (3)
Browse files Browse the repository at this point in the history
electrum.rs
 - Simplify reorg detection logic.
 - Remove reorg height from Reorg error type. Add display comment to specify
 how to handle reorg

 main.rs
 - Use the latest API to efficiently discover new txids to fetch. and inflate
 ChainGraph changeset.
 - Move common code between Sync and Scan into new function `complet_update`.
  • Loading branch information
rajarshimaitra committed Dec 22, 2022
1 parent 21592c6 commit 837b0e7
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 112 deletions.
48 changes: 20 additions & 28 deletions bdk_electrum_example/src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ use electrum_client::{Client, Config, ElectrumApi};
#[derive(Debug)]
pub enum ElectrumError {
Client(electrum_client::Error),
Reorg(u32),
Reorg,
}

impl core::fmt::Display for ElectrumError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ElectrumError::Client(e) => write!(f, "{}", e),
ElectrumError::Reorg(height) => write!(f, "Reorg detected at height : {}", height),
ElectrumError::Reorg => write!(
f,
"Reorg detected at sync time. Please run the sync call again"
),
}
}
}
Expand Down Expand Up @@ -119,11 +122,12 @@ impl ElectrumClient {
.insert_checkpoint(BlockId {
height: tip_height,
hash: tip_hash,
}).is_err() {
// This means our existing chain tip has been reorged out.
return Err(ElectrumError::Reorg(tip_height));
}

})
.is_err()
{
// There has been an reorg since the last call to `block_header` in the loop above at the current tip.
return Err(ElectrumError::Reorg);
}

let mut keychain_index_update = BTreeMap::new();

Expand Down Expand Up @@ -160,8 +164,7 @@ impl ElectrumClient {
if let Err(err) = sparse_chain.insert_tx(txid, index) {
match err {
InsertTxErr::TxTooHigh => {
/* We should not encounter this error as we ensured TxHeight <= tip_height */
unreachable!();
unreachable!("We should not encounter this error as we ensured TxHeight <= tip_height");
}
InsertTxErr::TxMoved => {
/* This means there is a reorg, we will handle this situation below */
Expand All @@ -179,25 +182,14 @@ impl ElectrumClient {
keychain_index_update.insert(keychain, last_active_index);
}

// To detect reorgs during syncing we re-apply last known
// 20 blocks again into sparsechain
// TODO: Handle reorg case here, so user don't need to handle it manually.
let (tip_height, _) = self.get_tip()?;
let reorged_headers = self
.block_headers((tip_height - 20) as usize, 21)?
.headers
.into_iter()
.map(|header| header.block_hash())
.zip((tip_height - 20)..=tip_height);

// Insert the new checkpoints
for (hash, height) in reorged_headers {
if sparse_chain
.insert_checkpoint(BlockId { height, hash })
.is_err()
{
return Err(ElectrumError::Reorg(height));
};
// Check for Reorg during the above sync process
let tip_blockid = {
let (height, hash) = self.get_tip()?;
BlockId { height, hash }
};

if sparse_chain.latest_checkpoint().expect("must exist") != tip_blockid {
return Err(ElectrumError::Reorg);
}

Ok((sparse_chain, keychain_index_update))
Expand Down
145 changes: 61 additions & 84 deletions bdk_electrum_example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
mod electrum;

use std::fmt::Debug;
use std::{collections::BTreeMap, fmt::Debug};

use bdk_core::{
bitcoin::{Network, Transaction},
sparse_chain::SparseChain,
sparse_chain::{ChainIndex, SparseChain},
BlockId, TxHeight,
};
use bdk_keychain::{KeychainScan, KeychainTracker};
use bdk_keychain::{KeychainChangeSet, KeychainScan, KeychainTracker};
use electrum::ElectrumClient;

use bdk_cli::{
anyhow::{self, Context, Result},
anyhow::{self, anyhow, Context, Result},
clap::{self, Subcommand},
};
use log::debug;
Expand Down Expand Up @@ -40,46 +40,47 @@ enum ElectrumCommands {
},
}

fn fetch_transactions<K: Debug + Ord + Clone>(
new_sparsechain: &SparseChain<TxHeight>,
fn complete_update<K: Debug + Clone + Ord, I: ChainIndex>(
new_sparsechain: &SparseChain<I>,
tracker: &KeychainTracker<K, I>,
client: &ElectrumClient,
tracker: &KeychainTracker<K, TxHeight>,
) -> Result<Vec<(Transaction, Option<TxHeight>)>> {
// Changeset of txids, both new and old.
let txid_changeset = tracker.chain().determine_changeset(new_sparsechain)?.txids;

// Only filter for txids that are new to us.
let new_txids = txid_changeset
.iter()
.filter_map(|(txid, index)| {
if !tracker.graph().contains_txid(*txid) {
Some((txid, index))
} else {
None
}
})
.collect::<Vec<_>>();
) -> Result<KeychainChangeSet<K, I>> {
let sparsechain_changeset = match tracker.chain().determine_changeset(new_sparsechain) {
Ok(cs) => cs,
Err(update_failure) => {
return Err(anyhow!(
"Changeset determination failed : {:?}",
update_failure
))
}
};

// Remaining of the transactions that only changed in Index
let existing_txs = txid_changeset
.iter()
.filter_map(|(txid, index)| match tracker.graph().tx(*txid) {
Some(tx) => Some((tx.clone(), *index)),
// We don't keep the index for `TxNode::Partial`s
_ => None,
})
let new_txids = tracker
.chain()
.changeset_additions(&sparsechain_changeset)
.collect::<Vec<_>>();

let new_transactions = client.batch_transaction_get(new_txids.iter().map(|(txid, _)| *txid))?;
let new_txs = client.batch_transaction_get(new_txids.iter())?;

let chaingraph_changeset = match tracker
.chain_graph()
.inflate_changeset(sparsechain_changeset, new_txs)
{
Ok(cs) => cs,
Err(cs_error) => {
return Err(anyhow!(
"Chaingraph changeset creation failed : {:?}",
cs_error
))
}
};

// Add all the transaction, new and existing into scan_update
let transaction_update = new_transactions
.into_iter()
.zip(new_txids.into_iter().map(|(_, index)| *index))
.chain(existing_txs)
.collect::<Vec<_>>();
let keychain_changeset = KeychainChangeSet {
derivation_indices: BTreeMap::new(),
chain_graph: chaingraph_changeset,
};

Ok(transaction_update)
Ok(keychain_changeset)
}

fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -113,33 +114,20 @@ fn main() -> anyhow::Result<()> {
ElectrumCommands::Scan { stop_gap } => {
let scripts = tracker.txout_index.iter_all_script_pubkeys_by_keychain();

let mut keychain_scan = KeychainScan::default();

// Wallet scan returns a sparse chain that contains new All the BlockIds and Txids
// relevant to the wallet, along with keychain index update if required.
let (new_sparsechain, keychain_index_update) =
client.wallet_txid_scan(scripts, Some(stop_gap), tracker.chain().checkpoints())?;

keychain_scan.last_active_indexes = keychain_index_update;
let mut keychain_changeset = complete_update(&new_sparsechain, &tracker, &client)?;

// Inserting everything from the new_sparsechain should be okay as duplicate
// data would be rejected at the time of update application.
for (height, hash) in new_sparsechain.checkpoints() {
let _ = keychain_scan.update.insert_checkpoint(BlockId {
height: *height,
hash: *hash,
})?;
}
keychain_changeset.derivation_indices = keychain_index_update;

// Fetch the new and old transactions to be added in update structure
for (tx, index) in fetch_transactions(&new_sparsechain, &client, &tracker)? {
keychain_scan.update.insert_tx(tx, index)?;
db.append_changeset(&keychain_changeset)?;
if let Err(err) = tracker.apply_changeset(keychain_changeset) {
return Err(anyhow!(
"Changeset application failed in tracker : {:?}",
err
));
}

// Apply the full scan update
let changeset = tracker.determine_changeset(&keychain_scan)?;
db.append_changeset(&changeset)?;
tracker.apply_changeset(changeset);
debug!("sync completed!!")
}
ElectrumCommands::Sync {
Expand All @@ -158,10 +146,12 @@ fn main() -> anyhow::Result<()> {
let mut spks: Box<dyn Iterator<Item = bdk_core::bitcoin::Script>> =
Box::new(core::iter::empty());
if unused {
spks = Box::new(spks.chain(txout_index.iter_unused().map(|(index, script)| {
eprintln!("Checking if address at {:?} has been used", index);
script.clone()
})));
spks = Box::new(spks.chain(txout_index.inner().unused(..).map(
|(index, script)| {
eprintln!("Checking if address at {:?} has been used", index);
script.clone()
},
)));
}

if all {
Expand All @@ -174,38 +164,25 @@ fn main() -> anyhow::Result<()> {
}

if unspent {
spks = Box::new(spks.chain(tracker.utxos().map(|(_index, ftxout)| {
spks = Box::new(spks.chain(tracker.full_utxos().map(|(_index, ftxout)| {
eprintln!("checking if {} has been spent", ftxout.outpoint);
ftxout.txout.script_pubkey
})));
}

let mut scan_update = KeychainScan::default();

// Wallet scan returns a sparse chain that contains new All the BlockIds and Txids
// relevant to the wallet, along with keychain index update if required.
let new_sparsechain = client
.spk_txid_scan(spks, tracker.chain().checkpoints())
.context("scanning the blockchain")?;

// Inserting everything from the new_sparsechain should be okay as duplicate
// data would be rejected at the time of update application.
for (height, hash) in new_sparsechain.checkpoints() {
let _ = scan_update.update.insert_checkpoint(BlockId {
height: *height,
hash: *hash,
})?;
}
let keychain_changeset = complete_update(&new_sparsechain, &tracker, &client)?;

// Fetch the new and old transactions to be added in update structure
for (tx, index) in fetch_transactions(&new_sparsechain, &client, &tracker)? {
scan_update.update.insert_tx(tx, index)?;
db.append_changeset(&keychain_changeset)?;
if let Err(err) = tracker.apply_changeset(keychain_changeset) {
return Err(anyhow!(
"Changeset application failed in tracker : {:?}",
err
));
}

// Apply the full scan update
let changeset = tracker.determine_changeset(&scan_update)?;
db.append_changeset(&changeset)?;
tracker.apply_changeset(changeset);
debug!("sync completed!!")
}
}
Expand Down

0 comments on commit 837b0e7

Please sign in to comment.