Skip to content

Commit

Permalink
refactor(cognitarium): isolate insert business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Jun 1, 2023
1 parent a5066ea commit 2bc8be5
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 235 deletions.
95 changes: 5 additions & 90 deletions contracts/okp4-cognitarium/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,103 +43,18 @@ pub fn execute(

pub mod execute {
use super::*;
use crate::error::StoreError;
use crate::msg::DataInput;
use crate::rdf;
use crate::rdf::NSResolveFn;
use crate::state::{namespaces, triples, Namespace, NAMESPACE_KEY_INCREMENT};
use blake3::Hash;
use cosmwasm_std::Storage;
use std::collections::BTreeMap;
use crate::state::TripleStorer;

pub fn insert(deps: DepsMut, graph: DataInput) -> Result<Response, ContractError> {
let mut store = STORE.load(deps.storage)?;

let old_count = store.stat.triples_count;
let mut ns_key_inc = NAMESPACE_KEY_INCREMENT.load(deps.storage)?;
let mut ns_cache: BTreeMap<String, Namespace> = BTreeMap::new();

let mut triple_reader = rdf::read_triples(&graph);

loop {
let next = triple_reader.next(&mut ns_resolver(
deps.storage,
&mut ns_key_inc,
&mut ns_cache,
));

match next {
None => {
break;
}
Some(res) => {
let triple = res.map_err(ContractError::from)?;
store.stat.triples_count += Uint128::one();

if store.stat.triples_count > store.limits.max_triple_count {
Err(ContractError::from(StoreError::MaxTriplesLimitExceeded(
store.limits.max_triple_count,
)))?
}

let object_hash: Hash = triple.object.as_hash();
triples()
.save(
deps.storage,
(
object_hash.as_bytes(),
triple.predicate.clone(),
triple.subject.clone(),
),
&triple,
)
.map_err(ContractError::Std)?;
}
}
}

STORE.save(deps.storage, &store)?;
NAMESPACE_KEY_INCREMENT.save(deps.storage, &ns_key_inc)?;
for entry in ns_cache {
namespaces().save(deps.storage, entry.0, &entry.1)?;
}
let mut reader = rdf::read_triples(&graph);
let mut storer = TripleStorer::new(deps.storage)?;
let count = storer.store_all(&mut reader)?;

Ok(Response::new()
.add_attribute("action", "insert")
.add_attribute("triple_count", store.stat.triples_count - old_count))
}

fn ns_resolver<'a>(
store: &'a dyn Storage,
ns_key_inc: &'a mut u128,
ns_cache: &'a mut BTreeMap<String, Namespace>,
) -> NSResolveFn<'a> {
Box::new(|ns_str| -> Result<u128, StdError> {
match ns_cache.get_mut(ns_str.as_str()) {
Some(namespace) => {
namespace.counter += 1;
Ok(namespace.key)
}
None => {
let mut namespace = match namespaces().load(store, ns_str.clone()) {
Err(StdError::NotFound { .. }) => {
let n = Namespace {
key: *ns_key_inc,
counter: 0u128,
};
*ns_key_inc += 1;
Ok(n)
}
Ok(n) => Ok(n),
Err(e) => Err(e),
}?;

namespace.counter += 1;
ns_cache.insert(ns_str.clone(), namespace.clone());
Ok(namespace.key)
}
}
})
.add_attribute("triple_count", count))
}
}

Expand Down
18 changes: 12 additions & 6 deletions contracts/okp4-cognitarium/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ pub enum ContractError {
Unauthorized,
}

impl From<RdfXmlError> for ContractError {
fn from(value: RdfXmlError) -> Self {
RDFParseError::from(value).into()
}
}

impl From<TurtleError> for ContractError {
fn from(value: TurtleError) -> Self {
RDFParseError::from(value).into()
}
}

