Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory efficiency of EntityCache #3594

Merged
merged 8 commits into from
Aug 23, 2022
Merged
16 changes: 4 additions & 12 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use anyhow::Error;
use graph::{
blockchain::{self, block_stream::BlockWithTriggers, BlockPtr},
components::{
store::{DeploymentLocator, EntityType, SubgraphFork},
store::{DeploymentLocator, EntityKey, SubgraphFork},
subgraph::{MappingError, ProofOfIndexingEvent, SharedProofOfIndexing},
},
data::store::scalar::Bytes,
prelude::{
anyhow, async_trait, BigDecimal, BigInt, BlockHash, BlockNumber, BlockState, Entity,
EntityKey, RuntimeHostBuilder, Value,
RuntimeHostBuilder, Value,
},
slog::Logger,
substreams::Modules,
Expand Down Expand Up @@ -188,11 +188,7 @@ where
};
let entity_id: String = String::from_utf8(entity_change.id.clone())
.map_err(|e| MappingError::Unknown(anyhow::Error::from(e)))?;
let key = EntityKey {
subgraph_id: self.locator.hash.clone(),
entity_type: EntityType::new(entity_type.into()),
entity_id: entity_id.clone(),
};
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());

let mut data: HashMap<String, Value> = HashMap::from_iter(vec![]);
for field in entity_change.fields.iter() {
Expand Down Expand Up @@ -253,11 +249,7 @@ where
let entity_type: &str = &entity_change.entity;
let entity_id: String = String::from_utf8(entity_change.id.clone())
.map_err(|e| MappingError::Unknown(anyhow::Error::from(e)))?;
let key = EntityKey {
subgraph_id: self.locator.hash.clone(),
entity_type: EntityType::new(entity_type.into()),
entity_id: entity_id.clone(),
};
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());

state.entity_cache.remove(key);

