Skip to content

Commit

Permalink
Issue #523/#498/#236 Fall back on EJR when no job metadata in ZK
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 29, 2023
1 parent 4dea314 commit 9a89505
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
15 changes: 10 additions & 5 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import datetime as dt
import json
import logging
Expand Down Expand Up @@ -728,10 +729,12 @@ def get_job(self, job_id: str, user_id: str) -> dict:
# TODO: eliminate get_job/get_job_metadata duplication?
zk_job = ejr_job = None
if self.zk_job_registry:
zk_job = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
with contextlib.suppress(JobNotFoundException):
zk_job = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
if self.elastic_job_registry:
with self._just_log_errors("get_job", job_id=job_id):
ejr_job = self.elastic_job_registry.get_job(job_id=job_id)
with contextlib.suppress(JobNotFoundException):
ejr_job = self.elastic_job_registry.get_job(job_id=job_id)

self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job, ejr_job_info=ejr_job)
return zk_job or ejr_job
Expand All @@ -741,11 +744,13 @@ def get_job_metadata(self, job_id: str, user_id: str) -> BatchJobMetadata:
zk_job_info = ejr_job_info = None
if self.zk_job_registry:
with TimingLogger(f"self.zk_job_registry.get_job({job_id=}, {user_id=})", logger=_log.debug):
zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
with contextlib.suppress(JobNotFoundException):
zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
if self.elastic_job_registry:
with self._just_log_errors("get_job_metadata", job_id=job_id):
with TimingLogger(f"self.elastic_job_registry.get_job({job_id=})", logger=_log.debug):
ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id)
with contextlib.suppress(JobNotFoundException):
ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id)

self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job_info, ejr_job_info=ejr_job_info)
job_metadata = zk_job_info_to_metadata(zk_job_info) if zk_job_info else ejr_job_info_to_metadata(ejr_job_info)
Expand All @@ -760,7 +765,7 @@ def _check_zk_ejr_job_info(self, job_id: str, zk_job_info: Union[dict, None], ej
if zk_job_info != ejr_job_info:
self._log.warning(f"DoubleJobRegistry mismatch {zk_job_info=} {ejr_job_info=}")
elif zk_job_info is None and ejr_job_info is None:
raise DoubleJobRegistryException(f"None of ZK/EJR have {job_id=}")
raise JobNotFoundException(job_id=job_id)

def set_status(self, job_id: str, user_id: str, status: str) -> None:
if self.zk_job_registry:
Expand Down
10 changes: 7 additions & 3 deletions openeogeotrellis/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,13 @@ def set(self, path: Union[str, Path], value: bytes, version: int = -1):

def delete(self, path: Union[str, Path], version: int = -1):
path = Path(path)
self._get(path).assert_version(version)
parent = self._get(path.parent)
del parent.children[path.name]
znode = self._get(path).assert_version(version)
if znode is self.root:
# Special case: wipe everything, start over.
self.root = _ZNode()
else:
parent = self._get(path.parent)
del parent.children[path.name]

def dump(self) -> Dict[str, bytes]:
"""Dump ZooKeeper data for inspection"""
Expand Down
53 changes: 53 additions & 0 deletions tests/test_job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ def test_get_job(self, double_jr, caplog):

assert caplog.messages == []

def test_get_job_not_found(self, double_jr, caplog):
with double_jr:
with pytest.raises(JobNotFoundException):
_ = double_jr.get_job("j-nope", user_id="john")
with pytest.raises(JobNotFoundException):
_ = double_jr.get_job_metadata("j-nope", user_id="john")
assert caplog.messages == []

def test_get_job_mismatch(self, double_jr, memory_jr, caplog):
with double_jr:
double_jr.create_job(
Expand Down Expand Up @@ -381,6 +389,51 @@ def test_get_job_consistency(

assert caplog.messages == []

def test_get_job_deleted_from_zk(self, double_jr, caplog, zk_client, memory_jr):
"""
Make sure to fall back on EJR if no data found in ZK
https://github.com/Open-EO/openeo-geopyspark-driver/issues/523
"""
with double_jr:
double_jr.create_job(job_id="j-123", user_id="john", process=self.DUMMY_PROCESS)
# Wipe Zookeeper db
zk_client.delete("/")

job = double_jr.get_job("j-123", user_id="john")
job_metadata = double_jr.get_job_metadata("j-123", user_id="john")

expected_job = {
"job_id": "j-123",
"user_id": "john",
"created": "2023-02-15T17:17:17Z",
"status": "created",
"updated": "2023-02-15T17:17:17Z",
"api_version": None,
"application_id": None,
"title": "John's job",
"description": None,
}
assert job == DictSubSet(
{"job_id": "j-123", "user_id": "john", "created": "2023-02-15T17:17:17Z", "status": "created"}
)
assert job_metadata == BatchJobMetadata(
id="j-123",
status="created",
created=datetime.datetime(2023, 2, 15, 17, 17, 17),
process=dict(
process_graph={"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
title="dummy",
),
job_options=None,
title=None,
description=None,
updated=datetime.datetime(2023, 2, 15, 17, 17, 17),
started=None,
finished=None,
)

assert caplog.messages == []

def test_set_status(self, double_jr, zk_client, memory_jr, time_machine):
with double_jr:
double_jr.create_job(job_id="j-123", user_id="john", process=self.DUMMY_PROCESS)
Expand Down

0 comments on commit 9a89505

Please sign in to comment.