Skip to content

Commit

Permalink
handle task expiration sweep
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Oct 10, 2023
1 parent 8c9bbcf commit 71b63f9
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 11 deletions.
114 changes: 103 additions & 11 deletions pallets/automation-price/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ pub mod pallet {
BTreeMap<AssetPrice, TaskIdList<T>>,
>;

// SortedTasksByExpiration is our expiration sorted tasks
#[pallet::storage]
#[pallet::getter(fn get_sorted_tasks_by_expiration)]
pub type SortedTasksByExpiration<T> =
StorageValue<_, BTreeMap<u128, TaskIdList<T>>, ValueQuery>;

// All active tasks, but organized by account
// In this storage, we only interested in returning task belong to an account, we also want to
// have fast lookup for task inserted/remove into the storage
Expand Down Expand Up @@ -361,6 +367,8 @@ pub mod pallet {
TaskRemoveFailure,
/// Task Not Found When canceling
TaskDoesNotExist,
/// Error when failed to update task expiration storage
TaskExpiredStorageFailedToUpdate,
/// Insufficient Balance
InsufficientBalance,
/// Restrictions on Liquidity in Account
Expand Down Expand Up @@ -460,6 +468,10 @@ pub mod pallet {
);
Self::trigger_tasks(max_weight)
}

fn on_idle(_: T::BlockNumber, remaining_weight: Weight) -> Weight {
Self::sweep_expired_task(remaining_weight)
}
}

