Skip to content

Commit

Permalink
Merge pull request openzfs#503 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster

Merge remote-tracking branch '6.0/stage' into 'master'
  • Loading branch information
Prakash Surya authored Jun 29, 2022
2 parents 531eaf1 + 185313a commit c0d05f4
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 17 deletions.
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod watch_once;
pub mod write_stdout;
mod zcache_devices;
mod zcache_hits;
mod zcache_status;
pub mod zettacache_stats;

pub use alloc::with_alloctag;
Expand Down Expand Up @@ -60,3 +61,6 @@ pub use write_stdout::flush_stdout;
pub use zcache_devices::DeviceEntry;
pub use zcache_devices::DeviceList;
pub use zcache_hits::ReportHitsResponse;
pub use zcache_status::DeviceStatus;
pub use zcache_status::IndexStatus;
pub use zcache_status::ZcacheStatus;
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/util/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub const TYPE_REPORT_HITS: &str = "report hits";
pub const TYPE_LIST_DEVICES: &str = "list devices";
pub const TYPE_ZCACHE_IOSTAT: &str = "zcache iostat";
pub const TYPE_ZCACHE_STATS: &str = "zcache stats";
pub const TYPE_ZCACHE_STATUS: &str = "zcache status";
pub const TYPE_ADD_DISK: &str = "add disk";
pub const TYPE_EXPAND_DISK: &str = "expand disk";
pub const TYPE_SYNC_CHECKPOINT: &str = "sync checkpoint";
Expand Down
29 changes: 29 additions & 0 deletions cmd/zfs_object_agent/util/src/zcache_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//! This module provides common zcache structures that are collected by
//! the **zettacache** runtime and consumed by **zcache** subcommands.
//! These structures on the zettacache side are serialized and then deserialized
//! by the zettacache subcommands.

use std::path::PathBuf;

