Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #71 from nolar/extract-apis-no-scopes
Browse files Browse the repository at this point in the history
Consolidate all K8s API calls in one place
  • Loading branch information
Sergey Vasilyev authored May 28, 2019
2 parents 36412f7 + 47f6777 commit 0c55dcb
Show file tree
Hide file tree
Showing 22 changed files with 755 additions and 215 deletions.
44 changes: 4 additions & 40 deletions kopf/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@
TODO
"""

import datetime
import sys

import kubernetes

from kopf.structs import hierarchies
from kopf.k8s import events


# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()?
Expand All @@ -27,41 +23,9 @@ def event(obj, *, type, reason, message=''):
"""
if isinstance(obj, (list, tuple)):
for item in obj:
event(obj, type=type, reason=reason, message=message)
return

now = datetime.datetime.utcnow()
namespace = obj['metadata']['namespace']

meta = kubernetes.client.V1ObjectMeta(
namespace=namespace,
generate_name='kopf-event-',
)
body = kubernetes.client.V1beta1Event(
metadata=meta,

action='Action?',
type=type,
reason=reason,
note=message,
# message=message,

reporting_controller='kopf',
reporting_instance='dev',
deprecated_source=kubernetes.client.V1EventSource(component='kopf'), # used in the "From" column in `kubectl describe`.

regarding=hierarchies.build_object_reference(obj),
# related=hierarchies.build_object_reference(obj),

event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z'
deprecated_first_timestamp=now.isoformat() + 'Z', # used in the "Age" column in `kubectl describe`.
)

api = kubernetes.client.EventsV1beta1Api()
api.create_namespaced_event(
namespace=namespace,
body=body,
)
events.post_event(obj=item, type=type, reason=reason, message=message)
else:
events.post_event(obj=obj, type=type, reason=reason, message=message)


