-
Notifications
You must be signed in to change notification settings - Fork 185
fix watching with a specified resource version #109
fix watching with a specified resource version #109
Conversation
5cab3de
to
5eb6633
Compare
/assign @yliaog |
Any update @yliaog ? Thanks! |
watch/watch.py
Outdated
@@ -83,13 +83,14 @@ def unmarshal_event(self, data, return_type): | |||
obj = SimpleNamespace(data=json.dumps(js['raw_object'])) | |||
js['object'] = self._api_client.deserialize(obj, return_type) | |||
if hasattr(js['object'], 'metadata'): | |||
self.resource_version = js['object'].metadata.resource_version | |||
self.resource_version = int( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resource version is an opaque string, it cannot be assumed to be an int
watch/watch.py
Outdated
# For custom objects that we don't have model defined, json | ||
# deserialization results in dictionary | ||
elif (isinstance(js['object'], dict) and 'metadata' in js['object'] | ||
and 'resourceVersion' in js['object']['metadata']): | ||
self.resource_version = js['object']['metadata'][ | ||
'resourceVersion'] | ||
self.resource_version = int( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
watch/watch.py
Outdated
@@ -122,6 +123,7 @@ def stream(self, func, *args, **kwargs): | |||
return_type = self.get_return_type(func) | |||
kwargs['watch'] = True | |||
kwargs['_preload_content'] = False | |||
min_resource_version = int(kwargs.get('resource_version', 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resource version is an string, the resource version string "0" is kind of treated special on the server side. but please don't assume "0" is the minimum resource version
watch/watch.py
Outdated
# continue to watch from the requested resource version | ||
# does not handle overflow though that should take a few | ||
# hundred years | ||
kwargs['resource_version'] = max( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, no max or min can be done on resource version. Instead, there is only special "0" versus non-special other opaque strings. so what can be done is to test for equal to "0" or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know where I can find the exact definition of the resource version?
There has to be be some kind of ordering possible, what else is the resource version for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kubernetes-client/python#693 (comment)
has some useful references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a suggestion how to fix this issue then?
my best idea is to ditch the while loop that caused the issue in the first place if a resource version is passed in by the caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the while loop is still needed, i think probably check if kwargs['resource_version'] is present at the beginning, if it does, then start watching with kwargs['resource_version'], instead of starting with "0" (which is set in Init to self.resource_version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm assuming receive all events in order from the first watch it would work, you'd restart at a lower resource version than inputted but if you have received them all you should still be fine. Makes the tests a bit more tricky to implement but I'll post an update soon
Any progress @juliantaylor ? |
c40c9cc
to
3219b84
Compare
finally figured out a way to write a test and updated treating the rv as opaque, please have a look again. |
3219b84
to
1e559d9
Compare
@juliantaylor you need to lint the files as some of the are failing the style checks: |
1e559d9
to
9d521a1
Compare
should be fixed |
watch/watch_test.py
Outdated
@@ -62,6 +64,69 @@ def test_watch_with_decode(self): | |||
fake_resp.close.assert_called_once() | |||
fake_resp.release_conn.assert_called_once() | |||
|
|||
def test_watch_resource_version_set(self): | |||
# gh-700 ensure watching from a resource version does reset to resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is gh-700 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
github issue 700, the convention I'm used to. How do you reference issues in this project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use a direct link to the issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
watch/watch_test.py
Outdated
|
||
from .watch import Watch | ||
|
||
callcount = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to avoid using global
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know a way to avoid it, any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you move it to class WatchTests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, just putting it the class works ... I had just tested a function local variable ...
updated
fake_resp.close = Mock() | ||
fake_resp.release_conn = Mock() | ||
values = [ | ||
'{"type": "ADDED", "object": {"metadata": {"name": "test1",' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you use a real k8s object, like a pod? that could make the test easier to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one could though it does not matter for the test
none of the existing tests use real objects, should all be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine to leave it as is
watch/watch_test.py
Outdated
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' | ||
|
||
w = Watch() | ||
count = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does count do? the number of the for loop iterations? usually it starts with 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes its the loop count, again a copy paste from other tests, using enumerate would be a bit nicer
if count == len(values) * 3: | ||
w.stop() | ||
|
||
fake_api.get_namespaces.assert_has_calls(calls) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you simulate the case the 'connection got reset'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
via the global variable changing the return value of the mock read_chunked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if 'resource_version' in kwargs:
self.resource_version = kwargs['resource_version']
===================================================
In the above test, the code in the function stream() from its beginning to "while True" is executed only once, i.e. the two lines of fix code you added taken above is executed only once.
when the above two lines are executed, kwargs is None, hence essentially the two lines you added did not make any effect. I'm wondering how the test code tests the fix? am I missing anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the testcode does run with kwargs['resource_version]
set which sets the rv member of the watch object.
The function then calls the get_namespaces
mock which would direct k8s to provide a watch from the provided resource version, in our mock it is just ignored and returns nothing simulating the k8s api not returning anything and resetting the connection.
Now instead of resetting the resource version member variable back to zero and issuing a watch from zero in the next iteration it will reuse the input resource version which is what we want.
The mock in the second call returns something but what doesn't really matter we just want to register that the second mocked call was with the input resource version and not with zero which causes the linked issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here the failure when you remove the two lines I added, it matches what happens in the real testcase I posted in the linked issue:
E AssertionError: Calls not found.
E Expected: [call(_preload_content=False, resource_version='5', watch=True),
E call(_preload_content=False, resource_version='3', watch=True),
E call(_preload_content=False, resource_version='3', watch=True)]
E Actual: [call(_preload_content=False, resource_version='5', watch=True),
E call(_preload_content=False, resource_version=0, watch=True), <<<<<<<<< ERROR HERE
E call(_preload_content=False, resource_version='3', watch=True),
E call(_preload_content=False, resource_version='3', watch=True)]
watch/watch_test.py
Outdated
calls.append(call(_preload_content=False, watch=True, | ||
resource_version=rv)) | ||
# returned | ||
if count == len(values) * 3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you expect the loop above iterates 3*3 times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the mock needs a way to identify when it is done, this and other tests in the file use the loop iteration count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so for each iteration of the for loop, it what's returned from w.stream(...) is:
calls: call<rv=5> | call<rv=5> | call<rv=3> | call<rv=3>
result: nothing | test1 test2 test3 | test1 test2 test3 | test1 test2
count: 1 | 2 3 4 | 5 6 7 | 8 9
it looks that there should be 4 calls as illustrated above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes one could reduce it to 4, the amount of iterations doesn't matter as long as we iterate at least twice in the watch.
going a bit beyond what is needed is currently unnecessary but may make it more robust to future changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean there are calls, call<rv=5> | call<rv=5> | call<rv=3> | call<rv=3>
currently, the test expects 3 calls, (call<rv=5> | call<rv=3> | call<rv=3>)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, assert_has_calls does not care about calls before and after the wanted ones, I'll update the test to be more strict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, changed it a bit to be hopefully more clear what result we expect
97174be
to
82c710c
Compare
The watch code reset the version to the last found in the response. When you first list existing objects and then start watching from that resource version the existing versions are older than the version you wanted and the watch starts from the wrong version after the first restart. This leads to for example already deleted objects ending in the stream again. Fix this by setting the minimum resource version to reset from to the input resource version. As long as k8s returns all objects in order in the watch this should work. We cannot use the integer value of the resource version to order it as one should be treat the value as opaque. Closes kubernetes-client/python#700
82c710c
to
3c30a30
Compare
Codecov Report
@@ Coverage Diff @@
## master #109 +/- ##
=========================================
+ Coverage 91.74% 91.94% +0.2%
=========================================
Files 13 13
Lines 1187 1217 +30
=========================================
+ Hits 1089 1119 +30
Misses 98 98
Continue to review full report at Codecov.
|
/lgtm |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: juliantaylor, yliaog The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This PR won't stop the 410 error, right? It will just stop the out of order processing? |
the 410 resource version to old error is not handled by this. |
The watch code reset the version to the last found in the
response.
When you first list existing objects and then start watching from that
resource version the existing versions are older than the version you
wanted and the watch starts from the wrong version after the first
restart.
This leads to for example already deleted objects ending in the stream
again.
Fix this by not resetting to an older version than the one specified in
the watch.
It does not handle overflows of the resource version but they are 64 bit
integers so they should not realistically overflow even in the most loaded
clusters.
Closes kubernetes-client/python#700