Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Nov 22, 2021
1 parent d512c25 commit 8ce5b41
Showing 1 changed file with 55 additions and 17 deletions.
72 changes: 55 additions & 17 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,41 +192,79 @@ pub async fn unencapsulate_from_manifest(
return Err(anyhow!("containers-policy.json specifies a default of `insecureAcceptAnything`; refusing usage"));
}
let options = options.unwrap_or_default();
let remote = match &imgref.sigverify {
SignatureSource::OstreeRemote(remote) => Some(remote.clone()),
SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => None,
};
let commit_layer = manifest
.layers()
.last()
.ok_or_else(|| anyhow!("No layers found"))?;
let component_layers: Vec<_> = manifest
.layers()
.iter()
.filter(|&l| l != commit_layer)
.collect();
let component_layer_size = component_layers.iter().fold(0, |acc, s| acc + s.size());
event!(
Level::DEBUG,
"commit blob digest:{} size: {}",
"commit blob digest:{} size: {} components: {} size: {}",
commit_layer.digest().as_str(),
commit_layer.size()
commit_layer.size(),
component_layers.len(),
component_layer_size,
);
let mut proxy = ImageProxy::new().await?;
let oi = proxy.open_image(&imgref.imgref.to_string()).await?;
let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, commit_layer).await?;
let blob = ProgressReader {
reader: blob,
progress: options.progress,
};
let blob = tokio_util::io::SyncIoBridge::new(blob);
let remote = match &imgref.sigverify {
SignatureSource::OstreeRemote(remote) => Some(remote.clone()),
SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => None,
};
let mut importer = crate::tar::Importer::new(&repo, remote);
let (tx, rx) = tokio::sync::mpsc::channel(1);
let repo = repo.clone();
let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| {
let mut archive = tar::Archive::new(blob);
let mut rx = rx;
let mut importer = crate::tar::Importer::new(&repo, remote);
let txn = repo.auto_transaction(Some(cancellable))?;

// First, import the commit
let commit_blob = rx.blocking_recv().unwrap();
let commit_blob = tokio_util::io::SyncIoBridge::new(commit_blob);
let mut archive = tar::Archive::new(commit_blob);
importer.import_commit(&mut archive, Some(cancellable))?;

// Then, all component/split blobs
while let Some(blob) = rx.blocking_recv() {
let blob = tokio_util::io::SyncIoBridge::new(blob);
let mut archive = tar::Archive::new(blob);
importer.import_objects(&mut archive, Some(cancellable))?;
}

let checksum: String = importer.finish_import();
txn.commit(Some(cancellable))?;
repo.mark_commit_partial(&checksum, false)?;
Ok::<_, anyhow::Error>(checksum)
});
let (import, driver) = tokio::join!(import, driver);
driver?;
let ostree_commit = import.with_context(|| format!("Parsing blob {}", layer.digest()))?;
for &layer in std::iter::once(&commit_layer).chain(component_layers.iter()) {
let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, commit_layer).await?;
let blob = ProgressReader {
reader: blob,
progress: options.progress,
};
if tx.send(blob).await.is_err() {
drop(tx);
return match import.await? {
Ok(_) => {
return Err(anyhow::anyhow!(
"internal error: import worker thread did not set error"
))
}
Err(e) => Err(e),
};
}
driver.await?;
}
drop(tx);

let import = import.await?;
let ostree_commit =
import.with_context(|| format!("Parsing blob {}", commit_layer.digest()))?;
// FIXME write ostree commit after proxy finalization
proxy.finalize().await?;
event!(Level::DEBUG, "created commit {}", ostree_commit);
Expand Down

0 comments on commit 8ce5b41

Please sign in to comment.