Skip to content

Commit

Permalink
Fix reconnecting in watch for custom resources (#321)
Browse files Browse the repository at this point in the history
* Fix reconnecting in watch for custom resources

* add test for watch crd without timeout

* clean up tests
  • Loading branch information
tomplus authored Jul 23, 2024
1 parent b7ec53c commit 4ac32bb
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 154 deletions.
2 changes: 2 additions & 0 deletions kubernetes_asyncio/dynamic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ async def watch(resource, namespace=None, name=None, label_selector=None, field_
serialize=False,
**kwargs
):
if event == "":
break
event['object'] = ResourceInstance(resource, event['object'])
yield event

Expand Down
232 changes: 84 additions & 148 deletions kubernetes_asyncio/dynamic/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,46 @@ async def test_cluster_custom_resources(self):
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')

crd_api = await client.resources.get(
api_version='apiextensions.k8s.io/v1beta1',
api_version='apiextensions.k8s.io/v1',
kind='CustomResourceDefinition')
name = 'clusterchangemes.apps.example.com'
crd_manifest = {
'apiVersion': 'apiextensions.k8s.io/v1beta1',
'kind': 'CustomResourceDefinition',
'metadata': {
'name': name,
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": {
"name": name,
},
'spec': {
'group': 'apps.example.com',
'names': {
'kind': 'ClusterChangeMe',
'listKind': 'ClusterChangeMeList',
'plural': 'clusterchangemes',
'singular': 'clusterchangeme',
"spec": {
"group": "apps.example.com",
"names": {
"kind": "ClusterChangeMe",
"listKind": "ClusterChangeMeList",
"plural": "clusterchangemes",
"singular": "clusterchangeme",
},
'scope': 'Cluster',
'version': 'v1',
'subresources': {
'status': {}
}
}
"scope": "Cluster",
"versions": [
{
"name": "v1",
"served": True,
"storage": True,
"schema": {
"openAPIV3Schema": {
"type": "object",
"properties": {
"spec": {
"type": "object",
"properties": {"size": {"type": "integer"}},
}
},
}
},
}
],
},
}
resp = await crd_api.create(crd_manifest)

resp = await crd_api.create(crd_manifest)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)

Expand Down Expand Up @@ -99,6 +113,19 @@ async def test_cluster_custom_resources(self):
resp = await changeme_api.create(body=changeme_manifest)
self.assertEqual(resp.metadata.name, changeme_name)

# watch with timeout
count = 0
async for _ in client.watch(changeme_api, timeout=3, namespace="default", name=changeme_name):
count += 1
self.assertTrue(count > 0, msg="no events received for watch")

# without timeout, should be longer than the previous check
async def _watch_no_timeout():
async for _ in client.watch(changeme_api, namespace="default", name=changeme_name):
pass
with self.assertRaises(asyncio.exceptions.TimeoutError):
await asyncio.wait_for(_watch_no_timeout(), timeout=5)

resp = await changeme_api.get(name=changeme_name)
self.assertEqual(resp.metadata.name, changeme_name)

Expand Down Expand Up @@ -127,111 +154,6 @@ async def test_cluster_custom_resources(self):
with self.assertRaises(ResourceNotFoundError):
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')

# async def test_async_namespaced_custom_resources(self):
# async with api_client.ApiClient(configuration=self.config) as apic:
# client = await DynamicClient.newclient(apic)
#
# with self.assertRaises(ResourceNotFoundError):
# await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')
#
# crd_api = await client.resources.get(
# api_version='apiextensions.k8s.io/v1beta1',
# kind='CustomResourceDefinition')
#
# name = 'changemes.apps.example.com'
#
# crd_manifest = {
# 'apiVersion': 'apiextensions.k8s.io/v1beta1',
# 'kind': 'CustomResourceDefinition',
# 'metadata': {
# 'name': name,
# },
# 'spec': {
# 'group': 'apps.example.com',
# 'names': {
# 'kind': 'ChangeMe',
# 'listKind': 'ChangeMeList',
# 'plural': 'changemes',
# 'singular': 'changeme',
# },
# 'scope': 'Namespaced',
# 'version': 'v1',
# 'subresources': {
# 'status': {}
# }
# }
# }
# async_resp = await crd_api.create(crd_manifest, async_req=True)
#
# self.assertEqual(name, async_resp.metadata.name)
# self.assertTrue(async_resp.status)
#
# async_resp = await crd_api.get(name=name, async_req=True)
# self.assertEqual(name, async_resp.metadata.name)
# self.assertTrue(async_resp.status)
#
# try:
# changeme_api = await client.resources.get(
# api_version='apps.example.com/v1', kind='ChangeMe')
# except ResourceNotFoundError:
# # Need to wait a sec for the discovery layer to get updated
# await asyncio.sleep(2)
# changeme_api = await client.resources.get(
# api_version='apps.example.com/v1', kind='ChangeMe')
#
# async_resp = await changeme_api.get(async_req=True)
# self.assertEqual(async_resp.items, [])
#
# changeme_name = 'custom-resource' + short_uuid()
# changeme_manifest = {
# 'apiVersion': 'apps.example.com/v1',
# 'kind': 'ChangeMe',
# 'metadata': {
# 'name': changeme_name,
# },
# 'spec': {}
# }
#
# async_resp = await changeme_api.create(body=changeme_manifest, namespace='default', async_req=True)
# self.assertEqual(async_resp.metadata.name, changeme_name)
#
# async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True)
# self.assertEqual(async_resp.metadata.name, changeme_name)
#
# changeme_manifest['spec']['size'] = 3
# async_resp = await changeme_api.patch(
# body=changeme_manifest,
# namespace='default',
# content_type='application/merge-patch+json',
# async_req=True
# )
# self.assertEqual(async_resp.spec.size, 3)
#
# async_resp = await changeme_api.get(name=changeme_name, namespace='default', async_req=True)
# self.assertEqual(async_resp.spec.size, 3)
#
# async_resp = await changeme_api.get(namespace='default', async_req=True)
# self.assertEqual(len(async_resp.items), 1)
#
# async_resp = await changeme_api.get(async_req=True)
# self.assertEqual(len(async_resp.items), 1)
#
# await changeme_api.delete(name=changeme_name, namespace='default', async_req=True)
#
# async_resp = await changeme_api.get(namespace='default', async_req=True)
# self.assertEqual(len(async_resp.items), 0)
#
# async_resp = await changeme_api.get(async_req=True)
# self.assertEqual(len(async_resp.items), 0)
#
# await crd_api.delete(name=name, async_req=True)
#
# await asyncio.sleep(2)
# await client.resources.invalidate_cache()
# with self.assertRaises(ResourceNotFoundError):
# await client.resources.get(
# api_version='apps.example.com/v1', kind='ChangeMe')

async def test_namespaced_custom_resources(self):
async with api_client.ApiClient(configuration=self.config) as apic:
client = await DynamicClient(apic)
Expand All @@ -240,32 +162,46 @@ async def test_namespaced_custom_resources(self):
await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')

crd_api = await client.resources.get(
api_version='apiextensions.k8s.io/v1beta1',
api_version='apiextensions.k8s.io/v1',
kind='CustomResourceDefinition')
name = 'changemes.apps.example.com'
name = 'clusterchangemes.apps.example.com'
crd_manifest = {
'apiVersion': 'apiextensions.k8s.io/v1beta1',
'kind': 'CustomResourceDefinition',
'metadata': {
'name': name,
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": {
"name": name,
},
'spec': {
'group': 'apps.example.com',
'names': {
'kind': 'ChangeMe',
'listKind': 'ChangeMeList',
'plural': 'changemes',
'singular': 'changeme',
"spec": {
"group": "apps.example.com",
"names": {
"kind": "ClusterChangeMe",
"listKind": "ClusterChangeMeList",
"plural": "clusterchangemes",
"singular": "clusterchangeme",
},
'scope': 'Namespaced',
'version': 'v1',
'subresources': {
'status': {}
}
}
"scope": "Namespaced",
"versions": [
{
"name": "v1",
"served": True,
"storage": True,
"schema": {
"openAPIV3Schema": {
"type": "object",
"properties": {
"spec": {
"type": "object",
"properties": {"size": {"type": "integer"}},
}
},
}
},
}
],
},
}
resp = await crd_api.create(crd_manifest)

resp = await crd_api.create(crd_manifest)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)

Expand All @@ -276,18 +212,18 @@ async def test_namespaced_custom_resources(self):
self.assertTrue(resp.status)

try:
await client.resources.get(api_version='apps.example.com/v1', kind='ChangeMe')
await client.resources.get(api_version='apps.example.com/v1', kind='ClusterChangeMe')
except ResourceNotFoundError:
# Need to wait a sec for the discovery layer to get updated
await asyncio.sleep(2)
changeme_api = await client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
api_version='apps.example.com/v1', kind='ClusterChangeMe')
resp = await changeme_api.get()
self.assertEqual(resp.items, [])
changeme_name = 'custom-resource' + short_uuid()
changeme_manifest = {
'apiVersion': 'apps.example.com/v1',
'kind': 'ChangeMe',
'kind': 'ClusterChangeMe',
'metadata': {
'name': changeme_name,
},
Expand Down Expand Up @@ -446,7 +382,7 @@ async def test_configmap_apis(self):
self.assertEqual(name, resp.metadata.name)

count = 0
async for _ in client.watch(api, timeout=10, namespace="default", name=name):
async for _ in client.watch(api, timeout=3, namespace="default", name=name):
count += 1
self.assertTrue(count > 0, msg="no events received for watch")

Expand Down
23 changes: 17 additions & 6 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ def unmarshal_event(self, data: str, response_type):

# If possible, compile the JSON response into a Python native response
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
if response_type and js['type'].lower() != 'bookmark':
if response_type:
js['object'] = self._api_client.deserialize(
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
response_type=response_type
)

if js['type'].lower() != 'bookmark':
# decode and save resource_version to continue watching
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
Expand All @@ -120,6 +121,7 @@ def unmarshal_event(self, data: str, response_type):
and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata']['resourceVersion']

elif js['type'].lower() == 'bookmark':
self.resource_version = js['object']['metadata']['resourceVersion']

Expand All @@ -135,6 +137,12 @@ async def __anext__(self):
await self.close()
raise

def _reconnect(self):
self.resp.close()
self.resp = None
if self.resource_version:
self.func.keywords['resource_version'] = self.resource_version

async def next(self):

while 1:
Expand All @@ -153,11 +161,12 @@ async def next(self):
try:
line = await self.resp.content.readline()
except asyncio.TimeoutError:
# This exception can be raised by aiohttp (client timeout)
# but we don't retry if server side timeout is applied.
# The base scenario would be to restart watching with timeout_seconds
# reduced by time spent in previous iterations.
if 'timeout_seconds' not in self.func.keywords:
self.resp.close()
self.resp = None
if self.resource_version:
self.func.keywords['resource_version'] = self.resource_version
self._reconnect()
continue
else:
raise
Expand All @@ -167,7 +176,9 @@ async def next(self):
# Stop the iterator if K8s sends an empty response. This happens when
# eg the supplied timeout has expired.
if line == '':
raise StopAsyncIteration
if 'timeout_seconds' not in self.func.keywords:
self._reconnect()
continue

# Special case for faster log streaming
if self.return_type == 'str':
Expand Down

0 comments on commit 4ac32bb

Please sign in to comment.