Skip to content

Commit

Permalink
tugger-debian: refactor HTTP client path handling
Browse files Browse the repository at this point in the history
Our assumption that the root URL was 2 directories up from the
distribution URL was incorrect. In fact, the distribution path can
have as many path/subdirectory components as wanted.

This commit refactors the HTTP client to support arbitrary path
layouts.
  • Loading branch information
indygreg committed Nov 30, 2021
1 parent 0a65803 commit 62a2863
Showing 1 changed file with 139 additions and 93 deletions.
232 changes: 139 additions & 93 deletions tugger-debian/src/repository/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,28 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

/*! Debian repository HTTP client.
This module provides functionality for interfacing with HTTP based Debian
repositories.
See <https://wiki.debian.org/DebianRepository/Format> for a definition of a
Debian repository layout. Essentially, there's a root URL. Under that root URL
are `dists/<distribution>/` directories. Each of these directories (which can
have multiple path separators) has an `InRelease` and/or `Release` file. These
files define the contents of a given *distribution*. This includes which
architectures are supported, what *components* are available, etc.
Our [HttpRepositoryClient] models a client bound to a root URL.
Our [HttpDistributionClient] models a client bound to a virtual sub-directory
under the root URL. You can obtain instances by calling [HttpRepositoryClient.distribution_client()].
The `InRelease`/`Release` files define the contents of a given *distribution*. Our
[HttpReleaseClient] models a client bound to a parsed file. You can obtain instances
by calling [HttpDistributionClient.fetch_inrelease()].
*/

use {
crate::{
binary_package_control::BinaryPackageControlFile,
Expand Down Expand Up @@ -46,24 +68,38 @@ pub enum HttpError {
PackagesIndicesEntryNotFound,
}

async fn transform_http_response(
res: Response,
compression: IndexFileCompression,
) -> Result<Pin<Box<dyn AsyncRead>>, HttpError> {
let stream = res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e)));

Ok(match compression {
IndexFileCompression::None => Box::pin(stream.into_async_read()),
IndexFileCompression::Gzip => Box::pin(GzipDecoder::new(stream.into_async_read())),
IndexFileCompression::Xz => Box::pin(XzDecoder::new(stream.into_async_read())),
IndexFileCompression::Bzip2 => Box::pin(BzDecoder::new(stream.into_async_read())),
IndexFileCompression::Lzma => Box::pin(LzmaDecoder::new(stream.into_async_read())),
})
}

/// Client for a Debian repository served via HTTP.
///
/// Instances are bound to a base URL, which represents the base directory.
/// That URL should have an `InRelease` or `Release` file under it. From
/// that main entrypoint, all other repository state can be discovered and
/// retrieved.
///
/// Distributions (typically) exist in a `dists/<distribution>` directory.
/// Distributions have an `InRelease` and/or `Release` file under it.
#[derive(Debug)]
pub struct HttpRepositoryClient {
/// HTTP client to use.
client: Client,

/// Base URL for Debian projects.
/// Base URL for this Debian archive.
///
/// Pool paths are relative to this.
debian_base_url: Url,

/// Base URL for this repository (where the `InRelease` file is).
repository_url: Url,
/// Contains both distributions and the files pool.
root_url: Url,
}