use serde::Deserialize;
use serde::Serialize;

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct IndexStatus {
pub bytes: u64,
pub entries: u64,
pub pending_changes: u64,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DeviceStatus {
pub path: PathBuf,
pub canonical_path: PathBuf,
pub size: u64,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ZcacheStatus {
pub index: IndexStatus,
pub devices: Vec<DeviceStatus>,
}
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/zcache/src/iostat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ impl IoStatDisplay {
for disk_stat in stat_delta.disk_stats.iter() {
if self.show_time {
if self.interval_is_subsecond {
writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S%.3f UTC"));
writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S%.3f %z"));
} else {
writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S UTC"));
writeln_stdout!("{}", Local::now().format("%Y-%m-%d %H:%M:%S %z"));
}
}
writeln_stdout!();
Expand Down
3 changes: 3 additions & 0 deletions cmd/zfs_object_agent/zcache/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod labelclear;
mod list;
mod remote_channel;
mod stats;
mod status;
mod subcommand;
mod sync;

Expand Down Expand Up @@ -63,6 +64,7 @@ enum Commands {
Expand(expand::Expand),
Sync(sync::Sync),
Labelclear(labelclear::Labelclear),
Status(status::Status),

// clear_hit_data is deprecated/hidden
#[clap(rename_all = "snake_case")]
Expand All @@ -81,6 +83,7 @@ impl Commands {
Commands::Sync(sync) => sync,
Commands::Labelclear(labelclear) => labelclear,
Commands::ClearHitData(clear_hit_data) => clear_hit_data,
Commands::Status(status) => status,
}
}
}
Expand Down
147 changes: 147 additions & 0 deletions cmd/zfs_object_agent/zcache/src/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! `zcache status` subcommand

use std::thread::sleep;
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use chrono::Local;
use clap::Parser;
use util::flush_stdout;
use util::message::TYPE_ZCACHE_STATUS;
use util::nice_p2size;
use util::write_stdout;
use util::writeln_stdout;
use util::DeviceStatus;
use util::ZcacheStatus;

use crate::remote_channel::RemoteChannel;
use crate::subcommand::ZcacheSubCommand;

#[derive(Parser)]
#[clap(about = "Display zettacache status.")]
pub struct Status {
/// Display a timestamp
#[clap(short = 't', long)]
timestamp: bool,

/// Display real paths for devices resolving all symbolic links.
#[clap(short = 'r', long)]
real_paths: bool,

/// Use JSON output format.
#[clap(
short = 'j',
long,
conflicts_with = "timestamp",
conflicts_with = "real-paths"
)]
json: bool,

/// Status is printed every <interval> seconds
#[clap()]
interval: Option<f64>,

/// Stop after <count> status reports have been displayed
#[clap()]
count: Option<u64>,
}

impl Status {
// Maximum width from the set of possible keys ("devices", "index", etc.)
const MAXIMUM_KEY_WIDTH: usize = 7;

/// Derive the device path to display
fn derive_path(&self, device_status: &DeviceStatus) -> String {
let device_path = if self.real_paths {
&device_status.canonical_path
} else {
&device_status.path
};
device_path.to_string_lossy().into()
}

fn max_path_length(&self, devices: &[DeviceStatus]) -> usize {
devices
.iter()
.map(|d| self.derive_path(d).len())
.max()
.unwrap_or_default()
}

fn cli_status(&self, status: &ZcacheStatus) {
if self.timestamp {
writeln_stdout!("{}", Local::now().to_rfc2822());
}

// Display index status
writeln_stdout!(
"{:>3$}: current size {} with {} pending changes",
"index",
nice_p2size(status.index.bytes),
status.index.pending_changes,
Status::MAXIMUM_KEY_WIDTH
);

// Display devices status
let path_width = self.max_path_length(&status.devices);
writeln_stdout!("{:>1$}:", "devices", Status::MAXIMUM_KEY_WIDTH);
for device_status in status.devices.iter() {
// example: " /dev/nvme6n1p2 56.0GB <status>"
write_stdout!(
"{:>3$}{:<4$} {:>6}",
"",
self.derive_path(device_status),
nice_p2size(device_status.size),
Status::MAXIMUM_KEY_WIDTH + 2,
path_width,
);

// ToDo: write optional device status column here

writeln_stdout!();
}
}

fn json_status(&self, status: &ZcacheStatus) {
writeln_stdout!("{}", serde_json::to_string_pretty(&status).unwrap());
}

async fn display_status(&self) -> Result<()> {
let interval = self.interval.map(Duration::from_secs_f64);
let mut iteration = 0;
let mut remote = RemoteChannel::new(false).await?;

loop {
let response = remote.call(TYPE_ZCACHE_STATUS, None).await?;
let status_json = response.lookup_string("status_json")?;
let status: ZcacheStatus = serde_json::from_str(status_json.to_str()?)?;
if self.json {
self.json_status(&status);
} else {
self.cli_status(&status);
flush_stdout()?;
}

let interval: Duration = match interval {
None => return Ok(()),
Some(interval) => interval,
};

iteration += 1;
if let Some(count) = self.count {
if iteration >= count {
return Ok(());
}
}
sleep(interval);
}
}
}

#[async_trait]
impl ZcacheSubCommand for Status {
async fn invoke(&self) -> Result<()> {
self.display_status().await
}
}
17 changes: 16 additions & 1 deletion cmd/zfs_object_agent/zettacache/src/block_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use util::AlignedBytes;
use util::AlignedVec;
use util::DeviceEntry;
use util::DeviceList;
use util::DeviceStatus;
use util::From64;
use uuid::Uuid;

Expand Down Expand Up @@ -662,7 +663,7 @@ impl BlockAccess {
})
}

