Skip to content

Commit

Permalink
Use std::time::Duration in file_info_poller
Browse files Browse the repository at this point in the history
std::Durations were being turned into chrono::Durations only to be
turned right back into std::Durations.

It also doesn't make much sense for the file poller to support negative
time deltas, so we can remove that possibility.
  • Loading branch information
michaeldjeffrey committed May 14, 2024
1 parent 5c7d693 commit 3d416e9
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 16 deletions.
2 changes: 1 addition & 1 deletion boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Server {
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
let store_base_path = path::Path::new(&settings.cache);

let reward_check_interval = chrono::Duration::from_std(settings.reward_check_interval)?;
let reward_check_interval = settings.reward_check_interval;

// setup the received for the rewards manifest files
let file_store = FileStore::from_settings(&settings.verifier).await?;
Expand Down
13 changes: 5 additions & 8 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{file_store, traits::MsgDecode, Error, FileInfo, FileStore, Result};
use aws_sdk_s3::types::ByteStream;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration};
use task_manager::ManagedTask;
use tokio::sync::mpsc::{Receiver, Sender};

Expand Down Expand Up @@ -85,14 +85,14 @@ pub enum LookbackBehavior {
#[derive(Debug, Clone, Builder)]
#[builder(pattern = "owned")]
pub struct FileInfoPollerConfig<T, S, P> {
#[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")]
#[builder(default = "DEFAULT_POLL_DURATION")]
poll_duration: Duration,
state: S,
store: FileStore,
prefix: String,
parser: P,
lookback: LookbackBehavior,
#[builder(default = "Duration::minutes(10)")]
#[builder(default = "Duration::from_secs(10 * 60)")]
offset: Duration,
#[builder(default = "5")]
queue_size: usize,
Expand Down Expand Up @@ -262,10 +262,7 @@ where
}

fn poll_duration(&self) -> std::time::Duration {
self.config
.poll_duration
.to_std()
.unwrap_or(DEFAULT_POLL_DURATION)
self.config.poll_duration
}

async fn is_already_processed(&self, file_info: &FileInfo) -> Result<bool> {
Expand Down
6 changes: 3 additions & 3 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ impl Server {
// *
// setup entropy requirements
// *
let max_lookback_age = chrono::Duration::from_std(settings.loader_window_max_lookback_age)?;
let max_lookback_age = settings.loader_window_max_lookback_age;
let entropy_store = FileStore::from_settings(&settings.entropy).await?;
let entropy_interval = chrono::Duration::from_std(settings.entropy_interval)?;
let entropy_interval = settings.entropy_interval;
let (entropy_loader_receiver, entropy_loader_server) =
file_source::continuous_source::<EntropyReport, _>()
.state(pool.clone())
Expand Down Expand Up @@ -184,7 +184,7 @@ impl Server {
.await?;

let packet_store = FileStore::from_settings(&settings.packet_ingest).await?;
let packet_interval = chrono::Duration::from_std(settings.packet_interval)?;
let packet_interval = settings.packet_interval;
let (pk_loader_receiver, pk_loader_server) =
file_source::continuous_source::<IotValidPacket, _>()
.state(pool.clone())
Expand Down
6 changes: 2 additions & 4 deletions reward_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ impl Server {
telemetry::initialize(&pool).await?;

let file_store = FileStore::from_settings(&settings.verifier).await?;
let interval = chrono::Duration::from_std(settings.interval)?;

let (receiver, server) = file_source::continuous_source::<RewardManifest, _>()
.state(pool.clone())
.store(file_store)
.prefix(FileType::RewardManifest.to_string())
.lookback(LookbackBehavior::StartAfter(settings.start_after))
.poll_duration(interval)
.offset(interval * 2)
.poll_duration(settings.interval)
.offset(settings.interval * 2)
.create()
.await?;
let source_join_handle = server.start(shutdown_listener.clone()).await?;
Expand Down

0 comments on commit 3d416e9

Please sign in to comment.