Skip to content

Commit

Permalink
quickfix: avoid hanging ZK get_job calls #632
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie authored Jan 16, 2024
1 parent 79340e9 commit 4b4fe78
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ without compromising stable operations.
### Bugfix

- Prevent usage duplication in ETL API ([#41](https://github.com/eu-cdse/openeo-cdse-infra/issues/41))
- Prevent Zookeeper from blocking requests (https://github.com/Open-EO/openeo-geopyspark-driver/pull/639)

## 0.22.0

Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.22.1a1"
__version__ = "0.22.2a1"
2 changes: 1 addition & 1 deletion openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,7 @@ def _start_job(self, job_id: str, user: User, get_vault_token: Callable[[str], s
else:
# New style job info (EJR based)
job_process_graph = job_info["process"]["process_graph"]
job_options = job_info.get("job_options", {})
job_options = job_info.get("job_options") or {} # can be None
job_specification_json = json.dumps({"process_graph": job_process_graph, "job_options": job_options})

job_title = job_info.get('title', '')
Expand Down
43 changes: 18 additions & 25 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,45 +771,38 @@ def create_job(

def get_job(self, job_id: str, user_id: str) -> dict:
# TODO: eliminate get_job/get_job_metadata duplication?
zk_job = ejr_job = None
if self.zk_job_registry:
with contextlib.suppress(JobNotFoundException):
zk_job = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
ejr_job = None
if self.elastic_job_registry:
with self._just_log_errors("get_job", job_id=job_id):
with contextlib.suppress(JobNotFoundException):
ejr_job = self.elastic_job_registry.get_job(job_id=job_id)

self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job, ejr_job_info=ejr_job)
return zk_job or ejr_job
if ejr_job:
return ejr_job

if self.zk_job_registry:
return self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)

raise JobNotFoundException(job_id=job_id)

def get_job_metadata(self, job_id: str, user_id: str) -> BatchJobMetadata:
# TODO: eliminate get_job/get_job_metadata duplication?
zk_job_info = ejr_job_info = None
if self.zk_job_registry:
with TimingLogger(f"self.zk_job_registry.get_job({job_id=}, {user_id=})", logger=_log.debug):
with contextlib.suppress(JobNotFoundException):
zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
ejr_job_info = None
if self.elastic_job_registry:
with self._just_log_errors("get_job_metadata", job_id=job_id):
with TimingLogger(f"self.elastic_job_registry.get_job({job_id=})", logger=_log.debug):
with contextlib.suppress(JobNotFoundException):
ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id)

self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job_info, ejr_job_info=ejr_job_info)
job_metadata = zk_job_info_to_metadata(zk_job_info) if zk_job_info else ejr_job_info_to_metadata(ejr_job_info)
return job_metadata

def _check_zk_ejr_job_info(self, job_id: str, zk_job_info: Union[dict, None], ejr_job_info: Union[dict, None]):
# TODO #236/#498 For now: compare job metadata between Zk and EJR
fields = ["job_id", "status", "created"]
if zk_job_info is not None and ejr_job_info is not None:
zk_job_info = {k: v for (k, v) in zk_job_info.items() if k in fields}
ejr_job_info = {k: v for (k, v) in ejr_job_info.items() if k in fields}
if zk_job_info != ejr_job_info:
self._log.warning(f"DoubleJobRegistry mismatch {zk_job_info=} {ejr_job_info=}")
elif zk_job_info is None and ejr_job_info is None:
raise JobNotFoundException(job_id=job_id)
if ejr_job_info:
return ejr_job_info_to_metadata(ejr_job_info)

if self.zk_job_registry:
with TimingLogger(f"self.zk_job_registry.get_job({job_id=}, {user_id=})", logger=_log.debug):
zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id)
return zk_job_info_to_metadata(zk_job_info)

raise JobNotFoundException(job_id=job_id)

def set_status(self, job_id: str, user_id: str, status: str) -> None:
if self.zk_job_registry:
Expand Down
42 changes: 16 additions & 26 deletions tests/test_job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def test_list_user_jobs(self):

class TestDoubleJobRegistry:
DUMMY_PROCESS = {
"title": "dummy",
"description": "dummy",
"process_graph": {
"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
},
Expand Down Expand Up @@ -277,7 +277,10 @@ def test_get_job(self, double_jr, caplog):
assert job == {
"job_id": "j-123",
"user_id": "john",
"specification": '{"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": true}}, "job_options": {"prio": "low"}}',
"process": {"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
"description": "dummy"},
"parent_id": None,
"job_options": {"prio": "low"},
"created": "2023-02-15T17:17:17Z",
"status": "created",
"updated": "2023-02-15T17:17:17Z",
Expand All @@ -290,7 +293,8 @@ def test_get_job(self, double_jr, caplog):
id="j-123",
status="created",
created=datetime.datetime(2023, 2, 15, 17, 17, 17),
process={"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}},
process={"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
"description": "dummy"},
job_options={"prio": "low"},
title="John's job",
description=None,
Expand All @@ -309,25 +313,11 @@ def test_get_job_not_found(self, double_jr, caplog):
_ = double_jr.get_job_metadata("j-nope", user_id="john")
assert caplog.messages == []

def test_get_job_mismatch(self, double_jr, memory_jr, caplog):
with double_jr:
double_jr.create_job(
job_id="j-123", user_id="john", process=self.DUMMY_PROCESS
)
memory_jr.db["j-123"]["status"] = "c0rRupt"
job = double_jr.get_job("j-123", user_id="john")
assert job == DictSubSet({"job_id": "j-123", "status": "created"})
assert caplog.messages == [
"DoubleJobRegistry mismatch"
" zk_job_info={'job_id': 'j-123', 'status': 'created', 'created': '2023-02-15T17:17:17Z'}"
" ejr_job_info={'job_id': 'j-123', 'status': 'c0rRupt', 'created': '2023-02-15T17:17:17Z'}"
]

@pytest.mark.parametrize(
["with_zk", "with_ejr", "expected_process_extra"],
[
(True, True, {}),
(False, True, {"title": "dummy"}),
(True, True, {"description": "dummy"}),
(False, True, {"description": "dummy"}),
(True, False, {}),
],
)
Expand Down Expand Up @@ -362,14 +352,14 @@ def test_get_job_consistency(
"title": "John's job",
"description": None,
}
if with_zk:
expected_job[
"specification"
] = '{"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": true}}, "job_options": {"prio": "low"}}'
elif with_ejr:
if with_ejr:
expected_job["process"] = self.DUMMY_PROCESS
expected_job["job_options"] = {"prio": "low"}
expected_job["parent_id"] = None
elif with_zk:
expected_job[
"specification"
] = '{"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": true}}, "job_options": {"prio": "low"}}'
assert job == expected_job
assert job_metadata == BatchJobMetadata(
id="j-123",
Expand Down Expand Up @@ -422,7 +412,7 @@ def test_get_job_deleted_from_zk(self, double_jr, caplog, zk_client, memory_jr):
created=datetime.datetime(2023, 2, 15, 17, 17, 17),
process=dict(
process_graph={"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
title="dummy",
description="dummy",
),
job_options=None,
title=None,
Expand Down Expand Up @@ -546,7 +536,7 @@ def test_get_user_jobs_no_zk(self, double_jr_no_zk, caplog):
["with_zk", "with_ejr", "expected_process_extra", "expected_log"],
[
(True, True, {}, "zk_jobs=1 ejr_jobs=1"),
(False, True, {"title": "dummy"}, "zk_jobs=None ejr_jobs=1"),
(False, True, {"description": "dummy"}, "zk_jobs=None ejr_jobs=1"),
(True, False, {}, "zk_jobs=1 ejr_jobs=None"),
],
)
Expand Down
55 changes: 35 additions & 20 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ def test_create_and_get_user_jobs(self, api):
}

@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root):
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root,
job_registry):
with self._mock_kazoo_client() as zk, \
self._mock_utcnow() as un, \
mock.patch.dict("os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}):
Expand Down Expand Up @@ -580,11 +581,14 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
assert res["logs"] == []

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)

meta_data = zk.get_json_decoded(
f"/openeo.test/jobs/ongoing/{TEST_USER}/{job_id}"
)
Expand All @@ -607,9 +611,8 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
metadata = api.load_json(JOB_METADATA_FILENAME)
json.dump(metadata, f)

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
with dbl_job_registry as jr:
jr.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED
)
res = (
Expand Down Expand Up @@ -665,7 +668,7 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp

assert res["logs"] == expected_log_entries

def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root):
def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root, job_registry):
with self._mock_kazoo_client() as zk, self._mock_utcnow() as un, mock.patch.dict(
"os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}
):
Expand Down Expand Up @@ -741,9 +744,13 @@ def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_ro
json.dump(job_metadata_contents, f)

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
res = api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "finished"

Expand Down Expand Up @@ -1066,8 +1073,13 @@ def test_cancel_job(self, api, job_registry):

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = api.get('/jobs/{j}'.format(j=job_id), headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "running"

Expand Down Expand Up @@ -1120,11 +1132,13 @@ def test_delete_job(self, api, job_registry):
run.assert_called_once()

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = (
api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER)
.assert_status_code(200)
Expand Down Expand Up @@ -1529,6 +1543,7 @@ def test_api_job_results_contains_proj_metadata_at_item_level(self, api100, batc
}
]


class TestSentinelHubBatchJobs:
"""Tests for batch jobs involving SentinelHub collections and batch processes"""

Expand Down

0 comments on commit 4b4fe78

Please sign in to comment.