Skip to content

Commit

Permalink
fix code
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <[email protected]>
  • Loading branch information
JorTurFer committed May 26, 2024
1 parent 6ade487 commit f1c5ffe
Show file tree
Hide file tree
Showing 19 changed files with 31 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/eventemitter/azure_event_grid_topic_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewAzureEventGridTopicHandler(context context.Context, clusterName string,
}
client, err = publisher.NewClientWithSharedKeyCredential(spec.Endpoint, azcore.NewKeyCredential(authParams["accessKey"]), nil)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, chainedErr := azure.NewChainedCredential(logger, podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID(), podIdentity.Provider)
creds, chainedErr := azure.NewChainedCredential(logger, podIdentity)
if chainedErr != nil {
err = chainedErr
break
Expand Down
2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/metricsservice/api/metrics_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/scalers/azure/azure_azidentity_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewChainedCredential(logger logr.Logger, podIdentity v1alpha1.AuthPodIdenti
}
}

switch podIdentity {
switch podIdentity.Provider {
case v1alpha1.PodIdentityProviderAzureWorkload:
wiCred, err := NewADWorkloadIdentityCredential(podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID())
if err != nil {
Expand All @@ -36,7 +36,7 @@ func NewChainedCredential(logger logr.Logger, podIdentity v1alpha1.AuthPodIdenti
creds = append(creds, wiCred)
}
default:
return nil, fmt.Errorf("pod identity %s not supported for azure credentials chain", podIdentity)
return nil, fmt.Errorf("pod identity %s not supported for azure credentials chain", podIdentity.Provider)
}

// Create the chained credential based on the previous 3
Expand Down
3 changes: 1 addition & 2 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func GetEventHubClient(info EventHubInfo, logger logr.Logger) (*azeventhubs.Prod
return nil, fmt.Errorf("failed to create hub client: %w", err)
}
return hub, nil
// config.PodIdentity.GetIdentityID(), config.PodIdentity.GetIdentityTenantID()
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, chainedErr := NewChainedCredential(logger, info.PodIdentity)
if chainedErr != nil {
return nil, chainedErr
Expand Down
8 changes: 0 additions & 8 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func getCheckpointFromStorageMetadata(get *azblob.DownloadStreamResponse, partit
metadata := get.Metadata

var sequencenumber *string
var offset *string
ok := false
if sequencenumber, ok = metadata["sequencenumber"]; !ok {
if sequencenumber, ok = metadata["Sequencenumber"]; !ok {
Expand All @@ -271,13 +270,6 @@ func getCheckpointFromStorageMetadata(get *azblob.DownloadStreamResponse, partit
} else {
return Checkpoint{}, fmt.Errorf("sequencenumber is not a valid int64 value: %w", err)
}

if offset, ok = metadata["offset"]; !ok {
if offset, ok = metadata["Offset"]; !ok {
return Checkpoint{}, fmt.Errorf("offset on blob not found")
}
}
checkpoint.Offset = *offset
return checkpoint, nil
}

Expand Down
30 changes: 4 additions & 26 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {

containerName := "azure-webjobs-eventhub"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/%s", consumerGroup, partitionID)
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub",
Expand All @@ -54,8 +54,6 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, "0")
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -72,7 +70,7 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {

containerName := "defaultcontainer"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/%s", consumerGroup, partitionID)

eventHubInfo := EventHubInfo{
Expand All @@ -94,8 +92,6 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -112,7 +108,7 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T

containerName := "defaultcontainerpython"
checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/%s", consumerGroup, partitionID)

eventHubInfo := EventHubInfo{
Expand All @@ -130,16 +126,11 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T
assert.NoError(t, err, "error creating checkoiunt")

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -155,7 +146,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
sequencenumber := int64(1)
sequencenumberString := strconv.FormatInt(sequencenumber, 10)
metadata := map[string]*string{
"offset": &offset,
"sequencenumber": &sequencenumberString,
}

Expand All @@ -178,16 +168,11 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
assert.NoError(t, err, "error creating checkoiunt")

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand Down Expand Up @@ -222,16 +207,11 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
assert.NoError(t, err, "error creating checkoiunt")

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand All @@ -249,7 +229,7 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {

containerName := "dapr-container"
checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}"
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber)
checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber)

urlPath := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID)

Expand All @@ -274,8 +254,6 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) {
}

check, _ := GetCheckpointFromBlobStorage(ctx, client, eventHubInfo, partitionID)
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func GetStorageBlobClient(logger logr.Logger, podIdentity kedav1alpha1.AuthPodId
}
return blobClient, nil
// config.PodIdentity.GetIdentityID(), config.PodIdentity.GetIdentityTenantID()
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, chainedErr := NewChainedCredential(logger, podIdentity)
if chainedErr != nil {
return nil, chainedErr
Expand All @@ -125,7 +125,7 @@ func GetStorageQueueClient(logger logr.Logger, podIdentity kedav1alpha1.AuthPodI
return nil, fmt.Errorf("failed to create hub client: %w", err)
}
return queueClient, nil
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, chainedErr := NewChainedCredential(logger, podIdentity)
if chainedErr != nil {
return nil, chainedErr
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *scaler
// GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition
func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionInfo azeventhubs.PartitionProperties) (newEventCount int64, checkpoint azure.Checkpoint, err error) {
// if partitionInfo.LastEnqueuedSequenceNumber = -1, that means event hub partition is empty
if partitionInfo == nil || partitionInfo.LastEnqueuedSequenceNumber == -1 {
if partitionInfo.LastEnqueuedSequenceNumber == -1 {
return 0, azure.Checkpoint{}, nil
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,50 +205,50 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat

var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{
{
checkpoint: azure.NewCheckpoint("1", 5),
checkpoint: azure.NewCheckpoint(5),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 10, LastEnqueuedOffset: 2},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("1002", 4611686018427387903),
checkpoint: azure.NewCheckpoint(4611686018427387903),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 4611686018427387905, LastEnqueuedOffset: 1000},
unprocessedEvents: 2,
},
{
checkpoint: azure.NewCheckpoint("900", 4611686018427387900),
checkpoint: azure.NewCheckpoint(4611686018427387900),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 4611686018427387905, LastEnqueuedOffset: 1000},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("800", 4000000000000200000),
checkpoint: azure.NewCheckpoint(4000000000000200000),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 4000000000000000000, LastEnqueuedOffset: 750},
unprocessedEvents: 9223372036854575807,
},
// Empty checkpoint
{
checkpoint: azure.NewCheckpoint("", 0),
checkpoint: azure.NewCheckpoint(0),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 1, LastEnqueuedOffset: 1},
unprocessedEvents: 2,
},
// Stale PartitionInfo
{
checkpoint: azure.NewCheckpoint("5", 15),
checkpoint: azure.NewCheckpoint(15),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 10, LastEnqueuedOffset: 2},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1000", 4611686018427387910),
checkpoint: azure.NewCheckpoint(4611686018427387910),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 4611686018427387905, LastEnqueuedOffset: 900},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1", 5),
checkpoint: azure.NewCheckpoint(5),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 9223372036854775797, LastEnqueuedOffset: 10000},
unprocessedEvents: 0,
},
// Circular buffer reset
{
checkpoint: azure.NewCheckpoint("100000", 9223372036854775797),
checkpoint: azure.NewCheckpoint(9223372036854775797),
partitionInfo: azeventhubs.PartitionProperties{LastEnqueuedSequenceNumber: 5, LastEnqueuedOffset: 1},
unprocessedEvents: 15,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_log_analytics_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func CreateAzureLogsClient(config *scalersconfig.ScalerConfig, meta *azureLogAna
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
creds, err = azidentity.NewClientSecretCredential(meta.tenantID, meta.clientID, meta.clientSecret, nil)
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, err = azure.NewChainedCredential(logger, config.PodIdentity)
default:
return nil, fmt.Errorf("azure monitor does not support pod identity provider - %s", config.PodIdentity.Provider)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_monitor_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func CreateAzureMetricsClient(config *scalersconfig.ScalerConfig, meta *azureMon
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
creds, err = azidentity.NewClientSecretCredential(meta.azureMonitorInfo.TenantID, meta.azureMonitorInfo.ClientID, meta.azureMonitorInfo.ClientPassword, nil)
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, err = azure.NewChainedCredential(logger, config.PodIdentity)
default:
return nil, fmt.Errorf("azure monitor does not support pod identity provider - %s", config.PodIdentity.Provider)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func getAuthMethod(logger logr.Logger, config *scalersconfig.ScalerConfig) (stri
case "", kedav1alpha1.PodIdentityProviderNone:
return "", nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no personalAccessToken given or PodIdentity provider configured")
case kedav1alpha1.PodIdentityProviderAzureWorkload:
cred, err := azure.NewChainedCredential(logger, config.PodIdentity.GetIdentityID(), config.PodIdentity.GetIdentityTenantID(), config.PodIdentity.Provider)
cred, err := azure.NewChainedCredential(logger, config.PodIdentity)
if err != nil {
return "", nil, kedav1alpha1.AuthPodIdentity{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (s *azureServiceBusScaler) getServiceBusAdminClient() (*admin.Client, error
case "", kedav1alpha1.PodIdentityProviderNone:
client, err = admin.NewClientFromConnectionString(s.metadata.connection, opts)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
creds, chainedErr := azure.NewChainedCredential(s.logger, s.podIdentity.GetIdentityID(), s.podIdentity.GetIdentityTenantID(), s.podIdentity.Provider)
creds, chainedErr := azure.NewChainedCredential(s.logger, s.podIdentity)
if chainedErr != nil {
return nil, chainedErr
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/externalscaler/externalscaler_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scalers/liiklus/LiiklusService_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scaling/resolver/azure_keyvault_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (vh *AzureKeyVaultHandler) getCredentials(ctx context.Context, client clien
}
//TODO (review jorturfer): podIdentity.GetIdentityTenantID(), podIdentity.GetIdentityAuthorityHost()
return azidentity.NewClientSecretCredential(tenantID, clientID, clientSecret, nil)
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
case kedav1alpha1.PodIdentityProviderAzureWorkload:
return azure.NewChainedCredential(logger, *podIdentity)
default:
return nil, fmt.Errorf("key vault does not support pod identity provider - %s", podIdentity.Provider)
Expand Down

0 comments on commit f1c5ffe

Please sign in to comment.