Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce restriction operator for streams #172

Merged
merged 26 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## Version 0.17.0 (2021-11-23)
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved

[PR 172](https://github.com/fluencelabs/aquavm/pull/172):
A new instruction intended to restrict a scope of variables was introduced to AquaVM.

[PR 168](https://github.com/fluencelabs/aquavm/pull/168):
AIR parser and AST was highly refactored to be more suitable to the scalar/stream restriction scheme used in AIR instructions.

[PR 164](https://github.com/fluencelabs/aquavm/pull/164):
SecurityTetraplet was decoupled with marine-rs-sdk to have the only one definition in AquaVM that then exported by marine-rs-sdk.

[PR 162](https://github.com/fluencelabs/aquavm/pull/162):
The scalar scoping scheme was improved in order to support more than two scope levels.

## Version 0.16.0 (2021-10-18)

[PR 154](https://github.com/fluencelabs/aquavm/pull/154)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion air-interpreter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "air-interpreter"
version = "0.16.0"
version = "0.17.0"
description = "Crate-wrapper for air"
authors = ["Fluence Labs"]
edition = "2018"
Expand Down
4 changes: 2 additions & 2 deletions air/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "air"
version = "0.16.0"
version = "0.17.0"
description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network"
authors = ["Fluence Labs"]
edition = "2018"
Expand Down Expand Up @@ -30,6 +30,7 @@ serde = { version = "1.0.118", features = [ "derive", "rc" ] }
serde_json = "1.0.61"

boolinator = "2.4.0"
maplit = "1.0.2"
log = "0.4.11"
thiserror = "1.0.23"
strum = "0.21"
Expand All @@ -46,7 +47,6 @@ criterion = "0.3.3"
csv = "1.1.5"
once_cell = "1.4.1"
env_logger = "0.7.1"
maplit = "1.0.2"
pretty_assertions = "0.6.1"
serde_json = "1.0.61"

Expand Down
6 changes: 4 additions & 2 deletions air/src/execution_step/air/ap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
mod apply_to_arguments;
mod utils;

use super::call::call_result_setter::set_stream_result;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
Expand Down Expand Up @@ -75,7 +74,10 @@ fn save_result<'ctx>(
Scalar(scalar) => exec_ctx.scalars.set_value(scalar.name, result).map(|_| ()),
Stream(stream) => {
let generation = ap_result_to_generation(merger_ap_result);
set_stream_result(result, generation, stream.name.to_string(), exec_ctx).map(|_| ())
exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)
.map(|_| ())
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions air/src/execution_step/air/ap/apply_to_arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn apply_scalar(
) -> ExecutionResult<ValueAggregate> {
// TODO: refactor this code after boxed value
match &scalar.lambda {
Some(lambda) => apply_scalar_wl_impl(scalar.name, lambda, exec_ctx, trace_ctx),
Some(lambda) => apply_scalar_wl_impl(scalar.name, scalar.position, lambda, exec_ctx, trace_ctx),
None => apply_scalar_impl(scalar.name, exec_ctx, trace_ctx, should_touch_trace),
}
}
Expand Down Expand Up @@ -104,11 +104,12 @@ fn apply_scalar_impl(

fn apply_scalar_wl_impl(
scalar_name: &str,
position: usize,
lambda: &LambdaAST<'_>,
exec_ctx: &ExecutionCtx<'_>,
trace_ctx: &TraceHandler,
) -> ExecutionResult<ValueAggregate> {
let variable = Variable::scalar(scalar_name);
let variable = Variable::scalar(scalar_name, position);
let (jvalue, mut tetraplets) = apply_lambda(variable, lambda, exec_ctx)?;

let tetraplet = tetraplets
Expand Down
2 changes: 1 addition & 1 deletion air/src/execution_step/air/ap/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn variable_to_generations(variable: &ast::Variable<'_>, exec_ctx: &ExecutionCtx
Stream(stream) => {
// unwrap here is safe because this function will be called only
// when this stream's been created
let stream = exec_ctx.streams.get(stream.name).unwrap();
let stream = exec_ctx.streams.get(stream.name, stream.position).unwrap();
let generation = match stream.borrow().generations_count() {
0 => 0,
n => n - 1,
Expand Down
45 changes: 15 additions & 30 deletions air/src/execution_step/air/call/call_result_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use super::*;
use crate::execution_step::execution_context::*;
use crate::execution_step::Generation;
use crate::execution_step::Stream;
use crate::execution_step::ValueAggregate;

use air_interpreter_data::CallResult;
Expand All @@ -26,9 +25,6 @@ use air_parser::ast::CallOutputValue;
use air_parser::ast::Variable;
use air_trace_handler::TraceHandler;

use std::cell::RefCell;
use std::collections::hash_map::Entry::{Occupied, Vacant};

/// Writes result of a local `Call` instruction to `ExecutionCtx` at `output`.
/// Returns call result.
pub(crate) fn set_local_result<'i>(
Expand All @@ -44,12 +40,21 @@ pub(crate) fn set_local_result<'i>(
}
CallOutputValue::Variable(Variable::Stream(stream)) => {
// TODO: refactor this generation handling
let generation = match exec_ctx.streams.get(stream.name) {
Some(stream) => Generation::Nth(stream.borrow().generations_count() as u32 - 1),
let generation = match exec_ctx.streams.get(stream.name, stream.position) {
Some(stream) => {
let generation = match stream.borrow().generations_count() {
0 => 0,
n => n - 1,
};
Generation::Nth(generation as u32)
}
None => Generation::Last,
};

let generation = set_stream_result(executed_result, generation, stream.name.to_string(), exec_ctx)?;
let generation =
exec_ctx
.streams
.add_stream_value(executed_result, generation, stream.name, stream.position)?;
Ok(CallResult::executed_stream(result_value, generation))
}
CallOutputValue::None => Ok(CallResult::executed_scalar(result_value)),
Expand All @@ -71,7 +76,9 @@ pub(crate) fn set_result_from_value<'i>(
(CallOutputValue::Variable(Variable::Stream(stream)), Value::Stream { value, generation }) => {
let result = ValueAggregate::new(value, tetraplet, trace_pos);
let generation = Generation::Nth(generation);
let _ = set_stream_result(result, generation, stream.name.to_string(), exec_ctx)?;
let _ = exec_ctx
.streams
.add_stream_value(result, generation, stream.name, stream.position)?;
}
// it isn't needed to check there that output and value matches because
// it's been already checked in trace handler
Expand All @@ -81,28 +88,6 @@ pub(crate) fn set_result_from_value<'i>(
Ok(())
}

// TODO: decouple this function to a separate module
pub(crate) fn set_stream_result(
executed_result: ValueAggregate,
generation: Generation,
stream_name: String,
exec_ctx: &mut ExecutionCtx<'_>,
) -> ExecutionResult<u32> {
let generation = match exec_ctx.streams.entry(stream_name) {
Occupied(mut entry) => {
// if result is an array, insert result to the end of the array
entry.get_mut().borrow_mut().add_value(executed_result, generation)?
}
Vacant(entry) => {
let stream = Stream::from_value(executed_result);
entry.insert(RefCell::new(stream));
0
}
};

Ok(generation)
}

/// Writes an executed state of a particle being sent to remote node.
pub(crate) fn set_remote_call_result<'i>(
peer_pk: String,
Expand Down
11 changes: 6 additions & 5 deletions air/src/execution_step/air/call/resolved_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,21 @@ impl<'i> ResolvedCall<'i> {
fn check_output_name(output: &ast::CallOutputValue<'_>, exec_ctx: &ExecutionCtx<'_>) -> ExecutionResult<()> {
use crate::execution_step::boxed_value::ScalarRef;

let scalar_name = match output {
ast::CallOutputValue::Variable(ast::Variable::Scalar(scalar)) => scalar.name,
let scalar = match output {
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved
ast::CallOutputValue::Variable(ast::Variable::Scalar(scalar)) => scalar,
_ => return Ok(()),
};

match exec_ctx.scalars.get(scalar_name) {
match exec_ctx.scalars.get(scalar.name) {
Ok(ScalarRef::Value(_)) => {
if exec_ctx.scalars.shadowing_allowed() {
Ok(())
} else {
crate::exec_err!(ExecutionError::MultipleVariablesFound(scalar_name.to_string()))
println!("check name error");
mikevoronov marked this conversation as resolved.
Show resolved Hide resolved
crate::exec_err!(ExecutionError::MultipleVariablesFound(scalar.name.to_string()))
}
}
Ok(ScalarRef::IterableValue(_)) => crate::exec_err!(ExecutionError::IterableShadowing(scalar_name.to_string())),
Ok(ScalarRef::IterableValue(_)) => crate::exec_err!(ExecutionError::IterableShadowing(scalar.name.to_string())),
Err(_) => Ok(()),
}
}
4 changes: 2 additions & 2 deletions air/src/execution_step/air/fold/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ pub(crate) fn construct_scalar_iterable_value<'ctx>(

/// Constructs iterable value for given stream iterable.
pub(crate) fn construct_stream_iterable_value<'ctx>(
stream_name: &'ctx str,
stream: &ast::Stream<'_>,
exec_ctx: &ExecutionCtx<'ctx>,
) -> ExecutionResult<FoldIterableStream> {
match exec_ctx.streams.get(stream_name) {
match exec_ctx.streams.get(stream.name, stream.position) {
Some(stream) => {
let stream = stream.borrow();
if stream.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion air/src/execution_step/air/fold_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<'i> ExecutableInstruction<'i> for FoldScalar<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(fold, exec_ctx, trace_ctx);

exec_ctx.scalars.meet_fold_begin();
exec_ctx.scalars.meet_fold_start();

let fold_result = match construct_scalar_iterable_value(&self.iterable, exec_ctx)? {
FoldIterableScalar::Empty => Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions air/src/execution_step/air/fold_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(fold, exec_ctx, trace_ctx);

let iterables = match construct_stream_iterable_value(self.iterable.name, exec_ctx)? {
let iterables = match construct_stream_iterable_value(&self.iterable, exec_ctx)? {
FoldIterableStream::Empty => return Ok(()),
FoldIterableStream::Stream(iterables) => iterables,
};

let fold_id = exec_ctx.tracker.fold.seen_stream_count;
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id))?;
exec_ctx.scalars.meet_fold_begin();
exec_ctx.scalars.meet_fold_start();

for iterable in iterables {
let value = match iterable.peek() {
Expand Down
10 changes: 3 additions & 7 deletions air/src/execution_step/air/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod fold_scalar;
mod fold_stream;
mod match_;
mod mismatch;
mod new;
mod next;
mod null;
mod par;
Expand Down Expand Up @@ -126,6 +127,7 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
Instruction::Ap(ap) => execute!(self, ap, exec_ctx, trace_ctx),
Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx),
Instruction::FoldStream(fold) => execute_fold!(self, fold, exec_ctx, trace_ctx),
Instruction::New(new) => execute!(self, new, exec_ctx, trace_ctx),
Instruction::Next(next) => execute!(self, next, exec_ctx, trace_ctx),
Instruction::Null(null) => execute!(self, null, exec_ctx, trace_ctx),
Instruction::Par(par) => execute!(self, par, exec_ctx, trace_ctx),
Expand All @@ -147,16 +149,10 @@ macro_rules! log_instruction {
log::debug!(target: air_log_targets::INSTRUCTION, "> {}", stringify!($instr_name));

let mut variables = String::from(" scalars:");

variables.push_str(&format!("\n {}", $exec_ctx.scalars));

variables.push_str(" streams:");
if $exec_ctx.streams.is_empty() {
variables.push_str(" empty");
}
for (key, value) in $exec_ctx.streams.iter() {
variables.push_str(&format!("\n {} => {}", key, value.borrow()));
}
variables.push_str(&format!("\n {}", $exec_ctx.streams));

log::trace!(target: air_log_targets::DATA_CACHE, "{}", variables);
log::trace!(
Expand Down
66 changes: 66 additions & 0 deletions air/src/execution_step/air/new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::log_instruction;

use air_parser::ast::New;
use air_parser::ast::Variable;

impl<'i> super::ExecutableInstruction<'i> for New<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(new, exec_ctx, trace_ctx);

prolog(self, exec_ctx);
// it should be a lazy error evaluating after execution of epilog block, since it's
// necessary to return a restricted variable to it's previous state in case of
// any error. It's highly important to distinguish between global and restricted streams
// at the end of execution to make a correct data.
let instruction_result = self.instruction.execute(exec_ctx, trace_ctx);
epilog(self, exec_ctx);

instruction_result
}
}

fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
let position = new.span.left;
match &new.variable {
Variable::Stream(stream) => {
let iteration = exec_ctx.tracker.new_tracker.get_iteration(position);
exec_ctx
.streams
.meet_scope_start(stream.name, new.span.left, new.span.right, iteration);
}
// noop
Variable::Scalar(_) => {}
}

exec_ctx.tracker.meet_new(position);
}

fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
let position = new.span.left;
match &new.variable {
Variable::Stream(stream) => exec_ctx
.streams
.meet_scope_end(stream.name.to_string(), position as u32),
// noop
Variable::Scalar(_) => {}
}
}
Loading