Skip to content

Commit

Permalink
Implement client native object reference fetching (#1511)
Browse files Browse the repository at this point in the history
Implement native object reference fetching

Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev authored Jun 11, 2024
1 parent b7f564e commit 9df1b43
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 4 deletions.
289 changes: 285 additions & 4 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::{Client, Error, Result};
use k8s_openapi::api::core::v1::Namespace as k8sNs;
use k8s_openapi::{
api::core::v1::{LocalObjectReference, Namespace as k8sNs, ObjectReference},
apimachinery::pkg::apis::meta::v1::OwnerReference,
};
use kube_core::{
object::ObjectList,
params::{GetParams, ListParams},
request::Request,
ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource,
ApiResource, ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
Expand Down Expand Up @@ -42,9 +45,109 @@ pub struct Cluster;
/// You can create this directly, or convert `From` a `String` / `&str`, or `TryFrom` an `k8s_openapi::api::core::v1::Namespace`
pub struct Namespace(String);

/// Referenced object name resolution
pub trait ObjectRef<K>: ObjectUrl<K> {
fn name(&self) -> Option<&str>;
}

/// Reference resolver for a specified namespace
pub trait NamespacedRef<K> {
/// Resolve reference in the provided namespace
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K>;
}

impl<K> ObjectUrl<K> for ObjectReference
where
K: Resource,
{
fn url_path(&self) -> String {
url_path(
&ApiResource::from_gvk(&self.clone().into()),
self.namespace.clone(),
)
}
}

impl<K> ObjectRef<K> for ObjectReference
where
K: Resource,
{
fn name(&self) -> Option<&str> {
self.name.as_deref()
}
}

impl<K> NamespacedRef<K> for ObjectReference
where
K: Resource,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
Self {
namespace: namespace.into(),
..self.clone()
}
}
}

impl<K> ObjectUrl<K> for OwnerReference
where
K: Resource,
K::Scope: ClusterScope,
{
fn url_path(&self) -> String {
url_path(&ApiResource::from_gvk(&self.clone().into()), None)
}
}

impl<K> ObjectRef<K> for OwnerReference
where
K: Resource,
K::Scope: ClusterScope,
{
fn name(&self) -> Option<&str> {
self.name.as_str().into()
}
}

impl<K> NamespacedRef<K> for OwnerReference
where
K: Resource,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
ObjectReference {
api_version: self.api_version.clone().into(),
namespace: namespace.into(),
name: self.name.clone().into(),
uid: self.uid.clone().into(),
kind: self.kind.clone().into(),
..Default::default()
}
}
}

impl<K> NamespacedRef<K> for LocalObjectReference
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn within(&self, namespace: impl Into<Option<String>>) -> impl ObjectRef<K> {
let dt = Default::default();
ObjectReference {
api_version: K::api_version(&dt).to_string().into(),
namespace: namespace.into(),
name: self.name.clone(),
kind: K::kind(&dt).to_string().into(),
..Default::default()
}
}
}

/// Scopes for `unstable-client` [`Client#impl-Client`] extension methods
pub mod scope {
pub use super::{Cluster, Namespace};
pub use super::{Cluster, Namespace, NamespacedRef};
}

// All objects can be listed cluster-wide
Expand Down Expand Up @@ -184,6 +287,69 @@ impl Client {
self.request::<K>(req).await
}

/// Fetch a single instance of a `Resource` from a provided object reference.
///
/// ```no_run
/// # use k8s_openapi::api::rbac::v1::ClusterRole;
/// # use k8s_openapi::api::core::v1::Service;
/// # use k8s_openapi::api::core::v1::Secret;
/// # use k8s_openapi::api::core::v1::ObjectReference;
/// # use k8s_openapi::api::core::v1::LocalObjectReference;
/// # use k8s_openapi::api::core::v1::{Node, Pod};
/// # use kube::{Resource, ResourceExt, api::GetParams};
/// # use kube::client::scope::NamespacedRef;
/// # use kube::api::DynamicObject;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// // cluster scoped
/// let cr: ClusterRole = todo!();
/// let cr: ClusterRole = client.fetch(&cr.object_ref(&())).await?;
/// assert_eq!(cr.name_unchecked(), "cluster-admin");
/// // namespace scoped
/// let svc: Service = todo!();
/// let svc: Service = client.fetch(&svc.object_ref(&())).await?;
/// assert_eq!(svc.name_unchecked(), "kubernetes");
/// // Fetch an owner of the resource
/// let pod: Pod = todo!();
/// let owner = pod
/// .owner_references()
/// .to_vec()
/// .into_iter()
/// .find(|r| r.kind == Node::kind(&()))
/// .ok_or("Not Found")?;
/// let node: Node = client.fetch(&owner).await?;
/// // Namespace scoped objects require namespace
/// let pod: Pod = client.fetch(&owner.within("ns".to_string())).await?;
/// // Fetch dynamic object to resolve type later
/// let dynamic: DynamicObject = client.fetch(&owner.within("ns".to_string())).await?;
/// // Fetch using local object reference
/// let secret_ref = pod
/// .spec
/// .unwrap_or_default()
/// .image_pull_secrets
/// .unwrap_or_default()
/// .get(0)
/// .unwrap_or(&LocalObjectReference{name: Some("pull_secret".into())});
/// let secret: Secret = client.fetch(&secret_ref.within(pod.namespace())).await?;
/// # Ok(())
/// # }
/// ```
pub async fn fetch<K>(&self, reference: &impl ObjectRef<K>) -> Result<K>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
{
let mut req = Request::new(reference.url_path())
.get(
reference
.name()
.ok_or(Error::RefResolve("Reference is empty".to_string()))?,
&GetParams::default(),
)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.request::<K>(req).await
}

