Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable GCP Filestore on cluster #215

Open
wants to merge 11 commits into
base: development
Choose a base branch
from
30 changes: 27 additions & 3 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
update_cluster_with_clouddns_if_necessary,
update_cluster_with_gcsfuse_driver_if_necessary,
update_cluster_with_workload_identity_if_necessary,
update_cluster_with_gcpfilestore_driver_if_necessary,
zone_to_region,
)
from ..core.kueue import (
Expand Down Expand Up @@ -89,7 +90,11 @@ def cluster_create(args) -> None:
xpk_exit(create_cluster_command_code)

# Enable WorkloadIdentity if not enabled already.
if args.enable_workload_identity or args.enable_gcsfuse_csi_driver:
if (
args.enable_workload_identity
or args.enable_gcsfuse_csi_driver
or args.enable_gcpfilestore_csi_driver
):
update_cluster_command_code = (
update_cluster_with_workload_identity_if_necessary(args)
)
Expand All @@ -104,6 +109,13 @@ def cluster_create(args) -> None:
if update_cluster_command_code != 0:
xpk_exit(update_cluster_command_code)

if args.enable_gcpfilestore_csi_driver:
update_cluster_command_code = (
update_cluster_with_gcpfilestore_driver_if_necessary(args)
)
if update_cluster_command_code != 0:
xpk_exit(update_cluster_command_code)

# Update Pathways clusters with CloudDNS if not enabled already.
if args.enable_pathways:
update_cluster_command_code = update_cluster_with_clouddns_if_necessary(
Expand Down Expand Up @@ -486,11 +498,23 @@ def run_gke_cluster_create_command(
f' --cluster-dns-domain={args.cluster}-domain'
)

if args.enable_workload_identity or args.enable_gcsfuse_csi_driver:
if (
args.enable_workload_identity
or args.enable_gcsfuse_csi_driver
or args.enable_gcpfilestore_csi_driver
):
command += f' --workload-pool={args.project}.svc.id.goog'

addons = []
if args.enable_gcsfuse_csi_driver:
command += ' --addons GcsFuseCsiDriver'
addons.append('GcsFuseCsiDriver')

if args.enable_gcpfilestore_csi_driver:
addons.append('GcpFilestoreCsiDriver')

if len(addons) > 0:
addons_str = ','.join(addons)
command += f' --addons={addons_str}'

return_code = run_command_with_updates(command, 'GKE Cluster Create', args)
if return_code != 0:
Expand Down
10 changes: 10 additions & 0 deletions src/xpk/commands/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
setup_k8s_env,
update_cluster_with_gcsfuse_driver_if_necessary,
update_cluster_with_workload_identity_if_necessary,
update_cluster_with_gcpfilestore_driver_if_necessary,
)
from ..core.storage import (
GCS_FUSE_TYPE,
GCP_FILESTORE_TYPE,
STORAGE_CRD_KIND,
XPK_API_GROUP_NAME,
XPK_API_GROUP_VERSION,
Expand All @@ -49,6 +51,14 @@ def storage_create(args: Namespace) -> None:
xpk_exit(return_code)
apply_kubectl_manifest(k8s_api_client, args.manifest)

if args.type == GCP_FILESTORE_TYPE:
return_code = update_cluster_with_workload_identity_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)
return_code = update_cluster_with_gcpfilestore_driver_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)


def storage_list(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)
Expand Down
75 changes: 64 additions & 11 deletions src/xpk/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ def update_gke_cluster_with_workload_identity_enabled(args) -> int:
return 0


def update_gke_cluster_with_gcsfuse_driver_enabled(args) -> int:
"""Run the GKE cluster update command for existing cluster and enable GCSFuse CSI driver.
def update_gke_cluster_with_addon(addon: str, args) -> int:
"""Run the GKE cluster update command for existing cluster and enabling passed addon.
Args:
args: user provided arguments for running the command.
Returns:
Expand All @@ -447,14 +447,12 @@ def update_gke_cluster_with_gcsfuse_driver_enabled(args) -> int:
'gcloud container clusters update'
f' {args.cluster} --project={args.project}'
f' --region={zone_to_region(args.zone)}'
' --update-addons GcsFuseCsiDriver=ENABLED'
f' --update-addons {addon}=ENABLED'
' --quiet'
)
xpk_print(
'Updating GKE cluster to enable GCSFuse CSI driver, may take a while!'
)
xpk_print(f'Updating GKE cluster to enable {addon}, may take a while!')
return_code = run_command_with_updates(
command, 'GKE Cluster Update to enable GCSFuse CSI driver', args
command, f'GKE Cluster Update to enable {addon}', args
)
if return_code != 0:
xpk_print(f'GKE Cluster Update request returned ERROR {return_code}')
Expand Down Expand Up @@ -1200,6 +1198,33 @@ def is_gcsfuse_driver_enabled_on_cluster(args) -> bool:
return False


