Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #149 from nolar/e2e-fixes
Browse files Browse the repository at this point in the history
Improve the stability of e2e tests
  • Loading branch information
Sergey Vasilyev authored Jul 16, 2019
2 parents ae57004 + 3fb00b5 commit a9e2030
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 43 deletions.
2 changes: 1 addition & 1 deletion examples/09-testing/test_example_09.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_resource_lifecycle(mocker):
mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10)

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner:
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py], timeout=60) as runner:
subprocess.run(f"kubectl create -f {obj_yaml}", shell=True, check=True)
time.sleep(5) # give it some time to react
subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=True)
Expand Down
21 changes: 11 additions & 10 deletions examples/10-builtins/test_example_10.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@
def test_pods_reacted():

example_py = os.path.join(os.path.dirname(__file__), 'example.py')
with kopf.testing.KopfRunner(['run', '--verbose', example_py]) as runner:
_create_pod()
with kopf.testing.KopfRunner(['run', '--standalone', '--verbose', example_py], timeout=60) as runner:
name = _create_pod()
time.sleep(5) # give it some time to react
_delete_pod()
_delete_pod(name)
time.sleep(1) # give it some time to react

assert runner.exception is None
assert runner.exit_code == 0

assert '[default/kopf-pod-1] Creation event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing happens in 30s.' in runner.stdout
assert '[default/kopf-pod-1] Deletion event:' in runner.stdout
assert '[default/kopf-pod-1] === Pod killing is cancelled!' in runner.stdout
assert f'[default/{name}] Creation event:' in runner.stdout
assert f'[default/{name}] === Pod killing happens in 30s.' in runner.stdout
assert f'[default/{name}] Deletion event:' in runner.stdout
assert f'[default/{name}] === Pod killing is cancelled!' in runner.stdout


def _create_pod():
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod(api, {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'kopf-pod-1', 'namespace': 'default'},
'metadata': {'generateName': 'kopf-pod-', 'namespace': 'default'},
'spec': {
'containers': [{
'name': 'the-only-one',
Expand All @@ -38,9 +38,10 @@ def _create_pod():
}]},
})
pod.create()
return pod.name


def _delete_pod():
def _delete_pod(name):
api = pykube.HTTPClient(pykube.KubeConfig.from_file())
pod = pykube.Pod.objects(api, namespace='default').get_by_name('kopf-pod-1')
pod = pykube.Pod.objects(api, namespace='default').get_by_name(name)
pod.delete()
62 changes: 32 additions & 30 deletions kopf/testing/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import threading

import click.testing
Expand Down Expand Up @@ -37,22 +38,20 @@ class KopfRunner:
but not across processes --- the mock's calls (counts, arrgs) are lost.
"""

def __init__(self, *args, reraise=True, **kwargs):
def __init__(self, *args, reraise=True, timeout=None, **kwargs):
super().__init__()
self.args = args
self.kwargs = kwargs
self.reraise = reraise
self._loop = None
self._loop_set = None
self._thread = None
self._result = None
self._invoke_exception = None
self.timeout = timeout
self._loop = asyncio.new_event_loop()
self._ready = threading.Event() # NB: not asyncio.Event!
self._thread = threading.Thread(target=self._target)
self._future = concurrent.futures.Future()

def __enter__(self):
self._loop_set = threading.Event() # NB: not asyncio.Event!
self._thread = threading.Thread(target=self._target)
self._thread.start()
self._loop_set.wait() # should be nanosecond-fast
self._ready.wait() # should be nanosecond-fast
return self

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -71,70 +70,73 @@ async def shutdown():
# but instead wait for the thread+loop (CLI command) to finish.
if self._loop.is_running():
asyncio.run_coroutine_threadsafe(shutdown(), self._loop)
self._thread.join()
self._thread.join(timeout=self.timeout)

# If the thread is not finished, it is a bigger problem than exceptions.
if self._thread.is_alive():
raise Exception("The operator didn't stop, still running.")

# Re-raise the exceptions of the threading & invocation logic.
if self._invoke_exception is not None:
if self._future.exception() is not None:
if exc_val is None:
raise self._invoke_exception
raise self._future.exception()
else:
raise self._invoke_exception from exc_val
if self._result.exception is not None and self.reraise:
raise self._future.exception() from exc_val
if self._future.result().exception is not None and self.reraise:
if exc_val is None:
raise self._result.exception
raise self._future.result().exception
else:
raise self._result.exception from exc_val
raise self._future.result().exception from exc_val

def _target(self):

# Every thread must have its own loop. The parent thread (pytest)
# needs to know when the loop is set up, to be able to shut it down.
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop_set.set()
self._ready.set()

# Execute the requested CLI command in the thread & thread's loop.
# Remember the result & exception for re-raising in the parent thread.
try:
runner = click.testing.CliRunner()
result = runner.invoke(cli.main, *self.args, **self.kwargs)
except BaseException as e:
self._result = None
self._invoke_exception = e
self._future.set_exception(e)
else:
self._result = result
self._invoke_exception = None
self._future.set_result(result)

@property
def future(self):
return self._future

@property
def output(self):
return self._result.output
return self.future.result().output

@property
def stdout(self):
return self._result.stdout
return self.future.result().stdout

@property
def stdout_bytes(self):
return self._result.stdout_bytes
return self.future.result().stdout_bytes

@property
def stderr(self):
return self._result.stderr
return self.future.result().stderr

@property
def stderr_bytes(self):
return self._result.stderr_bytes
return self.future.result().stderr_bytes

@property
def exit_code(self):
return self._result.exit_code
return self.future.result().exit_code

@property
def exception(self):
return self._result.exception
return self.future.result().exception

@property
def exc_info(self):
return self._result.exc_info
return self.future.result().exc_info
4 changes: 2 additions & 2 deletions tests/e2e/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kopf.testing import KopfRunner


def test_all_examples_are_runnable(mocker, with_crd, with_peering, exampledir):
def test_all_examples_are_runnable(mocker, with_crd, exampledir):

# If the example has its own opinion on the timing, try to respect it.
# See e.g. /examples/99-all-at-once/example.py.
Expand Down Expand Up @@ -39,7 +39,7 @@ def test_all_examples_are_runnable(mocker, with_crd, with_peering, exampledir):
mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10)

# Run an operator and simulate some activity with the operated resource.
with KopfRunner(['run', '--verbose', str(example_py)]) as runner:
with KopfRunner(['run', '--standalone', '--verbose', str(example_py)], timeout=60) as runner:
subprocess.run("kubectl apply -f examples/obj.yaml", shell=True, check=True)
time.sleep(e2e_create_time or 1) # give it some time to react and to sleep and to retry
subprocess.run("kubectl delete -f examples/obj.yaml", shell=True, check=True)
Expand Down

0 comments on commit a9e2030

Please sign in to comment.