Skip to content

Commit

Permalink
reintroduce small fixes of 4b4fe78
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Jan 22, 2024
1 parent 38a2600 commit 05e2a3c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
2 changes: 1 addition & 1 deletion openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,7 +2033,7 @@ def _start_job(self, job_id: str, user: User, get_vault_token: Callable[[str], s
else:
# New style job info (EJR based)
job_process_graph = job_info["process"]["process_graph"]
job_options = job_info.get("job_options", {})
job_options = job_info.get("job_options") or {} # can be None
job_specification_json = json.dumps({"process_graph": job_process_graph, "job_options": job_options})

job_title = job_info.get('title', '')
Expand Down
56 changes: 35 additions & 21 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ def test_create_and_get_user_jobs(self, api):
}

@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root):
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root,
job_registry):
with self._mock_kazoo_client() as zk, \
self._mock_utcnow() as un, \
mock.patch.dict("os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}):
Expand Down Expand Up @@ -580,11 +581,14 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
assert res["logs"] == []

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)

meta_data = zk.get_json_decoded(
f"/openeo.test/jobs/ongoing/{TEST_USER}/{job_id}"
)
Expand All @@ -607,9 +611,8 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
metadata = api.load_json(JOB_METADATA_FILENAME)
json.dump(metadata, f)

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
with dbl_job_registry as jr:
jr.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED
)
res = (
Expand Down Expand Up @@ -665,7 +668,7 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp

assert res["logs"] == expected_log_entries

def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root):
def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root, job_registry):
with self._mock_kazoo_client() as zk, self._mock_utcnow() as un, mock.patch.dict(
"os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}
):
Expand Down Expand Up @@ -741,9 +744,14 @@ def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_ro
json.dump(job_metadata_contents, f)

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)

res = api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "finished"

Expand Down Expand Up @@ -1065,9 +1073,13 @@ def test_cancel_job(self, api, job_registry):
run.assert_called_once()

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = api.get('/jobs/{j}'.format(j=job_id), headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "running"

Expand Down Expand Up @@ -1120,11 +1132,13 @@ def test_delete_job(self, api, job_registry):
run.assert_called_once()

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = (
api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER)
.assert_status_code(200)
Expand Down

0 comments on commit 05e2a3c

Please sign in to comment.