// Gather a list of devices for zcache list_devices command.
/// Gather a list of devices for `zcache list` command.
pub fn list_devices(&self) -> DeviceList {
let devices = self
.disks
Expand All @@ -677,6 +678,20 @@ impl BlockAccess {
DeviceList { devices }
}

/// Gather a list of devices with status for `zcache status` command.
pub fn list_device_status(&self) -> Vec<DeviceStatus> {
self.disks
.read()
.unwrap()
.iter()
.map(|d| DeviceStatus {
path: d.path.clone(),
canonical_path: d.canonical_path.clone(),
size: *d.size.lock().unwrap(),
})
.collect()
}

pub fn disk_size(&self, disk: DiskId) -> u64 {
*self.disks.read().unwrap()[disk.index()]
.size
Expand Down
24 changes: 24 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::mem;
use std::mem::size_of;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -61,8 +62,10 @@ use util::zettacache_stats::IoStatsRef;
use util::AlignedBytes;
use util::DeviceList;
use util::From64;
use util::IndexStatus;
use util::LockSet;
use util::LockedItem;
use util::ZcacheStatus;
use uuid::Uuid;

use self::merge::MergeMessage;
Expand Down Expand Up @@ -431,6 +434,13 @@ impl ZettaCache {
}
}

pub async fn status(&self) -> ZcacheStatus {
match &*self.inner.load() {
Some(inner) => inner.status().await,
None => Default::default(),
}
}

pub fn io_stats<'a>(&self) -> IoStatsRef<'a> {
match &*self.inner.load() {
Some(inner) => inner.io_stats(),
Expand Down Expand Up @@ -1533,6 +1543,20 @@ impl Inner {
self.block_access.list_devices()
}

async fn status(&self) -> ZcacheStatus {
let indices = self.indices.read().await;
let pending_changes = &self.stats.stats[PendingChanges];

ZcacheStatus {
index: IndexStatus {
bytes: indices.old.num_bytes(),
entries: indices.old.len(),
pending_changes: pending_changes.0.load(Relaxed),
},
devices: self.block_access.list_device_status(),
}
}

fn io_stats<'a>(&self) -> IoStatsRef<'a> {
self.block_access.io_stats(self.cache_runtime_id)
}
Expand Down
24 changes: 10 additions & 14 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,16 +1032,10 @@ impl Pool {
async fn get_recovered_objects(
state: &Arc<PoolState>,
shared_state: &Arc<PoolSharedState>,
final_write: BlockId,
last_replayed_write: BlockId,
) -> BTreeMap<ObjectId, DataObject> {
if shared_state.object_access.supports_list_after() {
let recovered = recover_list(state, shared_state).await;
assert!(recovered
.iter()
.next_back()
.map(|(k, _)| k.as_min_block() <= final_write)
.unwrap_or(true));
return recovered;
return recover_list(state, shared_state).await;
}

let mut recovered = BTreeMap::new();
Expand All @@ -1066,14 +1060,15 @@ impl Pool {
next_id = ObjectId::new(object.header.next_block);
recovered.insert(object.header.object, object);
}
if next_id.as_min_block() >= final_write {
break;
}
if let Some(object) = DataObject::next_uncached(
&shared_state.object_access,
shared_state.guid,
next_id,
ObjectId::new(final_write),
// The kernel knows about all writes after last_replayed_write. Therefore, all
// objects after it must be in the object store. As long as we get
// the first one (which, if it exists, must start at last_replayed_write + 1), they
// will all be found in the next get_uncached loop.
ObjectId::new(last_replayed_write + 1),
)
.await
{
Expand All @@ -1089,7 +1084,7 @@ impl Pool {

pub async fn resume_complete(&self) {
let state = &self.state;
let (txg, final_write) = self.state.with_syncing_state(|syncing_state| {
let (txg, last_replayed_write) = self.state.with_syncing_state(|syncing_state| {
// verify that we're in resuming state
assert!(!syncing_state.pending_object.is_pending());
(
Expand All @@ -1099,7 +1094,8 @@ impl Pool {
});
let shared_state = &state.shared_state;

let recovered_objects = Self::get_recovered_objects(state, shared_state, final_write).await;
let recovered_objects =
Self::get_recovered_objects(state, shared_state, last_replayed_write).await;

self.state.with_syncing_state(|syncing_state| {
let mut recovered_objects_iter = recovered_objects.into_iter().peekable();
Expand Down
Loading

0 comments on commit c0d05f4

Please sign in to comment.