Expand Down
10 changes: 4 additions & 6 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::subgraph::stream::new_block_stream;
use atomic_refcell::AtomicRefCell;
use graph::blockchain::block_stream::{BlockStreamEvent, BlockWithTriggers, FirehoseCursor};
use graph::blockchain::{Block, Blockchain, DataSource, TriggerFilter as _};
use graph::components::store::EntityKey;
use graph::components::{
store::ModificationsAndCache,
subgraph::{CausalityRegion, MappingError, ProofOfIndexing, SharedProofOfIndexing},
Expand Down Expand Up @@ -298,7 +299,6 @@ where
update_proof_of_indexing(
proof_of_indexing,
&self.metrics.host.stopwatch,
&self.inputs.deployment.hash,
&mut block_state.entity_cache,
)
.await?;
Expand Down Expand Up @@ -346,7 +346,7 @@ where
// If a deterministic error has happened, make the PoI to be the only entity that'll be stored.
if has_errors && !is_non_fatal_errors_active {
let is_poi_entity =
|entity_mod: &EntityModification| entity_mod.entity_key().entity_type.is_poi();
|entity_mod: &EntityModification| entity_mod.entity_ref().entity_type.is_poi();
mods.retain(is_poi_entity);
// Confidence check
assert!(
Expand Down Expand Up @@ -860,7 +860,6 @@ where
async fn update_proof_of_indexing(
proof_of_indexing: ProofOfIndexing,
stopwatch: &StopwatchMetrics,
deployment_id: &DeploymentHash,
entity_cache: &mut EntityCache,
) -> Result<(), Error> {
let _section_guard = stopwatch.start_section("update_proof_of_indexing");
Expand All @@ -870,9 +869,8 @@ async fn update_proof_of_indexing(
for (causality_region, stream) in proof_of_indexing.drain() {
// Create the special POI entity key specific to this causality_region
let entity_key = EntityKey {
subgraph_id: deployment_id.clone(),
entity_type: POI_OBJECT.to_owned(),
entity_id: causality_region,
entity_id: causality_region.into(),
};

// Grab the current digest attribute on this entity
Expand All @@ -892,7 +890,7 @@ async fn update_proof_of_indexing(
// Put this onto an entity with the same digest attribute
// that was expected before when reading.
let new_poi_entity = entity! {
id: entity_key.entity_id.clone(),
id: entity_key.entity_id.to_string(),
digest: updated_proof_of_indexing,
};

Expand Down
3 changes: 2 additions & 1 deletion core/src/subgraph/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use graph::{
prelude::{Entity, EntityKey},
components::store::EntityKey,
prelude::Entity,
util::{backoff::ExponentialBackoff, lfu_cache::LfuCache},
};
use std::time::Instant;
Expand Down
11 changes: 1 addition & 10 deletions graph/src/components/ethereum/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use web3::types::{
U64,
};

use crate::{
blockchain::BlockPtr,
prelude::{BlockNumber, DeploymentHash, EntityKey, ToEntityKey},
};
use crate::{blockchain::BlockPtr, prelude::BlockNumber};

pub type LightEthereumBlock = Block<Transaction>;

Expand Down Expand Up @@ -173,9 +170,3 @@ impl<'a> From<&'a EthereumCall> for BlockPtr {
BlockPtr::from((call.block_hash, call.block_number))
}
}

impl ToEntityKey for BlockPtr {
fn to_entity_key(&self, subgraph: DeploymentHash) -> EntityKey {
EntityKey::data(subgraph, "Block".into(), self.hash_hex())
}
}
34 changes: 14 additions & 20 deletions graph/src/components/store/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ impl EntityCache {
self.handler_updates.clear();
}

pub fn get(&mut self, key: &EntityKey) -> Result<Option<Entity>, s::QueryExecutionError> {
pub fn get(&mut self, eref: &EntityKey) -> Result<Option<Entity>, s::QueryExecutionError> {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
let mut entity = self.current.get_entity(&*self.store, key)?;
if let Some(op) = self.updates.get(key).cloned() {
let mut entity = self.current.get_entity(&*self.store, eref)?;
if let Some(op) = self.updates.get(eref).cloned() {
entity = op.apply_to(entity)
}
if let Some(op) = self.handler_updates.get(key).cloned() {
if let Some(op) = self.handler_updates.get(eref).cloned() {
entity = op.apply_to(entity)
}
Ok(entity)
Expand All @@ -125,7 +125,7 @@ impl EntityCache {
/// returned.
pub fn set(&mut self, key: EntityKey, mut entity: Entity) -> Result<(), anyhow::Error> {
fn check_id(key: &EntityKey, prev_id: &str) -> Result<(), anyhow::Error> {
if prev_id != key.entity_id {
if prev_id != key.entity_id.as_str() {
return Err(anyhow!(
"Value of {} attribute 'id' conflicts with ID passed to `store.set()`: \
{} != {}",
Expand Down Expand Up @@ -243,27 +243,21 @@ impl EntityCache {
let missing =
missing.filter(|key| !self.store.input_schema().is_immutable(&key.entity_type));

let mut missing_by_subgraph: BTreeMap<_, BTreeMap<&EntityType, Vec<&str>>> =
BTreeMap::new();
let mut missing_by_type: BTreeMap<&EntityType, Vec<&str>> = BTreeMap::new();
for key in missing {
missing_by_subgraph
.entry(&key.subgraph_id)
.or_default()
missing_by_type
.entry(&key.entity_type)
.or_default()
.push(&key.entity_id);
}

for (subgraph_id, keys) in missing_by_subgraph {
for (entity_type, entities) in self.store.get_many(keys)? {
for entity in entities {
let key = EntityKey {
subgraph_id: subgraph_id.clone(),
entity_type: entity_type.clone(),
entity_id: entity.id().unwrap(),
};
self.current.insert(key, Some(entity));
}
for (entity_type, entities) in self.store.get_many(missing_by_type)? {
for entity in entities {
let key = EntityKey {
entity_type: entity_type.clone(),
entity_id: entity.id().unwrap().into(),
};
self.current.insert(key, Some(entity));
}
}

Expand Down
84 changes: 28 additions & 56 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,32 @@ use std::time::Duration;
use crate::blockchain::{Block, Blockchain};
use crate::data::store::scalar::Bytes;
use crate::data::store::*;
use crate::data::value::Word;
use crate::prelude::*;
use crate::util::stable_hash_glue::impl_stable_hash;

/// The type name of an entity. This is the string that is used in the
/// subgraph's GraphQL schema as `type NAME @entity { .. }`
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityType(String);
pub struct EntityType(Word);

impl EntityType {
/// Construct a new entity type. Ideally, this is only called when
/// `entity_type` either comes from the GraphQL schema, or from
/// the database from fields that are known to contain a valid entity type
pub fn new(entity_type: String) -> Self {
Self(entity_type)
Self(entity_type.into())
}

pub fn as_str(&self) -> &str {
&self.0
self.0.as_str()
}

pub fn into_string(self) -> String {
self.0
self.0.to_string()
}

pub fn is_poi(&self) -> bool {
&self.0 == "Poi$"
self.0.as_str() == "Poi$"
}
}

Expand Down Expand Up @@ -93,55 +93,26 @@ impl EntityFilterDerivative {
}
}

// Note: Do not modify fields without making a backward compatible change to
// the StableHash impl (below)
/// Key by which an individual entity in the store can be accessed.
/// Key by which an individual entity in the store can be accessed. Stores
/// only the entity type and id. The deployment must be known from context.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityKey {
/// ID of the subgraph.
pub subgraph_id: DeploymentHash,

/// Name of the entity type.
pub entity_type: EntityType,

/// ID of the individual entity.
pub entity_id: String,
pub entity_id: Word,
}

impl_stable_hash!(EntityKey {
subgraph_id,
entity_type: EntityType::as_str,
entity_id
});

impl EntityKey {
pub fn data(subgraph_id: DeploymentHash, entity_type: String, entity_id: String) -> Self {
pub fn data(entity_type: String, entity_id: String) -> Self {
Self {
subgraph_id,
entity_type: EntityType::new(entity_type),
entity_id,
entity_id: entity_id.into(),
}
}
}

#[test]
fn key_stable_hash() {
use stable_hash_legacy::crypto::SetHasher;
use stable_hash_legacy::utils::stable_hash;

#[track_caller]
fn hashes_to(key: &EntityKey, exp: &str) {
let hash = hex::encode(stable_hash::<SetHasher, _>(&key));
assert_eq!(exp, hash.as_str());
}

let id = DeploymentHash::new("QmP9MRvVzwHxr3sGvujihbvJzcTz2LYLMfi5DyihBg6VUd").unwrap();
let key = EntityKey::data(id.clone(), "Account".to_string(), "0xdeadbeef".to_string());
hashes_to(
&key,
"905b57035d6f98cff8281e7b055e10570a2bd31190507341c6716af2d3c1ad98",
);
}
#[derive(Clone, Debug, PartialEq)]
pub struct Child {
pub attr: Attribute,
Expand Down Expand Up @@ -565,9 +536,9 @@ pub enum EntityChange {
}

impl EntityChange {
pub fn for_data(key: EntityKey) -> Self {
pub fn for_data(subgraph_id: DeploymentHash, key: EntityKey) -> Self {
Self::Data {
subgraph_id: key.subgraph_id,
subgraph_id: subgraph_id,
entity_type: key.entity_type,
}
}
Expand Down Expand Up @@ -608,31 +579,32 @@ pub struct StoreEvent {
pub changes: HashSet<EntityChange>,
}

impl<'a> FromIterator<&'a EntityModification> for StoreEvent {
fn from_iter<I: IntoIterator<Item = &'a EntityModification>>(mods: I) -> Self {
impl StoreEvent {
pub fn new(changes: Vec<EntityChange>) -> StoreEvent {
static NEXT_TAG: AtomicUsize = AtomicUsize::new(0);

let tag = NEXT_TAG.fetch_add(1, Ordering::Relaxed);
let changes = changes.into_iter().collect();
StoreEvent { tag, changes }
}

pub fn from_mods<'a, I: IntoIterator<Item = &'a EntityModification>>(
subgraph_id: &DeploymentHash,
mods: I,
) -> Self {
let changes: Vec<_> = mods
.into_iter()
.map(|op| {
use self::EntityModification::*;
match op {
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => {
EntityChange::for_data(key.clone())
EntityChange::for_data(subgraph_id.clone(), key.clone())
}
}
})
.collect();
StoreEvent::new(changes)
}
}

impl StoreEvent {
pub fn new(changes: Vec<EntityChange>) -> StoreEvent {
static NEXT_TAG: AtomicUsize = AtomicUsize::new(0);

let tag = NEXT_TAG.fetch_add(1, Ordering::Relaxed);
let changes = changes.into_iter().collect();
StoreEvent { tag, changes }
}

/// Extend `ev1` with `ev2`. If `ev1` is `None`, just set it to `ev2`
fn accumulate(logger: &Logger, ev1: &mut Option<StoreEvent>, ev2: StoreEvent) {
Expand Down Expand Up @@ -901,7 +873,7 @@ pub enum EntityModification {
}

impl EntityModification {
pub fn entity_key(&self) -> &EntityKey {
pub fn entity_ref(&self) -> &EntityKey {
use EntityModification::*;
match self {
Insert { key, .. } | Overwrite { key, .. } | Remove { key } => key,
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::blockchain::Blockchain;
use crate::components::store::EntityKey;
use crate::prelude::*;
use crate::util::lfu_cache::LfuCache;
use crate::{components::store::WritableStore, data::subgraph::schema::SubgraphError};
Expand Down
2 changes: 1 addition & 1 deletion graph/src/data/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ impl Schema {
.get_base_type();

match base_type {
"ID" | "String" => Ok(store::Value::String(key.entity_id.clone())),
"ID" | "String" => Ok(store::Value::String(key.entity_id.to_string())),
"Bytes" => Ok(store::Value::Bytes(scalar::Bytes::from_str(
&key.entity_id,
)?)),
Expand Down
14 changes: 1 addition & 13 deletions graph/src/data/store/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::scalar;
use crate::prelude::*;
use web3::types::{Address, Bytes, H160, H2048, H256, H64, U128, U256, U64};
use web3::types::{Address, Bytes, H2048, H256, H64, U128, U256, U64};

impl From<U128> for Value {
fn from(n: U128) -> Value {
Expand Down Expand Up @@ -49,15 +49,3 @@ impl From<U256> for Value {
Value::BigInt(BigInt::from_unsigned_u256(&n))
}
}

impl ToEntityId for H160 {
fn to_entity_id(&self) -> String {
format!("{:x}", self)
}
}

impl ToEntityId for H256 {
fn to_entity_id(&self) -> String {
format!("{:x}", self)
}
}
Loading