impl HttpRepositoryClient {
Expand All @@ -74,110 +110,99 @@ impl HttpRepositoryClient {

/// Construct an instance using the given [Client] and URL.
///
/// The URL should have an `InRelease` or `Release` file under it.
/// The given URL should be the value that follows the
/// `deb` line in apt sources files. e.g. for
/// `deb https://deb.debian.org/debian stable main`, the value would be
/// `https://deb.debian.org/debian`. The URL typically has a `dists/` directory
/// underneath.
pub fn new_client(client: Client, url: impl IntoUrl) -> Result<Self, HttpError> {
let repository_url = url.into_url()?;

// Pool paths are relative to what's known as the Debian base URL, which is
// typically 2 path components up from where we are.
let debian_base_url = repository_url
.join("../..")
.unwrap_or_else(|_| repository_url.clone());

Ok(Self {
client,
debian_base_url,
repository_url,
})
}

/// Debian base URL for this fetcher.
pub fn debian_base_url(&self) -> &Url {
&self.debian_base_url
}
let mut root_url = url.into_url()?;

/// Set the Debian base URL for this fetcher.
///
/// This is the directory from which package/pool paths are relative to. It is
/// typically 2 directory levels up from where the `InRelease` file is located.
pub fn set_debian_base_url(&mut self, url: impl IntoUrl) -> Result<(), HttpError> {
self.debian_base_url = url.into_url()?;
// Trailing URLs are significant to the Url type when we .join(). So ensure
// the URL has a trailing path.
if !root_url.path().ends_with('/') {
root_url.set_path(&format!("{}/", root_url.path()));
}

Ok(())
Ok(Self { client, root_url })
}

/// Repository URL for this fetcher.
pub fn repository_url(&self) -> &Url {
&self.repository_url
/// Base URL for this fetcher.
pub fn root_url(&self) -> &Url {
&self.root_url
}

async fn transform_http_response(
res: Response,
compression: IndexFileCompression,
) -> Result<Pin<Box<dyn AsyncRead>>, HttpError> {
let stream = res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e)));

Ok(match compression {
IndexFileCompression::None => Box::pin(stream.into_async_read()),
IndexFileCompression::Gzip => Box::pin(GzipDecoder::new(stream.into_async_read())),
IndexFileCompression::Xz => Box::pin(XzDecoder::new(stream.into_async_read())),
IndexFileCompression::Bzip2 => Box::pin(BzDecoder::new(stream.into_async_read())),
IndexFileCompression::Lzma => Box::pin(LzmaDecoder::new(stream.into_async_read())),
})
}

/// Perform an HTTP GET for a path relative to the Debian root directory.
///
/// This is typically called to retrieve non-index files (e.g. .deb packages).
pub async fn get_debian_path(&self, path: &str) -> Result<Response, HttpError> {
let url = self.debian_base_url.join(path)?;
let res = self.client.get(url).send().await?;
/// Perform an HTTP GET for a path relative to the root directory/URL.
pub async fn get_path(&self, path: &str) -> Result<Response, HttpError> {
let res = self.client.get(self.root_url.join(path)?).send().await?;

Ok(res.error_for_status()?)
}

/// Perform an HTTP GET for a path relative to the Debian root directory.
/// Perform an HTTP GET for a path relative to the root directory/URL.
///
/// This transforms the response to an async reader for reading the HTTP response body.
pub async fn get_debian_path_reader(
pub async fn get_path_reader(
&self,
path: &str,
compression: IndexFileCompression,
) -> Result<Pin<Box<dyn AsyncRead>>, HttpError> {
let res = self.get_debian_path(path).await?;
let res = self.get_path(path).await?;

Self::transform_http_response(res, compression).await
transform_http_response(res, compression).await
}

/// Perform an HTTP GET for a path relative to the repository root directory.
pub async fn get_repository_path(&self, path: &str) -> Result<Response, HttpError> {
let url = self.repository_url.join(path)?;
let res = self.client.get(url).send().await?;

Ok(res.error_for_status()?)
/// Obtain a [HttpDistributionClient] for a given distribution name/path.
///
/// The returned client has its root URL set to `self.root_url().join("dists/{distribution}")`.
pub fn distribution_client(&self, distribution: &str) -> HttpDistributionClient<'_> {
HttpDistributionClient {
root_client: self,
distribution_path: format!("dists/{}", distribution.trim_matches('/')),
}
}

/// Perform an HTTP GET for a path relative to the repository root directory.
/// Obtain a [HttpDistributionClient] for a given sub-directory.
///
/// Returns a reader that can be used to read HTTP response body payload, with an
/// optional decompression content transformation transparently applied.
pub async fn get_repository_path_reader(
&self,
path: &str,
compression: IndexFileCompression,
) -> Result<Pin<Box<dyn AsyncRead>>, HttpError> {
let res = self.get_repository_path(path).await?;
/// The root URL of the returned client is `self.root_url().join(path)`, without
/// `dists/` prepended. This allows specifying non-standard paths to the distribution.
pub fn distribution_client_raw_path(&self, path: &str) -> HttpDistributionClient<'_> {
HttpDistributionClient {
root_client: self,
distribution_path: path.trim_matches('/').to_string(),
}
}
}

fn join_path(a: &str, b: &str) -> String {
format!("{}/{}", a.trim_matches('/'), b.trim_start_matches('/'))
}

/// An HTTP client bound to a specific distribution.
///
/// Debian repositories have the form `<root>/dists/<distribution>/` where the
/// *distribution* directory contains an `InRelease` and/or `Release` file.
///
/// This type models a client interface to a specific distribution path under a root
/// directory.
pub struct HttpDistributionClient<'client> {
root_client: &'client HttpRepositoryClient,
distribution_path: String,
}