#[derive(Error, Debug, PartialEq)]
pub enum StoreError {
#[error("Maximum triples number exceeded: {0}")]
Expand All @@ -44,17 +56,11 @@ pub enum StoreError {

#[derive(Error, Debug, PartialEq)]
pub enum RDFParseError {
#[error("{0}")]
Std(#[from] StdError),

#[error("Error parsing XML RDF: {0}")]
XML(String),

#[error("Error parsing Turtle RDF: {0}")]
Turtle(String),

#[error("Unexpected error parsing RDF: {0}")]
Unexpected(String),
}

impl From<RdfXmlError> for RDFParseError {
Expand Down
154 changes: 15 additions & 139 deletions contracts/okp4-cognitarium/src/rdf.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::error::RDFParseError;
use crate::msg::DataInput;
use crate::state;
use cosmwasm_std::StdError;
use rio_api::model::{Literal, NamedNode, Subject, Term, Triple};
use cosmwasm_std::{StdError, StdResult};
use rio_api::model::Triple;
use rio_api::parser::TriplesParser;
use rio_turtle::{NTriplesParser, TurtleParser};
use rio_xml::RdfXmlParser;
use rio_turtle::{NTriplesParser, TurtleError, TurtleParser};
use rio_xml::{RdfXmlError, RdfXmlParser};
use std::io::{BufRead, BufReader};

pub struct TripleReader<R: BufRead> {
parser: TriplesParserKind<R>,
buffer: Vec<state::Triple>,
}

pub enum TriplesParserKind<R: BufRead> {
Expand All @@ -33,142 +30,25 @@ pub fn read_triples(graph: &DataInput) -> TripleReader<BufReader<&[u8]>> {
})
}

pub type NSResolveFn<'a> = Box<dyn FnMut(String) -> Result<u128, StdError> + 'a>;

impl<R: BufRead> TripleReader<R> {
pub fn new(parser: TriplesParserKind<R>) -> Self {
TripleReader {
parser,
buffer: Vec::new(),
}
}

pub fn next(
&mut self,
ns_resolve_fn: &mut NSResolveFn,
) -> Option<Result<state::Triple, RDFParseError>> {
loop {
if let Some(t) = self.buffer.pop() {
return Some(Ok(t));
}

if let Err(e) = match &mut self.parser {
TriplesParserKind::NTriples(parser) => {
Self::read(parser, &mut self.buffer, ns_resolve_fn)
}
TriplesParserKind::Turtle(parser) => {
Self::read(parser, &mut self.buffer, ns_resolve_fn)
}
TriplesParserKind::RdfXml(parser) => {
Self::read(parser, &mut self.buffer, ns_resolve_fn)
}
}? {
return Some(Err(e));
}
}
TripleReader { parser }
}

fn read<P, E>(
parser: &mut P,
buffer: &mut Vec<state::Triple>,
ns_resolve_fn: &mut NSResolveFn,
) -> Option<Result<(), E>>
pub fn read_all<E, UF>(&mut self, mut use_fn: UF) -> Result<(), E>
where
P: TriplesParser,
E: From<P::Error> + From<RDFParseError>,
UF: FnMut(Triple) -> Result<(), E>,
E: From<TurtleError> + From<RdfXmlError>,
{
if parser.is_end() {
None?
}

if let Err(e) = parser.parse_step(&mut |t| {
buffer.push(Self::triple(&t, ns_resolve_fn)?);
Ok(())
}) {
Some(Err(e))
} else {
Some(Ok(()))
}
}

fn triple(
triple: &Triple,
ns_resolve_fn: &mut NSResolveFn,
) -> Result<state::Triple, RDFParseError> {
Ok(state::Triple {
subject: Self::subject(triple.subject, ns_resolve_fn)?,
predicate: Self::node(triple.predicate, ns_resolve_fn)?,
object: Self::object(triple.object, ns_resolve_fn)?,
})
}

fn subject(
subject: Subject,
ns_resolve_fn: &mut NSResolveFn,
) -> Result<state::Subject, RDFParseError> {
match subject {
Subject::NamedNode(node) => {
Self::node(node, ns_resolve_fn).map(|n| state::Subject::Named(n))
}
Subject::BlankNode(node) => Ok(state::Subject::Blank(node.id.to_string())),
_ => Err(RDFParseError::Unexpected(
"RDF star syntax unsupported".to_string(),
)),
}
}

fn node(
node: NamedNode,
ns_resolve_fn: &mut NSResolveFn,
) -> Result<state::Node, RDFParseError> {
let (ns, v) = explode_iri(node.iri)?;
Ok(state::Node {
namespace: ns_resolve_fn(ns)?,
value: v,
})
}

fn object(
object: Term,
ns_resolve_fn: &mut NSResolveFn,
) -> Result<state::Object, RDFParseError> {
match object {
Term::BlankNode(node) => Ok(state::Object::Blank(node.id.to_string())),
Term::NamedNode(node) => {
Self::node(node, ns_resolve_fn).map(|n| state::Object::Named(n))
}
Term::Literal(literal) => {
Self::literal(literal, ns_resolve_fn).map(|l| state::Object::Literal(l))
}
_ => Err(RDFParseError::Unexpected(
"RDF star syntax unsupported".to_string(),
)),
}
}

fn literal(
literal: Literal,
ns_resolve_fn: &mut NSResolveFn,
) -> Result<state::Literal, RDFParseError> {
match literal {
Literal::Simple { value } => Ok(state::Literal::Simple {
value: value.to_string(),
}),
Literal::LanguageTaggedString { value, language } => Ok(state::Literal::I18NString {
value: value.to_string(),
language: language.to_string(),
}),
Literal::Typed { value, datatype } => {
Self::node(datatype, ns_resolve_fn).map(|node| state::Literal::Typed {
value: value.to_string(),
datatype: node,
})
}
match &mut self.parser {
TriplesParserKind::NTriples(parser) => parser.parse_all(&mut use_fn),
TriplesParserKind::Turtle(parser) => parser.parse_all(&mut use_fn),
TriplesParserKind::RdfXml(parser) => parser.parse_all(&mut use_fn),
}
}
}

pub fn explode_iri(iri: &str) -> Result<(String, String), RDFParseError> {
pub fn explode_iri(iri: &str) -> StdResult<(String, String)> {
let mut marker_index: Option<usize> = None;
for delim in ['#', '/', ':'] {
if let Some(index) = iri.rfind(delim) {
Expand All @@ -183,9 +63,7 @@ pub fn explode_iri(iri: &str) -> Result<(String, String), RDFParseError> {
return Ok((iri[..index + 1].to_string(), iri[index + 1..].to_string()));
}

Err(RDFParseError::Unexpected(
"Couldn't extract IRI namespace".to_string(),
))
Err(StdError::generic_err("Couldn't extract IRI namespace"))
}

#[cfg(test)]
Expand Down Expand Up @@ -227,9 +105,7 @@ mod tests {
);
assert_eq!(
explode_iri("this_doesn't_work"),
Err(RDFParseError::Unexpected(
"Couldn't extract IRI namespace".to_string()
))
Err(StdError::generic_err("Couldn't extract IRI namespace"))
);
}
}
2 changes: 2 additions & 0 deletions contracts/okp4-cognitarium/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod namespaces;
mod serde;
mod store;
mod storer;
mod triples;

pub use namespaces::*;
pub use store::*;
pub use storer::*;
pub use triples::*;
Loading

0 comments on commit 2bc8be5

Please sign in to comment.