diff --git a/poetry.lock b/poetry.lock index bdede89f4..bf2a4a7e0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -751,11 +751,12 @@ adal = ["adal (>=1.0.2)"] [[package]] name = "kubernetes-asyncio" -version = "12.1.1" +version = "12.1.2" description = "Kubernetes asynchronous python client" category = "main" optional = false python-versions = "*" +develop = false [package.dependencies] aiohttp = ">=3.7.0,<4.0.0" @@ -765,6 +766,12 @@ pyyaml = ">=3.12" six = ">=1.9.0" urllib3 = ">=1.24.2" +[package.source] +type = "git" +url = "https://github.com/opsani/kubernetes_asyncio" +reference = "v12.1.2-custom-resource-patch-fix" +resolved_reference = "63b0d36288a867212d24f7d03e0f77c985343fc0" + [[package]] name = "kubetest" version = "0.9.3" @@ -1792,7 +1799,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pyt [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "bc2f0ccb76c2e05a2c6b69ad6850da78996567014a441af7fde4352daf7e8f6b" +content-hash = "ac88e3e0073d282f08d044699dd98fdd4641e5151123070a94b033c57e13b4b0" [metadata.files] aiohttp = [ @@ -2191,9 +2198,7 @@ kubernetes = [ {file = "kubernetes-17.17.0-py3-none-any.whl", hash = "sha256:225a95a0aadbd5b645ab389d941a7980db8cdad2a776fde64d1b43fc3299bde9"}, {file = "kubernetes-17.17.0.tar.gz", hash = "sha256:c69b318696ba797dcf63eb928a8d4370c52319f4140023c502d7dfdf2080eb79"}, ] -kubernetes-asyncio = [ - {file = "kubernetes_asyncio-12.1.1.tar.gz", hash = "sha256:826d79ab68f7e158895da624826e5ae9c75c1b69f3f3b7aa3447bd8bba022445"}, -] +kubernetes-asyncio = [] kubetest = [] loguru = [ {file = "loguru-0.5.3-py3-none-any.whl", hash = "sha256:f8087ac396b5ee5f67c963b495d615ebbceac2796379599820e324419d53667c"}, diff --git a/pyproject.toml b/pyproject.toml index 88a2aaf76..074f2398d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ typer = "^0.3.0" bullet = "^2.1.0" jsonschema = "^3.2.0" timeago = "^1.0.14" -kubernetes_asyncio = ">=11.3,<13.0" orjson = "^3.5.0" uvloop = "^0.15.2" statesman = "^1.0.0" @@ -37,6 +36,7 @@ toml = "^0.10.2" colorama = "^0.4.4" pyfiglet = "^0.8.post1" curlify2 = "^1.0.0" +kubernetes-asyncio = {git = "https://github.com/opsani/kubernetes_asyncio", rev = "v12.1.2-custom-resource-patch-fix"} [tool.poetry.dev-dependencies] pytest = "^6.2.4" @@ -81,6 +81,7 @@ pytest-vscodedebug = "^0.1.0" pytest-html = "^3.1.1" bandit = "^1.7.0" watchgod = "^0.7" +filelock = "^3.0.12" [tool.poetry.scripts] servo = "servo.entry_points:run_cli" diff --git a/servo/checks.py b/servo/checks.py index 11c1eaf19..fa97b4c05 100644 --- a/servo/checks.py +++ b/servo/checks.py @@ -552,7 +552,7 @@ async def run_all( spec = getattr(method, "__check__", None) if spec: # once all filtered methods are removed, only run non-decorated - if not spec.critical or not filtered_methods: + if not spec.critical or not filtered_methods or (matching and matching.exclusive): continue check = await method() if asyncio.iscoroutinefunction(method) else method() diff --git a/servo/cli.py b/servo/cli.py index 228096f78..156e650f1 100644 --- a/servo/cli.py +++ b/servo/cli.py @@ -1729,8 +1729,8 @@ def inject_sidecar( """ Inject an Envoy sidecar to capture metrics """ - if not target.startswith(("deploy/", "deployment/", "pod/")): - raise typer.BadParameter("target must prefixed with Kubernetes object kind of \"deployment\" or \"pod\"") + if not target.startswith(("deploy/", "deployment/", "pod/", "rollout/")): + raise typer.BadParameter("target must prefixed with Kubernetes object kind of \"deployment\", \"rollout\" or \"pod\"") if not (service or port): raise typer.BadParameter("service or port must be given") @@ -1759,6 +1759,19 @@ def inject_sidecar( ) typer.echo(f"Envoy sidecar injected to Deployment {deployment.name} in {namespace}") + elif target.startswith("rollout"): + rollout = run_async( + servo.connectors.kubernetes.Rollout.read( + target.split('/', 1)[1], namespace + ) + ) + run_async( + rollout.inject_sidecar( + 'opsani-envoy', ENVOY_SIDECAR_IMAGE_TAG, service=service, port=port + ) + ) + typer.echo(f"Envoy sidecar injected to Rollout {rollout.name} in {namespace}") + elif target.startswith("pod"): raise typer.BadParameter("Pod sidecar injection is not yet implemented") else: diff --git a/servo/connectors/kubernetes.py b/servo/connectors/kubernetes.py index cdb9dff9a..01babc8ce 100644 --- a/servo/connectors/kubernetes.py +++ b/servo/connectors/kubernetes.py @@ -12,6 +12,7 @@ import enum import functools import itertools +import json import operator import os import pathlib @@ -1588,6 +1589,10 @@ def find_container(self, name: str) -> Optional[Container]: """ return next(filter(lambda c: c.name == name, self.containers), None) + async def get_target_container(self, config: ContainerConfiguration) -> Optional[Container]: + """Return the container targeted by the supplied configuration""" + return self.find_container(config.name) + def set_container(self, name: str, container: Container) -> None: """Set the container with the given name to a new value.""" index = next(filter(lambda i: self.containers[i].name == name, range(len(self.containers)))) @@ -1632,6 +1637,15 @@ def pod_template_spec(self) -> kubernetes_asyncio.client.models.V1PodTemplateSpe """Return the pod template spec for instances of the Deployment.""" return self.obj.spec.template + async def get_pod_template_spec_copy(self) -> kubernetes_asyncio.client.models.V1PodTemplateSpec: + """Return a deep copy of the pod template spec. Eg. for creation of a tuning pod""" + return copy.deepcopy(self.pod_template_spec) + + def update_pod(self, pod: kubernetes_asyncio.client.models.V1Pod) -> kubernetes_asyncio.client.models.V1Pod: + """Update the pod with the latest state of the controller if needed""" + # NOTE: Deployment currently needs no updating + return pod + @property def pod_spec(self) -> kubernetes_asyncio.client.models.V1PodSpec: """Return the pod spec for instances of the Deployment.""" @@ -2003,6 +2017,575 @@ async def get_restart_count(self) -> int: return count +# Workarounds to allow use of api_client.deserialize() public method instead of private api_client._ApiClient__deserialize +# TODO: is this workaround worth it just to avoid using the private method? +# fix for https://github.com/kubernetes-client/python/issues/977#issuecomment-594045477 +def default_kubernetes_json_serializer(o: Any) -> Any: + if isinstance(o, (datetime.datetime, datetime.date)): + return o.isoformat() + raise TypeError(f'Object of type {o.__class__.__name__} ' + f'is not JSON serializable') + +# https://github.com/kubernetes-client/python/issues/977#issuecomment-592030030 +class FakeKubeResponse: + """Mocks the RESTResponse object as a workaround for kubernetes python api_client deserialization""" + def __init__(self, obj): + self.data = json.dumps(obj, default=default_kubernetes_json_serializer) + +# Use alias generator so that dromedary case can be parsed to snake case properties to match k8s python client behaviour +def to_dromedary_case(string: str) -> str: + split = string.split('_') + return split[0] + ''.join(word.capitalize() for word in split[1:]) + +class RolloutBaseModel(pydantic.BaseModel): + class Config: + # arbitrary_types_allowed = True + alias_generator = to_dromedary_case + allow_population_by_field_name = True + +# Pydantic type models for argo rollout spec: https://argoproj.github.io/argo-rollouts/features/specification/ +# https://github.com/argoproj/argo-rollouts/blob/master/manifests/crds/rollout-crd.yaml +# NOTE/TODO: fields typed with Any should maintain the same form when dumped as when they are parsed. Should the need +# arise to interact with such fields, they will need to have an explicit type defined so the alias_generator is applied +class RolloutV1LabelSelector(RolloutBaseModel): # must type out k8s models as well to allow parse_obj to work + match_expressions: Any + match_labels: Optional[Dict[str, str]] + +class RolloutV1ObjectMeta(RolloutBaseModel): + annotations: Optional[Dict[str, str]] + cluster_name: Optional[str] + creation_timestamp: Optional[datetime.datetime] + deletion_grace_period_seconds: Optional[int] + deletion_timestamp: Optional[datetime.datetime] + finalizers: Optional[List[str]] + generate_name: Optional[str] + generation: Optional[int] + labels: Optional[Dict[str, str]] + managed_fields: Any + name: Optional[str] + namespace: Optional[str] + owner_references: Any + resource_version: Optional[str] + self_link: Optional[str] + uid: Optional[str] + +class RolloutV1EnvVar(RolloutBaseModel): + name: str + value: Optional[str] + value_from: Any + +class RolloutV1ContainerPort(RolloutBaseModel): + container_port: int + host_ip: Optional[str] + host_port: Optional[int] + name: Optional[str] + protocol: Optional[str] + +class RolloutV1ResourceRequirements(RolloutBaseModel): + limits: Optional[Dict[str, str]] + requests: Optional[Dict[str, str]] + +class RolloutV1Container(RolloutBaseModel): + args: Optional[List[str]] + command: Optional[List[str]] + env: Optional[List[RolloutV1EnvVar]] + env_from: Any + image: str + image_pull_policy: Optional[str] + lifecycle: Any + liveness_probe: Any + name: str + ports: Optional[List[RolloutV1ContainerPort]] + readiness_probe: Any + resources: Optional[RolloutV1ResourceRequirements] + security_context: Any + startup_probe: Any + stdin: Optional[bool] + stdin_once: Optional[bool] + termination_message_path: Optional[str] + termination_message_policy: Optional[str] + tty: Optional[bool] + volume_devices: Any + volume_mounts: Any + working_dir: Optional[str] + +class RolloutV1PodSpec(RolloutBaseModel): + active_deadline_seconds: Optional[int] + affinity: Any + automount_service_account_token: Optional[bool] + containers: List[RolloutV1Container] + dns_config: Any + dns_policy: Optional[str] + enable_service_links: Optional[bool] + ephemeral_containers: Any + host_aliases: Any + host_ipc: Optional[bool] + host_network: Optional[bool] + host_pid: Optional[bool] + hostname: Optional[str] + image_pull_secrets: Any + init_containers: Optional[List[RolloutV1Container]] + node_name: Optional[str] + node_selector: Optional[Dict[str, str]] + overhead: Optional[Dict[str, str]] + preemption_policy: Optional[str] + priority: Optional[int] + priority_class_name: Optional[str] + readiness_gates: Any + restart_policy: Optional[str] + runtime_class_name: Optional[str] + scheduler_name: Optional[str] + security_context: Any + service_account: Optional[str] + service_account_name: Optional[str] + share_process_namespace: Optional[bool] + subdomain: Optional[str] + termination_grace_period_seconds: Optional[int] + tolerations: Any + topology_spread_constraints: Any + volumes: Any + +class RolloutV1PodTemplateSpec(RolloutBaseModel): + metadata: RolloutV1ObjectMeta + spec: RolloutV1PodSpec + +class RolloutSpec(RolloutBaseModel): + replicas: int + selector: RolloutV1LabelSelector + template: RolloutV1PodTemplateSpec + min_ready_seconds: Optional[int] + revision_history_limit: Optional[int] + paused: Optional[bool] + progress_deadline_seconds: Optional[int] + restart_at: Optional[datetime.datetime] + strategy: Any + +class RolloutBlueGreenStatus(RolloutBaseModel): + active_selector: Optional[str] + post_promotion_analysis_run: Optional[str] + post_promotion_analysis_run_status: Any + pre_promotion_analysis_run: Optional[str] + pre_promotion_analysis_run_status: Any + preview_selector: Optional[str] + previous_active_selector: Optional[str] + scale_down_delay_start_time: Optional[datetime.datetime] + scale_up_preview_check_point: Optional[bool] + +class RolloutStatusCondition(RolloutBaseModel): + last_transition_time: datetime.datetime + last_update_time: datetime.datetime + message: str + reason: str + status: str + type: str + +class RolloutStatus(RolloutBaseModel): + hpa_replicas: Optional[int] = pydantic.Field(..., alias="HPAReplicas") + abort: Optional[bool] + aborted_at: Optional[datetime.datetime] + available_replicas: Optional[int] + blue_green: RolloutBlueGreenStatus + canary: Any # TODO type this out if connector needs to interact with it + collision_count: Optional[int] + conditions: List[RolloutStatusCondition] + controller_pause: Optional[bool] + current_pod_hash: str + current_step_hash: Optional[str] + current_step_index: Optional[int] + observed_generation: str + pause_conditions: Any + ready_replicas: Optional[int] + replicas: Optional[int] + restarted_at: Optional[datetime.datetime] + selector: str + stable_RS: Optional[str] + updated_replicas: Optional[int] + +class RolloutObj(RolloutBaseModel): # TODO is this the right base to inherit from? + api_version: str + kind: str + metadata: RolloutV1ObjectMeta + spec: RolloutSpec + status: Optional[RolloutStatus] + +# TODO expose to config if needed +ROLLOUT_GROUP = "argoproj.io" +ROLLOUT_VERSION = "v1alpha1" +ROLLOUT_PURAL = "rollouts" + +class Rollout(KubernetesModel): + """Wrapper around an ArgoCD Kubernetes `Rollout` Object. + The actual instance that this + wraps can be accessed via the ``obj`` instance member. + This wrapper provides some convenient functionality around the + API Object and provides some state management for the `Rollout`. + .. Rollout: + https://argoproj.github.io/argo-rollouts/features/specification/ + """ + + obj: RolloutObj + + _rollout_const_args: Dict[str, str] = dict( + group=ROLLOUT_GROUP, + version=ROLLOUT_VERSION, + plural=ROLLOUT_PURAL, + ) + + api_clients: ClassVar[Dict[str, Type]] = { + "preferred":kubernetes_asyncio.client.CustomObjectsApi, + f"{ROLLOUT_GROUP}/{ROLLOUT_VERSION}":kubernetes_asyncio.client.CustomObjectsApi, + } + + async def create(self, namespace: str = None) -> None: + """Create the Rollout under the given namespace. + Args: + namespace: The namespace to create the Rollout under. + """ + if namespace is None: + namespace = self.namespace + + self.logger.info( + f'creating rollout "{self.name}" in namespace "{namespace}"' + ) + self.logger.debug(f"rollout: {self.obj}") + + async with self.api_client() as api_client: + self.obj = RolloutObj.parse_obj(await api_client.create_namespaced_custom_object( + namespace=namespace, + body=self.obj.dict(by_alias=True, exclude_none=True), + **self._rollout_const_args, + )) + + @classmethod + async def read(cls, name: str, namespace: str) -> "Rollout": + """Read a Rollout by name under the given namespace. + Args: + name: The name of the Rollout to read. + namespace: The namespace to read the Rollout from. + """ + + async with cls.preferred_client() as api_client: + obj = await api_client.get_namespaced_custom_object( + namespace=namespace, + name=name, + **cls._rollout_const_args, + ) + return Rollout(RolloutObj.parse_obj(obj)) + + async def patch(self) -> None: + """Update the changed attributes of the Rollout.""" + async with self.api_client() as api_client: + self.obj = RolloutObj.parse_obj(await api_client.patch_namespaced_custom_object( + namespace=self.namespace, + name=self.name, + body=self.obj.dict(by_alias=True, exclude_none=True), + **self._rollout_const_args, + )) + + async def delete(self, options:kubernetes_asyncio.client.V1DeleteOptions = None) ->kubernetes_asyncio.client.V1Status: + """Delete the Rollout. + This method expects the Rollout to have been loaded or otherwise + assigned a namespace already. If it has not, the namespace will need + to be set manually. + Args: + options: Unsupported, options for Rollout deletion. + Returns: + The status of the delete operation. + """ + if options is not None: + raise RuntimeError("Rollout deletion does not support V1DeleteOptions") + + self.logger.info(f'deleting rollout "{self.name}"') + self.logger.trace(f"rollout: {self.obj}") + + async with self.api_client() as api_client: + return await api_client.delete_namespaced_custom_object( + namespace=self.namespace, + name=self.name, + **self._rollout_const_args, + ) + + async def refresh(self) -> None: + """Refresh the underlying Kubernetes Rollout resource.""" + async with self.api_client() as api_client: + self.obj = RolloutObj.parse_obj(await api_client.get_namespaced_custom_object_status( + namespace=self.namespace, + name=self.name, + **self._rollout_const_args + )) + + async def rollback(self) -> None: + # TODO rollbacks are automated in Argo Rollouts, not sure if making this No Op will cause issues + # but I was unable to locate a means of triggering a rollout rollback manually + raise TypeError( + ( + "rollback is not supported under the optimization of rollouts because rollbacks are applied to " + "Kubernetes Deployment objects whereas this is automated by argocd" + ) + ) + + async def get_status(self) -> RolloutStatus: + """Get the status of the Rollout. + Returns: + The status of the Rollout. + """ + self.logger.info(f'checking status of rollout "{self.name}"') + # first, refresh the rollout state to ensure the latest status + await self.refresh() + + # return the status from the rollout + return self.obj.status + + async def get_pods(self) -> List[Pod]: + """Get the pods for the Rollout. + + Returns: + A list of pods that belong to the rollout. + """ + self.logger.debug(f'getting pods for rollout "{self.name}"') + + async with Pod.preferred_client() as api_client: + label_selector = self.obj.spec.selector.match_labels + pod_list:kubernetes_asyncio.client.V1PodList = await api_client.list_namespaced_pod( + namespace=self.namespace, label_selector=selector_string(label_selector) + ) + + pods = [Pod(p) for p in pod_list.items] + return pods + + @property + def status(self) -> RolloutStatus: + """Return the status of the Rollout. + Returns: + The status of the Rollout. + """ + return self.obj.status + + async def is_ready(self) -> bool: + """Check if the Rollout is in the ready state. + + Returns: + True if in the ready state; False otherwise. + """ + await self.refresh() + + # if there is no status, the deployment is definitely not ready + status = self.obj.status + if status is None: + return False + + # check the status for the number of total replicas and compare + # it to the number of ready replicas. if the numbers are + # equal, the deployment is ready; otherwise it is not ready. + total = status.replicas + ready = status.ready_replicas + + if total is None: + return False + + return total == ready + + @property + def containers(self) -> List[Container]: + """ + Return a list of Container objects from the underlying pod template spec. + """ + return list( + map(lambda c: Container(c, None), self.obj.spec.template.spec.containers) + ) + + def find_container(self, name: str) -> Optional[Container]: + """ + Return the container with the given name. + """ + return next(filter(lambda c: c.name == name, self.containers), None) + + async def get_target_container(self, config: ContainerConfiguration) -> Optional[Container]: + """Return the container targeted by the supplied configuration""" + target_container = self.find_container(config.name) + if target_container is not None: + async with kubernetes_asyncio.client.ApiClient() as api_client: + target_container.obj = api_client.deserialize( + response=FakeKubeResponse(target_container.obj.dict(by_alias=True, exclude_none=True)), + response_type=kubernetes_asyncio.client.models.V1Container + ) + return target_container + + @property + def replicas(self) -> int: + """ + Return the number of desired pods. + """ + return self.obj.spec.replicas + + @replicas.setter + def replicas(self, replicas: int) -> None: + """ + Set the number of desired pods. + """ + self.obj.spec.replicas = replicas + + @property + def pod_template_spec(self) -> RolloutV1PodTemplateSpec: + """Return the pod template spec for instances of the Rollout.""" + return self.obj.spec.template + + async def get_pod_template_spec_copy(self) -> kubernetes_asyncio.client.models.V1PodTemplateSpec: + """Return a deep copy of the pod template spec. Eg. for creation of a tuning pod""" + async with kubernetes_asyncio.client.ApiClient() as api_client: + return api_client.deserialize( + response=FakeKubeResponse(self.pod_template_spec.dict(by_alias=True, exclude_none=True)), + response_type=kubernetes_asyncio.client.models.V1PodTemplateSpec + ) + + def update_pod(self, pod: kubernetes_asyncio.client.models.V1Pod) -> kubernetes_asyncio.client.models.V1Pod: + """Update the pod with the latest state of the controller if needed. In the case of argo rollouts, the + pod labels are updated with the latest template hash so that it will be routed to by the appropriate service""" + # Apply the latest template hash so the active service register the tuning pod as an endpoint + pod.metadata.labels["rollouts-pod-template-hash"] = self.obj.status.current_pod_hash + return pod + + @backoff.on_exception(backoff.expo, kubernetes_asyncio.client.exceptions.ApiException, max_tries=3) + async def inject_sidecar( + self, + name: str, + image: str, + *, + service: Optional[str] = None, + port: Optional[int] = None, + index: Optional[int] = None, + service_port: int = 9980 + ) -> None: + """ + Injects an Envoy sidecar into a target Deployment that proxies a service + or literal TCP port, generating scrapeable metrics usable for optimization. + + The service or port argument must be provided to define how traffic is proxied + between the Envoy sidecar and the container responsible for fulfilling the request. + + Args: + name: The name of the sidecar to inject. + image: The container image for the sidecar container. + service: Name of the service to proxy. Envoy will accept ingress traffic + on the service port and reverse proxy requests back to the original + target container. + port: The name or number of a port within the Deployment to wrap the proxy around. + index: The index at which to insert the sidecar container. When `None`, the sidecar is appended. + service_port: The port to receive ingress traffic from an upstream service. + """ + + await self.refresh() + + if not (service or port): + raise ValueError(f"a service or port must be given") + + if isinstance(port, str) and port.isdigit(): + port = int(port) + + # check for a port conflict + container_ports = list(itertools.chain(*map(operator.attrgetter("ports"), self.containers))) + if service_port in list(map(operator.attrgetter("container_port"), container_ports)): + raise ValueError(f"Port conflict: Rollout '{self.name}' already exposes port {service_port} through an existing container") + + # lookup the port on the target service + if service: + try: + service_obj = await Service.read(service, self.namespace) + except kubernetes_asyncio.client.exceptions.ApiException as error: + if error.status == 404: + raise ValueError(f"Unknown Service '{service}'") from error + else: + raise error + if not port: + port_count = len(service_obj.obj.spec.ports) + if port_count == 0: + raise ValueError(f"Target Service '{service}' does not expose any ports") + elif port_count > 1: + raise ValueError(f"Target Service '{service}' exposes multiple ports -- target port must be specified") + port_obj = service_obj.obj.spec.ports[0] + else: + if isinstance(port, int): + port_obj = next(filter(lambda p: p.port == port, service_obj.obj.spec.ports), None) + elif isinstance(port, str): + port_obj = next(filter(lambda p: p.name == port, service_obj.obj.spec.ports), None) + else: + raise TypeError(f"Unable to resolve port value of type {port.__class__} (port={port})") + + if not port_obj: + raise ValueError(f"Port '{port}' does not exist in the Service '{service}'") + + # resolve symbolic name in the service target port to a concrete container port + if isinstance(port_obj.target_port, str): + container_port_obj = next(filter(lambda p: p.name == port_obj.target_port, container_ports), None) + if not container_port_obj: + raise ValueError(f"Port '{port_obj.target_port}' could not be resolved to a destination container port") + + container_port = container_port_obj.container_port + else: + container_port = port_obj.target_port + + else: + # find the container port + container_port_obj = next(filter(lambda p: p.container_port == port, container_ports), None) + if not container_port_obj: + raise ValueError(f"Port '{port}' could not be resolved to a destination container port") + + container_port = container_port_obj.container_port + + # build the sidecar container + container = RolloutV1Container( + name=name, + image=image, + image_pull_policy="IfNotPresent", + resources=RolloutV1ResourceRequirements( + requests={ + "cpu": "125m", + "memory": "128Mi" + }, + limits={ + "cpu": "250m", + "memory": "256Mi" + } + ), + env=[ + RolloutV1EnvVar(name="OPSANI_ENVOY_PROXY_SERVICE_PORT", value=str(service_port)), + RolloutV1EnvVar(name="OPSANI_ENVOY_PROXIED_CONTAINER_PORT", value=str(container_port)), + RolloutV1EnvVar(name="OPSANI_ENVOY_PROXY_METRICS_PORT", value="9901") + ], + ports=[ + RolloutV1ContainerPort(name="opsani-proxy", container_port=service_port, protocol="TCP"), + RolloutV1ContainerPort(name="opsani-metrics", container_port=9901, protocol="TCP"), + ] + ) + + # add the sidecar to the Deployment + if index is None: + self.obj.spec.template.spec.containers.append(container) + else: + self.obj.spec.template.spec.containers.insert(index, container) + + # patch the deployment + await self.patch() + + # TODO: convert to rollout logic + async def eject_sidecar(self, name: str) -> bool: + """Eject an Envoy sidecar from the Deployment. + + Returns True if the sidecar was ejected. + """ + await self.refresh() + container = self.remove_container(name) + if container: + await self.replace() + return True + + return False + + # TODO: rebase this and _check_conditions for saturation mode + @contextlib.asynccontextmanager + async def rollout(self, *, timeout: Optional[servo.Duration] = None) -> None: + raise NotImplementedError('To be implemented in future update') + class Millicore(int): """ The Millicore class represents one one-thousandth of a vCPU or hyperthread in Kubernetes. @@ -2524,11 +3107,13 @@ class CanaryOptimization(BaseOptimization): """ # The deployment and container stanzas from the configuration - deployment_config: "DeploymentConfiguration" + deployment_config: Optional["DeploymentConfiguration"] + rollout_config: Optional["RolloutConfiguration"] container_config: "ContainerConfiguration" # State for mainline resources. Read from the cluster - deployment: Deployment + deployment: Optional[Deployment] + rollout: Optional[Rollout] main_container: Container # State for tuning resources @@ -2537,32 +3122,69 @@ class CanaryOptimization(BaseOptimization): _tuning_pod_template_spec: Optional[kubernetes_asyncio.client.models.V1PodTemplateSpec] = pydantic.PrivateAttr() + + @pydantic.root_validator + def check_deployment_and_rollout(cls, values): + if values.get('deployment_config') is not None and values.get('rollout_config') is not None: + raise ValueError("Cannot create a CanaryOptimization with both rollout and deployment configurations") + if values.get('deployment') is not None and values.get('rollout') is not None: + raise ValueError("Cannot create a CanaryOptimization with both rollout and deployment") + + if values.get('deployment_config') is None and values.get('rollout_config') is None: + raise ValueError("CanaryOptimization must be initialized with either a rollout or deployment configuration") + if values.get('deployment') is None and values.get('rollout') is None: + raise ValueError("CanaryOptimization must be initialized with either a rollout or deployment") + + return values + + @property + def target_controller_config(self) -> Union["DeploymentConfiguration", "RolloutConfiguration"]: + return self.deployment_config or self.rollout_config + + @property + def target_controller(self) -> Union[Deployment, Rollout]: + return self.deployment or self.rollout + + @property + def target_controller_type(self) -> str: + return type(self.target_controller).__name__ + @classmethod async def create( - cls, deployment_config: "DeploymentConfiguration", **kwargs + cls, deployment_or_rollout_config: Union["DeploymentConfiguration", "RolloutConfiguration"], **kwargs ) -> "CanaryOptimization": - deployment = await Deployment.read(deployment_config.name, cast(str, deployment_config.namespace)) - if not deployment: + read_args = (deployment_or_rollout_config.name, cast(str, deployment_or_rollout_config.namespace)) + if isinstance(deployment_or_rollout_config, DeploymentConfiguration): + controller_type = "Deployment" + deployment_or_rollout = await Deployment.read(*read_args) + init_args = dict(deployment_config = deployment_or_rollout_config, deployment = deployment_or_rollout) + elif isinstance(deployment_or_rollout_config, RolloutConfiguration): + controller_type = "Rollout" + deployment_or_rollout = await Rollout.read(*read_args) + init_args = dict(rollout_config = deployment_or_rollout_config, rollout = deployment_or_rollout) + else: + raise NotImplementedError(f"Unknown configuration type '{type(deployment_or_rollout_config).__name__}'") + if not deployment_or_rollout: raise ValueError( - f'cannot create CanaryOptimization: target Deployment "{deployment_config.name}" does not exist in Namespace "{deployment_config.namespace}"' + f'cannot create CanaryOptimization: target {controller_type} "{deployment_or_rollout_config.name}"' + f' does not exist in Namespace "{deployment_or_rollout_config.namespace}"' ) # NOTE: Currently only supporting one container - assert len(deployment_config.containers) == 1, "CanaryOptimization currently only supports a single container" - container_config = deployment_config.containers[0] - main_container = deployment.find_container(container_config.name) + assert len(deployment_or_rollout_config.containers) == 1, "CanaryOptimization currently only supports a single container" + container_config = deployment_or_rollout_config.containers[0] + main_container = await deployment_or_rollout.get_target_container(container_config) name = ( - deployment_config.strategy.alias - if isinstance(deployment_config.strategy, CanaryOptimizationStrategyConfiguration) - and deployment_config.strategy.alias - else f"{deployment.name}/{main_container.name}-tuning" + deployment_or_rollout_config.strategy.alias + if isinstance(deployment_or_rollout_config.strategy, CanaryOptimizationStrategyConfiguration) + and deployment_or_rollout_config.strategy.alias + else f"{deployment_or_rollout.name}/{main_container.name}-tuning" ) optimization = cls( name=name, - deployment_config=deployment_config, + **init_args, container_config=container_config, - deployment=deployment, main_container=main_container, **kwargs, ) @@ -2573,7 +3195,7 @@ async def create( async def _load_tuning_state(self) -> None: # Find an existing tuning Pod/Container if available try: - tuning_pod = await Pod.read(self.tuning_pod_name, cast(str, self.deployment_config.namespace)) + tuning_pod = await Pod.read(self.tuning_pod_name, cast(str, self.namespace)) tuning_container = tuning_pod.get_container(self.container_config.name) except kubernetes_asyncio.client.exceptions.ApiException as e: @@ -2649,14 +3271,14 @@ async def apply(self) -> None: @property def namespace(self) -> str: - return self.deployment_config.namespace + return self.target_controller_config.namespace @property def tuning_pod_name(self) -> str: """ Return the name of tuning Pod for this optimization. """ - return f"{self.deployment_config.name}-tuning" + return f"{self.target_controller_config.name}-tuning" async def delete_tuning_pod(self, *, raise_if_not_found: bool = True) -> Optional[Pod]: """ @@ -2688,8 +3310,8 @@ async def delete_tuning_pod(self, *, raise_if_not_found: bool = True) -> Optiona return None @property - def deployment_name(self) -> str: - return self.deployment_config.name + def target_controller_name(self) -> str: + return self.target_controller_config.name @property def container_name(self) -> str: @@ -2698,7 +3320,7 @@ def container_name(self) -> str: # TODO: Factor into another class? async def _configure_tuning_pod_template_spec(self) -> None: # Configure a PodSpecTemplate for the tuning Pod state - pod_template_spec: kubernetes_asyncio.client.models.V1PodTemplateSpec = copy.deepcopy(self.deployment.pod_template_spec) + pod_template_spec: kubernetes_asyncio.client.models.V1PodTemplateSpec = await self.target_controller.get_pod_template_spec_copy() pod_template_spec.metadata.name = self.tuning_pod_name if pod_template_spec.metadata.annotations is None: @@ -2786,7 +3408,7 @@ async def create_tuning_pod(self) -> Pod: assert self.tuning_pod is None, "Tuning Pod already exists" assert self.tuning_container is None, "Tuning Pod Container already exists" self.logger.debug( - f"creating tuning pod '{self.tuning_pod_name}' based on deployment '{self.deployment_name}' in namespace '{self.namespace}'" + f"creating tuning pod '{self.tuning_pod_name}' based on {self.target_controller_type} '{self.target_controller_name}' in namespace '{self.namespace}'" ) # Setup the tuning Pod -- our settings are updated on the underlying PodSpec template @@ -2795,6 +3417,9 @@ async def create_tuning_pod(self) -> Pod: metadata=self._tuning_pod_template_spec.metadata, spec=self._tuning_pod_template_spec.spec ) + # Update pod with latest controller state + pod_obj = self.target_controller.update_pod(pod_obj) + tuning_pod = Pod(obj=pod_obj) # Create the Pod and wait for it to get ready @@ -2906,7 +3531,7 @@ def on_failure(self) -> FailureMode: Return the configured failure behavior. If not set explicitly, this will be cascaded from the base kubernetes configuration (or its default) """ - return self.deployment_config.on_failure + return self.target_controller_config.on_failure @property def main_cpu(self) -> CPU: @@ -2955,7 +3580,7 @@ def main_replicas(self) -> servo.Replicas: return servo.Replicas( min=0, max=99999, - value=self.deployment.replicas, + value=self.target_controller.replicas, pinned=True, ) @@ -2968,7 +3593,7 @@ def main_name(self) -> str: """ return ( self.container_config.alias - or f"{self.deployment_config.name}/{self.container_config.name}" + or f"{self.target_controller_config.name}/{self.container_config.name}" ) def to_components(self) -> List[servo.Component]: @@ -3033,7 +3658,7 @@ async def handle_error(self, error: Exception) -> bool: self.logger.info( "creating new tuning pod against baseline following failed adjust" ) - await self._configure_tuning_pod_template_spec() # reset to baseline from the Deployment + await self._configure_tuning_pod_template_spec() # reset to baseline from the target controller self.tuning_pod = await self.create_or_recreate_tuning_pod() raise error # Always communicate errors to backend unless ignored @@ -3087,18 +3712,20 @@ async def create( runtime_ids = {} pod_tmpl_specs = {} - for deployment_config in config.deployments: - if deployment_config.strategy == OptimizationStrategy.default: + for deployment_or_rollout_config in (config.deployments or []) + (config.rollouts or []): + if deployment_or_rollout_config.strategy == OptimizationStrategy.default: + if isinstance(deployment_or_rollout_config, RolloutConfiguration): + raise NotImplementedError("Saturation mode not currently supported on Argo Rollouts") optimization = await DeploymentOptimization.create( - deployment_config, timeout=config.timeout + deployment_or_rollout_config, timeout=deployment_or_rollout_config.timeout ) - deployment = optimization.deployment + deployment_or_rollout = optimization.deployment container = optimization.container - elif deployment_config.strategy == OptimizationStrategy.canary: + elif deployment_or_rollout_config.strategy == OptimizationStrategy.canary: optimization = await CanaryOptimization.create( - deployment_config, timeout=config.timeout + deployment_or_rollout_config, timeout=deployment_or_rollout_config.timeout ) - deployment = optimization.deployment + deployment_or_rollout = optimization.target_controller container = optimization.main_container # Ensure the canary is available @@ -3108,15 +3735,15 @@ async def create( await optimization.create_tuning_pod() else: raise ValueError( - f"unknown optimization strategy: {deployment_config.strategy}" + f"unknown optimization strategy: {deployment_or_rollout_config.strategy}" ) optimizations.append(optimization) # compile artifacts for checksum calculation - pods = await deployment.get_pods() + pods = await deployment_or_rollout.get_pods() runtime_ids[optimization.name] = [pod.uid for pod in pods] - pod_tmpl_specs[deployment.name] = deployment.obj.spec.template.spec + pod_tmpl_specs[deployment_or_rollout.name] = deployment_or_rollout.obj.spec.template.spec images[container.name] = container.image # Compute checksums for change detection @@ -3433,6 +4060,14 @@ class PermissionSet(pydantic.BaseModel): ), ] +ROLLOUT_PERMISSIONS = [ + PermissionSet( + group="argoproj.io", + resources=["rollouts", "rollouts/status"], + verbs=["get", "list", "watch", "update", "patch"], + ), +] + class BaseKubernetesConfiguration(servo.BaseConfiguration): """ @@ -3483,6 +4118,16 @@ class DeploymentConfiguration(BaseKubernetesConfiguration): strategy: StrategyTypes = OptimizationStrategy.default replicas: servo.Replicas +class RolloutConfiguration(BaseKubernetesConfiguration): + """ + The RolloutConfiguration class models the configuration of an optimizable Argo Rollout. + """ + + name: DNSSubdomainName + containers: List[ContainerConfiguration] + strategy: StrategyTypes = OptimizationStrategy.canary + replicas: servo.Replicas + class KubernetesConfiguration(BaseKubernetesConfiguration): namespace: DNSSubdomainName = DNSSubdomainName("default") @@ -3492,10 +4137,21 @@ class KubernetesConfiguration(BaseKubernetesConfiguration): description="Permissions required by the connector to operate in Kubernetes.", ) - deployments: List[DeploymentConfiguration] = pydantic.Field( + deployments: Optional[List[DeploymentConfiguration]] = pydantic.Field( description="Deployments to be optimized.", ) + rollouts: Optional[List[RolloutConfiguration]] = pydantic.Field( + description="Argo rollouts to be optimized.", + ) + + @pydantic.root_validator + def check_deployment_and_rollout(cls, values): + if (not values.get('deployments')) and (not values.get('rollouts')): + raise ValueError("No optimization target(s) were specified") + + return values + @classmethod def generate(cls, **kwargs) -> "KubernetesConfiguration": return cls( @@ -3542,6 +4198,9 @@ def cascade_common_settings(self, *, overwrite: bool = False) -> None: for obj in ( attribute if isinstance(attribute, Collection) else [attribute] ): + # don't cascade if optional and not set + if obj is None: + continue for ( field_name, field, @@ -3552,7 +4211,7 @@ def cascade_common_settings(self, *, overwrite: bool = False) -> None: if field_name in obj.__fields_set__ and not overwrite: self.logger.trace( - f"skipping config cascade for unset field '{field_name}'" + f"skipping config cascade for field '{field_name}' set with value '{getattr(obj, field_name)}'" ) continue @@ -3616,7 +4275,10 @@ async def check_version(self) -> None: async def check_permissions(self) -> None: async with kubernetes_asyncio.client.api_client.ApiClient() as api: v1 = kubernetes_asyncio.client.AuthorizationV1Api(api) - for permission in self.config.permissions: + required_permissions = self.config.permissions + if self.config.rollouts: + required_permissions.append(ROLLOUT_PERMISSIONS) + for permission in required_permissions: for resource in permission.resources: for verb in permission.verbs: attributes = kubernetes_asyncio.client.models.V1ResourceAttributes( @@ -3643,36 +4305,62 @@ async def check_namespace(self) -> None: @servo.multicheck('Deployment "{item.name}" is readable') async def check_deployments(self) -> Tuple[Iterable, servo.CheckHandler]: - async def check_dep(dep_config: DeploymentConfiguration) -> str: + async def check_dep(dep_config: DeploymentConfiguration) -> None: await Deployment.read(dep_config.name, dep_config.namespace) - return self.config.deployments, check_dep + return (self.config.deployments or []), check_dep + + @servo.multicheck('Rollout "{item.name}" is readable') + async def check_rollouts(self) -> Tuple[Iterable, servo.CheckHandler]: + async def check_rol(rol_config: RolloutConfiguration) -> None: + await Rollout.read(rol_config.name, rol_config.namespace) + + return (self.config.rollouts or []), check_rol + + async def _check_container_resource_requirements( + self, + target_controller: Union[Deployment, Rollout], + target_config: Union[DeploymentConfiguration, RolloutConfiguration] + ) -> None: + for cont_config in target_config.containers: + container = target_controller.find_container(cont_config.name) + assert container, f"{type(target_controller).__name__} {target_config.name} has no container {cont_config.name}" + + for resource in Resource.values(): + current_state = None + container_requirements = container.get_resource_requirements(resource) + get_requirements = getattr(cont_config, resource).get + for requirement in get_requirements: + current_state = container_requirements.get(requirement) + if current_state: + break + + assert current_state, ( + f"{type(target_controller).__name__} {target_config.name} target container {cont_config.name} spec does not define the resource {resource}. " + f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" + ) @servo.multicheck('Containers in the "{item.name}" Deployment have resource requirements') async def check_resource_requirements(self) -> Tuple[Iterable, servo.CheckHandler]: async def check_dep_resource_requirements( dep_config: DeploymentConfiguration, - ) -> str: + ) -> None: deployment = await Deployment.read(dep_config.name, dep_config.namespace) - for cont_config in dep_config.containers: - container = deployment.find_container(cont_config.name) - assert container, f"Deployment {dep_config.name} has no container {cont_config.name}" - - for resource in Resource.values(): - current_state = None - container_requirements = container.get_resource_requirements(resource) - get_requirements = getattr(cont_config, resource).get - for requirement in get_requirements: - current_state = container_requirements.get(requirement) - if current_state: - break + await self._check_container_resource_requirements(deployment, dep_config) + + return (self.config.deployments or []), check_dep_resource_requirements - assert current_state, ( - f"Deployment {dep_config.name} target container {cont_config.name} spec does not define the resource {resource}. " - f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" - ) - return self.config.deployments, check_dep_resource_requirements + @servo.multicheck('Containers in the "{item.name}" Rollout have resource requirements') + async def check_rollout_resource_requirements(self) -> Tuple[Iterable, servo.CheckHandler]: + async def check_rol_resource_requirements( + rol_config: RolloutConfiguration, + ) -> None: + rollout = await Rollout.read(rol_config.name, rol_config.namespace) + await self._check_container_resource_requirements(rollout, rol_config) + + return (self.config.rollouts or []), check_rol_resource_requirements + @servo.multicheck('Deployment "{item.name}" is ready') async def check_deployments_are_ready(self) -> Tuple[Iterable, servo.CheckHandler]: @@ -3681,7 +4369,16 @@ async def check_deployment(dep_config: DeploymentConfiguration) -> None: if not await deployment.is_ready(): raise RuntimeError(f'Deployment "{deployment.name}" is not ready') - return self.config.deployments, check_deployment + return (self.config.deployments or []), check_deployment + + @servo.multicheck('Rollout "{item.name}" is ready') + async def check_rollouts_are_ready(self) -> Tuple[Iterable, servo.CheckHandler]: + async def check_rollout(rol_config: RolloutConfiguration) -> None: + rollout = await Rollout.read(rol_config.name, rol_config.namespace) + if not await rollout.is_ready(): + raise RuntimeError(f'Rollout "{rollout.name}" is not ready') + + return (self.config.rollouts or []), check_rollout @servo.metadata( diff --git a/servo/connectors/opsani_dev.py b/servo/connectors/opsani_dev.py index 0db90cbe2..00cecd44b 100644 --- a/servo/connectors/opsani_dev.py +++ b/servo/connectors/opsani_dev.py @@ -1,7 +1,8 @@ +import abc import json import operator import os -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union, Type import kubernetes_asyncio import pydantic @@ -48,7 +49,8 @@ class Memory(servo.connectors.kubernetes.Memory): class OpsaniDevConfiguration(servo.BaseConfiguration): namespace: str - deployment: str + deployment: Optional[str] + rollout: Optional[str] container: str service: str port: Optional[Union[pydantic.StrictInt, str]] = None @@ -60,6 +62,16 @@ class OpsaniDevConfiguration(servo.BaseConfiguration): description="Duration to observe the application after an adjust to ensure the deployment is stable. May be overridden by optimizer supplied `control.adjust.settlement` value." ) + @pydantic.root_validator + def check_deployment_and_rollout(cls, values): + if values.get('deployment') is not None and values.get('rollout') is not None: + raise ValueError("Configuration cannot specify both rollout and deployment") + + if values.get('deployment') is None and values.get('rollout') is None: + raise ValueError("Configuration must specify either rollout or deployment") + + return values + @classmethod def generate(cls, **kwargs) -> "OpsaniDevConfiguration": return cls( @@ -79,32 +91,38 @@ def generate_kubernetes_config( Returns: A Kubernetes connector configuration object. """ + main_config = servo.connectors.kubernetes.DeploymentConfiguration( + name=(self.deployment or self.rollout), + strategy=servo.connectors.kubernetes.CanaryOptimizationStrategyConfiguration( + type=servo.connectors.kubernetes.OptimizationStrategy.canary, + alias="tuning" + ), + replicas=servo.Replicas( + min=0, + max=1, + ), + containers=[ + servo.connectors.kubernetes.ContainerConfiguration( + name=self.container, + alias="main", + cpu=self.cpu, + memory=self.memory, + ) + ], + ) + if self.deployment: + main_arg = { 'deployments': [ main_config ] } + elif self.rollout: + main_arg = { 'rollouts': [ servo.connectors.kubernetes.RolloutConfiguration.parse_obj( + main_config.dict(exclude_none=True) + ) ] } + return servo.connectors.kubernetes.KubernetesConfiguration( namespace=self.namespace, description="Update the namespace, deployment, etc. to match your Kubernetes cluster", timeout=self.timeout, settlement=self.settlement, - deployments=[ - servo.connectors.kubernetes.DeploymentConfiguration( - name=self.deployment, - strategy=servo.connectors.kubernetes.CanaryOptimizationStrategyConfiguration( - type=servo.connectors.kubernetes.OptimizationStrategy.canary, - alias="tuning" - ), - replicas=servo.Replicas( - min=0, - max=1, - ), - containers=[ - servo.connectors.kubernetes.ContainerConfiguration( - name=self.container, - alias="main", - cpu=self.cpu, - memory=self.memory, - ) - ], - ) - ], + **main_arg, **kwargs, ) @@ -199,10 +217,50 @@ def generate_prometheus_config( **kwargs, ) - -class OpsaniDevChecks(servo.BaseChecks): +class BaseOpsaniDevChecks(servo.BaseChecks, abc.ABC): config: OpsaniDevConfiguration + @property + @abc.abstractmethod + def controller_type_name(self) -> str: + ... + + @property + @abc.abstractmethod + def config_controller_name(self) -> str: + ... + + @property + @abc.abstractmethod + def controller_class(self) -> Type[Union[ + servo.connectors.kubernetes.Deployment, + servo.connectors.kubernetes.Rollout + ]]: + ... + + @property + @abc.abstractmethod + def required_permissions(self) -> List[servo.connectors.kubernetes.PermissionSet]: + ... + + @abc.abstractmethod + async def _get_port_forward_target(self) -> str: + ... + + @abc.abstractmethod + def _get_generated_controller_config(self, config: servo.connectors.kubernetes.KubernetesConfiguration) -> Union[ + servo.connectors.kubernetes.DeploymentConfiguration, + servo.connectors.kubernetes.RolloutConfiguration + ]: + ... + + @abc.abstractmethod + def _get_controller_service_selector(self, controller: Union[ + servo.connectors.kubernetes.Deployment, + servo.connectors.kubernetes.Rollout + ]) -> Dict[str, str]: + ... + ## # Kubernetes essentials @@ -225,7 +283,7 @@ async def check_version(self) -> None: async def check_permissions(self) -> None: async with kubernetes_asyncio.client.api_client.ApiClient() as api: v1 = kubernetes_asyncio.client.AuthorizationV1Api(api) - for permission in KUBERNETES_PERMISSIONS: + for permission in self.required_permissions: for resource in permission.resources: for verb in permission.verbs: attributes = kubernetes_asyncio.client.models.V1ResourceAttributes( @@ -250,24 +308,22 @@ async def check_permissions(self) -> None: async def check_kubernetes_namespace(self) -> None: await servo.connectors.kubernetes.Namespace.read(self.config.namespace) - @servo.checks.require('Deployment "{self.config.deployment}" is readable') - async def check_kubernetes_deployment(self) -> None: - await servo.connectors.kubernetes.Deployment.read(self.config.deployment, self.config.namespace) + @servo.checks.require('{self.controller_type_name} "{self.config_controller_name}" is readable') + async def check_kubernetes_controller(self) -> None: + await self.controller_class.read(self.config_controller_name, self.config.namespace) @servo.checks.require('Container "{self.config.container}" is readable') async def check_kubernetes_container(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, self.config.namespace - ) - container = deployment.find_container(self.config.container) + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + container = controller.find_container(self.config.container) assert ( container - ), f"failed reading Container '{self.config.container}' in Deployment '{self.config.deployment}'" + ), f"failed reading Container '{self.config.container}' in {self.controller_type_name} '{self.config_controller_name}'" @servo.require('Container "{self.config.container}" has resource requirements') async def check_resource_requirements(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read(self.config.deployment, self.config.namespace) - container = deployment.find_container(self.config.container) + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + container = controller.find_container(self.config.container) assert container assert container.resources, "missing container resources" @@ -284,21 +340,21 @@ async def check_resource_requirements(self) -> None: break assert current_state, ( - f"Deployment {self.config.deployment} target container {self.config.container} spec does not define the resource {resource}. " + f"{self.controller_type_name} {self.config_controller_name} target container {self.config.container} spec does not define the resource {resource}. " f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" ) @servo.checks.require("Target container resources fall within optimization range") async def check_target_container_resources_within_limits(self) -> None: - # Load the Deployment - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, + # Load the Controller + controller = await self.controller_class.read( + self.config_controller_name, self.config.namespace ) - assert deployment, f"failed to read deployment '{self.config.deployment}' in namespace '{self.config.namespace}'" + assert controller, f"failed to read {self.controller_type_name} '{self.config_controller_name}' in namespace '{self.config.namespace}'" # Find the target Container - target_container = next(filter(lambda c: c.name == self.config.container, deployment.containers), None) + target_container = next(filter(lambda c: c.name == self.config.container, controller.containers), None) assert target_container, f"failed to find container '{self.config.container}' when verifying resource limits" # Apply any defaults/overrides from the config @@ -330,11 +386,11 @@ async def check_target_container_resources_within_limits(self) -> None: assert container_memory_value >= config_memory_min, f"target container Memory value {container_memory_value.human_readable()} must be greater than optimizable minimum {config_memory_min.human_readable()}" assert container_memory_value <= config_memory_max, f"target container Memory value {container_memory_value.human_readable()} must be less than optimizable maximum {config_memory_max.human_readable()}" - @servo.require('Deployment "{self.config.deployment}" is ready') - async def check_deployment(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read(self.config.deployment, self.config.namespace) - if not await deployment.is_ready(): - raise RuntimeError(f'Deployment "{deployment.name}" is not ready') + @servo.require('{self.controller_type_name} "{self.config_controller_name}" is ready') + async def check_controller_readiness(self) -> None: + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + if not await controller.is_ready(): + raise RuntimeError(f'{self.controller_type_name} "{controller.name}" is not ready') @servo.checks.require("service") async def check_kubernetes_service(self) -> None: @@ -379,21 +435,19 @@ async def check_kubernetes_service_port(self) -> None: return f"Service Port: {port.name} {port.port}:{port.target_port}/{port.protocol}" - @servo.checks.check('Service routes traffic to Deployment Pods') - async def check_service_routes_traffic_to_deployment(self) -> None: + @servo.checks.check('Service routes traffic to {self.controller_type_name} Pods') + async def check_service_routes_traffic_to_controller(self) -> None: service = await servo.connectors.kubernetes.Service.read( self.config.service, self.config.namespace ) - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, self.config.namespace - ) + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) - # NOTE: The Service labels should be a subset of the Deployment labels - deployment_labels = deployment.obj.spec.selector.match_labels - delta = dict(set(service.selector.items()) - set(deployment_labels.items())) + # NOTE: The Service labels should be a subset of the controller labels + controller_labels = self._get_controller_service_selector(controller) + delta = dict(set(service.selector.items()) - set(controller_labels.items())) if delta: desc = ' '.join(map('='.join, delta.items())) - raise RuntimeError(f"Service selector does not match Deployment labels. Missing labels: {desc}") + raise RuntimeError(f"Service selector does not match {self.controller_type_name} labels. Missing labels: {desc}") ## # Prometheus sidecar @@ -524,47 +578,41 @@ async def _list_servo_pods(self) -> List[servo.connectors.kubernetes.Pod]: return pods ## - # Kubernetes Deployment edits + # Kubernetes Controller edits - @servo.checks.require("Deployment PodSpec has expected annotations") - async def check_deployment_annotations(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, - self.config.namespace - ) - assert deployment, f"failed to read deployment '{self.config.deployment}' in namespace '{self.config.namespace}'" + @servo.checks.require("{self.controller_type_name} PodSpec has expected annotations") + async def check_controller_annotations(self) -> None: + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + assert controller, f"failed to read {self.controller_type_name} '{self.config_controller_name}' in namespace '{self.config.namespace}'" # Add optimizer annotation to the static Prometheus values required_annotations = PROMETHEUS_ANNOTATION_DEFAULTS.copy() required_annotations['servo.opsani.com/optimizer'] = self.config.optimizer.id # NOTE: Only check for annotation keys - annotations = deployment.pod_template_spec.metadata.annotations or dict() + annotations = controller.pod_template_spec.metadata.annotations or dict() actual_annotations = set(annotations.keys()) delta = set(required_annotations.keys()).difference(actual_annotations) if delta: annotations = dict(map(lambda k: (k, required_annotations[k]), delta)) patch = {"spec": {"template": {"metadata": {"annotations": annotations}}}} patch_json = json.dumps(patch, indent=None) - command = f"kubectl --namespace {self.config.namespace} patch deployment {self.config.deployment} -p '{patch_json}'" + # NOTE: custom resources don't support strategic merge type. json merge is acceptable for both cases because the patch json doesn't contain lists + command = f"kubectl --namespace {self.config.namespace} patch {self.controller_type_name} {self.config_controller_name} --type='merge' -p '{patch_json}'" desc = ', '.join(sorted(delta)) raise servo.checks.CheckError( - f"deployment '{deployment.name}' is missing annotations: {desc}", + f"{self.controller_type_name} '{controller.name}' is missing annotations: {desc}", hint=f"Patch annotations via: `{command}`", remedy=lambda: _stream_remedy_command(command) ) - @servo.checks.require("Deployment PodSpec has expected labels") - async def check_deployment_labels(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, - self.config.namespace - ) - assert deployment, f"failed to read deployment '{self.config.deployment}' in namespace '{self.config.namespace}'" - - labels = deployment.pod_template_spec.metadata.labels - assert labels, f"deployment '{deployment.name}' does not have any labels" + @servo.checks.require("{self.controller_type_name} PodSpec has expected labels") + async def check_controller_labels(self) -> None: + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + assert controller, f"failed to read {self.controller_type_name} '{self.config_controller_name}' in namespace '{self.config.namespace}'" + labels = controller.pod_template_spec.metadata.labels + assert labels, f"{self.controller_type_name} '{controller.name}' does not have any labels" # Add optimizer label to the static values required_labels = ENVOY_SIDECAR_LABELS.copy() required_labels['servo.opsani.com/optimizer'] = servo.connectors.kubernetes.dns_labelize(self.config.optimizer.id) @@ -575,23 +623,21 @@ async def check_deployment_labels(self) -> None: desc = ', '.join(sorted(map('='.join, delta.items()))) patch = {"spec": {"template": {"metadata": {"labels": delta}}}} patch_json = json.dumps(patch, indent=None) - command = f"kubectl --namespace {self.config.namespace} patch deployment {self.config.deployment} -p '{patch_json}'" + # NOTE: custom resources don't support strategic merge type. json merge is acceptable for both cases because the patch json doesn't contain lists + command = f"kubectl --namespace {self.config.namespace} patch {self.controller_type_name} {controller.name} --type='merge' -p '{patch_json}'" raise servo.checks.CheckError( - f"deployment '{deployment.name}' is missing labels: {desc}", + f"{self.controller_type_name} '{controller.name}' is missing labels: {desc}", hint=f"Patch labels via: `{command}`", remedy=lambda: _stream_remedy_command(command) ) - @servo.checks.require("Deployment has Envoy sidecar container") - async def check_deployment_envoy_sidecars(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, - self.config.namespace - ) - assert deployment, f"failed to read deployment '{self.config.deployment}' in namespace '{self.config.namespace}'" + @servo.checks.require("{self.controller_type_name} has Envoy sidecar container") + async def check_controller_envoy_sidecars(self) -> None: + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + assert controller, f"failed to read {self.controller_type_name} '{self.config_controller_name}' in namespace '{self.config.namespace}'" # Search the containers list for the sidecar - for container in deployment.containers: + for container in controller.containers: if container.name == "opsani-envoy": return @@ -599,23 +645,24 @@ async def check_deployment_envoy_sidecars(self) -> None: f" --port {self.config.port}" if self.config.port is not None else '' ) - command = f"kubectl exec -n {self.config.namespace} -c servo {self._servo_resource_target} -- servo --token-file /servo/opsani.token inject-sidecar --namespace {self.config.namespace} --service {self.config.service}{port_switch} deployment/{self.config.deployment}" + command = ( + f"kubectl exec -n {self.config.namespace} -c servo {self._servo_resource_target} -- " + f"servo --token-file /servo/opsani.token inject-sidecar --namespace {self.config.namespace} --service {self.config.service}{port_switch} " + f"{self.controller_type_name.lower()}/{self.config_controller_name}" + ) raise servo.checks.CheckError( - f"deployment '{deployment.name}' pod template spec does not include envoy sidecar container ('opsani-envoy')", + f"{self.controller_type_name} '{controller.name}' pod template spec does not include envoy sidecar container ('opsani-envoy')", hint=f"Inject Envoy sidecar container via: `{command}`", remedy=lambda: _stream_remedy_command(command) ) @servo.checks.require("Pods have Envoy sidecar containers") async def check_pod_envoy_sidecars(self) -> None: - deployment = await servo.connectors.kubernetes.Deployment.read( - self.config.deployment, - self.config.namespace - ) - assert deployment, f"failed to read deployment '{self.config.deployment}' in namespace '{self.config.namespace}'" + controller = await self.controller_class.read(self.config_controller_name, self.config.namespace) + assert controller, f"failed to read {self.controller_type_name} '{self.config_controller_name}' in namespace '{self.config.namespace}'" pods_without_sidecars = [] - for pod in await deployment.get_pods(): + for pod in await controller.get_pods(): # Search the containers list for the sidecar if not pod.get_container('opsani-envoy'): pods_without_sidecars.append(pod) @@ -659,9 +706,14 @@ async def check_envoy_sidecar_metrics(self) -> str: assert response.data.result_type == servo.connectors.prometheus.ResultType.vector, f"expected a vector result but found {response.data.result_type}" assert len(response.data) == 1, f"expected Prometheus API to return a single result for metric '{metric.name}' but found {len(response.data)}" result = response.data[0] - timestamp, value = result.value + _, value = result.value if value in {None, 0.0}: - command = f"kubectl exec -n {self.config.namespace} -c servo {self._servo_resource_target} -- sh -c \"kubectl port-forward --namespace={self.config.namespace} deploy/{self.config.deployment} 9980 & echo 'GET http://localhost:9980/' | vegeta attack -duration 10s | vegeta report -every 3s\"" + port_forward_target = await self._get_port_forward_target() + command = ( + f"kubectl exec -n {self.config.namespace} -c servo {self._servo_resource_target} -- " + f"sh -c \"kubectl port-forward --namespace={self.config.namespace} {port_forward_target} 9980 & " + f"echo 'GET http://localhost:9980/' | vegeta attack -duration 10s | vegeta report -every 3s\"" + ) raise servo.checks.CheckError( f"Envoy is not reporting any traffic to Prometheus for metric '{metric.name}' ({metric.query})", hint=f"Send traffic to your application on port 9980. Try `{command}`", @@ -696,9 +748,9 @@ async def check_service_proxy(self) -> str: async def check_tuning_is_running(self) -> None: # Generate a KubernetesConfiguration to initialize the optimization class kubernetes_config = self.config.generate_kubernetes_config() - deployment_config = kubernetes_config.deployments[0] + controller_config = self._get_generated_controller_config(kubernetes_config) optimization = await servo.connectors.kubernetes.CanaryOptimization.create( - deployment_config, timeout=kubernetes_config.timeout + controller_config, timeout=kubernetes_config.timeout ) # Ensure the tuning pod is available @@ -751,6 +803,121 @@ def _servo_resource_target(self) -> str: else: return "deployment/servo" +class OpsaniDevChecks(BaseOpsaniDevChecks): + """Opsani dev checks against standard kubernetes Deployments""" + + @property + def controller_type_name(self) -> str: + return "Deployment" + + @property + def config_controller_name(self) -> str: + return self.config.deployment + + @property + def controller_class(self) -> Type[servo.connectors.kubernetes.Deployment]: + return servo.connectors.kubernetes.Deployment + + @property + def required_permissions(self) -> List[servo.connectors.kubernetes.PermissionSet]: + return KUBERNETES_PERMISSIONS + + async def _get_port_forward_target(self) -> str: + return f"deploy/{self.config.deployment}" + + def _get_generated_controller_config( + self, + config: servo.connectors.kubernetes.KubernetesConfiguration + ) -> servo.connectors.kubernetes.DeploymentConfiguration: + return config.deployments[0] + + def _get_controller_service_selector(self, controller: servo.connectors.kubernetes.Deployment) -> Dict[str, str]: + return controller.obj.spec.selector.match_labels + +class OpsaniDevRolloutChecks(BaseOpsaniDevChecks): + """Opsani dev checks against argoproj.io Rollouts""" + @property + def controller_type_name(self) -> str: + return "Rollout" + + @property + def config_controller_name(self) -> str: + return self.config.rollout + + @property + def controller_class(self) -> Type[servo.connectors.kubernetes.Rollout]: + return servo.connectors.kubernetes.Rollout + + @property + def required_permissions(self) -> List[servo.connectors.kubernetes.PermissionSet]: + return KUBERNETES_PERMISSIONS + [servo.connectors.kubernetes.PermissionSet( + group="argoproj.io", + resources=["rollouts", "rollouts/status"], + verbs=["get", "list", "watch", "update", "patch"], + )] + + async def _get_port_forward_target(self) -> str: + # NOTE rollouts don't support kubectl port-forward, have to target the current replicaset instead + rollout = await servo.connectors.kubernetes.Rollout.read( + self.config.rollout, + self.config.namespace + ) + assert rollout, f"failed to read rollout '{self.config.rollout}' in namespace '{self.config.namespace}'" + assert rollout.status, f"unable to verify envoy proxy. rollout '{self.config.rollout}' in namespace '{self.config.namespace}' has no status" + assert rollout.status.current_pod_hash, f"unable to verify envoy proxy. rollout '{self.config.rollout}' in namespace '{self.config.namespace}' has no currentPodHash" + return f"replicaset/{rollout.name}-{rollout.status.current_pod_hash}" + + def _get_generated_controller_config( + self, + config: servo.connectors.kubernetes.KubernetesConfiguration + ) -> servo.connectors.kubernetes.RolloutConfiguration: + return config.rollouts[0] + + def _get_controller_service_selector(self, controller: servo.connectors.kubernetes.Rollout) -> Dict[str, str]: + match_labels = dict(controller.obj.spec.selector.match_labels) + assert controller.status, f"unable to determine service selector. rollout '{self.config.rollout}' in namespace '{self.config.namespace}' has no status" + assert controller.status.current_pod_hash, f"unable to determine service selector. rollout '{self.config.rollout}' in namespace '{self.config.namespace}' has no currentPodHash" + match_labels['rollouts-pod-template-hash'] = controller.status.current_pod_hash + return match_labels + + @servo.checks.require("Rollout Selector and PodSpec has opsani_role label") + async def check_rollout_selector_labels(self) -> None: + if os.environ.get("POD_NAME") and os.environ.get("POD_NAMESPACE"): + return # Setting owner reference to servo should prevent tuning pod from being adopted by the rollout controller + + rollout = await servo.connectors.kubernetes.Rollout.read(self.config.rollout, self.config.namespace) + assert rollout, f"failed to read Rollout '{self.config.rollout}' in namespace '{self.config.namespace}'" + + spec_patch = {} + match_labels = rollout.obj.spec.selector.match_labels or dict() + opsani_role_selector = match_labels.get("opsani_role") + if opsani_role_selector is None or opsani_role_selector == "tuning": + opsani_role_selector = "mainline" + spec_patch["selector"] = {"matchLabels": {"opsani_role": opsani_role_selector}} + + labels = rollout.pod_template_spec.metadata.labels or dict() + opsani_role_label = labels.get("opsani_role") + if opsani_role_label is None or opsani_role_label == "tuning" or opsani_role_label != opsani_role_selector: + spec_patch["template"] = {"metadata": {"labels": {"opsani_role": opsani_role_selector }}} + + if spec_patch: # Check failed if spec needs patching + patch = {"spec": spec_patch} + patch_json = json.dumps(patch, indent=None) + # NOTE: custom resources don't support strategic merge type. json merge is acceptable because the patch json doesn't contain lists + command = f"kubectl --namespace {self.config.namespace} patch rollout {self.config.rollout} --type='merge' -p '{patch_json}'" + replicasets = [ f"rs/{rollout.name}-{rollout.status.current_pod_hash}" ] + if rollout.status.stable_RS and rollout.status.stable_RS != rollout.status.current_pod_hash: + replicasets.append(f"rs/{rollout.name}-{rollout.status.stable_RS}") + raise servo.checks.CheckError( + ( + f"Rollout '{self.config.rollout}' has missing/mismatched opsani_role selector and/or label." + " Label opsani_role with value != \"tuning\" is required to prevent the rollout controller from adopting and destroying the tuning pod" + ), + hint=( + f"NOTE: Running this patch will require that you manually scale down or delete the replicaset(s) ({', '.join(replicasets)})" + f" orphaned by the selector update. Patch selector and labels via: `{command}`" + ) + ) @servo.metadata( description="Optimize a single service via a tuning instance and an Envoy sidecar", @@ -786,7 +953,12 @@ async def check( matching: Optional[servo.CheckFilter], halt_on: Optional[servo.ErrorSeverity] = servo.ErrorSeverity.critical, ) -> List[servo.Check]: - return await OpsaniDevChecks.run( + if self.config.deployment: + checks_class = OpsaniDevChecks + elif self.config.rollout: + checks_class = OpsaniDevRolloutChecks + + return await checks_class.run( self.config, matching=matching, halt_on=halt_on ) diff --git a/tests/conftest.py b/tests/conftest.py index 536b68042..768503c7d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ import chevron import devtools import fastapi +import filelock import httpx import kubetest import pytest @@ -170,6 +171,11 @@ def parents(self) -> List['Environment']: 'minikube_profile: marks tests using minikube to run under the profile specified' 'in the first marker argument. Eg. pytest.mark.minikube_profile.with_args(MINIKUBE_PROFILE)' ) +ROLLOUT_MANIFEST_INI = ( + 'rollout_manifest: mark tests using argo rollouts to apply the manifest from' + 'the specified path in the first marker argument. Eg.' + '@pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout.yaml")' +) def pytest_configure(config) -> None: """Register custom markers for use in the test suite.""" @@ -178,6 +184,7 @@ def pytest_configure(config) -> None: config.addinivalue_line("markers", SYSTEM_INI) config.addinivalue_line("markers", EVENT_LOOP_POLICY_INI) config.addinivalue_line("markers", MINIKUBE_PROFILE_INI) + config.addinivalue_line("markers", ROLLOUT_MANIFEST_INI) # Add generic description for all environments for key, value in Environment.__members__.items(): @@ -679,3 +686,67 @@ def _pod_loader(deployment: str) -> kubetest.objects.Pod: return pod return _pod_loader + +# NOTE: kubetest does not support CRD or CR objects, rollout fixtures utilize kubectl for needed setup +# NOTE: session scope doesnt work under xdist, rollout CRD setup has been factored to be idempotent so running it +# multiple times does not cause issues (https://github.com/pytest-dev/pytest-xdist/issues/271) +@pytest.fixture +async def install_rollout_crds(tmp_path_factory: pytest.TempPathFactory, subprocess, kubeconfig): + # Use lock file to ensure CRD setup doesn't fail due to multiple xdist runners trying to do it at the same time + # https://github.com/pytest-dev/pytest-xdist#making-session-scoped-fixtures-execute-only-once + + # get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + lock_file = root_tmp_dir / "argo_rollout_crd_apply.lock" + with filelock.FileLock(str(lock_file)): + # NOTE: Rollout CRDs are picky about the installation namespace + ns_cmd = [ "kubectl", f"--kubeconfig={kubeconfig}", "get", "namespace", "argo-rollouts" ] + exit_code, _, stderr = await subprocess(" ".join(ns_cmd), print_output=True, timeout=None) + if exit_code != 0: + ns_cmd[2] = "create" + exit_code, _, stderr = await subprocess(" ".join(ns_cmd), print_output=True, timeout=None) + assert exit_code == 0, f"argo-rollouts namespace creation failed: {stderr}" + + rollout_crd_cmd = ["kubectl", f"--kubeconfig={kubeconfig}", "apply", "-n", "argo-rollouts", "-f", "https://raw.githubusercontent.com/argoproj/argo-rollouts/stable/manifests/install.yaml"] + exit_code, _, stderr = await subprocess(" ".join(rollout_crd_cmd), print_output=True, timeout=None) + assert exit_code == 0, f"argo-rollouts crd apply failed: {stderr}" + + wait_cmd = [ "kubectl", f"--kubeconfig={kubeconfig}", "wait", "--for=condition=available", "--timeout=60s", "-n", "argo-rollouts", "deployment", "argo-rollouts" ] + exit_code, _, stderr = await subprocess(" ".join(wait_cmd), print_output=True, timeout=None) + assert exit_code == 0, f"argo-rollouts wait for CRD controller available failed: {stderr}" + + # NOTE: under xdist, we're unable to guarantee all tests using the CRD have finished running prior to teardown + # yield # Tests run + + # rollout_crd_cmd[2] = "delete" + # exit_code, _, stderr = await subprocess(" ".join(rollout_crd_cmd), print_output=True, timeout=None) + # assert exit_code == 0, f"argo-rollouts crd delete failed: {stderr}" + + # ns_cmd[2] = "delete" + # exit_code, _, stderr = await subprocess(" ".join(ns_cmd), print_output=True, timeout=None) + # assert exit_code == 0, f"argo-rollouts namespace delete failed: {stderr}" + +@pytest.fixture +async def manage_rollout(request, kube, rootpath, kubeconfig, subprocess, install_rollout_crds): + """ + Apply the manifest of the target rollout being tested against + + The applied manifest is determined using the parametrized `rollout_manifest` marker + or else uses "tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml". + """ + marker = request.node.get_closest_marker("rollout_manifest") + if marker: + assert len(marker.args) == 1, f"rollout_manifest marker accepts a single argument but received: {repr(marker.args)}" + manifest = marker.args[0] + else: + manifest = "tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml" + + rollout_cmd = ["kubectl", f"--kubeconfig={kubeconfig}", "apply", "-n", kube.namespace, "-f", str(rootpath / manifest)] + exit_code, _, stderr = await subprocess(" ".join(rollout_cmd), print_output=True, timeout=None) + assert exit_code == 0, f"argo-rollouts CR manifest apply failed: {stderr}" + + wait_cmd = [ "kubectl", f"--kubeconfig={kubeconfig}", "wait", "--for=condition=available", "--timeout=60s", "-n", kube.namespace, "rollout", "fiber-http" ] + exit_code, _, stderr = await subprocess(" ".join(wait_cmd), print_output=True, timeout=None) + assert exit_code == 0, f"argo-rollouts CR manifest wait for available failed: {stderr}" + + # NOTE: it is assumed kubetest will handle the needed teardown of the rollout diff --git a/tests/connectors/kubernetes_test.py b/tests/connectors/kubernetes_test.py index e0658b3ec..212d08722 100644 --- a/tests/connectors/kubernetes_test.py +++ b/tests/connectors/kubernetes_test.py @@ -40,6 +40,8 @@ OptimizationStrategy, Pod, ResourceRequirement, + Rollout, + RolloutConfiguration, ) from servo.errors import AdjustmentFailedError, AdjustmentRejectedError import servo.runner @@ -1027,6 +1029,7 @@ async def test_adjust_memory(self, config): async def test_adjust_deployment_insufficient_resources(self, config: KubernetesConfiguration): config.timeout = "3s" + config.cascade_common_settings(overwrite=True) config.deployments[0].containers[0].memory.max = "256Gi" connector = KubernetesConnector(config=config) @@ -1057,6 +1060,7 @@ async def test_adjust_deployment_image_pull_backoff( ) -> None: servo.logging.set_level("TRACE") config.timeout = "10s" + config.cascade_common_settings(overwrite=True) connector = KubernetesConnector(config=config) adjustment = Adjustment( component_name="fiber-http/fiber-http", @@ -1108,6 +1112,7 @@ async def test_adjust_tuning_insufficient_mem( tuning_config: KubernetesConfiguration ) -> None: tuning_config.timeout = "10s" + tuning_config.cascade_common_settings(overwrite=True) tuning_config.deployments[0].containers[0].memory = Memory(min="128MiB", max="128GiB", step="32MiB") connector = KubernetesConnector(config=tuning_config) @@ -1136,6 +1141,7 @@ async def test_adjust_tuning_insufficient_cpu_and_mem( tuning_config: KubernetesConfiguration ) -> None: tuning_config.timeout = "10s" + tuning_config.cascade_common_settings(overwrite=True) tuning_config.deployments[0].containers[0].memory = Memory(min="128MiB", max="128GiB", step="32MiB") tuning_config.deployments[0].containers[0].cpu = CPU(min="125m", max="200", step="125m") connector = KubernetesConnector(config=tuning_config) @@ -1236,6 +1242,7 @@ async def test_adjust_tuning_cpu_with_settlement(self, tuning_config, namespace, async def test_adjust_handle_error_respects_nested_config(self, config: KubernetesConfiguration, kube: kubetest.client.TestClient): config.timeout = "3s" config.on_failure = FailureMode.destroy + config.cascade_common_settings(overwrite=True) config.deployments[0].on_failure = FailureMode.exception config.deployments[0].containers[0].memory.max = "256Gi" connector = KubernetesConnector(config=config) @@ -1418,7 +1425,8 @@ def kubetest_deployment_becomes_unready(self, kubetest_deployment: KubetestDeplo async def test_adjust_deployment_never_ready(self, config: KubernetesConfiguration, kubetest_deployment_never_ready: KubetestDeployment) -> None: - config.timeout = "3s" + config.timeout = "5s" + config.cascade_common_settings(overwrite=True) connector = KubernetesConnector(config=config) adjustment = Adjustment( @@ -1439,6 +1447,7 @@ async def test_adjust_deployment_never_ready(self, config: KubernetesConfigurati async def test_adjust_deployment_oom_killed(self, config: KubernetesConfiguration, kubetest_deployemnt_oom_killed: KubetestDeployment) -> None: config.timeout = "10s" + config.cascade_common_settings(overwrite=True) connector = KubernetesConnector(config=config) adjustment = Adjustment( @@ -1499,7 +1508,7 @@ async def test_adjust_tuning_never_ready( ) -> None: tuning_config.timeout = "30s" tuning_config.on_failure = FailureMode.destroy - tuning_config.deployments[0].on_failure = FailureMode.destroy + tuning_config.cascade_common_settings(overwrite=True) connector = KubernetesConnector(config=tuning_config) adjustment = Adjustment( @@ -1532,7 +1541,7 @@ async def test_adjust_tuning_oom_killed( ) -> None: tuning_config.timeout = "25s" tuning_config.on_failure = FailureMode.destroy - tuning_config.deployments[0].on_failure = FailureMode.destroy + tuning_config.cascade_common_settings(overwrite=True) connector = KubernetesConnector(config=tuning_config) adjustment = Adjustment( @@ -2013,11 +2022,11 @@ def namespace(self, kube: kubetest.client.TestClient) -> str: @pytest.mark.applymanifests("../manifests/sidecar_injection", files=["fiber-http_single_port.yaml"]) @pytest.mark.parametrize( - "service, port", + "port, service", [ - ('fiber-http', None), - ('fiber-http', 80), - ('fiber-http', 'http'), + (None, 'fiber-http'), + (80, 'fiber-http'), + ('http', 'fiber-http'), ], ) async def test_inject_single_port_deployment(self, namespace: str, service: str, port: Union[str, int]) -> None: @@ -2081,11 +2090,11 @@ async def test_inject_single_port_deployment(self, namespace: str, service: str, @pytest.mark.applymanifests("../manifests/sidecar_injection", files=["fiber-http_multiple_ports.yaml"]) @pytest.mark.parametrize( - "service, port, error", + "port, service, error", [ - ('fiber-http', None, ValueError("Target Service 'fiber-http' exposes multiple ports -- target port must be specified")), - ('fiber-http', 80, None), - ('fiber-http', 'http', None), + (None, 'fiber-http', ValueError("Target Service 'fiber-http' exposes multiple ports -- target port must be specified")), + (80, 'fiber-http', None), + ('http', 'fiber-http', None), ], ) async def test_inject_multiport_deployment(self, namespace: str, service: str, port: Union[str, int], error: Optional[Exception]) -> None: @@ -2153,14 +2162,15 @@ async def test_inject_multiport_deployment(self, namespace: str, service: str, p @pytest.mark.applymanifests("../manifests/sidecar_injection", files=["fiber-http_multiple_ports_symbolic_targets.yaml"]) @pytest.mark.parametrize( - "service, port", + "port, service", [ - ('fiber-http', None), - ('fiber-http', 80), - ('fiber-http', 'http'), + (None, 'fiber-http'), + (80, 'fiber-http'), + ('http', 'fiber-http'), ], ) - async def test_inject_by_source_port_name_with_symbolic_target_port(self, namespace: str, service: str, port: Union[str, int]) -> None: + async def test_inject_symbolic_target_port(self, namespace: str, service: str, port: Union[str, int]) -> None: + """test_inject_by_source_port_name_with_symbolic_target_port""" deployment = await servo.connectors.kubernetes.Deployment.read('fiber-http', namespace) assert len(deployment.containers) == 1, "expected a single container" service = await servo.connectors.kubernetes.Service.read('fiber-http', namespace) @@ -2256,3 +2266,145 @@ async def test_telemetry_hello(self, namespace: str, config: KubernetesConfigura assert request.called print(request.calls.last.request.content.decode()) assert expected in request.calls.last.request.content.decode() + + +## +# Tests against an ArgoCD rollout +@pytest.mark.integration +@pytest.mark.clusterrolebinding('cluster-admin') +@pytest.mark.usefixtures("kubernetes_asyncio_config", "manage_rollout") +@pytest.mark.rollout_manifest.with_args("tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml") +class TestKubernetesConnectorRolloutIntegration: + @pytest.fixture + def namespace(self, kube: kubetest.client.TestClient) -> str: + return kube.namespace + + @pytest.fixture() + def _rollout_tuning_config(self, tuning_config: KubernetesConfiguration) -> KubernetesConfiguration: + tuning_config.rollouts = [ RolloutConfiguration.parse_obj(d) for d in tuning_config.deployments ] + tuning_config.deployments = None + return tuning_config + + ## + # Canary Tests + async def test_create_rollout_tuning(self, _rollout_tuning_config: KubernetesConfiguration, namespace: str) -> None: + connector = KubernetesConnector(config=_rollout_tuning_config) + rol = await Rollout.read("fiber-http", namespace) + await connector.describe() + + # verify tuning pod is registered as service endpoint + service = await servo.connectors.kubernetes.Service.read("fiber-http", namespace) + endpoints = await service.get_endpoints() + tuning_name = f"{_rollout_tuning_config.rollouts[0].name}-tuning" + tuning_endpoint = next(filter( + lambda epa: epa.target_ref.name == tuning_name, + endpoints[0].subsets[0].addresses + ), None) + if tuning_endpoint is None: + raise AssertionError(f"Tuning pod {tuning_name} not contained in service endpoints: {endpoints}") + + async def test_adjust_rollout_tuning_cpu_with_settlement(self, _rollout_tuning_config, namespace): + connector = KubernetesConnector(config=_rollout_tuning_config) + adjustment = Adjustment( + component_name="fiber-http/fiber-http-tuning", + setting_name="cpu", + value=".250", + ) + control = servo.Control(settlement='1s') + description = await connector.adjust([adjustment], control) + assert description is not None + setting = description.get_setting('fiber-http/fiber-http-tuning.cpu') + assert setting + assert setting.value == 250 + + async def test_adjust_rollout_tuning_insufficient_resources(self, _rollout_tuning_config: KubernetesConfiguration, namespace) -> None: + _rollout_tuning_config.timeout = "10s" + _rollout_tuning_config.cascade_common_settings(overwrite=True) + _rollout_tuning_config.rollouts[0].containers[0].memory.max = "256Gi" + connector = KubernetesConnector(config=_rollout_tuning_config) + + adjustment = Adjustment( + component_name="fiber-http/fiber-http-tuning", + setting_name="mem", + value="128Gi", # impossible right? + ) + with pytest.raises(AdjustmentRejectedError) as rejection_info: + description = await connector.adjust([adjustment]) + + rej_msg = str(rejection_info.value) + assert "Insufficient memory." in rej_msg or "Pod Node didn't have enough resource: memory" in rej_msg + +@pytest.mark.integration +@pytest.mark.clusterrolebinding('cluster-admin') +@pytest.mark.usefixtures("kubernetes_asyncio_config", "manage_rollout") +class TestRolloutSidecarInjection: + @pytest.fixture + def namespace(self, kube: kubetest.client.TestClient) -> str: + return kube.namespace + + @pytest.mark.parametrize( + "port, service", + [ + (None, 'fiber-http'), + (80, 'fiber-http'), + ('http', 'fiber-http'), + ], + ) + @pytest.mark.rollout_manifest.with_args("tests/manifests/argo_rollouts/fiber-http_single_port.yaml") + async def test_inject_single_port_rollout(self, namespace: str, service: str, port: Union[str, int]) -> None: + rollout = await servo.connectors.kubernetes.Rollout.read('fiber-http', namespace) + assert len(rollout.containers) == 1, "expected a single container" + service = await servo.connectors.kubernetes.Service.read('fiber-http', namespace) + assert len(service.ports) == 1 + port_obj = service.ports[0] + + if isinstance(port, int): + assert port_obj.port == port + elif isinstance(port, str): + assert port_obj.name == port + assert port_obj.target_port == 8480 + + await rollout.inject_sidecar( + 'opsani-envoy', ENVOY_SIDECAR_IMAGE_TAG, service='fiber-http', port=port + ) + + # Examine new sidecar + await rollout.refresh() + assert len(rollout.containers) == 2, "expected an injected container" + sidecar_container = rollout.containers[1] + assert sidecar_container.name == 'opsani-envoy' + + # Check ports and env + assert sidecar_container.ports == [ + servo.connectors.kubernetes.RolloutV1ContainerPort( + container_port=9980, + host_ip=None, + host_port=None, + name='opsani-proxy', + protocol='TCP' + ), + servo.connectors.kubernetes.RolloutV1ContainerPort( + container_port=9901, + host_ip=None, + host_port=None, + name='opsani-metrics', + protocol='TCP' + ) + ] + assert sidecar_container.obj.env == [ + servo.connectors.kubernetes.RolloutV1EnvVar( + name='OPSANI_ENVOY_PROXY_SERVICE_PORT', + value='9980', + value_from=None + ), + servo.connectors.kubernetes.RolloutV1EnvVar( + name='OPSANI_ENVOY_PROXIED_CONTAINER_PORT', + value='8480', + value_from=None + ), + servo.connectors.kubernetes.RolloutV1EnvVar( + name='OPSANI_ENVOY_PROXY_METRICS_PORT', + value='9901', + value_from=None + ), + ] diff --git a/tests/connectors/opsani_dev/argo_rollouts/rollout.yaml b/tests/connectors/opsani_dev/argo_rollouts/rollout.yaml new file mode 100644 index 000000000..2a675fecc --- /dev/null +++ b/tests/connectors/opsani_dev/argo_rollouts/rollout.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + # This label selector is required to prevent the canary pod from being adopted by the rollout controller when servo running outside the k8s cluster + opsani_role: mainline + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + opsani_role: mainline + spec: + containers: + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 128Mi + limits: + cpu: 500m + memory: 1Gi + ports: + - containerPort: 8480 + protocol: TCP diff --git a/tests/connectors/opsani_dev/argo_rollouts/rollout_no_cpu_limit.yaml b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_cpu_limit.yaml new file mode 100644 index 000000000..3a1eae514 --- /dev/null +++ b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_cpu_limit.yaml @@ -0,0 +1,39 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + # This label selector is required to prevent the canary pod from being adopted by the rollout controller when servo running outside the k8s cluster + opsani_role: mainline + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + opsani_role: mainline + spec: + containers: + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 128Mi + limits: + memory: 1Gi + ports: + - containerPort: 8480 + protocol: TCP diff --git a/tests/connectors/opsani_dev/argo_rollouts/rollout_no_mem.yaml b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_mem.yaml new file mode 100644 index 000000000..1dd0fb318 --- /dev/null +++ b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_mem.yaml @@ -0,0 +1,38 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + # This label selector is required to prevent the canary pod from being adopted by the rollout controller when servo running outside the k8s cluster + opsani_role: mainline + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + opsani_role: mainline + spec: + containers: + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + limits: + cpu: 500m + ports: + - containerPort: 8480 + protocol: TCP diff --git a/tests/connectors/opsani_dev/argo_rollouts/rollout_no_selector.yaml b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_selector.yaml new file mode 100644 index 000000000..640778a39 --- /dev/null +++ b/tests/connectors/opsani_dev/argo_rollouts/rollout_no_selector.yaml @@ -0,0 +1,37 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 2 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + spec: + containers: + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 128Mi + limits: + cpu: 500m + memory: 1Gi + ports: + - containerPort: 8480 + protocol: TCP diff --git a/tests/connectors/opsani_dev_test.py b/tests/connectors/opsani_dev_test.py index d62005e43..69c5c40e4 100644 --- a/tests/connectors/opsani_dev_test.py +++ b/tests/connectors/opsani_dev_test.py @@ -46,15 +46,31 @@ def config(kube) -> servo.connectors.opsani_dev.OpsaniDevConfiguration: __optimizer__=servo.configuration.Optimizer(id="test.com/foo", token="12345") ) +@pytest.fixture +def rollout_config(kube) -> servo.connectors.opsani_dev.OpsaniDevConfiguration: + return servo.connectors.opsani_dev.OpsaniDevConfiguration( + namespace=kube.namespace, + rollout="fiber-http", + container="fiber-http", + service="fiber-http", + cpu=servo.connectors.kubernetes.CPU(min="125m", max="4000m", step="125m"), + memory=servo.connectors.kubernetes.Memory(min="128 MiB", max="4.0 GiB", step="128 MiB"), + __optimizer__=servo.configuration.Optimizer(id="test.com/foo", token="12345") + ) + @pytest.fixture def checks(config: servo.connectors.opsani_dev.OpsaniDevConfiguration) -> servo.connectors.opsani_dev.OpsaniDevChecks: return servo.connectors.opsani_dev.OpsaniDevChecks(config=config) +@pytest.fixture +def rollout_checks(rollout_config: servo.connectors.opsani_dev.OpsaniDevConfiguration) -> servo.connectors.opsani_dev.OpsaniDevRolloutChecks: + return servo.connectors.opsani_dev.OpsaniDevRolloutChecks(config=rollout_config) + class TestConfig: def test_generate(self) -> None: config = servo.connectors.opsani_dev.OpsaniDevConfiguration.generate() - assert list(config.dict().keys()) == ['description', 'namespace', 'deployment', 'container', 'service', 'port', 'cpu', 'memory', 'prometheus_base_url', 'timeout', 'settlement'] + assert list(config.dict().keys()) == ['description', 'namespace', 'deployment', 'rollout', 'container', 'service', 'port', 'cpu', 'memory', 'prometheus_base_url', 'timeout', 'settlement'] def test_generate_yaml(self) -> None: config = servo.connectors.opsani_dev.OpsaniDevConfiguration.generate() @@ -75,6 +91,19 @@ def test_assign_optimizer(self) -> None: config = servo.connectors.opsani_dev.OpsaniDevConfiguration.generate() config.__optimizer__ = None + def test_generate_rollout_config(self) -> None: + rollout_config = servo.connectors.opsani_dev.OpsaniDevConfiguration( + namespace="test", + rollout="fiber-http", + container="fiber-http", + service="fiber-http", + cpu=servo.connectors.kubernetes.CPU(min="125m", max="4000m", step="125m"), + memory=servo.connectors.kubernetes.Memory(min="128 MiB", max="4.0 GiB", step="128 MiB"), + __optimizer__=servo.configuration.Optimizer(id="test.com/foo", token="12345") + ) + k_config = rollout_config.generate_kubernetes_config() + assert k_config.rollouts[0].namespace == "test" # validate config cascade + @pytest.mark.applymanifests( "opsani_dev", @@ -112,7 +141,7 @@ async def load_manifests( @pytest.mark.parametrize( - "resource", ["namespace", "deployment", "container", "service"] + "resource", ["namespace", "controller", "container", "service"] ) async def test_resource_exists( self, resource: str, checks: servo.connectors.opsani_dev.OpsaniDevChecks @@ -143,7 +172,7 @@ async def test_target_container_resources_outside_of_limits( async def test_service_routes_traffic_to_deployment( self, kube, checks: servo.connectors.opsani_dev.OpsaniDevChecks ) -> None: - result = await checks.run_one(id=f"check_service_routes_traffic_to_deployment") + result = await checks.run_one(id=f"check_service_routes_traffic_to_controller") assert result.success, f"Failed with message: {result.message}" async def test_prometheus_configmap_exists( @@ -273,6 +302,133 @@ async def test_check_resource_requirements_config_defaults( result = await checks.run_one(id=f"check_resource_requirements") assert result.success, f"Expected success but got: {result}" +@pytest.mark.applymanifests( + "opsani_dev", + files=[ + "service.yaml", + "prometheus.yaml", + ], +) +@pytest.mark.integration +@pytest.mark.clusterrolebinding('cluster-admin') +@pytest.mark.usefixtures("kubeconfig", "kubernetes_asyncio_config") +@pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout.yaml") +class TestRolloutIntegration: + class TestChecksOriginalState: + @pytest.fixture(autouse=True) + async def load_manifests( + self, kube, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks, kubeconfig, + manage_rollout # NOTE: rollout must be specified as a dependency, otherwise kube.wait_for_registered runs + # indefinitely waiting for the service to have endpoints + ) -> None: + kube.wait_for_registered() + rollout_checks.config.namespace = kube.namespace + + # Fake out the servo metadata in the environment + # These env vars are set by our manifests + pods = kube.get_pods(labels={ "app.kubernetes.io/name": "servo"}) + assert pods, "servo is not deployed" + try: + os.environ['POD_NAME'] = list(pods.keys())[0] + os.environ["POD_NAMESPACE"] = kube.namespace + + yield + + finally: + os.environ.pop('POD_NAME', None) + os.environ.pop('POD_NAMESPACE', None) + + + @pytest.mark.parametrize( + "resource", ["namespace", "controller", "container", "service"] + ) + async def test_rollout_resource_exists( + self, resource: str, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ) -> None: + result = await rollout_checks.run_one(id=f"check_kubernetes_{resource}") + assert result.success + + async def test_rollout_target_container_resources_within_limits( + self, kube, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks, rollout_config: servo.connectors.opsani_dev.OpsaniDevConfiguration + ) -> None: + rollout_config.cpu.min = "125m" + rollout_config.cpu.max = "2000m" + rollout_config.memory.min = "128MiB" + rollout_config.memory.max = "4GiB" + result = await rollout_checks.run_one(id=f"check_target_container_resources_within_limits") + assert result.success, f"Expected success but got: {result}" + + async def test_rollout_target_container_resources_outside_of_limits( + self, kube, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks, rollout_config: servo.connectors.opsani_dev.OpsaniDevConfiguration + ) -> None: + rollout_config.cpu.min = "4000m" + rollout_config.cpu.max = "5000m" + rollout_config.memory.min = "2GiB" + rollout_config.memory.min = "4GiB" + result = await rollout_checks.run_one(id=f"check_target_container_resources_within_limits") + assert result.exception + + async def test_service_routes_traffic_to_rollout( + self, kube, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ) -> None: + result = await rollout_checks.run_one(id=f"check_service_routes_traffic_to_controller") + assert result.success, f"Failed with message: {result.message}" + + async def test_rollout_check_resource_requirements( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ) -> None: + result = await rollout_checks.run_one(id=f"check_resource_requirements") + assert result.success, f"Expected success but got: {result}" + + @pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout_no_mem.yaml") + async def test_rollout_check_mem_resource_requirements_fails( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ): + result = await rollout_checks.run_one(id=f"check_resource_requirements") + assert result.exception, f"Expected exception but got: {result}" + + @pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout_no_cpu_limit.yaml") + async def test_rollout_check_cpu_limit_resource_requirements_fails( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ): + rollout_checks.config.cpu.get = [ servo.connectors.kubernetes.ResourceRequirement.limit ] + result = await rollout_checks.run_one(id=f"check_resource_requirements") + assert result.exception, f"Expected exception but got: {result}" + + @pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout_no_selector.yaml") + async def test_check_rollout_selector_labels_fails( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ): + # simulate servo running outside of cluster + os.environ.pop('POD_NAME', None) + os.environ.pop('POD_NAMESPACE', None) + + result = await rollout_checks.run_one(id=f"check_rollout_selector_labels", skip_requirements=True) + assert result.exception, f"Expected exception but got: {result}" + + @pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout.yaml") + async def test_check_rollout_mainline_selector_labels_pass( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ): + # simulate servo running outside of cluster + os.environ.pop('POD_NAME', None) + os.environ.pop('POD_NAMESPACE', None) + + result = await rollout_checks.run_one(id=f"check_rollout_selector_labels", skip_requirements=True) + assert result.success, f"Expected success but got: {result}" + + @pytest.mark.rollout_manifest.with_args("tests/connectors/opsani_dev/argo_rollouts/rollout_no_selector.yaml") + async def test_check_rollout_servo_in_cluster_selector_labels_pass( + self, rollout_checks: servo.connectors.opsani_dev.OpsaniDevRolloutChecks + ): + # servo running in cluster, owner reference will be set on tuning pod + result = await rollout_checks.run_one(id=f"check_rollout_selector_labels", skip_requirements=True) + assert result.success, f"Expected success but got: {result}" + + # NOTE: Prometheus checks are redundant in this case, covered by standard integration tests + + # TODO: port TestInstall class to rollouts by refactoring deployment specific helper code + @pytest.mark.applymanifests( "opsani_dev", files=[ @@ -416,9 +572,9 @@ async def test_process( servo.logger.critical("Step 1 - Annotate the Deployment PodSpec") async with assert_check_raises_in_context( servo.checks.CheckError, - match="deployment 'fiber-http' is missing annotations" + match="Deployment 'fiber-http' is missing annotations" ) as assertion: - assertion.set(checks.run_one(id=f"check_deployment_annotations")) + assertion.set(checks.run_one(id=f"check_controller_annotations")) # Add a subset of the required annotations to catch partial setup cases async with change_to_resource(deployment): @@ -430,9 +586,9 @@ async def test_process( } ) await assert_check_raises( - checks.run_one(id=f"check_deployment_annotations"), + checks.run_one(id=f"check_controller_annotations"), servo.checks.CheckError, - re.escape("deployment 'fiber-http' is missing annotations: prometheus.opsani.com/scheme, prometheus.opsani.com/scrape") + re.escape("Deployment 'fiber-http' is missing annotations: prometheus.opsani.com/scheme, prometheus.opsani.com/scrape") ) # Fill in the missing annotations @@ -443,14 +599,14 @@ async def test_process( "prometheus.opsani.com/scheme": "http", } ) - await assert_check(checks.run_one(id=f"check_deployment_annotations")) + await assert_check(checks.run_one(id=f"check_controller_annotations")) # Step 2: Verify the labels are set on the Deployment pod spec servo.logger.critical("Step 2 - Label the Deployment PodSpec") await assert_check_raises( - checks.run_one(id=f"check_deployment_labels"), + checks.run_one(id=f"check_controller_labels"), servo.checks.CheckError, - re.escape("deployment 'fiber-http' is missing labels: servo.opsani.com/optimizer=test.com_foo, sidecar.opsani.com/type=envoy") + re.escape("Deployment 'fiber-http' is missing labels: servo.opsani.com/optimizer=test.com_foo, sidecar.opsani.com/type=envoy") ) async with change_to_resource(deployment): @@ -460,14 +616,14 @@ async def test_process( "servo.opsani.com/optimizer": servo.connectors.kubernetes.dns_labelize(checks.config.optimizer.id), } ) - await assert_check(checks.run_one(id=f"check_deployment_labels")) + await assert_check(checks.run_one(id=f"check_controller_labels")) # Step 3 servo.logger.critical("Step 3 - Inject Envoy sidecar container") await assert_check_raises( - checks.run_one(id=f"check_deployment_envoy_sidecars"), + checks.run_one(id=f"check_controller_envoy_sidecars"), servo.checks.CheckError, - re.escape("deployment 'fiber-http' pod template spec does not include envoy sidecar container ('opsani-envoy')") + re.escape("Deployment 'fiber-http' pod template spec does not include envoy sidecar container ('opsani-envoy')") ) # servo.logging.set_level("DEBUG") @@ -475,7 +631,7 @@ async def test_process( servo.logger.info(f"injecting Envoy sidecar to Deployment {deployment.name} PodSpec") await deployment.inject_sidecar('opsani-envoy', 'opsani/envoy-proxy:latest', service="fiber-http") - await wait_for_check_to_pass(functools.partial(checks.run_one, id=f"check_deployment_envoy_sidecars")) + await wait_for_check_to_pass(functools.partial(checks.run_one, id=f"check_controller_envoy_sidecars")) await wait_for_check_to_pass(functools.partial(checks.run_one, id=f"check_pod_envoy_sidecars")) # Step 4 @@ -547,7 +703,7 @@ async def wait_for_targets_to_be_scraped() -> List[servo.connectors.prometheus.A # TODO: why is the tuning pod being created here when the check will recreate it anyway? kubernetes_config = checks.config.generate_kubernetes_config() canary_opt = await servo.connectors.kubernetes.CanaryOptimization.create( - deployment_config=kubernetes_config.deployments[0], timeout=kubernetes_config.timeout + deployment_or_rollout_config=kubernetes_config.deployments[0], timeout=kubernetes_config.timeout ) await canary_opt.create_tuning_pod() await assert_check(checks.run_one(id=f"check_tuning_is_running")) @@ -812,9 +968,11 @@ async def change_to_resource(resource: servo.connectors.kubernetes.KubernetesMod await resource.refresh() # wait for the change to roll out - if isinstance(resource, servo.connectors.kubernetes.Deployment): - await resource.wait_until_ready() - elif isinstance(resource, servo.connectors.kubernetes.Pod): + if isinstance(resource, ( + servo.connectors.kubernetes.Deployment, + servo.connectors.kubernetes.Rollout, + servo.connectors.kubernetes.Pod, + )): await resource.wait_until_ready() elif isinstance(resource, servo.connectors.kubernetes.Service): pass @@ -954,7 +1112,7 @@ async def _loop_check() -> servo.Check: async def _remedy_check(id: str, *, config, deployment, kube_port_forward, load_generator, checks) -> None: envoy_proxy_port = servo.connectors.opsani_dev.ENVOY_SIDECAR_DEFAULT_PORT servo.logger.warning(f"Remedying failing check '{id}'...") - if id == 'check_deployment_annotations': + if id == 'check_controller_annotations': ## Step 1 servo.logger.critical("Step 1 - Annotate the Deployment PodSpec") async with change_to_resource(deployment): @@ -968,7 +1126,7 @@ async def _remedy_check(id: str, *, config, deployment, kube_port_forward, load_ } ) - elif id == 'check_deployment_labels': + elif id == 'check_controller_labels': # Step 2: Verify the labels are set on the Deployment pod spec servo.logger.critical("Step 2 - Label the Deployment PodSpec") async with change_to_resource(deployment): @@ -979,7 +1137,7 @@ async def _remedy_check(id: str, *, config, deployment, kube_port_forward, load_ } ) - elif id == 'check_deployment_envoy_sidecars': + elif id == 'check_controller_envoy_sidecars': # Step 3 servo.logger.critical("Step 3 - Inject Envoy sidecar container") async with change_to_resource(deployment): @@ -1018,8 +1176,11 @@ async def _remedy_check(id: str, *, config, deployment, kube_port_forward, load_ elif id == 'check_tuning_is_running': servo.logger.critical("Step 7 - Bring tuning Pod online") - async with change_to_resource(deployment): - await deployment.create_or_recreate_tuning_pod() + kubernetes_config = config.generate_kubernetes_config() + canary_opt = await servo.connectors.kubernetes.CanaryOptimization.create( + deployment_or_rollout_config=kubernetes_config.deployments[0], timeout=kubernetes_config.timeout + ) + await canary_opt.create_tuning_pod() elif id == 'check_traffic_metrics': # Step 8 diff --git a/tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml b/tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml new file mode 100644 index 000000000..8937a47f9 --- /dev/null +++ b/tests/manifests/argo_rollouts/fiber-http-opsani-dev.yaml @@ -0,0 +1,157 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 1 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + # This label selector is required to prevent the canary pod from being adopted by the rollout controller when servo running outside the k8s cluster + opsani_role: mainline + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + opsani_role: mainline + # Attach a label for identifying Pods that have been augmented with + # an Opsani Envoy sidecar. + sidecar.opsani.com/type: "envoy" + annotations: + # These annotations are scraped by the Prometheus sidecar + # running alongside the servo Pod. The port must match the + # `METRICS_PORT` defined in the Envoy container definition + # below. The metrics are provided by the Envoy administration + # module. It should not be necessary to change the path or port + # unless the proxied service happens to have a namespace collision. + # Any divergence from the defaults will require corresponding + # changes to the container ports, service definition, and/or the + # Envoy proxy configuration file. + prometheus.opsani.com/scheme: http + prometheus.opsani.com/path: /stats/prometheus + prometheus.opsani.com/port: "9901" + prometheus.opsani.com/scrape: "true" + spec: + # Prefer deployment onto a Node labeled node.opsani.com=app + # This ensures physical isolation and network transport if possible + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: node.opsani.com/role + operator: In + values: + - app + containers: + # Primary container providing the fiber-http web service + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 128Mi + limits: + cpu: 125m + memory: 128Mi + ports: + # The ingress port that Envoy will reverse proxy requests + # to for handling. Before Envoy sidecar injection this port + # would typically be the `targetPort` of the Service defined + # below. + - containerPort: 8480 + protocol: TCP + + # Opsani Envoy Sidecar + # Provides metrics for consumption by the Opsani Servo + - name: opsani-envoy + image: opsani/envoy-proxy:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 128Mi + limits: + cpu: 250m + memory: 256Mi + env: + # The container port of Pods in the target Deployment responsible for + # handling requests. This port is equal to the original port value of + # the Kubernetes Service prior to injection of the Envoy sidecar. This + # port is the destination for inbound traffic that Envoy will proxy from + # the `OPSANI_ENVOY_PROXY_SERVICE_PORT` value configured above. + - name: OPSANI_ENVOY_PROXIED_CONTAINER_PORT + value: "8480" + + # Uncomment if the upstream is serving TLS traffic + # - name: OPSANI_ENVOY_PROXIED_CONTAINER_TLS_ENABLED + # value: "true" + + # The ingress port accepting traffic from the Kubernetes Service destined + # for Pods that are part of the target Deployment (Default: 9980). + # The Envoy proxy listens on this port and reverse proxies traffic back + # to `OPSANI_ENVOY_PROXIED_CONTAINER_PORT` for handling. This port must + # be equal to the newly assigned port in the updated Kubernetes Service + # and must be configured in the `ports` section below. + - name: OPSANI_ENVOY_PROXY_SERVICE_PORT + value: "9980" + + # The port that exposes the metrics produced by Envoy while it proxies + # traffic (Default: 9901). The corresponding entry in the `ports` stanza + # below must match the value configured here. + - name: OPSANI_ENVOY_PROXY_METRICS_PORT + value: "9901" + + ports: + # Traffic ingress from the Service endpoint. Must match the + # `OPSANI_ENVOY_PROXY_SERVICE_PORT` env above and the `targetPort` of + # the Service routing traffic into the Pod. + - containerPort: 9980 + name: service + protocol: TCP + + # Metrics port exposed by the Envoy proxy that will be scraped by the + # Prometheus sidecar running alongside the Servo. Must match the + # `OPSANI_ENVOY_PROXY_METRICS_PORT` env and `prometheus.opsani.com/port` + # annotation entries above. + - containerPort: 9901 + name: metrics + protocol: TCP + +--- + +apiVersion: v1 +kind: Service + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + annotations: + service.beta.kubernetes.io/aws-load-balancer-internal: "true" + +spec: + type: ClusterIP + sessionAffinity: None + selector: + app.kubernetes.io/name: fiber-http + ports: + # Send ingress traffic from the service to Envoy listening on port 9980. + # Envoy will reverse proxy back to localhost:8480 for the real service + # to handle the request. Must match `OPSANI_ENVOY_PROXY_SERVICE_PORT` above + # and be exposed as a `containerPort`. + - name: http + protocol: TCP + port: 80 + targetPort: 9980 diff --git a/tests/manifests/argo_rollouts/fiber-http_single_port.yaml b/tests/manifests/argo_rollouts/fiber-http_single_port.yaml new file mode 100644 index 000000000..e81553998 --- /dev/null +++ b/tests/manifests/argo_rollouts/fiber-http_single_port.yaml @@ -0,0 +1,76 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: Rollout + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + +spec: + replicas: 1 + revisionHistoryLimit: 2 + selector: + matchLabels: + app.kubernetes.io/name: fiber-http + # This label selector is required to prevent the canary pod from being adopted by the rollout controller when servo running outside the k8s cluster + opsani_role: mainline + strategy: + blueGreen: + activeService: fiber-http + template: + metadata: + labels: + app.kubernetes.io/name: fiber-http + opsani_role: mainline + spec: + # Prefer deployment onto a Node labeled node.opsani.com=app + # This ensures physical isolation and network transport if possible + affinity: + nodeAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: node.opsani.com/role + operator: In + values: + - app + containers: + # Primary container providing the fiber-http web service + - name: fiber-http + image: opsani/fiber-http:latest + imagePullPolicy: IfNotPresent + resources: + requests: + cpu: 125m + memory: 64Mi + limits: + cpu: 250m + memory: 128Mi + ports: + - containerPort: 8480 + protocol: TCP + +--- + +apiVersion: v1 +kind: Service + +metadata: + name: fiber-http + labels: + app.kubernetes.io/name: fiber-http + annotations: + service.beta.kubernetes.io/aws-load-balancer-internal: "true" + +spec: + type: ClusterIP + sessionAffinity: None + selector: + app.kubernetes.io/name: fiber-http + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 8480