Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ephemeral resource grpc wrappers #35784

Merged
merged 10 commits into from
Oct 1, 2024
13 changes: 6 additions & 7 deletions docs/plugin-protocol/tfplugin5.7.proto
Original file line number Diff line number Diff line change
Expand Up @@ -629,21 +629,21 @@ message OpenEphemeralResource {
message Request {
string type_name = 1;
DynamicValue config = 2;
ClientCapabilities client_capabilities = 3;
}
message Response {
repeated Diagnostic diagnostics = 1;
optional google.protobuf.Timestamp renew_at = 2;
bool is_closable = 3;
DynamicValue state = 4;
optional bytes private = 5;
DynamicValue result = 3;
optional bytes private = 4;
Deferred deferred = 5;
Comment on lines +637 to +639
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will match the provider side with the new field/naming once this gets merged 👍🏻

For the removal of is_closable, is the plan for Close to be called for all ephemeral resources regardless of if they were renewed/not renewed?

Additionally, will returned diagnostics from Close stop execution or will they behave more like the Stop RPC for providers and just log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Close would always be called. Making Close a defined part of the lifecycle regardless of whether it does anything or not makes the implementations much simpler, and since the plugin has to implement the calls anyway, it would have to at least be a noop regardless of whether it's called.

Copy link
Member Author

@jbardin jbardin Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, and diagnostics from Close would only be able to pass on some info to the user, and make Terraform exit with an error. The Close has to happen after all dependencies have used the value, so if there were an error there are no more operations to stop execution of.

}
}

