Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make oura v1 compatible with pallas 0.29.0 for Convway Era #805

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 390 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega <[email protected]>"]


[dependencies]
pallas = "0.18.2"
pallas = "0.29.0"
# pallas = { git = "https://github.com/txpipe/pallas" }
# pallas = { path = "../pallas/pallas" }
hex = "0.4.3"
Expand All @@ -30,6 +30,7 @@ strum = "0.24"
strum_macros = "0.24"
prometheus_exporter = { version = "0.8.5", default-features = false }
unicode-truncate = "0.2.0"
tokio-retry = "0.3"

# feature logs
file-rotate = { version = "0.7.1", optional = true }
Expand Down
8 changes: 7 additions & 1 deletion src/mapper/babbage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::Deref;

use pallas::codec::utils::KeepRaw;

use pallas::ledger::primitives::babbage::{
Expand Down Expand Up @@ -101,7 +103,11 @@ impl EventWriter {
.into();

record.native_witnesses = self
.collect_native_witness_records(&witnesses.native_script)?
.collect_native_witness_records(&witnesses.native_script.as_ref().map(|x| {
x.iter()
.map(|ws| ws.deref().clone())
.collect::<Vec<pallas::ledger::primitives::alonzo::NativeScript>>()
}))?
.into();

record.plutus_witnesses = self
Expand Down
2 changes: 1 addition & 1 deletion src/mapper/byron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
hash: &Hash<32>,
cbor: &[u8],
) -> Result<BlockRecord, Error> {
let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(
let abs_slot = pallas::ledger::traverse::time::relative_slot_to_absolute(

Check failure on line 171 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

cannot find function `relative_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 171 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

cannot find function `relative_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 171 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

cannot find function `relative_slot_to_absolute` in module `pallas::ledger::traverse::time`
source.header.consensus_data.0.epoch,
source.header.consensus_data.0.slot,
);
Expand Down Expand Up @@ -234,7 +234,7 @@
hash: &Hash<32>,
cbor: &[u8],
) -> Result<BlockRecord, Error> {
let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(

Check failure on line 237 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 237 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 237 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`
source.header.consensus_data.epoch_id,
0,
);
Expand Down Expand Up @@ -288,7 +288,7 @@
) -> Result<(), Error> {
let hash = block.header.original_hash();

let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(

Check failure on line 291 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 291 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 291 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`
block.header.consensus_data.0.epoch,
block.header.consensus_data.0.slot,
);
Expand Down Expand Up @@ -328,7 +328,7 @@
if self.config.include_byron_ebb {
let hash = block.header.original_hash();

let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(

Check failure on line 331 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 331 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`

Check failure on line 331 in src/mapper/byron.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

cannot find function `byron_epoch_slot_to_absolute` in module `pallas::ledger::traverse::time`
block.header.consensus_data.epoch_id,
0,
);
Expand Down
34 changes: 24 additions & 10 deletions src/mapper/map.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::ops::Deref;

use pallas::codec::utils::Nullable;
use pallas::ledger::primitives::alonzo::MintedWitnessSet;
use pallas::ledger::primitives::babbage::MintedDatumOption;
use pallas::ledger::traverse::{ComputeHash, OriginalHash};
Expand Down Expand Up @@ -68,19 +70,24 @@ fn relay_to_string(relay: &Relay) -> String {
match relay {
Relay::SingleHostAddr(port, ipv4, ipv6) => {
let ip = match (ipv6, ipv4) {
(None, None) => "".to_string(),
(_, Some(x)) => ip_string_from_bytes(x.as_ref()),
(Some(x), _) => ip_string_from_bytes(x.as_ref()),
(Nullable::Null, Nullable::Null) => "".to_string(),
(_, Nullable::Some(x)) => ip_string_from_bytes(x.as_ref()),
(Nullable::Some(x), _) => ip_string_from_bytes(x.as_ref()),
(Nullable::Null, Nullable::Undefined) => "".to_string(),
(Nullable::Undefined, Nullable::Null) => "".to_string(),
(Nullable::Undefined, Nullable::Undefined) => "".to_string(),
};

match port {
Some(port) => format!("{ip}:{port}"),
None => ip,
Nullable::Some(port) => format!("{ip}:{port}"),
Nullable::Null => ip,
Nullable::Undefined => ip,
}
}
Relay::SingleHostName(port, host) => match port {
Some(port) => format!("{host}:{port}"),
None => host.clone(),
Nullable::Some(port) => format!("{host}:{port}"),
Nullable::Null => host.clone(),
Nullable::Undefined => host.clone(),
},
Relay::MultiHostName(host) => host.clone(),
}
Expand Down Expand Up @@ -342,8 +349,11 @@ impl EventWriter {
reward_account: reward_account.to_hex(),
pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(),
relays: relays.iter().map(relay_to_string).collect(),
pool_metadata: pool_metadata.as_ref().map(|m| m.url.clone()),
pool_metadata_hash: pool_metadata.as_ref().map(|m| m.hash.clone().to_hex()),
pool_metadata: pool_metadata.clone().map(|m| m.url.clone()).into(),
pool_metadata_hash: pool_metadata
.clone()
.map(|m| m.hash.clone().to_hex())
.into(),
},
Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement {
pool: pool.to_hex(),
Expand Down Expand Up @@ -454,7 +464,11 @@ impl EventWriter {
.into();

record.native_witnesses = self
.collect_native_witness_records(&witnesses.native_script)?
.collect_native_witness_records(&witnesses.native_script.as_ref().map(|x| {
x.iter()
.map(|ws| ws.deref().clone())
.collect::<Vec<pallas::ledger::primitives::alonzo::NativeScript>>()
}))?
.into();

record.plutus_witnesses = self
Expand Down
2 changes: 1 addition & 1 deletion src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl From<PlutusWitnessRecord> for EventData {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct PlutusRedeemerRecord {
pub purpose: String,
pub ex_units_mem: u32,
pub ex_units_mem: u64,
pub ex_units_steps: u64,
pub input_idx: u32,
pub plutus_data: JsonValue,
Expand Down
2 changes: 1 addition & 1 deletion src/pipelining.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::thread::JoinHandle;
use tokio::task::JoinHandle;

use crate::{model::Event, Error};

Expand Down
67 changes: 40 additions & 27 deletions src/sources/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@
ledger::traverse::{probe, Era},
network::{
miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC},
multiplexer::{bearers::Bearer, StdChannel, StdPlexer},
multiplexer::{Bearer, Plexer},
},
};

use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use serde::{de::Visitor, Deserializer};
use serde::{Deserialize, Serialize};

use std::boxed::Box;

Check warning on line 18 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

unused import: `std::boxed::Box`

Check warning on line 18 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

unused import: `std::boxed::Box`

Check warning on line 18 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

unused import: `std::boxed::Box`
use std::error::Error as StdError;

Check warning on line 19 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

unused import: `std::error::Error as StdError`

Check warning on line 19 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

unused import: `std::error::Error as StdError`

Check warning on line 19 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

unused import: `std::error::Error as StdError`

use crate::{
mapper::EventWriter,
utils::{retry, SwallowResult, Utils},

Check warning on line 23 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

unused import: `retry`

Check warning on line 23 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

unused import: `retry`

Check warning on line 23 in src/sources/common.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

unused import: `retry`
Error,
};

Expand Down Expand Up @@ -182,36 +188,43 @@
}
}

pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result<StdPlexer, Error> {
pub async fn setup_multiplexer_attempt(
bearer: &BearerKind,
address: &str,
) -> Result<Plexer, Error> {
match bearer {
BearerKind::Tcp => {
let bearer = Bearer::connect_tcp(address)?;
Ok(StdPlexer::new(bearer))
let bearer = Bearer::connect_tcp(address).await?;
Ok(Plexer::new(bearer))
}
#[cfg(target_family = "unix")]
BearerKind::Unix => {
let unix = Bearer::connect_unix(address)?;
Ok(StdPlexer::new(unix))
let unix = Bearer::connect_unix(address).await?;
Ok(Plexer::new(unix))
}
}
}

pub fn setup_multiplexer(
pub async fn setup_multiplexer(
bearer: &BearerKind,
address: &str,
retry: &Option<RetryPolicy>,
) -> Result<StdPlexer, Error> {
) -> Result<Plexer, Error> {
match retry {
Some(policy) => retry::retry_operation(
|| setup_multiplexer_attempt(bearer, address),
&retry::Policy {
max_retries: policy.connection_max_retries,
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: Duration::from_secs(policy.connection_max_backoff as u64),
},
),
None => setup_multiplexer_attempt(bearer, address),
Some(policy) => {
let retry_strategy =
ExponentialBackoff::from_millis(1000) // backoff_unit
.factor(2) // backoff_factor
.max_delay(Duration::from_secs(policy.connection_max_backoff as u64)) // max_backoff
.map(jitter) // add jitter to delays
.take(policy.connection_max_retries.try_into().unwrap()); // max_retries

Retry::spawn(retry_strategy, || async {
setup_multiplexer_attempt(bearer, address).await
})
.await
}
None => setup_multiplexer_attempt(bearer, address).await,
}
}

Expand Down Expand Up @@ -262,8 +275,8 @@
false
}

pub(crate) fn intersect_starting_point<O>(
client: &mut chainsync::Client<StdChannel, O>,
pub(crate) async fn intersect_starting_point<O>(
client: &mut chainsync::Client<O>,
intersect_arg: &Option<IntersectArg>,
since_arg: &Option<PointArg>,
utils: &Utils,
Expand All @@ -277,7 +290,7 @@
Some(cursor) => {
log::info!("found persisted cursor, will use as starting point");
let desired = cursor.try_into()?;
let (point, _) = client.find_intersect(vec![desired])?;
let (point, _) = client.find_intersect(vec![desired]).await?;

Ok(point)
}
Expand All @@ -286,29 +299,29 @@
log::info!("found 'fallbacks' intersect argument, will use as starting point");
let options: Result<Vec<_>, _> = x.iter().map(|x| x.clone().try_into()).collect();

let (point, _) = client.find_intersect(options?)?;
let (point, _) = client.find_intersect(options?).await?;

Ok(point)
}
Some(IntersectArg::Origin) => {
log::info!("found 'origin' intersect argument, will use as starting point");

let point = client.intersect_origin()?;
let point = client.intersect_origin().await?;

Ok(Some(point))
}
Some(IntersectArg::Point(x)) => {
log::info!("found 'point' intersect argument, will use as starting point");
let options = vec![x.clone().try_into()?];

let (point, _) = client.find_intersect(options)?;
let (point, _) = client.find_intersect(options).await?;

Ok(point)
}
Some(IntersectArg::Tip) => {
log::info!("found 'tip' intersect argument, will use as starting point");

let point = client.intersect_tip()?;
let point = client.intersect_tip().await?;

Ok(Some(point))
}
Expand All @@ -318,14 +331,14 @@
log::warn!("`since` value is deprecated, please use `intersect`");
let options = vec![x.clone().try_into()?];

let (point, _) = client.find_intersect(options)?;
let (point, _) = client.find_intersect(options).await?;

Ok(point)
}
None => {
log::info!("no starting point specified, will use tip of chain");

let point = client.intersect_tip()?;
let point = client.intersect_tip().await?;

Ok(Some(point))
}
Expand Down
Loading
Loading