Self::transform_http_response(res, compression).await
impl<'client> HttpDistributionClient<'client> {
/// Perform an HTTP GET for a path relative to the distribution's root directory.
pub async fn get_path(&self, path: &str) -> Result<Response, HttpError> {
self.root_client
.get_path(&join_path(&self.distribution_path, path))
.await
}

/// Fetch and parse the `InRelease` file from the repository.
///
/// Returns a new object bound to the parsed `InRelease` file.
pub async fn fetch_inrelease(&self) -> Result<HttpReleaseClient<'_>, HttpError> {
let res = self.get_repository_path("InRelease").await?;
pub async fn fetch_inrelease(&self) -> Result<HttpReleaseClient<'client>, HttpError> {
let res = self.get_path("InRelease").await?;

let data = res.bytes().await?;

Expand All @@ -192,7 +217,8 @@ impl HttpRepositoryClient {
let fetch_compression = IndexFileCompression::Xz;

Ok(HttpReleaseClient {
base_client: self,
root_client: self.root_client,
distribution_path: self.distribution_path.clone(),
release,
fetch_checksum: **fetch_checksum,
fetch_compression,
Expand All @@ -202,7 +228,8 @@ impl HttpRepositoryClient {

/// Repository HTTP client bound to a parsed `Release` or `InRelease` file.
pub struct HttpReleaseClient<'client> {
base_client: &'client HttpRepositoryClient,
root_client: &'client HttpRepositoryClient,
distribution_path: String,
release: ReleaseFile<'static>,
/// Which checksum flavor to fetch and verify.
fetch_checksum: ChecksumType,
Expand All @@ -216,6 +243,26 @@ impl<'client> AsRef<ReleaseFile<'static>> for HttpReleaseClient<'client> {
}

impl<'client> HttpReleaseClient<'client> {
/// Perform an HTTP GET for a path relative to the distribution's root directory.
pub async fn get_path(&self, path: &str) -> Result<Response, HttpError> {
self.root_client
.get_path(&join_path(&self.distribution_path, path))
.await
}

/// Perform an HTTP GET for a path relative to the distribution's root directory.
///
/// This transforms the response to an async reader for reading the HTTP response body.
pub async fn get_path_reader(
&self,
path: &str,
compression: IndexFileCompression,
) -> Result<Pin<Box<dyn AsyncRead>>, HttpError> {
let res = self.get_path(path).await?;

transform_http_response(res, compression).await
}

/// Fetch a `Packages` file and convert it to a stream of [BinaryPackageControlFile] instances.
pub async fn fetch_packages(
&self,
Expand All @@ -239,9 +286,7 @@ impl<'client> HttpReleaseClient<'client> {
// TODO make this stream output.

let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
self.base_client
.get_repository_path_reader(path, entry.compression)
.await?,
self.get_path_reader(path, entry.compression).await?,
));

let mut res = BinaryPackageList::default();
Expand Down Expand Up @@ -278,14 +323,15 @@ mod test {
},
};

const BULLSEYE_URL: &str =
"http://snapshot.debian.org/archive/debian/20211120T085721Z/dists/bullseye/";
const BULLSEYE_URL: &str = "http://snapshot.debian.org/archive/debian/20211120T085721Z";

#[tokio::test]
async fn bullseye_release() -> Result<()> {
let repo = HttpRepositoryClient::new(BULLSEYE_URL)?;
let root = HttpRepositoryClient::new(BULLSEYE_URL)?;

let dist = root.distribution_client("bullseye");

let release = repo.fetch_inrelease().await?;
let release = dist.fetch_inrelease().await?;

let packages = release.fetch_packages("main", "amd64", false).await?;
assert_eq!(packages.len(), 58606);
Expand Down

0 comments on commit 62a2863

Please sign in to comment.