Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] reqwest v0.12 async refactor #419

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,598 changes: 677 additions & 921 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@ categories = ["command-line-utilities"]
keywords = ["container", "devops", "docker", "deployment", "systemd"]

[dependencies]
abscissa_core = "0.7"
abscissa_tokio = "0.7.0"
bytes = "1"
clap = "4"
hex = "0.4"
hyper = "1.4"
percent-encoding = "2"
libflate = "2.1"
log = "0.4"
os_pipe = "1.2"
reqwest = "0.9"
reqwest = "0.12"
thiserror = "1"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
subtle-encoding = "0.5"
sha2 = "0.10"
tar = "0.4"
tokio-stream = "0.1"
walkdir = "2"
futures-util = "0.3"
url = "2.5.2"

[package.metadata.rpm.cargo]
buildflags = ["--release"]
Expand All @@ -37,5 +43,3 @@ canister = { path = "/usr/bin/canister" }
[dev-dependencies]
once_cell = "1"

[dependencies.abscissa_core]
version = "0.7"
9 changes: 6 additions & 3 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use abscissa_core::{
config::{self, CfgCell},
trace, Application, FrameworkError, StandardPaths,
};
use abscissa_tokio::TokioComponent;

/// Application state
pub static APPLICATION: AppCell<CanisterApplication> = AppCell::new();
Expand Down Expand Up @@ -47,10 +48,12 @@ impl Application for CanisterApplication {
/// beyond the default ones provided by the framework, this is the place
/// to do so.
fn register_components(&mut self, command: &Self::Cmd) -> Result<(), FrameworkError> {
let components = self.framework_components(command)?;
let mut components = self.framework_components(command)?;

let mut component_registry = self.state.components_mut();
component_registry.register(components)
// Create `TokioComponent` and add it to your app's components here:
components.push(Box::new(TokioComponent::new()?));

self.state.components_mut().register(components)
}

/// Post-configuration lifecycle callback.
Expand Down
57 changes: 45 additions & 12 deletions src/commands/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::application::APPLICATION;
use crate::gcp::{Manifest, Storage, Token};
use crate::prelude::*;
use crate::unpacker::{HexDigest, Unpacker};
use crate::unpacker::HexDigest;
use abscissa_core::{Command, Runnable};
use clap::Parser;
use std::process;

use std::fs;
use std::io;
use std::os::unix;
use std::path::PathBuf;

#[derive(Command, Debug, Default, Parser)]
pub struct DeployCommand {
Expand All @@ -34,10 +36,38 @@
process::exit(1);
});

let (image_id, m) = Manifest::get(&token, project, image, tag, proxy).unwrap_or_else(|e| {
status_err!("Error, unable to fetch manifest: {}", e);
process::exit(1);
let _ = abscissa_tokio::run(&APPLICATION, async {
Self::perform(

Check warning on line 40 in src/commands/deploy.rs

View workflow job for this annotation

GitHub Actions / Check

unused implementer of `Future` that must be used

Check warning on line 40 in src/commands/deploy.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused implementer of `Future` that must be used
project,
bucket,
image,
tag,
object_path,
path,
proxy,
&token,
);
});
}
}