message RenewEphemeralResource {
message Request {
string type_name = 1;
DynamicValue state = 2;
optional bytes private = 4;
optional bytes private = 2;
}
message Response {
repeated Diagnostic diagnostics = 1;
Expand All @@ -655,8 +655,7 @@ message RenewEphemeralResource {
message CloseEphemeralResource {
message Request {
string type_name = 1;
DynamicValue state = 2;
optional bytes private = 3;
optional bytes private = 2;
}
message Response {
repeated Diagnostic diagnostics = 1;
Expand Down
13 changes: 6 additions & 7 deletions docs/plugin-protocol/tfplugin6.7.proto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by comment 🚗: RenewEphemeralResource.Request and CloseEphemeralResource.Request still have state in this proto file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I seemed to have somehow rebased that back in 😵

Original file line number Diff line number Diff line change
Expand Up @@ -609,21 +609,21 @@ message OpenEphemeralResource {
message Request {
string type_name = 1;
DynamicValue config = 2;
ClientCapabilities client_capabilities = 3;
}
message Response {
repeated Diagnostic diagnostics = 1;
optional google.protobuf.Timestamp renew_at = 2;
bool is_closable = 3;
DynamicValue state = 4;
optional bytes private = 5;
DynamicValue result = 3;
optional bytes private = 4;
Deferred deferred = 5;
}
}

message RenewEphemeralResource {
message Request {
string type_name = 1;
DynamicValue state = 2;
optional bytes private = 4;
optional bytes private = 2;
}
message Response {
repeated Diagnostic diagnostics = 1;
Expand All @@ -635,8 +635,7 @@ message RenewEphemeralResource {
message CloseEphemeralResource {
message Request {
string type_name = 1;
DynamicValue state = 2;
optional bytes private = 3;
optional bytes private = 2;
}
message Response {
repeated Diagnostic diagnostics = 1;
Expand Down
2 changes: 1 addition & 1 deletion internal/builtin/providers/terraform/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p *Provider) GetProviderSchema() providers.GetProviderSchemaResponse {
ResourceTypes: map[string]providers.Schema{
"terraform_data": dataStoreResourceSchema(),
},
EphemeralTypes: map[string]providers.Schema{},
EphemeralResourceTypes: map[string]providers.Schema{},
Functions: map[string]providers.FunctionDecl{
"encode_tfvars": {
Summary: "Produce a string representation of an object using the same syntax as for `.tfvars` files",
Expand Down
68 changes: 62 additions & 6 deletions internal/grpcwrap/provider6.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/zclconf/go-cty/cty/msgpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/hashicorp/terraform/internal/plugin6/convert"
"github.com/hashicorp/terraform/internal/providers"
Expand Down Expand Up @@ -39,9 +40,10 @@ func (p *provider6) GetMetadata(_ context.Context, req *tfplugin6.GetMetadata_Re

func (p *provider6) GetProviderSchema(_ context.Context, req *tfplugin6.GetProviderSchema_Request) (*tfplugin6.GetProviderSchema_Response, error) {
resp := &tfplugin6.GetProviderSchema_Response{
ResourceSchemas: make(map[string]*tfplugin6.Schema),
DataSourceSchemas: make(map[string]*tfplugin6.Schema),
Functions: make(map[string]*tfplugin6.Function),
ResourceSchemas: make(map[string]*tfplugin6.Schema),
DataSourceSchemas: make(map[string]*tfplugin6.Schema),
EphemeralResourceSchemas: make(map[string]*tfplugin6.Schema),
Functions: make(map[string]*tfplugin6.Function),
}

resp.Provider = &tfplugin6.Schema{
Expand Down Expand Up @@ -70,6 +72,12 @@ func (p *provider6) GetProviderSchema(_ context.Context, req *tfplugin6.GetProvi
Block: convert.ConfigSchemaToProto(dat.Block),
}
}
for typ, dat := range p.schema.EphemeralResourceTypes {
resp.EphemeralResourceSchemas[typ] = &tfplugin6.Schema{
Version: dat.Version,
Block: convert.ConfigSchemaToProto(dat.Block),
}
}
if decls, err := convert.FunctionDeclsToProto(p.schema.Functions); err == nil {
resp.Functions = decls
} else {
Expand Down Expand Up @@ -450,15 +458,63 @@ func (p *provider6) ReadDataSource(_ context.Context, req *tfplugin6.ReadDataSou
}

func (p *provider6) OpenEphemeralResource(_ context.Context, req *tfplugin6.OpenEphemeralResource_Request) (*tfplugin6.OpenEphemeralResource_Response, error) {
panic("unimplemented")
resp := &tfplugin6.OpenEphemeralResource_Response{}
ty := p.schema.EphemeralResourceTypes[req.TypeName].Block.ImpliedType()

configVal, err := decodeDynamicValue6(req.Config, ty)
if err != nil {
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, err)
return resp, nil
}

openResp := p.provider.OpenEphemeralResource(providers.OpenEphemeralResourceRequest{
TypeName: req.TypeName,
Config: configVal,
})
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, openResp.Diagnostics)
if openResp.Diagnostics.HasErrors() {
return resp, nil
}

resp.Result, err = encodeDynamicValue6(openResp.Result, ty)
if err != nil {
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, err)
return resp, nil
}
resp.Private = openResp.Private
resp.RenewAt = timestamppb.New(openResp.RenewAt)

return resp, nil
}

func (p *provider6) RenewEphemeralResource(_ context.Context, req *tfplugin6.RenewEphemeralResource_Request) (*tfplugin6.RenewEphemeralResource_Response, error) {
panic("unimplemented")
resp := &tfplugin6.RenewEphemeralResource_Response{}
renewResp := p.provider.RenewEphemeralResource(providers.RenewEphemeralResourceRequest{
TypeName: req.TypeName,
Private: req.Private,
})
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, renewResp.Diagnostics)
if renewResp.Diagnostics.HasErrors() {
return resp, nil
}

resp.Private = renewResp.Private
resp.RenewAt = timestamppb.New(renewResp.RenewAt)
return resp, nil
}

func (p *provider6) CloseEphemeralResource(_ context.Context, req *tfplugin6.CloseEphemeralResource_Request) (*tfplugin6.CloseEphemeralResource_Response, error) {
panic("unimplemented")
resp := &tfplugin6.CloseEphemeralResource_Response{}
closeResp := p.provider.CloseEphemeralResource(providers.CloseEphemeralResourceRequest{
TypeName: req.TypeName,
Private: req.Private,
})
resp.Diagnostics = convert.AppendProtoDiag(resp.Diagnostics, closeResp.Diagnostics)
if closeResp.Diagnostics.HasErrors() {
return resp, nil
}

return resp, nil
}

func (p *provider6) GetFunctions(context.Context, *tfplugin6.GetFunctions_Request) (*tfplugin6.GetFunctions_Response, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/plans/deferring/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (d *Deferred) GetDeferredResourceInstanceValue(addr addrs.AbsResourceInstan
instancesMap = d.resourceInstancesDeferred
case addrs.DataResourceMode:
instancesMap = d.dataSourceInstancesDeferred
case addrs.EphemeralResourceMode:
return cty.NilVal, false
default:
panic(fmt.Sprintf("unexpected resource mode %q for %s", addr.Resource.Resource.Mode, addr))
}
Expand Down Expand Up @@ -238,6 +240,8 @@ func (d *Deferred) GetDeferredResourceInstances(addr addrs.AbsResource) map[addr
instancesMap = d.resourceInstancesDeferred
case addrs.DataResourceMode:
instancesMap = d.dataSourceInstancesDeferred
case addrs.EphemeralResourceMode:
return nil
default:
panic(fmt.Sprintf("unexpected resource mode %q for %s", addr.Resource.Mode, addr))
}
Expand Down
127 changes: 122 additions & 5 deletions internal/plugin/grpc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (p *GRPCProvider) GetProviderSchema() providers.GetProviderSchemaResponse {

resp.ResourceTypes = make(map[string]providers.Schema)
resp.DataSources = make(map[string]providers.Schema)
resp.EphemeralResourceTypes = make(map[string]providers.Schema)

// Some providers may generate quite large schemas, and the internal default
// grpc response size limit is 4MB. 64MB should cover most any use case, and
Expand Down Expand Up @@ -143,6 +144,10 @@ func (p *GRPCProvider) GetProviderSchema() providers.GetProviderSchemaResponse {
resp.DataSources[name] = convert.ProtoToProviderSchema(data)
}

for name, ephem := range protoResp.EphemeralResourceSchemas {
resp.EphemeralResourceTypes[name] = convert.ProtoToProviderSchema(ephem)
}

if decls, err := convert.FunctionDeclsFromProto(protoResp.Functions); err == nil {
resp.Functions = decls
} else {
Expand Down Expand Up @@ -767,24 +772,136 @@ func (p *GRPCProvider) ReadDataSource(r providers.ReadDataSourceRequest) (resp p
return resp
}

func (p *GRPCProvider) ValidateEphemeralResourceConfig(req providers.ValidateEphemeralResourceConfigRequest) providers.ValidateEphemeralResourceConfigResponse {
func (p *GRPCProvider) ValidateEphemeralResourceConfig(r providers.ValidateEphemeralResourceConfigRequest) (resp providers.ValidateEphemeralResourceConfigResponse) {
logger.Trace("GRPCProvider: ValidateEphemeralResourceConfig")
panic("ephemeral resources not supported")

schema := p.GetProviderSchema()
if schema.Diagnostics.HasErrors() {
resp.Diagnostics = schema.Diagnostics
return resp
}

ephemSchema, ok := schema.EphemeralResourceTypes[r.TypeName]
if !ok {
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown ephemeral resource %q", r.TypeName))
return resp
}

mp, err := msgpack.Marshal(r.Config, ephemSchema.Block.ImpliedType())
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(err)
return resp
}

protoReq := &proto.ValidateEphemeralResourceConfig_Request{
TypeName: r.TypeName,
Config: &proto.DynamicValue{Msgpack: mp},
}

protoResp, err := p.client.ValidateEphemeralResourceConfig(p.ctx, protoReq)
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
}
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
return resp
}

func (p *GRPCProvider) OpenEphemeralResource(r providers.OpenEphemeralResourceRequest) (resp providers.OpenEphemeralResourceResponse) {
logger.Trace("GRPCProvider: OpenEphemeralResource")
panic("ephemeral resources not supported")

schema := p.GetProviderSchema()
if schema.Diagnostics.HasErrors() {
resp.Diagnostics = schema.Diagnostics
return resp
}

ephemSchema, ok := schema.EphemeralResourceTypes[r.TypeName]
if !ok {
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown ephemeral resource %q", r.TypeName))
return resp
}

config, err := msgpack.Marshal(r.Config, ephemSchema.Block.ImpliedType())
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(err)
return resp
}

protoReq := &proto.OpenEphemeralResource_Request{
TypeName: r.TypeName,
Config: &proto.DynamicValue{
Msgpack: config,
},
ClientCapabilities: &proto.ClientCapabilities{
DeferralAllowed: r.ClientCapabilities.DeferralAllowed,
},
}

protoResp, err := p.client.OpenEphemeralResource(p.ctx, protoReq)
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
}
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))

state, err := decodeDynamicValue(protoResp.Result, ephemSchema.Block.ImpliedType())
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(err)
return resp
}

if protoResp.RenewAt != nil {
resp.RenewAt = protoResp.RenewAt.AsTime()
}

resp.Result = state
resp.Private = protoResp.Private
resp.Deferred = convert.ProtoToDeferred(protoResp.Deferred)

return resp
}

func (p *GRPCProvider) RenewEphemeralResource(r providers.RenewEphemeralResourceRequest) (resp providers.RenewEphemeralResourceResponse) {
logger.Trace("GRPCProvider: RenewEphemeralResource")
panic("ephemeral resources not supported")

protoReq := &proto.RenewEphemeralResource_Request{
TypeName: r.TypeName,
Private: r.Private,
}

protoResp, err := p.client.RenewEphemeralResource(p.ctx, protoReq)
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
}
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))

if protoResp.RenewAt != nil {
resp.RenewAt = protoResp.RenewAt.AsTime()
}

resp.Private = protoResp.Private

return resp
}

func (p *GRPCProvider) CloseEphemeralResource(r providers.CloseEphemeralResourceRequest) (resp providers.CloseEphemeralResourceResponse) {
logger.Trace("GRPCProvider: CloseEphemeralResource")
panic("ephemeral resources not supported")

protoReq := &proto.CloseEphemeralResource_Request{
TypeName: r.TypeName,
Private: r.Private,
}

protoResp, err := p.client.CloseEphemeralResource(p.ctx, protoReq)
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
}
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))

return resp
}

func (p *GRPCProvider) CallFunction(r providers.CallFunctionRequest) (resp providers.CallFunctionResponse) {
Expand Down
Loading
Loading