Skip to content

Commit

Permalink
Merge pull request #1 from Emurgo/evgenii/s3_json
Browse files Browse the repository at this point in the history
S3 Json
  • Loading branch information
pedromtcosta committed May 23, 2023
2 parents 7623ac4 + f59c547 commit 8ce03c4
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 99 deletions.
4 changes: 3 additions & 1 deletion book/src/sinks/aws_s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ max_retries = 5
- `bucket`: The name of the bucket to store the blocks.
- `prefix`: A prefix to prepend on each object's key.
- `naming`: One of the available naming conventions (see section below)
- `content`: Either `Cbor` for binary encoding or `CborHex` for plain text hex representation of the CBOR
- `content`: Either `Cbor` for binary encoding or `CborHex` for plain text hex representation of the CBOR. Or `Json` for json string representation.
- `max_retries`: The max number of send retries before exiting the pipeline with an error.

IMPORTANT: For this sink to work correctly, the `include_block_cbor` option should be enabled in the source sink configuration (see [mapper options](../advanced/mapper_options.md)).
Expand All @@ -42,6 +42,7 @@ S3 Buckets allow the user to query by object prefix. It's important to use a nam
- `Hash`: formats the key using `"{hash}"`
- `SlotHash`: formats the key using `"{slot}.{hash}"`
- `BlockHash`: formats the key using `"{block_num}.{hash}"`
- `BlockNumber`: formats the key using `"{block_num}"`
- `EpochHash`: formats the key using `"{epoch}.{hash}"`
- `EpochSlotHash`: formats the key using `"{epoch}.{slot}.{hash}"`
- `EpochBlockHash`: formats the key using `"{epoch}.{block_num}.{hash}"`
Expand All @@ -52,6 +53,7 @@ The sink provides two options for encoding the content of the object:

- `Cbor`: the S3 object will contain the raw, unmodified CBOR value in binary format. The content type of the object in this case will be "application/cbor".
- `CborHex`: the S3 object will contain the CBOR payload of the block encoded as a hex string. The content type of the object in this case will be "text/plain".
- `Json`: the S3 object will contain block encoded as a json string. The content type of the object in this case will be "application/json".


## Metadata
Expand Down
106 changes: 100 additions & 6 deletions src/mapper/babbage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::collections::HashMap;
use pallas::codec::utils::KeepRaw;

use pallas::ledger::primitives::babbage::{
AuxiliaryData, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput,
MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId,
};
use pallas::ledger::primitives::babbage::{AuxiliaryData, CostMdls, Language, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId, ProtocolParamUpdate, Update};

use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::OriginalHash;
use serde_json::json;

use crate::model::{BlockRecord, Era, TransactionRecord};
use crate::model::{BlockRecord, CostModelRecord, CostModelsRecord, Era, LanguageVersionRecord, ProtocolParamUpdateRecord, TransactionRecord, UpdateRecord};
use crate::utils::time::TimeProvider;
use crate::{
model::{EventContext, EventData},
Expand Down Expand Up @@ -65,14 +64,37 @@ impl EventWriter {
}
}

if let Some(certs) = &body.certificates {
let certs = self.collect_certificate_records(certs);
record.certificate_count = certs.len();

if self.config.include_transaction_details {
record.certs = certs.into();
}
}

// Add Collateral Stuff
let collateral_inputs = &body.collateral.as_deref();
record.collateral_input_count = collateral_inputs.iter().count();
record.has_collateral_output = body.collateral_return.is_some();

if let Some(update) = &body.update {
if self.config.include_transaction_details {
record.update = Some(self.to_babbage_update_record(update));
}
}

if let Some(req_signers) = &body.required_signers {
let req_signers = self.collect_required_signers_records(req_signers)?;
record.required_signers_count = req_signers.len();

if self.config.include_transaction_details {
record.required_signers = Some(req_signers);
}
}

// TODO
// TransactionBodyComponent::ScriptDataHash(_)
// TransactionBodyComponent::RequiredSigners(_)
// TransactionBodyComponent::AuxiliaryDataHash(_)

