Skip to content

Commit

Permalink
Add several features/optimizations for SPIRE (tektoncd#3)
Browse files Browse the repository at this point in the history
* Record pod latency before SPIRE entry creation

Signed-off-by: Brandon Lum <[email protected]>

* SPIRE client connection caching

Signed-off-by: Brandon Lum <[email protected]>

* Optimize spire entry creation

Signed-off-by: Brandon Lum <[email protected]>

* Add TTL for workload entry based on taskrun timeout

Signed-off-by: Brandon Lum <[email protected]>
  • Loading branch information
lumjjb authored Feb 11, 2022
1 parent 53678cc commit 60a7b21
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 89 deletions.
3 changes: 2 additions & 1 deletion pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tektoncd/pipeline/pkg/pod"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/spire"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand Down Expand Up @@ -62,7 +63,7 @@ func NewController(opts *pipeline.Options, clock clock.Clock) func(context.Conte
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: opts.Images,
SpireConfig: opts.SpireConfig,
SpireClient: spire.NewSpireServerApiClient(opts.SpireConfig),
Clock: clock,
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
Expand Down
24 changes: 8 additions & 16 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/spire"
spireconfig "github.com/tektoncd/pipeline/pkg/spire/config"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
_ "github.com/tektoncd/pipeline/pkg/taskrunmetrics/fake" // Make sure the taskrunmetrics are setup
"github.com/tektoncd/pipeline/pkg/workspace"
Expand All @@ -70,7 +69,7 @@ type Reconciler struct {
KubeClientSet kubernetes.Interface
PipelineClientSet clientset.Interface
Images pipeline.Images
SpireConfig spireconfig.SpireConfig
SpireClient *spire.SpireServerApiClient
Clock clock.Clock

// listers index properties about resources
Expand Down Expand Up @@ -431,18 +430,14 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
}

if podconvert.SidecarsReady(pod.Status) {
if err := c.metrics.RecordPodLatency(pod, tr); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}

if config.FromContextOrDefaults(ctx).FeatureFlags.EnableSpire {
logger.Infof("Registering SPIRE entry: %v/%v", pod.Namespace, pod.Name)
spiffeclient, err := spire.NewSpiffeServerApiClient(ctx, c.SpireConfig)
if err != nil {
logger.Errorf("Failed to establish client with SPIRE server: %v", err)
return err
}
if err = spiffeclient.CreateNodeEntry(ctx, pod.Spec.NodeName); err != nil {
logger.Errorf("Failed to create node SPIFFE entry for node %v: %v", pod.Spec.NodeName, err)
return err
}
if err = spiffeclient.CreateWorkloadEntry(ctx, tr, pod); err != nil {
// TTL is in seconds
ttl := config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes * 60
if err = c.SpireClient.CreateEntries(ctx, tr, pod, ttl); err != nil {
logger.Errorf("Failed to create workload SPIFFE entry for taskrun %v: %v", tr.Name, err)
return err
}
Expand All @@ -453,9 +448,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
return err
}

if err := c.metrics.RecordPodLatency(pod, tr); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
}

// Convert the Pod's status to the equivalent TaskRun Status.
Expand Down
156 changes: 84 additions & 72 deletions pkg/spire/spire.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,54 @@ import (
"google.golang.org/grpc/credentials"
)

type SpiffeServerApiClient struct {
type SpireServerApiClient struct {
config spireconfig.SpireConfig
serverConn *grpc.ClientConn
workloadConn *workloadapi.X509Source
entryClient entryv1.EntryClient
config spireconfig.SpireConfig
}

func NewSpiffeServerApiClient(ctx context.Context, c spireconfig.SpireConfig) (*SpiffeServerApiClient, error) {
// Create X509Source
source, err := workloadapi.NewX509Source(ctx, workloadapi.WithClientOptions(workloadapi.WithAddr("unix://"+c.SocketPath)))
if err != nil {
return nil, fmt.Errorf("Unable to create X509Source for SPIFFE client: %w", err)
func (sc *SpireServerApiClient) checkClient(ctx context.Context) error {
if sc.entryClient == nil || sc.workloadConn == nil || sc.serverConn == nil {
return sc.dial(ctx)
}
return nil
}

// Create connection
tlsConfig := tlsconfig.MTLSClientConfig(source, source, tlsconfig.AuthorizeAny())
conn, err := grpc.DialContext(ctx, c.ServerAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
source.Close()
return nil, fmt.Errorf("Unable to dial SPIRE server: %w", err)
func (sc *SpireServerApiClient) dial(ctx context.Context) error {
if sc.workloadConn == nil {
// Create X509Source
source, err := workloadapi.NewX509Source(ctx, workloadapi.WithClientOptions(workloadapi.WithAddr("unix://"+sc.config.SocketPath)))
if err != nil {
return fmt.Errorf("Unable to create X509Source for SPIFFE client: %w", err)
}
sc.workloadConn = source
}

return &SpiffeServerApiClient{
serverConn: conn,
workloadConn: source,
entryClient: entryv1.NewEntryClient(conn),
config: c,
}, nil
if sc.serverConn == nil {
// Create connection
tlsConfig := tlsconfig.MTLSClientConfig(sc.workloadConn, sc.workloadConn, tlsconfig.AuthorizeAny())
conn, err := grpc.DialContext(ctx, sc.config.ServerAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if err != nil {
sc.workloadConn.Close()
sc.workloadConn = nil
return fmt.Errorf("Unable to dial SPIRE server: %w", err)
}
sc.serverConn = conn
}

sc.entryClient = entryv1.NewEntryClient(sc.serverConn)

return nil
}

func NewSpireServerApiClient(c spireconfig.SpireConfig) *SpireServerApiClient {
return &SpireServerApiClient{
config: c,
}
}

func (sc *SpiffeServerApiClient) CreateNodeEntry(ctx context.Context, nodeName string) error {
func (sc *SpireServerApiClient) NodeEntry(nodeName string) *spiffetypes.Entry {
selectors := []*spiffetypes.Selector{
{
Type: "k8s_psat",
Expand All @@ -76,43 +93,20 @@ func (sc *SpiffeServerApiClient) CreateNodeEntry(ctx context.Context, nodeName s
},
}

entries := []*spiffetypes.Entry{
{
SpiffeId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, nodeName),
},
ParentId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: "/spire/server",
},
Selectors: selectors,
return &spiffetypes.Entry{
SpiffeId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, nodeName),
},
ParentId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: "/spire/server",
},
Selectors: selectors,
}

req := entryv1.BatchCreateEntryRequest{
Entries: entries,
}

resp, err := sc.entryClient.BatchCreateEntry(ctx, &req)
if err != nil {
return err
}

if len(resp.Results) != 1 {
return fmt.Errorf("Batch create entry failed, malformed response expected 1 result")
}

res := resp.Results[0]
if codes.Code(res.Status.Code) == codes.AlreadyExists ||
codes.Code(res.Status.Code) == codes.OK {
return nil
}

return fmt.Errorf("Batch create entry failed, code: %v", res.Status.Code)
}

func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1beta1.TaskRun, pod *corev1.Pod) error {
func (sc *SpireServerApiClient) WorkloadEntry(tr *v1beta1.TaskRun, pod *corev1.Pod, ttl int32) *spiffetypes.Entry {
// Note: We can potentially add attestation on the container images as well since
// the information is available here.
selectors := []*spiffetypes.Selector{
Expand All @@ -126,18 +120,28 @@ func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1
},
}

entries := []*spiffetypes.Entry{
{
SpiffeId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("/ns/%v/taskrun/%v", tr.Namespace, tr.Name),
},
ParentId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, pod.Spec.NodeName),
},
Selectors: selectors,
return &spiffetypes.Entry{
SpiffeId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("/ns/%v/taskrun/%v", tr.Namespace, tr.Name),
},
ParentId: &spiffetypes.SPIFFEID{
TrustDomain: sc.config.TrustDomain,
Path: fmt.Sprintf("%v%v", sc.config.NodeAliasPrefix, pod.Spec.NodeName),
},
Selectors: selectors,
Ttl: ttl,
}
}

func (sc *SpireServerApiClient) CreateEntries(ctx context.Context, tr *v1beta1.TaskRun, pod *corev1.Pod, ttl int) error {
err := sc.checkClient(ctx)
if err != nil {
return err
}
entries := []*spiffetypes.Entry{
sc.NodeEntry(pod.Spec.NodeName),
sc.WorkloadEntry(tr, pod, int32(ttl)),
}

req := entryv1.BatchCreateEntryRequest{
Expand All @@ -149,20 +153,28 @@ func (sc *SpiffeServerApiClient) CreateWorkloadEntry(ctx context.Context, tr *v1
return err
}

if len(resp.Results) != 1 {
return fmt.Errorf("Batch create entry failed, malformed response expected 1 result")
if len(resp.Results) != len(entries) {
return fmt.Errorf("Batch create entry failed, malformed response expected %v result", len(entries))
}

res := resp.Results[0]
if codes.Code(res.Status.Code) == codes.AlreadyExists ||
codes.Code(res.Status.Code) == codes.OK {
return nil
var errPaths []string
var errCodes []int32

for _, r := range resp.Results {
if codes.Code(r.Status.Code) != codes.AlreadyExists &&
codes.Code(r.Status.Code) != codes.OK {
errPaths = append(errPaths, r.Entry.SpiffeId.Path)
errCodes = append(errCodes, r.Status.Code)
}
}

return fmt.Errorf("Batch create entry failed, code: %v", res.Status.Code)
if len(errPaths) != 0 {
return fmt.Errorf("Batch create entry failed for entries %+v with codes %+v", errPaths, errCodes)
}
return nil
}

func (sc *SpiffeServerApiClient) Close() {
func (sc *SpireServerApiClient) Close() {
err := sc.serverConn.Close()
if err != nil {
// Log error
Expand Down

0 comments on commit 60a7b21

Please sign in to comment.