def is_gcpfilestore_driver_enabled_on_cluster(args) -> bool:
"""Checks if GCPFilestore CSI driver is enabled on the cluster.
Args:
args: user provided arguments for running the command.
Returns:
True if GCPFilestore CSI driver is enabled on the cluster and False otherwise.
"""
command = (
f'gcloud container clusters describe {args.cluster}'
f' --project={args.project} --region={zone_to_region(args.zone)}'
' --format="value(addonsConfig.gcpFilestoreCsiDriverConfig.enabled)"'
)
return_code, gcpfilestore_driver_enabled = run_command_for_value(
command,
'Checks if GCPFilestore CSI driver is enabled in cluster describe.',
args,
)
if return_code != 0:
xpk_exit(return_code)
if gcpfilestore_driver_enabled.lower() == 'true':
xpk_print(
'GCPFilestore CSI driver is enabled on the cluster, no update needed.'
)
return True
return False


def update_cluster_with_clouddns_if_necessary(args) -> int:
"""Updates a GKE cluster to use CloudDNS, if not enabled already.

Expand Down Expand Up @@ -1275,8 +1300,8 @@ def update_cluster_with_gcsfuse_driver_if_necessary(args) -> int:

if is_gcsfuse_driver_enabled_on_cluster(args):
return 0
cluster_update_return_code = update_gke_cluster_with_gcsfuse_driver_enabled(
args
cluster_update_return_code = update_gke_cluster_with_addon(
'GcsFuseCsiDriver', args
)
if cluster_update_return_code > 0:
xpk_print('Updating GKE cluster to enable GCSFuse CSI driver failed!')
Expand All @@ -1285,6 +1310,26 @@ def update_cluster_with_gcsfuse_driver_if_necessary(args) -> int:
return 0


def update_cluster_with_gcpfilestore_driver_if_necessary(args) -> int:
"""Updates a GKE cluster to enable GCPFilestore CSI driver, if not enabled already.
Args:
args: user provided arguments for running the command.
Returns:
0 if successful and error code otherwise.
"""

if is_gcpfilestore_driver_enabled_on_cluster(args):
return 0
cluster_update_return_code = update_gke_cluster_with_addon(
'GcpFilestoreCsiDriver', args
)
if cluster_update_return_code > 0:
xpk_print('Updating GKE cluster to enable GCPFilestore CSI driver failed!')
return cluster_update_return_code

return 0


def get_nodepool_zone(args, nodepool_name) -> tuple[int, str]:
"""Return zone in which nodepool exists in the cluster.

Expand Down Expand Up @@ -1603,7 +1648,11 @@ def run_gke_node_pool_create_command(
node_pools_to_remain.append(node_pool_name)

# Workload Identity for existing nodepools
if args.enable_workload_identity or args.enable_gcsfuse_csi_driver:
if (
args.enable_workload_identity
or args.enable_gcsfuse_csi_driver
or args.enable_gcpfilestore_csi_driver
):
for node_pool_name in existing_node_pool_names:
if not node_pool_name in node_pools_to_delete:
# Check if workload identity is not already enabled:
Expand Down Expand Up @@ -1774,7 +1823,11 @@ def run_gke_node_pool_create_command(
command += f' --num-nodes={system.vms_per_slice}'
command += ' --scopes=storage-full,gke-default'

if args.enable_workload_identity or args.enable_gcsfuse_csi_driver:
if (
args.enable_workload_identity
or args.enable_gcsfuse_csi_driver
or args.enable_gcpfilestore_csi_driver
):
command += ' --workload-metadata=GKE_METADATA'

task = f'NodepoolCreate-{node_pool_name}'
Expand Down
1 change: 1 addition & 0 deletions src/xpk/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
XPK_API_GROUP_NAME = "xpk.x-k8s.io"
XPK_API_GROUP_VERSION = "v1"
GCS_FUSE_TYPE = "gcsfuse"
GCP_FILESTORE_TYPE = "gcpfilestore"


@dataclass
Expand Down
9 changes: 9 additions & 0 deletions src/xpk/parser/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,15 @@ def add_shared_cluster_create_optional_arguments(args_parsers):
),
)

custom_parser.add_argument(
'--enable-gcpfilestore-csi-driver',
action='store_true',
help=(
'Enable GCPFilestore driver on the cluster. This enables Workload'
' Identity Federation.'
),
)


def add_shared_cluster_create_tensorboard_arguments(args_parsers):
"""Add shared tensorboard arguments in cluster create and Pathways cluster create.
Expand Down
7 changes: 5 additions & 2 deletions src/xpk/parser/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ def add_storage_create_parser(
req_args.add_argument(
'--type',
type=str,
help='The type of storage. Currently supported types: ["gcsfuse"]',
choices=['gcsfuse'],
help=(
'The type of storage. Currently supported types: ["gcsfuse",'
' "gcpfilestore"]'
),
choices=['gcsfuse', 'gcpfilestore'],
required=True,
)
req_args.add_argument(
Expand Down
Loading