# Shortcuts for the only two officially documented event types as of now.
Expand Down
21 changes: 21 additions & 0 deletions kopf/k8s/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
All the routines to talk to Kubernetes API.
This library is supposed to be mocked when the mocked K8s client is needed,
and only the high-level logic has to be tested, not the API calls themselves.
Beware: this is NOT a Kubernetes client. It is set of dedicated helpers
specially tailored to do the framework-specific tasks, not the generic
Kubernetes object manipulation.
The operators MUST NOT rely on how the framework communicates with the cluster.
Specifically:
Currently, all the routines use the official Kubernetes client.
Eventually, it can be replaced with anything else (e.g. pykube-ng).
Currently, most of the routines are synchronous, i.e. blocking
from the asyncio's point of view. Later, they can be replaced
to async coroutines (if the client supports that),
or put into the asyncio executors (thread pools).
"""
52 changes: 52 additions & 0 deletions kopf/k8s/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime

import kubernetes


def post_event(*, obj, type, reason, message=''):
"""
Issue an event for the object.
"""

now = datetime.datetime.utcnow()
namespace = obj['metadata']['namespace']

# Object reference - similar to the owner reference, but different.
# TODO: reconstruct from `resource` once kind<->plural mapping is done. #57
ref = dict(
apiVersion=obj['apiVersion'], # resource.version
kind=obj['kind'], # resource.kind (~resource.plural)
name=obj['metadata']['name'],
uid=obj['metadata']['uid'],
namespace=obj['metadata']['namespace'],
)

meta = kubernetes.client.V1ObjectMeta(
namespace=namespace,
generate_name='kopf-event-',
)
body = kubernetes.client.V1beta1Event(
metadata=meta,

action='Action?',
type=type,
reason=reason,
note=message,
# message=message,

reporting_controller='kopf',
reporting_instance='dev',
deprecated_source=kubernetes.client.V1EventSource(component='kopf'), # used in the "From" column in `kubectl describe`.

regarding=ref,
# related=ref,

event_time=now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z'
deprecated_first_timestamp=now.isoformat() + 'Z', # used in the "Age" column in `kubectl describe`.
)

api = kubernetes.client.EventsV1beta1Api()
api.create_namespaced_event(
namespace=namespace,
body=body,
)
106 changes: 106 additions & 0 deletions kopf/k8s/fetching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import functools

import kubernetes

_UNSET_ = object()


def read_crd(*, resource, default=_UNSET_):
try:
name = f'{resource.plural}.{resource.group}'
api = kubernetes.client.ApiextensionsV1beta1Api()
rsp = api.read_custom_resource_definition(name=name)
return rsp
except kubernetes.client.rest.ApiException as e:
if e.status == 404 and default is not _UNSET_:
return default
raise


def read_obj(*, resource, namespace=None, name=None, default=_UNSET_):
try:
if namespace is None:
api = kubernetes.client.CustomObjectsApi()
rsp = api.get_cluster_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
name=name,
)
else:
api = kubernetes.client.CustomObjectsApi()
rsp = api.get_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
name=name,
)
return rsp
except kubernetes.client.rest.ApiException as e:
if e.status == 404 and default is not _UNSET_:
return default
raise


def list_objs(*, resource, namespace=None):
"""
List the objects of specific resource type.
The cluster-scoped call is used in two cases:
* The resource itself is cluster-scoped, and namespacing makes not sense.
* The operator serves all namespaces for the namespaced custom resource.
Otherwise, the namespace-scoped call is used:
* The resource is namespace-scoped AND operator is namespaced-restricted.
"""
api = kubernetes.client.CustomObjectsApi()
if namespace is None:
rsp = api.list_cluster_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
)
else:
rsp = api.list_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
)
return rsp


def make_list_fn(*, resource, namespace=None):
"""
Return a function to be called to receive the list of objects.
Needed in that form for the API streaming calls (see watching.py).
However, the returned function is already bound to the specified
resource type, and requires no resource-identifying parameters.
Docstrings are important! Kubernetes client uses them to guess
the returned object types and the parameters type.
Function wrapping does that: preserves the docstrings.
"""
api = kubernetes.client.CustomObjectsApi()
if namespace is None:
@functools.wraps(api.list_cluster_custom_object)
def list_fn(**kwargs):
return api.list_cluster_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
**kwargs)
else:
@functools.wraps(api.list_cluster_custom_object)
def list_fn(**kwargs):
return api.list_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
**kwargs)
return list_fn
39 changes: 39 additions & 0 deletions kopf/k8s/patching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import kubernetes


def patch_obj(*, resource, patch, namespace=None, name=None, body=None):
"""
Patch a resource of specific kind.
Either the namespace+name should be specified, or the body,
which is used only to get namespace+name identifiers.
Unlike the object listing, the namespaced call is always
used for the namespaced resources, even if the operator serves
the whole cluster (i.e. is not namespace-restricted).
"""

if body is not None and (name is not None or namespace is not None):
raise TypeError("Either body, or name+namespace can be specified. Got both.")

namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace
name = body.get('metadata', {}).get('name') if body is not None else name

api = kubernetes.client.CustomObjectsApi()
if namespace is None:
api.patch_cluster_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
name=name,
body=patch,
)
else:
api.patch_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
name=name,
body=patch,
)
20 changes: 4 additions & 16 deletions kopf/reactor/watching.py → kopf/k8s/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import kubernetes

from kopf.k8s import fetching
from kopf.reactor import registries

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,22 +77,9 @@ async def streaming_watch(
"""
kwargs = dict(timeout_seconds=DEFAULT_STREAM_TIMEOUT) if DEFAULT_STREAM_TIMEOUT else {}
loop = asyncio.get_event_loop()
w = kubernetes.watch.Watch()
api = kubernetes.client.CustomObjectsApi()
if namespace is None:
stream = w.stream(api.list_cluster_custom_object,
group=resource.group,
version=resource.version,
plural=resource.plural,
**kwargs)
else:
stream = w.stream(api.list_namespaced_custom_object,
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=namespace,
**kwargs)

fn = fetching.make_list_fn(resource=resource, namespace=namespace)
watch = kubernetes.watch.Watch()
stream = watch.stream(fn, **kwargs)
async for event in streaming_aiter(stream, loop=loop):

# "410 Gone" is for the "resource version too old" error, we must restart watching.
Expand Down
13 changes: 2 additions & 11 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from contextvars import ContextVar
from typing import NamedTuple, Optional, Any, MutableMapping, Text, Callable, Iterable

import kubernetes

from kopf import events
from kopf.k8s import patching
from kopf.reactor import invocation
from kopf.reactor import registries
from kopf.structs import diffs
Expand Down Expand Up @@ -198,15 +197,7 @@ async def custom_object_handler(
# But only once, to reduce the number of API calls and the generated irrelevant events.
if patch:
logger.debug("Patching with: %r", patch)
api = kubernetes.client.CustomObjectsApi()
api.patch_namespaced_custom_object(
group=resource.group,
version=resource.version,
plural=resource.plural,
namespace=body['metadata']['namespace'],
name=body['metadata']['name'],
body=patch,
)
patching.patch_obj(resource=resource, patch=patch, body=body)

# Sleep strictly after patching, never before -- to keep the status proper.
if delay:
Expand Down
Loading

0 comments on commit 0c55dcb

Please sign in to comment.