if self.config.include_transaction_details {
Expand Down Expand Up @@ -361,6 +383,78 @@ impl EventWriter {
Ok(())
}

pub fn to_babbage_cost_models_record(&self, cost_models: &Option<CostMdls>) -> Option<CostModelsRecord> {
match cost_models {
Some(cost_models) => {
let mut cost_models_record = HashMap::new();
if let Some(cost_model_v1) = &cost_models.plutus_v1 {
let language_version_record = LanguageVersionRecord::PlutusV1;
let cost_model_record = CostModelRecord(cost_model_v1.clone());
cost_models_record.insert(language_version_record, cost_model_record);
}
if let Some(cost_model_v2) = &cost_models.plutus_v2 {
let language_version_record = LanguageVersionRecord::PlutusV2;
let cost_model_record = CostModelRecord(cost_model_v2.clone());
cost_models_record.insert(language_version_record, cost_model_record);
}

Some(CostModelsRecord(cost_models_record))
},
None => None,
}
}

pub fn to_babbage_language_version_record(&self, language_version: &Language) -> LanguageVersionRecord {
match language_version {
Language::PlutusV1 => LanguageVersionRecord::PlutusV1,
Language::PlutusV2 => LanguageVersionRecord::PlutusV2,
}
}

pub fn to_babbage_protocol_update_record(&self, update: &ProtocolParamUpdate) -> ProtocolParamUpdateRecord {
ProtocolParamUpdateRecord {
minfee_a: update.minfee_a,
minfee_b: update.minfee_b,
max_block_body_size: update.max_block_body_size,
max_transaction_size: update.max_transaction_size,
max_block_header_size: update.max_block_header_size,
key_deposit: update.key_deposit,
pool_deposit: update.pool_deposit,
maximum_epoch: update.maximum_epoch,
desired_number_of_stake_pools: update.desired_number_of_stake_pools,
pool_pledge_influence: self.to_rational_number_record_option(&update.pool_pledge_influence),
expansion_rate: self.to_unit_interval_record(&update.expansion_rate),
treasury_growth_rate: self.to_unit_interval_record(&update.treasury_growth_rate),
decentralization_constant: None,
extra_entropy: None,
protocol_version: update.protocol_version,
min_pool_cost: update.min_pool_cost,
ada_per_utxo_byte: update.ada_per_utxo_byte,
cost_models_for_script_languages: self.to_babbage_cost_models_record(&update.cost_models_for_script_languages),
execution_costs: match &update.execution_costs {
Some(execution_costs) => Some(json!(execution_costs)),
None => None
},
max_tx_ex_units: self.to_ex_units_record(&update.max_tx_ex_units),
max_block_ex_units: self.to_ex_units_record(&update.max_block_ex_units),
max_value_size: update.max_value_size,
collateral_percentage: update.collateral_percentage,
max_collateral_inputs: update.max_collateral_inputs,
}
}

pub fn to_babbage_update_record(&self, update: &Update) -> UpdateRecord {
let mut updates = HashMap::new();
for update in update.proposed_protocol_parameter_updates.clone().to_vec() {
updates.insert(update.0.to_hex(), self.to_babbage_protocol_update_record(&update.1));
}

UpdateRecord{
proposed_protocol_parameter_updates: updates,
epoch: update.epoch,
}
}

/// Mapper entry-point for decoded Babbage blocks
///
/// Entry-point to start crawling a blocks for events. Meant to be used when
Expand Down
1 change: 1 addition & 0 deletions src/mapper/byron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl EventWriter {
assets: None,
datum_hash: None,
inline_datum: None,
inlined_script: None,
})
}

Expand Down
22 changes: 22 additions & 0 deletions src/mapper/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use pallas::{
traverse::OriginalHash,
},
};
use pallas::ledger::primitives::alonzo::{Certificate, RequiredSigners};

use crate::{
model::{
Expand All @@ -23,6 +24,7 @@ use crate::{
},
Error,
};
use crate::model::{CertificateRecord, RequiredSignerRecord};

use super::{map::ToHex, EventWriter};

Expand Down Expand Up @@ -91,6 +93,16 @@ impl EventWriter {
.collect()
}

pub fn collect_certificate_records(
&self,
certificates: &Vec<Certificate>,
) -> Vec<CertificateRecord> {
certificates
.iter()
.map(|cert| self.to_certificate_record(cert))
.collect()
}

pub fn collect_withdrawal_records(
&self,
withdrawls: &KeyValuePairs<RewardAccount, Coin>,
Expand Down Expand Up @@ -221,4 +233,14 @@ impl EventWriter {
})
.collect()
}

pub fn collect_required_signers_records(&self, req_signers: &RequiredSigners) -> Result<Vec<RequiredSignerRecord>, Error> {
let mut signers = vec![];
for req_sign in req_signers {
let hex = req_sign.to_hex();
signers.push(RequiredSignerRecord(hex));
}

Ok(signers)
}
}
Loading

0 comments on commit 8ce03c4

Please sign in to comment.