impl DeployCommand {
async fn perform(
project: &String,
bucket: &String,
image: &String,
tag: &String,
object_path: &String,
path: &PathBuf,
proxy: Option<&str>,
token: &Token,
) {
let (image_id, m) = Manifest::get(&token, project, image, tag, proxy)
.await
.unwrap_or_else(|e| {
status_err!("Error, unable to fetch manifest: {}", e);
process::exit(1);
});
debug!("{}", image_id);
let layers_len = m.layers.len();
debug!("{:?}", layers_len);
Expand All @@ -50,21 +80,24 @@
debug!("{:?}", &layer_digest);

let object = format!("{}/sha256:{}", object_path, layer_digest.as_str());
let response = Storage::get(&token, bucket, &object, proxy).unwrap_or_else(|e| {
status_err!("Error, unable to download object from bucket: {}", e);
process::exit(1);
});
let mut unpacker = Unpacker::new(response, config.path.join(image_id.to_string()));
let response = Storage::get(&token, bucket, &object, proxy)
.await
.unwrap_or_else(|e| {
status_err!("Error, unable to download object from bucket: {}", e);
process::exit(1);
});
debug!("response: {:?}", response);
/* let mut unpacker = Unpacker::new(response, config.path.join(image_id.to_string()));
unpacker.unpack().unwrap_or_else(|e| {
status_err!("Error, unable to unpack archive: {}", e);
process::exit(1);
});
let digest = unpacker.hex_digest();
let digest = unpacker.hex_digest();*/
debug!("digest: ");
status_ok!("Downloaded", "{} object from {}", object, bucket);
debug!("hasher result: {}", digest.as_str());
// debug!("hasher result: {}", digest.as_str());
debug!("layer digest: {}", layer_digest.as_str());
assert_eq!(digest, layer_digest);
// assert_eq!(digest, layer_digest);
let full_path = path.join(image_id.to_string());
let full_tag = path.join("current");
if let Err(e) = unix::fs::symlink(&full_path, &full_tag) {
Expand Down
16 changes: 8 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ impl From<io::Error> for Error {
}
}

impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
ErrorKind::ReqwestError.context(err).into()
}
}

impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
ErrorKind::ParseError.context(err).into()
Expand All @@ -84,8 +78,14 @@ impl From<FromUtf8Error> for Error {
}
}

