diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 67965f8838e..7eda96da7f9 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -30,6 +30,7 @@ import ( "github.com/tektoncd/pipeline/pkg/pod" cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" + "github.com/tektoncd/pipeline/pkg/spire" "github.com/tektoncd/pipeline/pkg/taskrunmetrics" "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -62,7 +63,7 @@ func NewController(opts *pipeline.Options, clock clock.Clock) func(context.Conte KubeClientSet: kubeclientset, PipelineClientSet: pipelineclientset, Images: opts.Images, - SpireConfig: opts.SpireConfig, + SpireClient: spire.NewSpireServerApiClient(opts.SpireConfig), Clock: clock, taskRunLister: taskRunInformer.Lister(), resourceLister: resourceInformer.Lister(), diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 6cd4482345a..7f2d0d46700 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -48,7 +48,6 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/spire" - spireconfig "github.com/tektoncd/pipeline/pkg/spire/config" "github.com/tektoncd/pipeline/pkg/taskrunmetrics" _ "github.com/tektoncd/pipeline/pkg/taskrunmetrics/fake" // Make sure the taskrunmetrics are setup "github.com/tektoncd/pipeline/pkg/workspace" @@ -70,7 +69,7 @@ type Reconciler struct { KubeClientSet kubernetes.Interface PipelineClientSet clientset.Interface Images pipeline.Images - SpireConfig spireconfig.SpireConfig + SpireClient *spire.SpireServerApiClient Clock clock.Clock // listers index properties about resources @@ -431,18 +430,14 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re } if podconvert.SidecarsReady(pod.Status) { + if err := c.metrics.RecordPodLatency(pod, tr); err != nil { + logger.Warnf("Failed to log the metrics : %v", err) + } + if config.FromContextOrDefaults(ctx).FeatureFlags.EnableSpire { - logger.Infof("Registering SPIRE entry: %v/%v", pod.Namespace, pod.Name) - spiffeclient, err := spire.NewSpiffeServerApiClient(ctx, c.SpireConfig) - if err != nil { - logger.Errorf("Failed to establish client with SPIRE server: %v", err) - return err - } - if err = spiffeclient.CreateNodeEntry(ctx, pod.Spec.NodeName); err != nil { - logger.Errorf("Failed to create node SPIFFE entry for node %v: %v", pod.Spec.NodeName, err) - return err - } - if err = spiffeclient.CreateWorkloadEntry(ctx, tr, pod); err != nil { + // TTL is in seconds + ttl := config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes * 60 + if err = c.SpireClient.CreateEntries(ctx, tr, pod, ttl); err != nil { logger.Errorf("Failed to create workload SPIFFE entry for taskrun %v: %v", tr.Name, err) return err } @@ -453,9 +448,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re return err } - if err := c.metrics.RecordPodLatency(pod, tr); err != nil { - logger.Warnf("Failed to log the metrics : %v", err) - } } // Convert the Pod's status to the equivalent TaskRun Status. diff --git a/pkg/spire/spire.go b/pkg/spire/spire.go index 5c1c932fe30..8d9f5a5c418 100644 --- a/pkg/spire/spire.go +++ b/pkg/spire/spire.go @@ -34,37 +34,54 @@ import ( "google.golang.org/grpc/credentials" ) -type SpiffeServerApiClient struct { +type SpireServerApiClient struct { + config spireconfig.SpireConfig serverConn *grpc.ClientConn workloadConn *workloadapi.X509Source entryClient entryv1.EntryClient - config spireconfig.SpireConfig } -func NewSpiffeServerApiClient(ctx context.Context, c spireconfig.SpireConfig) (*SpiffeServerApiClient, error) { - // Create X509Source - source, err := workloadapi.NewX509Source(ctx, workloadapi.WithClientOptions(workloadapi.WithAddr("unix://"+c.SocketPath))) - if err != nil { - return nil, fmt.Errorf("Unable to create X509Source for SPIFFE client: %w", err) +func (sc *SpireServerApiClient) checkClient(ctx context.Context) error { + if sc.entryClient == nil || sc.workloadConn == nil || sc.serverConn == nil { + return sc.dial(ctx) } + return nil +} - // Create connection - tlsConfig := tlsconfig.MTLSClientConfig(source, source, tlsconfig.AuthorizeAny()) - conn, err := grpc.DialContext(ctx, c.ServerAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - if err != nil { - source.Close() - return nil, fmt.Errorf("Unable to dial SPIRE server: %w", err) +func (sc *SpireServerApiClient) dial(ctx context.Context) error { + if sc.workloadConn == nil { + // Create X509Source + source, err := workloadapi.NewX509Source(ctx, workloadapi.WithClientOptions(workloadapi.WithAddr("unix://"+sc.config.SocketPath))) + if err != nil { + return fmt.Errorf("Unable to create X509Source for SPIFFE client: %w", err) + } + sc.workloadConn = source } - return &SpiffeServerApiClient{ - serverConn: conn, - workloadConn: source, - entryClient: entryv1.NewEntryClient(conn), - config: c, - }, nil + if sc.serverConn == nil { + // Create connection + tlsConfig := tlsconfig.MTLSClientConfig(sc.workloadConn, sc.workloadConn, tlsconfig.AuthorizeAny()) + conn, err := grpc.DialContext(ctx, sc.config.ServerAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + if err != nil { + sc.workloadConn.Close() + sc.workloadConn = nil + return fmt.Errorf("Unable to dial SPIRE server: %w", err) + } + sc.serverConn = conn + } + + sc.entryClient = entryv1.NewEntryClient(sc.serverConn) + + return nil +} + +func NewSpireServerApiClient(c spireconfig.SpireConfig) *SpireServerApiClient { + return &SpireServerApiClient{ + config: c, + } } -func (sc *SpiffeServerApiClient) CreateNodeEntry(ctx context.Context, nodeName string) error { +func (sc *SpireServerApiClient) NodeEntry(nodeName string) *spiffetypes.Entry { selectors := []*spiffetypes.Selector{ { Type: "k8s_psat", @@ -76,43 +93,20 @@ func (sc *SpiffeServerApiClient) CreateNodeEntry(ctx context.Context, nodeName s }, } - entries := []*spiffetypes.Entry{ - { - SpiffeId: &spiffetypes.SPIFFEID{ - TrustDomain: sc.config.TrustDomain, - Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, nodeName), - }, - ParentId: &spiffetypes.SPIFFEID{ - TrustDomain: sc.config.TrustDomain, - Path: "/spire/server", - }, - Selectors: selectors, + return &spiffetypes.Entry{ + SpiffeId: &spiffetypes.SPIFFEID{ + TrustDomain: sc.config.TrustDomain, + Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, nodeName), }, + ParentId: &spiffetypes.SPIFFEID{ + TrustDomain: sc.config.TrustDomain, + Path: "/spire/server", + }, + Selectors: selectors, } - - req := entryv1.BatchCreateEntryRequest{ - Entries: entries, - } - - resp, err := sc.entryClient.BatchCreateEntry(ctx, &req) - if err != nil { - return err - } - - if len(resp.Results) != 1 { - return fmt.Errorf("Batch create entry failed, malformed response expected 1 result") - } - - res := resp.Results[0] - if codes.Code(res.Status.Code) == codes.AlreadyExists || - codes.Code(res.Status.Code) == codes.OK { - return nil - } - - return fmt.Errorf("Batch create entry failed, code: %v", res.Status.Code) } -func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1beta1.TaskRun, pod *corev1.Pod) error { +func (sc *SpireServerApiClient) WorkloadEntry(tr *v1beta1.TaskRun, pod *corev1.Pod, ttl int32) *spiffetypes.Entry { // Note: We can potentially add attestation on the container images as well since // the information is available here. selectors := []*spiffetypes.Selector{ @@ -126,18 +120,28 @@ func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1 }, } - entries := []*spiffetypes.Entry{ - { - SpiffeId: &spiffetypes.SPIFFEID{ - TrustDomain: sc.config.TrustDomain, - Path: fmt.Sprintf("/ns/%v/taskrun/%v", tr.Namespace, tr.Name), - }, - ParentId: &spiffetypes.SPIFFEID{ - TrustDomain: sc.config.TrustDomain, - Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, pod.Spec.NodeName), - }, - Selectors: selectors, + return &spiffetypes.Entry{ + SpiffeId: &spiffetypes.SPIFFEID{ + TrustDomain: sc.config.TrustDomain, + Path: fmt.Sprintf("/ns/%v/taskrun/%v", tr.Namespace, tr.Name), }, + ParentId: &spiffetypes.SPIFFEID{ + TrustDomain: sc.config.TrustDomain, + Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, pod.Spec.NodeName), + }, + Selectors: selectors, + Ttl: ttl, + } +} + +func (sc *SpireServerApiClient) CreateEntries(ctx context.Context, tr *v1beta1.TaskRun, pod *corev1.Pod, ttl int) error { + err := sc.checkClient(ctx) + if err != nil { + return err + } + entries := []*spiffetypes.Entry{ + sc.NodeEntry(pod.Spec.NodeName), + sc.WorkloadEntry(tr, pod, int32(ttl)), } req := entryv1.BatchCreateEntryRequest{ @@ -149,20 +153,28 @@ func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1 return err } - if len(resp.Results) != 1 { - return fmt.Errorf("Batch create entry failed, malformed response expected 1 result") + if len(resp.Results) != len(entries) { + return fmt.Errorf("Batch create entry failed, malformed response expected %v result", len(entries)) } - res := resp.Results[0] - if codes.Code(res.Status.Code) == codes.AlreadyExists || - codes.Code(res.Status.Code) == codes.OK { - return nil + var errPaths []string + var errCodes []int32 + + for _, r := range resp.Results { + if codes.Code(r.Status.Code) != codes.AlreadyExists && + codes.Code(r.Status.Code) != codes.OK { + errPaths = append(errPaths, r.Entry.SpiffeId.Path) + errCodes = append(errCodes, r.Status.Code) + } } - return fmt.Errorf("Batch create entry failed, code: %v", res.Status.Code) + if len(errPaths) != 0 { + return fmt.Errorf("Batch create entry failed for entries %+v with codes %+v", errPaths, errCodes) + } + return nil } -func (sc *SpiffeServerApiClient) Close() { +func (sc *SpireServerApiClient) Close() { err := sc.serverConn.Close() if err != nil { // Log error