#[pallet::call]
Expand Down Expand Up @@ -774,10 +786,8 @@ pub mod pallet {
let who = ensure_signed(origin)?;

if let Some(task) = Self::get_task(&who, &task_id) {
Tasks::<T>::remove(&who, &task.task_id);
Self::remove_task(&task);
let key = (&task.chain, &task.exchange, &task.asset_pair, &task.trigger_function);
SortedTasksIndex::<T>::remove(&key);

Self::deposit_event(Event::TaskCancelled {
who: task.owner_id.clone(),
task_id: task.task_id.clone(),
Expand Down Expand Up @@ -1135,13 +1145,7 @@ pub mod pallet {
};

Self::remove_task(&task);
Tasks::<T>::remove(task.owner_id.clone(), task_id.clone());
TaskStats::<T>::insert(StatType::TotalTasksOverall, total_task - 1);
AccountStats::<T>::insert(
task.owner_id.clone(),
StatType::TotalTasksPerAccount,
total_task_per_account - 1,
);


if let Some(err) = task_dispatch_error {
Self::deposit_event(Event::<T>::TaskExecutionFailed {
Expand Down Expand Up @@ -1185,8 +1189,92 @@ pub mod pallet {
}
}

// TODO: check atomic
fn remove_task(task: &Task<T>) {
//AccountTasks::<T>::remove(task.owner_id.clone(), task.task_id.clone());
Tasks::<T>::remove(task.owner_id.clone(), task_id.clone());

// TODO: pluck the task from SortedTasksIndex
// let key = (&task.chain, &task.exchange, &task.asset_pair, &task.trigger_function);
// SortedTasksIndex::<T>::remove(&key);

// Update state
let total_task =
Self::get_task_stat(StatType::TotalTasksOverall).map_or(0, |v| v);
let total_task_per_account = Self::get_account_stat(
&task.owner_id,
StatType::TotalTasksPerAccount,
)
.map_or(0, |v| v);

if total_task >= 1 {
TaskStats::<T>::insert(StatType::TotalTasksOverall, total_task - 1);
}

if total_task_per_account >= 1 {
AccountStats::<T>::insert(
task.owner_id.clone(),
StatType::TotalTasksPerAccount,
total_task_per_account - 1,
);
}
}

// Sweep as mucht ask we can and return the remaining weight
pub fn sweep_expired_task(remaining_weight: Weight) -> Weight {
if remaining_weight <= T::DbWeight().get().reads(1u64) {
return remaining_weight;
}

let mut unused_weight = remaining_weight.saturating_sub(T::DbWeight::get().reads(1u64));

let current_block_time = Self::get_current_block_time();
if current_block_time.is_err() {
return (false, consumed_weight)
}

let now = current_block_time.unwrap();

let tasks_by_expiration = Self::get_sorted_tasks_by_expiration();
for (&expired_time, &task_ids) in map.range((Included(&0), Included(&now))) {
for (owner_id, task_id) in task_ids.iter() {
// Now let remove the task from chain storage
let Some(task) = Self::get_task(owner_id, task_id) {
Self::remove_task(&task);
}
}
}


unused_weight
}

// Task is write into a sorted storage, re-present bya BTreeMap so we can expired them
pub fn put_task_to_expired_queue(task: &Task<T>) -> Result<bool, Error<T>> {
if !SortedTasksByExpiration::<T>::exists() {
let mut tasks_by_expiration = BTreeMap::<u128, TaskIdList<T>>::new();
tasks_by_expiration.insert(
task.expired_at.clone(),
vec![(task.owner_id.clone(), task.task_id.clone())],
);
SortedTasksByExpiration::<T>::put(tasks_by_expiration);
return Ok(true)
}

// first we got back the reference to the underlying storage
// perform relevant update to write task to the right shard by expired
// time, then the value is store back to storage
let mut tasks_by_expiration = Self::get_sorted_tasks_by_expiration();
if let Some(task_shard) = tasks_by_expiration.get_mut(&task.expired_at) {
task_shard.push((task.owner_id.clone(), task.task_id.clone()));
} else {
tasks_by_expiration.insert(
task.expired_at.clone(),
vec![(task.owner_id.clone(), task.task_id.clone())],
);
}
SortedTasksByExpiration::<T>::put(tasks_by_expiration);

return Ok(true)
}

/// With transaction will protect against a partial success where N of M execution times might be full,
Expand Down Expand Up @@ -1248,6 +1336,10 @@ pub mod pallet {
SortedTasksIndex::<T>::insert(key, sorted_task_index);
}

if let Err(_) = Self::put_task_to_expired_queue(&task) {
Err(Error::<T>::TaskExpiredStorageFailedToUpdate)?
}

Self::deposit_event(Event::TaskScheduled {
who: task.owner_id.clone(),
task_id: task.task_id,
Expand Down
107 changes: 107 additions & 0 deletions pallets/automation-price/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,113 @@ fn test_schedule_xcmp_task_ok() {
})
}

#[test]
fn test_schedule_put_task_to_expiration_queue() {
new_test_ext(START_BLOCK_TIME).execute_with(|| {
let para_id: u32 = 1000;
let creator = AccountId32::new(ALICE);
let call: Vec<u8> = vec![2, 4, 5];
let destination = MultiLocation::new(1, X1(Parachain(para_id)));

setup_asset(&creator, chain1.to_vec());

assert_ok!(AutomationPrice::schedule_xcmp_task(
RuntimeOrigin::signed(creator.clone()),
chain1.to_vec(),
exchange1.to_vec(),
asset1.to_vec(),
asset2.to_vec(),
1005u128,
"gt".as_bytes().to_vec(),
vec!(100),
Box::new(destination.into()),
Box::new(NATIVE_LOCATION.into()),
Box::new(AssetPayment {
asset_location: MultiLocation::new(0, Here).into(),
amount: 10000000000000
}),
call.clone(),
Weight::from_ref_time(100_000),
Weight::from_ref_time(200_000)
));
let task_ids = get_task_ids_from_events();
let task_id = task_ids.last().expect("task failed to schedule");

let task_expiration_map = AutomationPrice::get_sorted_tasks_by_expiration();
assert_eq!(
task_expiration_map.get(&1005u128).expect("missing task expiration shard"),
&vec![(creator, task_id.clone())]
);
})
}

#[test]
fn test_schedule_put_task_to_expiration_queue_multi() {
new_test_ext(START_BLOCK_TIME).execute_with(|| {
let para_id: u32 = 1000;
let creator1 = AccountId32::new(ALICE);
let creator2 = AccountId32::new(BOB);
let call: Vec<u8> = vec![2, 4, 5];
let destination = MultiLocation::new(1, X1(Parachain(para_id)));

setup_asset(&creator1, chain1.to_vec());

assert_ok!(AutomationPrice::schedule_xcmp_task(
RuntimeOrigin::signed(creator1.clone()),
chain1.to_vec(),
exchange1.to_vec(),
asset1.to_vec(),
asset2.to_vec(),
1234u128,
"gt".as_bytes().to_vec(),
vec!(100),
Box::new(destination.into()),
Box::new(NATIVE_LOCATION.into()),
Box::new(AssetPayment {
asset_location: MultiLocation::new(0, Here).into(),
amount: 10000000000000
}),
call.clone(),
Weight::from_ref_time(100_000),
Weight::from_ref_time(200_000)
));
let task_ids1 = get_task_ids_from_events();
let task_id1 = task_ids1.last().expect("task failed to schedule");

assert_ok!(AutomationPrice::schedule_xcmp_task(
RuntimeOrigin::signed(creator2.clone()),
chain1.to_vec(),
exchange1.to_vec(),
asset1.to_vec(),
asset2.to_vec(),
5678u128,
"lt".as_bytes().to_vec(),
vec!(100),
Box::new(destination.into()),
Box::new(NATIVE_LOCATION.into()),
Box::new(AssetPayment {
asset_location: MultiLocation::new(0, Here).into(),
amount: 10000000000000
}),
call.clone(),
Weight::from_ref_time(100_000),
Weight::from_ref_time(200_000)
));
let task_ids2 = get_task_ids_from_events();
let task_id2 = task_ids2.last().expect("task failed to schedule");

let task_expiration_map = AutomationPrice::get_sorted_tasks_by_expiration();
assert_eq!(
task_expiration_map.get(&1234u128).expect("missing task expiration shard"),
&vec![(creator1, task_id1.clone())]
);
assert_eq!(
task_expiration_map.get(&5678u128).expect("missing task expiration shard"),
&vec![(creator2, task_id2.clone())]
);
})
}

#[test]
fn test_schedule_return_error_when_reaching_max_tasks_overall_limit() {
new_test_ext(START_BLOCK_TIME).execute_with(|| {
Expand Down

0 comments on commit 71b63f9

Please sign in to comment.