/// List instances of a `Resource` implementing type `K` at the specified scope.
///
/// ```no_run
Expand Down Expand Up @@ -216,13 +382,32 @@ impl Client {
}
}

// Resource url_path resolver
fn url_path(r: &ApiResource, namespace: Option<String>) -> String {
let n = if let Some(ns) = namespace {
format!("namespaces/{ns}/")
} else {
"".into()
};
format!(
"/{group}/{api_version}/{namespaces}{plural}",
group = if r.group.is_empty() { "api" } else { "apis" },
api_version = r.api_version,
namespaces = n,
plural = r.plural
)
}

#[cfg(test)]
mod test {
use crate::client::client_ext::NamespacedRef;

use super::{
scope::{Cluster, Namespace},
Client, ListParams,
};
use kube_core::ResourceExt;
use k8s_openapi::api::core::v1::LocalObjectReference;
use kube_core::{DynamicObject, Resource, ResourceExt};

#[tokio::test]
#[ignore = "needs cluster (will list/get namespaces, pods, jobs, svcs, clusterroles)"]
Expand Down Expand Up @@ -256,4 +441,100 @@ mod test {

Ok(())
}

#[tokio::test]
#[ignore = "needs cluster (will get svcs, clusterroles, pods, nodes)"]
async fn client_ext_fetch_ref_pods_svcs() -> Result<(), Box<dyn std::error::Error>> {
use k8s_openapi::api::{
core::v1::{Node, ObjectReference, Pod, Service},
rbac::v1::ClusterRole,
};

let client = Client::try_default().await?;
// namespaced fetch
let svc: Service = client
.fetch(&ObjectReference {
kind: Some(Service::kind(&()).into()),
api_version: Some(Service::api_version(&()).into()),
name: Some("kubernetes".into()),
namespace: Some("default".into()),
..Default::default()
})
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
// global fetch
let ca: ClusterRole = client
.fetch(&ObjectReference {
kind: Some(ClusterRole::kind(&()).into()),
api_version: Some(ClusterRole::api_version(&()).into()),
name: Some("cluster-admin".into()),
..Default::default()
})
.await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");
// namespaced fetch untyped
let svc: DynamicObject = client.fetch(&svc.object_ref(&())).await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
// global fetch untyped
let ca: DynamicObject = client.fetch(&ca.object_ref(&())).await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");

// Fetch using local object reference
let svc: Service = client
.fetch(
&LocalObjectReference {
name: svc.name_any().into(),
}
.within(svc.namespace()),
)
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");

let kube_system: Namespace = "kube-system".into();
for pod in client
.list::<Pod>(
&ListParams::default().labels("component=kube-apiserver"),
&kube_system,
)
.await?
{
let owner = pod
.owner_references()
.to_vec()
.into_iter()
.find(|r| r.kind == Node::kind(&()))
.ok_or("Not found")?;
let _: Node = client.fetch(&owner).await?;
}

Ok(())
}

#[tokio::test]
#[ignore = "needs cluster (will get svcs, clusterroles, pods, nodes)"]
async fn fetch_fails() -> Result<(), Box<dyn std::error::Error>> {
use crate::error::Error;
use k8s_openapi::api::core::v1::{ObjectReference, Pod, Service};

let client = Client::try_default().await?;
// namespaced fetch
let svc: Service = client
.fetch(&ObjectReference {
kind: Some(Service::kind(&()).into()),
api_version: Some(Service::api_version(&()).into()),
name: Some("kubernetes".into()),
namespace: Some("default".into()),
..Default::default()
})
.await?;
let err = client.fetch::<Pod>(&svc.object_ref(&())).await.unwrap_err();
assert!(matches!(err, Error::SerdeError(_)));
assert_eq!(err.to_string(), "Error deserializing response: invalid value: string \"Service\", expected Pod at line 1 column 17".to_string());

let obj: DynamicObject = client.fetch(&svc.object_ref(&())).await?;
let err = obj.try_parse::<Pod>().unwrap_err();
assert_eq!(err.to_string(), "failed to parse this DynamicObject into a Resource: invalid value: string \"Service\", expected Pod".to_string());

Ok(())
}
}
6 changes: 6 additions & 0 deletions kube-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub enum Error {
#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
#[error("auth error: {0}")]
Auth(#[source] crate::client::AuthError),

/// Error resolving resource reference
#[cfg(feature = "unstable-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[error("Reference resolve error: {0}")]
RefResolve(String),
}

#[derive(Error, Debug)]
Expand Down
30 changes: 30 additions & 0 deletions kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::str::FromStr;

use crate::TypeMeta;
use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down Expand Up @@ -47,6 +48,35 @@ impl TryFrom<TypeMeta> for GroupVersionKind {
}
}

impl From<OwnerReference> for GroupVersionKind {
fn from(value: OwnerReference) -> Self {
let (group, version) = match value.api_version.split_once("/") {
Some((group, version)) => (group, version),
None => ("", value.api_version.as_str()),
};
Self {
group: group.into(),
version: version.into(),
kind: value.kind,
}
}
}

impl From<ObjectReference> for GroupVersionKind {
fn from(value: ObjectReference) -> Self {
let api_version = value.api_version.unwrap_or_default();
let (group, version) = match api_version.split_once("/") {
Some((group, version)) => (group, version),
None => ("", api_version.as_str()),
};
Self {
group: group.into(),
version: version.into(),
kind: value.kind.unwrap_or_default(),
}
}
}

/// Core information about a family of API Resources
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct GroupVersion {
Expand Down

0 comments on commit 9df1b43

Please sign in to comment.