diff --git a/charts/k3k/crds/k3k.io_clusters.yaml b/charts/k3k/crds/k3k.io_clusters.yaml index 0071d17..6019da8 100644 --- a/charts/k3k/crds/k3k.io_clusters.yaml +++ b/charts/k3k/crds/k3k.io_clusters.yaml @@ -77,6 +77,31 @@ spec: x-kubernetes-validations: - message: clusterDNS is immutable rule: self == oldSelf + clusterLimit: + description: Limit is the limits that apply for the server/worker + nodes. + properties: + serverLimit: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ServerLimit is the limits (cpu/mem) that apply to + the server nodes + type: object + workerLimit: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: WorkerLimit is the limits (cpu/mem) that apply to + the agent nodes + type: object + type: object expose: description: |- Expose contains options for exposing the apiserver inside/outside of the cluster. By default, this is only exposed as a @@ -111,6 +136,12 @@ spec: - loadbalancer - nodePort type: object + nodeSelector: + additionalProperties: + type: string + description: NodeSelector is the node selector that will be applied + to all server/agent pods + type: object persistence: description: |- Persistence contains options controlling how the etcd data of the virtual cluster is persisted. By default, no data diff --git a/charts/k3k/crds/k3k.io_clustersets.yaml b/charts/k3k/crds/k3k.io_clustersets.yaml new file mode 100644 index 0000000..a3b8c55 --- /dev/null +++ b/charts/k3k/crds/k3k.io_clustersets.yaml @@ -0,0 +1,176 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: clustersets.k3k.io +spec: + group: k3k.io + names: + kind: ClusterSet + listKind: ClusterSetList + plural: clustersets + singular: clusterset + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: Spec is the spec of the ClusterSet + properties: + defaultLimits: + description: DefaultLimits are the limits used for servers/agents + when a cluster in the set doesn't provide any + properties: + serverLimit: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: ServerLimit is the limits (cpu/mem) that apply to + the server nodes + type: object + workerLimit: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: WorkerLimit is the limits (cpu/mem) that apply to + the agent nodes + type: object + type: object + defaultNodeSelector: + additionalProperties: + type: string + description: DefaultNodeSelector is the node selector that applies + to all clusters (server + agent) in the set + type: object + maxLimits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: MaxLimits are the limits that apply to all clusters (server + + agent) in the set + type: object + type: object + status: + description: Status is the status of the ClusterSet + properties: + conditions: + description: Conditions are the invidual conditions for the cluster + set + items: + description: "Condition contains details for one aspect of the current + state of this API Resource.\n---\nThis struct is intended for + direct use as an array at the field path .status.conditions. For + example,\n\n\n\ttype FooStatus struct{\n\t // Represents the + observations of a foo's current state.\n\t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // + +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t + \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" + patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t + \ // other fields\n\t}" + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: |- + type of condition in CamelCase or in foo.example.com/CamelCase. + --- + Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be + useful (see .node.status.conditions), the ability to deconflict is important. + The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + lastUpdateTime: + description: LastUpdate is the timestamp when the status was last + updated + type: string + observedGeneration: + description: ObservedGeneration was the generation at the time the + status was updated. + format: int64 + type: integer + summary: + description: Sumamry is a summary of the status (error, ready) + type: string + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/charts/k3k/templates/deployment.yaml b/charts/k3k/templates/deployment.yaml index 8360721..5e5f32f 100644 --- a/charts/k3k/templates/deployment.yaml +++ b/charts/k3k/templates/deployment.yaml @@ -15,6 +15,10 @@ spec: labels: {{- include "k3k.selectorLabels" . | nindent 8 }} spec: + volumes: + - name: webhook-serving + secret: + secretName: webhook-secret containers: - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} @@ -23,4 +27,11 @@ spec: - containerPort: 8080 name: https protocol: TCP - serviceAccountName: {{ include "k3k.serviceAccountName" . }} \ No newline at end of file + - containerPort: 9443 + name: https-webhook + protocol: TCP + volumeMounts: + - name: webhook-serving + readOnly: true + mountPath: "/tmp/k8s-webhook-server/serving-certs" + serviceAccountName: {{ include "k3k.serviceAccountName" . }} diff --git a/charts/k3k/templates/service.yaml b/charts/k3k/templates/service.yaml new file mode 100644 index 0000000..a777d82 --- /dev/null +++ b/charts/k3k/templates/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: k3k-webhook + labels: + {{- include "k3k.labels" . | nindent 4 }} + namespace: {{ .Values.namespace }} +spec: + ports: + - port: 443 + protocol: TCP + name: https-webhook + targetPort: 9443 + selector: + {{- include "k3k.selectorLabels" . | nindent 6 }} diff --git a/charts/k3k/templates/webhooks.yaml b/charts/k3k/templates/webhooks.yaml new file mode 100644 index 0000000..b03188a --- /dev/null +++ b/charts/k3k/templates/webhooks.yaml @@ -0,0 +1,46 @@ +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration +metadata: + name: "k3k.io.clusterset" +webhooks: +- name: "clusters.k3k.io" + rules: + - apiGroups: ["k3k.io"] + apiVersions: ["v1alpha1"] + operations: ["CREATE", "UPDATE"] + resources: ["clusters"] + scope: "Namespaced" + clientConfig: + service: + name: {{ include "k3k.fullname" . }}-webhook + namespace: {{ .Values.namespace }} + path: /validate-k3k-io-v1alpha1-cluster + caBundle: > + ReplaceMe + admissionReviewVersions: ["v1"] + sideEffects: None + timeoutSeconds: 10 +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: "k3k.io.clusterset-default" +webhooks: +- name: "clusters.k3k.io" + rules: + - apiGroups: ["k3k.io"] + apiVersions: ["v1alpha1"] + operations: ["CREATE"] + resources: ["clusters"] + scope: "Namespaced" + clientConfig: + service: + name: {{ include "k3k.fullname" . }}-webhook + namespace: {{ .Values.namespace }} + path: /mutate-k3k-io-v1alpha1-cluster + caBundle: > + ReplaceMe + admissionReviewVersions: ["v1"] + sideEffects: None + timeoutSeconds: 10 + diff --git a/charts/k3k/values.yaml b/charts/k3k/values.yaml index 5d889b0..1361ab1 100644 --- a/charts/k3k/values.yaml +++ b/charts/k3k/values.yaml @@ -2,7 +2,7 @@ replicaCount: 1 namespace: k3k-system image: - repository: rancher/k3k + repository: rancher/k3k pullPolicy: Always # Overrides the image tag whose default is the chart appVersion. tag: "v0.2.1" diff --git a/main.go b/main.go index ba7eb0e..45bfd4a 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" "github.com/rancher/k3k/pkg/controller/cluster" + "github.com/rancher/k3k/pkg/controller/clusterset" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -38,12 +39,25 @@ func main() { mgr, err := ctrl.NewManager(restConfig, manager.Options{ Scheme: Scheme, }) + if err != nil { klog.Fatalf("Failed to create new controller runtime manager: %v", err) } if err := cluster.Add(ctx, mgr); err != nil { - klog.Fatalf("Failed to add the new controller: %v", err) + klog.Fatalf("Failed to add the new cluster controller: %v", err) + } + + if err := cluster.AddPodController(ctx, mgr); err != nil { + klog.Fatalf("Failed to add the new cluster controller: %v", err) + } + klog.Info("adding clusterset controller") + if err := clusterset.Add(ctx, mgr); err != nil { + klog.Fatalf("Failed to add the clusterset controller: %v", err) + } + + if err := cluster.AddWebhookHandler(ctx, mgr); err != nil { + klog.Fatalf("failed to add a webhook for the cluster type: %v", err) } if err := cluster.AddPodController(ctx, mgr); err != nil { diff --git a/pkg/apis/k3k.io/v1alpha1/register.go b/pkg/apis/k3k.io/v1alpha1/register.go index bb8a099..6c2a6f6 100644 --- a/pkg/apis/k3k.io/v1alpha1/register.go +++ b/pkg/apis/k3k.io/v1alpha1/register.go @@ -21,7 +21,10 @@ func Resource(resource string) schema.GroupResource { func addKnownTypes(s *runtime.Scheme) error { s.AddKnownTypes(SchemeGroupVersion, &Cluster{}, - &ClusterList{}) + &ClusterList{}, + &ClusterSet{}, + &ClusterSetList{}, + ) metav1.AddToGroupVersion(s, SchemeGroupVersion) return nil } diff --git a/pkg/apis/k3k.io/v1alpha1/set_types.go b/pkg/apis/k3k.io/v1alpha1/set_types.go new file mode 100644 index 0000000..abbeaad --- /dev/null +++ b/pkg/apis/k3k.io/v1alpha1/set_types.go @@ -0,0 +1,50 @@ +package v1alpha1 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:storageversion +// +kubebuilder:subresource:status + +type ClusterSet struct { + metav1.ObjectMeta `json:"metadata,omitempty"` + metav1.TypeMeta `json:",inline"` + + // Spec is the spec of the ClusterSet + Spec ClusterSetSpec `json:"spec"` + // Status is the status of the ClusterSet + Status ClusterSetStatus `json:"status,omitempty"` +} + +type ClusterSetSpec struct { + // MaxLimits are the limits that apply to all clusters (server + agent) in the set + MaxLimits v1.ResourceList `json:"maxLimits,omitempty"` + // DefaultLimits are the limits used for servers/agents when a cluster in the set doesn't provide any + DefaultLimits *ClusterLimit `json:"defaultLimits,omitempty"` + // DefaultNodeSelector is the node selector that applies to all clusters (server + agent) in the set + DefaultNodeSelector map[string]string `json:"defaultNodeSelector,omitempty"` +} + +type ClusterSetStatus struct { + // ObservedGeneration was the generation at the time the status was updated. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // LastUpdate is the timestamp when the status was last updated + LastUpdate string `json:"lastUpdateTime,omitempty"` + // Sumamry is a summary of the status (error, ready) + Summary string `json:"summary,omitempty"` + // Conditions are the invidual conditions for the cluster set + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type ClusterSetList struct { + metav1.ListMeta `json:"metadata,omitempty"` + metav1.TypeMeta `json:",inline"` + + Items []ClusterSet `json:"items"` +} diff --git a/pkg/apis/k3k.io/v1alpha1/types.go b/pkg/apis/k3k.io/v1alpha1/types.go index 58c8db2..e8fde02 100644 --- a/pkg/apis/k3k.io/v1alpha1/types.go +++ b/pkg/apis/k3k.io/v1alpha1/types.go @@ -27,6 +27,10 @@ type ClusterSpec struct { // +kubebuilder:validation:XValidation:message="invalid value for agents",rule="self >= 0" // Agents is the number of K3s pods to run in agent (worker) mode. Agents *int32 `json:"agents"` + // NodeSelector is the node selector that will be applied to all server/agent pods + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // Limit is the limits that apply for the server/worker nodes. + Limit *ClusterLimit `json:"clusterLimit,omitempty"` // +kubebuilder:validation:XValidation:message="token is immutable",rule="self == oldSelf" // Token is the token used to join the worker nodes to the cluster. Token string `json:"token"` diff --git a/pkg/apis/k3k.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/k3k.io/v1alpha1/zz_generated.deepcopy.go index 3fa29fe..c11f54a 100644 --- a/pkg/apis/k3k.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/k3k.io/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,8 @@ package v1alpha1 import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -53,6 +55,36 @@ func (in *Cluster) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterLimit) DeepCopyInto(out *ClusterLimit) { + *out = *in + if in.ServerLimit != nil { + in, out := &in.ServerLimit, &out.ServerLimit + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + if in.WorkerLimit != nil { + in, out := &in.WorkerLimit, &out.WorkerLimit + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterLimit. +func (in *ClusterLimit) DeepCopy() *ClusterLimit { + if in == nil { + return nil + } + out := new(ClusterLimit) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterList) DeepCopyInto(out *ClusterList) { *out = *in @@ -86,6 +118,125 @@ func (in *ClusterList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterSet) DeepCopyInto(out *ClusterSet) { + *out = *in + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.TypeMeta = in.TypeMeta + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterSet. +func (in *ClusterSet) DeepCopy() *ClusterSet { + if in == nil { + return nil + } + out := new(ClusterSet) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterSet) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterSetList) DeepCopyInto(out *ClusterSetList) { + *out = *in + in.ListMeta.DeepCopyInto(&out.ListMeta) + out.TypeMeta = in.TypeMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterSet, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterSetList. +func (in *ClusterSetList) DeepCopy() *ClusterSetList { + if in == nil { + return nil + } + out := new(ClusterSetList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterSetList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterSetSpec) DeepCopyInto(out *ClusterSetSpec) { + *out = *in + if in.MaxLimits != nil { + in, out := &in.MaxLimits, &out.MaxLimits + *out = make(v1.ResourceList, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } + } + if in.DefaultLimits != nil { + in, out := &in.DefaultLimits, &out.DefaultLimits + *out = new(ClusterLimit) + (*in).DeepCopyInto(*out) + } + if in.DefaultNodeSelector != nil { + in, out := &in.DefaultNodeSelector, &out.DefaultNodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterSetSpec. +func (in *ClusterSetSpec) DeepCopy() *ClusterSetSpec { + if in == nil { + return nil + } + out := new(ClusterSetSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterSetStatus) DeepCopyInto(out *ClusterSetStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterSetStatus. +func (in *ClusterSetStatus) DeepCopy() *ClusterSetStatus { + if in == nil { + return nil + } + out := new(ClusterSetStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) { *out = *in @@ -99,6 +250,18 @@ func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) { *out = new(int32) **out = **in } + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Limit != nil { + in, out := &in.Limit, &out.Limit + *out = new(ClusterLimit) + (*in).DeepCopyInto(*out) + } if in.ServerArgs != nil { in, out := &in.ServerArgs, &out.ServerArgs *out = make([]string, len(*in)) diff --git a/pkg/controller/cluster/agent/agent.go b/pkg/controller/cluster/agent/agent.go index f588c9a..9df2b35 100644 --- a/pkg/controller/cluster/agent/agent.go +++ b/pkg/controller/cluster/agent/agent.go @@ -130,8 +130,12 @@ func (a *Agent) StatefulAgent(cluster *v1alpha1.Cluster) *apps.StatefulSet { func (a *Agent) podSpec(image, name string, args []string, statefulSet bool, affinitySelector *metav1.LabelSelector) v1.PodSpec { var limit v1.ResourceList + if a.cluster.Spec.Limit != nil && a.cluster.Spec.Limit.ServerLimit != nil { + limit = a.cluster.Spec.Limit.ServerLimit + } args = append([]string{"agent", "--config", "/opt/rancher/k3s/config.yaml"}, args...) podSpec := v1.PodSpec{ + NodeSelector: a.cluster.Spec.NodeSelector, Affinity: &v1.Affinity{ PodAntiAffinity: &v1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ diff --git a/pkg/controller/cluster/server/bootstrap/bootstrap.go b/pkg/controller/cluster/server/bootstrap/bootstrap.go index 0500be0..103d366 100644 --- a/pkg/controller/cluster/server/bootstrap/bootstrap.go +++ b/pkg/controller/cluster/server/bootstrap/bootstrap.go @@ -62,6 +62,14 @@ func Generate(ctx context.Context, cluster *v1alpha1.Cluster, ip string) (*v1.Se ObjectMeta: metav1.ObjectMeta{ Name: cluster.Name + "-bootstrap", Namespace: util.ClusterNamespace(cluster), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: cluster.APIVersion, + Kind: cluster.Kind, + Name: cluster.Name, + UID: cluster.UID, + }, + }, }, Data: map[string][]byte{ "bootstrap": bootstrapData, diff --git a/pkg/controller/cluster/server/server.go b/pkg/controller/cluster/server/server.go index ffc1b34..fa7ce0e 100644 --- a/pkg/controller/cluster/server/server.go +++ b/pkg/controller/cluster/server/server.go @@ -41,7 +41,12 @@ func New(cluster *v1alpha1.Cluster, client client.Client) *Server { } func (s *Server) podSpec(ctx context.Context, image, name string, persistent bool, affinitySelector *metav1.LabelSelector) v1.PodSpec { + var limit v1.ResourceList + if s.cluster.Spec.Limit != nil && s.cluster.Spec.Limit.ServerLimit != nil { + limit = s.cluster.Spec.Limit.ServerLimit + } podSpec := v1.PodSpec{ + NodeSelector: s.cluster.Spec.NodeSelector, Affinity: &v1.Affinity{ PodAntiAffinity: &v1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ @@ -110,6 +115,9 @@ func (s *Server) podSpec(ctx context.Context, image, name string, persistent boo { Name: name, Image: image, + Resources: v1.ResourceRequirements{ + Limits: limit, + }, Env: []v1.EnvVar{ { Name: "POD_NAME", diff --git a/pkg/controller/cluster/webhook.go b/pkg/controller/cluster/webhook.go new file mode 100644 index 0000000..05b61d9 --- /dev/null +++ b/pkg/controller/cluster/webhook.go @@ -0,0 +1,210 @@ +package cluster + +import ( + "context" + "fmt" + + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type webhookHandler struct { + client ctrlruntimeclient.Client + scheme *runtime.Scheme +} + +func AddWebhookHandler(ctx context.Context, mgr manager.Manager) error { + handler := webhookHandler{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + } + return ctrl.NewWebhookManagedBy(mgr).For(&v1alpha1.Cluster{}).WithDefaulter(&handler).WithValidator(&handler).Complete() +} + +func (w *webhookHandler) Default(ctx context.Context, obj runtime.Object) error { + cluster, ok := obj.(*v1alpha1.Cluster) + if !ok { + return fmt.Errorf("invalid request: object was type %t not cluster", obj) + } + klog.Info("recieved mutate request for %+v", cluster) + clusterSet, found, err := w.findClusterSet(ctx, cluster) + if err != nil { + return fmt.Errorf("error when finding cluster set: %w", err) + } + if !found { + // this cluster isn't in a cluster set, don't apply any defaults + return nil + } + clusterSetLimits := clusterSet.Spec.DefaultLimits + clusterLimits := cluster.Spec.Limit + if clusterSetLimits != nil { + if clusterLimits == nil { + clusterLimits = clusterSetLimits + } + defaultLimits(clusterSetLimits.ServerLimit, clusterLimits.ServerLimit) + defaultLimits(clusterSetLimits.WorkerLimit, clusterLimits.WorkerLimit) + cluster.Spec.Limit = clusterLimits + } + // values are overriden for node selector, which applies to all clusters in the set + for key, value := range clusterSet.Spec.DefaultNodeSelector { + cluster.Spec.NodeSelector[key] = value + } + return nil +} + +// defaultLimits adds missing keys from default into current. Existing values are not replaced +func defaultLimits(defaults v1.ResourceList, current v1.ResourceList) { + for name, limit := range defaults { + if _, ok := current[name]; !ok { + current[name] = limit + } + } +} + +func (w *webhookHandler) ValidateCreate(ctx context.Context, obj runtime.Object) error { + cluster, ok := obj.(*v1alpha1.Cluster) + if !ok { + return fmt.Errorf("invalid request: object was type %t not cluster", obj) + } + clusterSet, found, err := w.findClusterSet(ctx, cluster) + if err != nil { + return fmt.Errorf("error when finding cluster set: %w", err) + } + if !found { + // this cluster isn't in a cluster set, don't do any validation + return nil + } + return w.validateLimits(ctx, cluster, clusterSet) +} + +func (w *webhookHandler) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { + // note that since we only check new cluster, if the oldCluster was made invalid by changing the clusterSet + // the next operation on the cluster must correct the error, or no request can go through + cluster, ok := newObj.(*v1alpha1.Cluster) + if !ok { + return fmt.Errorf("invalid request: object was type %t not cluster", newObj) + } + clusterSet, found, err := w.findClusterSet(ctx, cluster) + if err != nil { + return fmt.Errorf("error when finding cluster set: %w", err) + } + if !found { + // this cluster isn't in a cluster set, don't do any validation + return nil + } + return w.validateLimits(ctx, cluster, clusterSet) +} + +func (w *webhookHandler) ValidateDelete(ctx context.Context, obj runtime.Object) error { + // Deleting a cluster is always valid + return nil +} + +func (w *webhookHandler) validateLimits(ctx context.Context, cluster *v1alpha1.Cluster, clusterSet *v1alpha1.ClusterSet) error { + maxLimits := clusterSet.Spec.MaxLimits + if maxLimits == nil { + // nothing to validate if there are no limits + return nil + } + if cluster.Spec.Limit == nil || cluster.Spec.Limit.ServerLimit == nil || cluster.Spec.Limit.WorkerLimit == nil { + // this should never happen because of the defaulter, but is done to guard a nil/panic below + return fmt.Errorf("cluster %s has no limits set but is in a cluster set with limits", cluster.Name) + } + serverLimit := cluster.Spec.Limit.ServerLimit + workerLimit := cluster.Spec.Limit.WorkerLimit + for limit := range maxLimits { + if _, ok := serverLimit[limit]; !ok { + return fmt.Errorf("a limit was set for resource %s but no cluster limit was provided", limit) + } + if _, ok := workerLimit[limit]; !ok { + return fmt.Errorf("a limit was set for resource %s but no cluster limit was provided", limit) + } + } + usedLimits, err := w.findUsedLimits(ctx, clusterSet, cluster) + if err != nil { + return fmt.Errorf("unable to find current used limits: %w", err) + } + klog.Infof("used: %+v, max: %+v", usedLimits, maxLimits) + if exceedsLimits(maxLimits, usedLimits) { + return fmt.Errorf("new cluster would exceed limits") + } + return nil +} + +func (w *webhookHandler) findClusterSet(ctx context.Context, cluster *v1alpha1.Cluster) (*v1alpha1.ClusterSet, bool, error) { + var clusterSets v1alpha1.ClusterSetList + err := w.client.List(ctx, &clusterSets, ctrlruntimeclient.InNamespace(cluster.Namespace)) + if err != nil { + return nil, false, fmt.Errorf("unable to list cluster sets: %w", err) + } + switch len(clusterSets.Items) { + case 0: + return nil, false, nil + case 1: + return &clusterSets.Items[0], true, nil + default: + return nil, false, fmt.Errorf("expected only one clusterset, found %d", len(clusterSets.Items)) + + } +} + +func (w *webhookHandler) findUsedLimits(ctx context.Context, clusterSet *v1alpha1.ClusterSet, newCluster *v1alpha1.Cluster) (v1.ResourceList, error) { + var clusterList v1alpha1.ClusterList + if err := w.client.List(ctx, &clusterList, ctrlruntimeclient.InNamespace(clusterSet.Namespace)); err != nil { + return nil, fmt.Errorf("unable to list clusters: %w", err) + } + usedLimits := v1.ResourceList{} + for _, cluster := range clusterList.Items { + // skip new cluster - if on update we need to add the new proposed values, not the existing values + if cluster.Spec.Limit == nil || cluster.Name == newCluster.Name { + // note that this will cause clusters with no values set to be beyond used calculations + continue + } + addClusterLimits(usedLimits, &cluster) + } + addClusterLimits(usedLimits, newCluster) + return usedLimits, nil +} + +func addClusterLimits(usedLimits v1.ResourceList, cluster *v1alpha1.Cluster) { + for i := 0; i < int(*cluster.Spec.Agents); i++ { + usedLimits = addLimits(usedLimits, cluster.Spec.Limit.WorkerLimit) + } + for i := 0; i < int(*cluster.Spec.Servers); i++ { + usedLimits = addLimits(usedLimits, cluster.Spec.Limit.ServerLimit) + } +} + +func addLimits(currentList v1.ResourceList, newList v1.ResourceList) v1.ResourceList { + for key, value := range newList { + current, ok := currentList[key] + if !ok { + // deep copy to avoid mutating the next time that we add something on + currentList[key] = value.DeepCopy() + continue + } + current.Add(value) + currentList[key] = current + } + return currentList +} + +func exceedsLimits(maxLimit v1.ResourceList, usedLimit v1.ResourceList) bool { + for key, value := range maxLimit { + used, ok := usedLimit[key] + if !ok { + // not present means there is no usage currently + continue + } + // used > max, so this would exceed the max + if used.Cmp(value) == 1 { + return true + } + } + return false +} diff --git a/pkg/controller/clusterset/clusterset.go b/pkg/controller/clusterset/clusterset.go new file mode 100644 index 0000000..bf56619 --- /dev/null +++ b/pkg/controller/clusterset/clusterset.go @@ -0,0 +1,80 @@ +package clusterset + +import ( + "context" + "fmt" + + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + clusterSetController = "k3k-clusterset-controller" + maxConcurrentReconciles = 1 +) + +type ClusterSetReconciler struct { + Client ctrlruntimeclient.Client + Scheme *runtime.Scheme +} + +// Add adds a new controller to the manager +func Add(ctx context.Context, mgr manager.Manager) error { + // initialize a new Reconciler + reconciler := ClusterSetReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + } + + // create a new controller and add it to the manager + //this can be replaced by the new builder functionality in controller-runtime + controller, err := controller.New(clusterSetController, mgr, controller.Options{ + Reconciler: &reconciler, + MaxConcurrentReconciles: maxConcurrentReconciles, + }) + if err != nil { + return err + } + + return controller.Watch(&source.Kind{Type: &v1alpha1.ClusterSet{}}, &handler.EnqueueRequestForObject{}) +} + +func (c *ClusterSetReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + var clusterSet v1alpha1.ClusterSet + if err := c.Client.Get(ctx, types.NamespacedName{Name: req.Name}, &clusterSet); err != nil { + return reconcile.Result{}, fmt.Errorf("unable to get the clusterset: %w", err) + } + klog.Infof("got a clusterset: %v", clusterSet) + if clusterSet.Spec.MaxLimits != nil { + quota := v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clusterset-quota", + Namespace: clusterSet.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + UID: clusterSet.UID, + Name: clusterSet.Name, + APIVersion: clusterSet.APIVersion, + Kind: clusterSet.Kind, + }, + }, + }, + } + quota.Spec.Hard = clusterSet.Spec.MaxLimits + err := c.Client.Create(ctx, "a) + if err != nil { + return reconcile.Result{}, fmt.Errorf("unable to create resource quota from cluster set: %w", err) + } + } + return reconcile.Result{}, nil +} diff --git a/pkg/controller/clusterset/webhook.go b/pkg/controller/clusterset/webhook.go new file mode 100644 index 0000000..7ecd94a --- /dev/null +++ b/pkg/controller/clusterset/webhook.go @@ -0,0 +1,4 @@ +package clusterset + +type clusterSetWebhokHandler struct { +}