Skip to content

Commit

Permalink
Fix dynamic allocation specs handling for custom launcher (apache#38223)
Browse files Browse the repository at this point in the history
* Add unit test for Spark Kubernetes custom launcher

Signed-off-by: Hai Nguyen <[email protected]>

* Test on cloned entries correctly

Signed-off-by: Hai Nguyen <[email protected]>

* Fix lint issue

Signed-off-by: Hai Nguyen <[email protected]>

* Move cloned entries declaration out of raise assertion scope

Signed-off-by: Hai Nguyen <[email protected]>

---------

Signed-off-by: Hai Nguyen <[email protected]>
  • Loading branch information
sudohainguyen authored and utkarsharma2 committed Apr 22, 2024
1 parent 5a6cbdc commit aca612d
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def validate(self):
if self.spec.get("dynamicAllocation", {}).get("enabled"):
if not all(
[
self.spec["dynamicAllocation"]["initialExecutors"],
self.spec["dynamicAllocation"]["minExecutors"],
self.spec["dynamicAllocation"]["maxExecutors"],
self.spec["dynamicAllocation"].get("initialExecutors"),
self.spec["dynamicAllocation"].get("minExecutors"),
self.spec["dynamicAllocation"].get("maxExecutors"),
]
):
raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,131 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest.mock import patch

import pytest

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.operators.custom_object_launcher import (
SparkJobSpec,
SparkResources,
)


class TestSparkJobSpec:
@patch("airflow.providers.cncf.kubernetes.operators.custom_object_launcher.SparkJobSpec.update_resources")
@patch("airflow.providers.cncf.kubernetes.operators.custom_object_launcher.SparkJobSpec.validate")
def test_spark_job_spec_initialization(self, mock_validate, mock_update_resources):
entries = {
"spec": {
"dynamicAllocation": {
"enabled": True,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 2,
},
"driver": {},
"executor": {},
}
}
SparkJobSpec(**entries)
mock_validate.assert_called_once()
mock_update_resources.assert_called_once()

def test_spark_job_spec_dynamicAllocation_enabled(self):
entries = {
"spec": {
"dynamicAllocation": {
"enabled": True,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 2,
},
"driver": {},
"executor": {},
}
}
spark_job_spec = SparkJobSpec(**entries)

assert spark_job_spec.spec["dynamicAllocation"]["enabled"]

def test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self):
entries = {
"spec": {
"dynamicAllocation": {
"enabled": True,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 2,
},
"driver": {},
"executor": {},
}
}

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["initialExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["minExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)

cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["maxExecutors"] = None
with pytest.raises(
AirflowException,
match="Make sure initial/min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)


class TestSparkResources:
@patch(
"airflow.providers.cncf.kubernetes.operators.custom_object_launcher.SparkResources.convert_resources"
)
def test_spark_resources_initialization(self, mock_convert_resources):
driver = {
"gpu": {"name": "nvidia", "quantity": 1},
"cpu": {"request": "1", "limit": "2"},
"memory": {"request": "1Gi", "limit": "2Gi"},
}
executor = {
"gpu": {"name": "nvidia", "quantity": 2},
"cpu": {"request": "2", "limit": "4"},
"memory": {"request": "2Gi", "limit": "4Gi"},
}
SparkResources(driver=driver, executor=executor)
mock_convert_resources.assert_called_once()

def test_spark_resources_conversion(self):
driver = {
"gpu": {"name": "nvidia", "quantity": 1},
"cpu": {"request": "1", "limit": "2"},
"memory": {"request": "1Gi", "limit": "2Gi"},
}
executor = {
"gpu": {"name": "nvidia", "quantity": 2},
"cpu": {"request": "2", "limit": "4"},
"memory": {"request": "2Gi", "limit": "4Gi"},
}
spark_resources = SparkResources(driver=driver, executor=executor)

assert spark_resources.driver["memory"]["limit"] == "1462m"
assert spark_resources.executor["memory"]["limit"] == "2925m"
assert spark_resources.driver["cpu"]["request"] == 1
assert spark_resources.driver["cpu"]["limit"] == "2"
assert spark_resources.executor["cpu"]["request"] == 2
assert spark_resources.executor["cpu"]["limit"] == "4"
assert spark_resources.driver["gpu"]["quantity"] == 1
assert spark_resources.executor["gpu"]["quantity"] == 2

0 comments on commit aca612d

Please sign in to comment.