impl From<reqwest::UrlError> for Error {
fn from(err: reqwest::UrlError) -> Self {
impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Self {
Error(ErrorKind::ParseError.context(err).into())
}
}

impl From<url::ParseError> for Error {
fn from(err: url::ParseError) -> Self {
Error(ErrorKind::ParseError.context(err).into())
}
}
6 changes: 3 additions & 3 deletions src/gcp/gcr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl fmt::Display for ImageId {
}

impl Manifest {
pub fn get(
pub async fn get(
token: &oauth::Token,
project: &str,
image: &str,
Expand All @@ -62,7 +62,7 @@ impl Manifest {

let url = format!("https://gcr.io/v2/{}/{}/manifests/{}", project, image, tag);

let mut response = client.get(url.as_str()).send()?;
let response = client.get(url.as_str()).send().await?;

let docker_digest_header = response
.headers()
Expand All @@ -80,7 +80,7 @@ impl Manifest {
debug!("{:?}", docker_digest);
debug!("response = {:?}", response);

let body = response.text()?;
let body = response.text().await?;
debug!("body = {:?}", body);
let image_id = ImageId(hex::encode(Sha256::digest(body.as_bytes())));
assert_eq!(image_id.0, *docker_digest);
Expand Down
46 changes: 7 additions & 39 deletions src/gcp/storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::oauth::{self, AuthHeader};
use crate::error::Error;
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
use reqwest::header::{HeaderValue, CONTENT_TYPE};
use reqwest::Url;
use std::fs::File;

pub struct Storage {
pub bucket: String,
Expand All @@ -12,7 +10,7 @@ pub struct Storage {

impl Storage {
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
pub fn get(
pub async fn get(
token: &oauth::Token,
bucket: &str,
object: &str,
Expand All @@ -32,15 +30,15 @@ impl Storage {
.build(),
None => reqwest::Client::builder().default_headers(headers).build(),
}?;
let response = storage_client.get(url.as_str()).send()?;
let response = storage_client.get(url.as_str()).send().await?;
if !response.status().is_success() {
panic!("{}", response.status())
}
Ok(response)
}

// https://cloud.google.com/storage/docs/json_api/v1/objects/list
pub fn list(
pub async fn list(
token: &oauth::Token,
bucket: &str,
proxy: Option<&str>,
Expand All @@ -55,43 +53,13 @@ impl Storage {
.build(),
None => reqwest::Client::builder().default_headers(headers).build(),
}?;
let response = storage_client.get(url.as_str()).send()?;
if !response.status().is_success() {
panic!("{}", response.status())
}
Ok(response)
}

// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
pub fn insert(
token: &oauth::Token,
bucket: &str,
object: File,
name: &str,
proxy: Option<&str>,
) -> Result<reqwest::Response, Error> {
let base = Url::parse("https://www.googleapis.com/upload/storage/v1/b/")?;
let mut url = base.join(&format!("{}/", bucket))?.join("o")?;
url.set_query(Some(&format!("uploadType=media&name={}", name)));

let mut headers = token.headers(AuthHeader::Bearer);
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/octet-stream"),
);

let storage_client = match proxy {
Some(p) => reqwest::Client::builder()
.default_headers(headers)
.proxy(reqwest::Proxy::all(p)?)
.build(),
None => reqwest::Client::builder().default_headers(headers).build(),
}?;

let response = storage_client.post(url.as_str()).body(object).send()?;
let response = storage_client.get(url.as_str()).send().await?;
if !response.status().is_success() {
panic!("{}", response.status())
}
Ok(response)
}
}

/*impl From<std::fs::File> for Into<reqwest::Body> {
}*/
47 changes: 26 additions & 21 deletions src/unpacker.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
use crate::error::Error;
use libflate::gzip::Decoder;
use sha2::{Digest, Sha256};
use std::io::{self, Read};
use bytes::Bytes;
//use crate::error::Error;
//use libflate::gzip::Decoder;
//use sha2::{Digest, Sha256};
//use std::io::{self, Read};
use std::path::PathBuf;
use tokio_stream::Stream;

pub struct Unpacker<R: Read> {
hasher: Hasher<R>,
pub struct Unpacker<S> {
//hasher: Hasher<R>,
path: PathBuf,

Check warning on line 11 in src/unpacker.rs

View workflow job for this annotation

GitHub Actions / Check

fields `path` and `stream` are never read

Check warning on line 11 in src/unpacker.rs

View workflow job for this annotation

GitHub Actions / Test Suite

fields `path` and `stream` are never read
stream: S,
}

impl<R: Read> Unpacker<R> {
pub fn new(reader: R, path: impl Into<PathBuf>) -> Self {
let hasher = Hasher::new(reader);
impl<S> Unpacker<S>
where
S: Stream<Item = reqwest::Result<Bytes>>,
{
pub fn new(stream: S, path: impl Into<PathBuf>) -> Self {
//let hasher = Hasher::new(reader);
Self {
hasher,
//hasher,
stream,
path: path.into(),
}
}

pub fn unpack(&mut self) -> Result<(), Error> {
/* pub fn unpack(&mut self) -> Result<(), Error> {
let decoder = Decoder::new(&mut self.hasher).unwrap();
let mut archive = tar::Archive::new(decoder);
archive.unpack(&self.path).unwrap();
Expand All @@ -29,15 +36,15 @@
// drain remaining data in the tarball
io::copy(&mut self.hasher, &mut io::sink()).unwrap();
self.hasher.hex_digest()
}
}*/
}

struct Hasher<R: Read> {
/*struct Hasher<R: Stream<Item = reqwest::Result<Bytes>>> {
reader: R,
digest: Sha256,
}

impl<R: Read> Hasher<R> {
impl<R: Stream<Item = reqwest::Result<Bytes>>> Hasher<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
Expand All @@ -48,15 +55,13 @@
pub fn hex_digest(self) -> HexDigest {
HexDigest(hex::encode(self.digest.finalize()))
}
}

impl<R: Read> Read for Hasher<R> {
fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
let nbytes = self.reader.read(buffer)?;
self.digest.update(&buffer[..nbytes]);
Ok(nbytes)
async fn read(&mut self, buffer: &mut [u8]) -> io::Result<Bytes> {
let bytes = self.reader.next().await?;
self.digest.update(&buffer[..bytes]);
Ok(bytes)
}
}
}*/

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct HexDigest(pub String);
Expand Down
Loading