Skip to content

Commit

Permalink
Rename new watcher Event names and remove one that cannot happen (#1499
Browse files Browse the repository at this point in the history
)

* Rename watcher Event names and remove one that cannot happen

1. Event::Init now marks start, it is followed by one/more of:
2. Event::InitApply or EventInitPage
3. Event::Ready marks init complete

removed Event::RestartDelete because by docs it cannot happen:
https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists

have inserted a big error in case they break their own docs for this alpha feature.

Signed-off-by: clux <[email protected]>

* fmt

Signed-off-by: clux <[email protected]>

* reomve references to deleted event + fix reducer oversight

InitApply should be propagated here, pages are.

Signed-off-by: clux <[email protected]>

* avoid unnecessary collect

Signed-off-by: clux <[email protected]>

* rename Event::Ready -> Event::InitDone for consistency

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored May 29, 2024
1 parent bd84d65 commit 4d1ea12
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 104 deletions.
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,7 @@ mod tests {
let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let mut applier = pin!(applier(
|obj, _| {
|_obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
//println!("reconciling {:?}", obj.metadata.name);
Expand All @@ -1763,7 +1763,7 @@ mod tests {
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
));
store_tx.apply_watcher_event(&watcher::Event::Restart);
store_tx.apply_watcher_event(&watcher::Event::InitDone);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
Expand Down
39 changes: 20 additions & 19 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo, bar])),
Ok(Event::Restart),
Ok(Event::Init),
Ok(Event::InitPage(vec![foo, bar])),
Ok(Event::InitDone),
]);

let (reader, writer) = reflector::store_shared(10);
Expand All @@ -192,15 +192,15 @@ pub(crate) mod test {
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartInit)))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_))))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitPage(_))))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restart)))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
assert_eq!(reader.len(), 2);

assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -219,13 +219,13 @@ pub(crate) mod test {
Ok(Event::Delete(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
Ok(Event::Init),
Ok(Event::InitPage(vec![foo.clone(), bar.clone()])),
Ok(Event::InitDone),
]);

let foo = Arc::new(foo);
let bar = Arc::new(bar);
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
Expand Down Expand Up @@ -257,17 +257,17 @@ pub(crate) mod test {

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartInit)))
Poll::Ready(Some(Ok(Event::Init)))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
Poll::Ready(Some(Ok(Event::InitPage(_))))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
Poll::Ready(Some(Ok(Event::InitDone)))
));

// these don't come back in order atm:
Expand All @@ -287,9 +287,9 @@ pub(crate) mod test {
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
Ok(Event::Init),
Ok(Event::InitPage(vec![foo.clone(), bar.clone()])),
Ok(Event::InitDone),
]);

let foo = Arc::new(foo);
Expand All @@ -314,19 +314,19 @@ pub(crate) mod test {

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartInit)))
Poll::Ready(Some(Ok(Event::Init)))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
Poll::Ready(Some(Ok(Event::InitPage(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
Poll::Ready(Some(Ok(Event::InitDone)))
));
drop(reflect);

Expand All @@ -346,6 +346,7 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
//TODO: include a ready event here to avoid dealing with Init?
Ok(Event::Apply(foo.clone())),
Ok(Event::Apply(bar.clone())),
Ok(Event::Apply(foo.clone())),
Expand Down
6 changes: 3 additions & 3 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@ mod tests {
store_w,
stream::iter(vec![
Ok(watcher::Event::Apply(cm_a.clone())),
Ok(watcher::Event::RestartInit),
Ok(watcher::Event::RestartPage(vec![cm_b.clone()])),
Ok(watcher::Event::Restart),
Ok(watcher::Event::Init),
Ok(watcher::Event::InitPage(vec![cm_b.clone()])),
Ok(watcher::Event::InitDone),
]),
)
.map(|_| ())
Expand Down
25 changes: 10 additions & 15 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,21 @@ where
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::RestartInit => {
watcher::Event::Init => {
self.buffer = AHashMap::new();
}
watcher::Event::RestartPage(new_objs) => {
watcher::Event::InitPage(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())))
.collect::<AHashMap<_, _>>();
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())));
self.buffer.extend(new_objs);
}
watcher::Event::Restart => {
watcher::Event::InitApply(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.buffer.insert(key, obj);
}
watcher::Event::InitDone => {
let mut store = self.store.write();

// Swap the buffer into the store
Expand All @@ -134,15 +138,6 @@ where
ready_tx.init(())
}
}
watcher::Event::RestartApply(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.buffer.insert(key, obj);
}
watcher::Event::RestartDelete(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
self.buffer.remove(&key);
}
}
}

Expand All @@ -157,7 +152,7 @@ where
dispatcher.broadcast(obj_ref).await;
}

watcher::Event::Restart => {
watcher::Event::InitDone => {
let obj_refs: Vec<_> = {
let store = self.store.read();
store.keys().cloned().collect()
Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ where
break Some(Ok(item));
}
let var_name = match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(Event::Apply(obj) | Event::RestartApply(obj))) => Some(Ok(obj)),
Some(Ok(Event::Delete(obj) | Event::RestartDelete(obj))) => {
Some(Ok(Event::Apply(obj) | Event::InitApply(obj))) => Some(Ok(obj)),
Some(Ok(Event::Delete(obj))) => {
if *me.emit_deleted {
Some(Ok(obj))
} else {
continue;
}
}
Some(Ok(Event::RestartPage(objs))) => {
Some(Ok(Event::InitPage(objs))) => {
*me.queue = objs.into_iter();
continue;
}
Some(Ok(Event::RestartInit | Event::Restart)) => continue,
Some(Ok(Event::Init | Event::InitDone)) => continue,
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
};
Expand All @@ -72,7 +72,7 @@ pub(crate) mod tests {
Ok(Event::Apply(1)),
Ok(Event::Delete(0)),
Ok(Event::Apply(2)),
Ok(Event::RestartPage(vec![1, 2])),
Ok(Event::InitPage(vec![1, 2])),
Err(Error::TooManyObjects),
Ok(Event::Apply(2)),
]);
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Apply(0)),
Err(Error::TooManyObjects),
Ok(Event::RestartPage(vec![10])),
Ok(Event::InitPage(vec![10])),
]);
let mut ev_modify = pin!(EventModify::new(st, |x| {
*x += 1;
Expand All @@ -75,7 +75,7 @@ pub(crate) mod test {
let restarted = poll!(ev_modify.next());
assert!(matches!(
restarted,
Poll::Ready(Some(Ok(Event::RestartPage(vec)))) if vec == [11]
Poll::Ready(Some(Ok(Event::InitPage(vec)))) if vec == [11]
));

assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
Expand Down
12 changes: 6 additions & 6 deletions kube-runtime/src/utils/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ pub(crate) mod test {
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo, bar])),
Ok(Event::Restart),
Ok(Event::Init),
Ok(Event::InitPage(vec![foo, bar])),
Ok(Event::InitDone),
]);
let (reader, writer) = reflector::store();

Expand All @@ -97,17 +97,17 @@ pub(crate) mod test {

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartInit)))
Poll::Ready(Some(Ok(Event::Init)))
));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_))))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitPage(_))))));
assert_eq!(reader.len(), 1);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
Poll::Ready(Some(Ok(Event::InitDone)))
));
assert_eq!(reader.len(), 2);

Expand Down
Loading

0 comments on commit 4d1ea12

Please sign in to comment.