Skip to content

Commit

Permalink
update not to create consumer groups for EventHub Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
kjcho-msft committed May 18, 2019
1 parent 597ccd4 commit b00905a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public override async Task<string> Process(FlowDeploymentSession flowToDeploy)

// Create consumer group only if the resource creation flag is set. This is to support scenario where EventHub
// is in a different subscription than where services are deployed
if (Configuration[Constants.ConfigSettingName_ResourceCreation].ToLower(CultureInfo.InvariantCulture) == "true")
if (Configuration[Constants.ConfigSettingName_ResourceCreation].ToLower(CultureInfo.InvariantCulture) == "true" && (inputType == Constants.InputType_EventHub || inputType == Constants.InputType_IoTHub))
{
var serviceKeyVaultName = Configuration[Constants.ConfigSettingName_ServiceKeyVaultName];
Ensure.NotNull(serviceKeyVaultName, "serviceKeyVaultName");
Expand All @@ -126,49 +126,46 @@ public override async Task<string> Process(FlowDeploymentSession flowToDeploy)
var inputSubscriptionId = string.IsNullOrEmpty(config?.Properties?.InputSubscriptionId) ? await KeyVaultClient.ResolveSecretUriAsync(flowToDeploy.GetTokenString(TokenName_InputEventHubSubscriptionId)) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputSubscriptionId);
var inputResourceGroupName = string.IsNullOrEmpty(config?.Properties?.InputResourceGroup) ? flowToDeploy.GetTokenString(TokenName_InputEventHubResourceGroupName) : await KeyVaultClient.ResolveSecretUriAsync(config?.Properties?.InputResourceGroup);

if (inputType != Constants.InputType_Kafka)
Result result = null;
switch (inputType)
{
Result result = null;
switch (inputType)
{
case Constants.InputType_EventHub:
case Constants.InputType_KafkaEventHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Namespace) || string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse Event Hub connection string; please check input.");
}
result = await EventHubUtil.CreateEventHubConsumerGroups(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubNamespace: hubInfo.Namespace,
hubNames: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
case Constants.InputType_IoTHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse IoT Hub connection string; please check input.");
}
result = await EventHubUtil.CreateIotHubConsumerGroup(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubName: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
default:
throw new ConfigGenerationException($"unexpected inputtype '{inputType}'.");
}

Ensure.IsSuccessResult(result);
case Constants.InputType_EventHub:
case Constants.InputType_KafkaEventHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Namespace) || string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse Event Hub connection string; please check input.");
}
result = await EventHubUtil.CreateEventHubConsumerGroups(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubNamespace: hubInfo.Namespace,
hubNames: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
case Constants.InputType_IoTHub:
//Check for required parameters
if (string.IsNullOrEmpty(hubInfo.Name))
{
throw new ConfigGenerationException("Could not parse IoT Hub connection string; please check input.");
}
result = await EventHubUtil.CreateIotHubConsumerGroup(
clientId: clientId,
tenantId: tenantId,
secretKey: resolvedSecretKey,
subscriptionId: inputSubscriptionId,
resourceGroupName: inputResourceGroupName,
hubName: hubInfo.Name,
consumerGroupName: consumerGroupName);
break;
default:
throw new ConfigGenerationException($"unexpected inputtype '{inputType}'.");
}

Ensure.IsSuccessResult(result);
}

flowToDeploy.SetStringToken(TokenName_InputEventHubConsumerGroup, consumerGroupName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public async Task<ApiResult> DeleteFlow(JObject jObject)

// ResourceCreation is one of the environment variables.
// If you don't want to create resource, you can set this to false.
if (_engineEnvironment.ResourceCreation && diag.InputType != Constants.InputType_Kafka)
if (_engineEnvironment.ResourceCreation && (diag.InputType == Constants.InputType_EventHub || diag.InputType == Constants.InputType_IoTHub))
{
var inputSubscriptionId = string.IsNullOrEmpty(diag.InputSubscriptionId) ? Helper.GetSecretFromKeyvaultIfNeeded(_engineEnvironment.EngineFlowConfig.SubscriptionId) : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputSubscriptionId);
var inputResourceGroup = string.IsNullOrEmpty(diag.InputResourceGroup) ? _engineEnvironment.EngineFlowConfig.EventHubResourceGroupName : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputResourceGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Build", "CA2227:Change 'Errors' to be read-only by removing the property setter.", Justification = "Critical issues only", Scope = "member", Target = "~P:DataX.Flow.SchemaInference.SchemaResult.Errors")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Build", "CA2227:Change 'fields' to be read-only by removing the property setter.", Justification = "Critical issues only", Scope = "member", Target = "~P:DataX.Flow.SchemaInference.StructObject.fields")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Build", "CA1724:The type name Metadata conflicts in whole or in part with the namespace name 'System.Runtime.Remoting.Metadata' defined in the .NET Framework. Rename the type to eliminate the conflict.", Justification = "Critical issues only", Scope = "type", Target = "~T:DataX.Flow.SchemaInference.Metadata")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:Naming Styles", Justification = "Critical issues only", Scope = "member", Target = "~F:DataX.Flow.SchemaInference.SchemaGenerator._certLocation")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:Naming Styles", Justification = "Critical issues only", Scope = "member", Target = "~F:DataX.Flow.SchemaInference.Kafka.KafkaMessageBus._cacertLocation")]
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
using Confluent.Kafka;
using DataX.Config;
using DataX.Config.ConfigDataModel;
using DataX.ServiceHost.ServiceFabric;
using DataX.Utilities.KeyVault;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
Expand All @@ -20,21 +24,26 @@ public class KafkaMessageBus : IMessageBus
private readonly string _brokerList = string.Empty;
private readonly string _connectionString = null;
private readonly List<string> _topics = null;
private readonly string _cacertLocation = string.Empty;
private readonly string _cacertLocation = @".\cacert.pem";
private readonly string _consumerGroup = string.Empty;
private readonly string _inputType = string.Empty;
private readonly ILogger _logger;
private bool _timeout = false;
private Timer _timer;


//public KafkaMessageBus(List<string> eventhubNames, string consumerGroup, string eventhubConnectionString, string inputType, ILogger logger)
public KafkaMessageBus(string brokerList, string connectionString, List<string> topics, string cacertLocation, string consumerGroup, string inputType, ILogger logger)
public KafkaMessageBus(string brokerList, string connectionString, List<string> topics, string consumerGroup, string inputType, ILogger logger)
{
if (!File.Exists(_cacertLocation))
{
var certSource = KeyVault.GetSecretFromKeyvault(ServiceFabricUtil.GetServiceFabricConfigSetting("CACertificateLocation").Result.ToString());

WebClient webClient = new WebClient();
webClient.DownloadFile(certSource, _cacertLocation);
}

_brokerList = brokerList;
_connectionString = connectionString;
_topics = topics;
_cacertLocation = cacertLocation;
_consumerGroup = consumerGroup;
_inputType = inputType;
_logger = logger;
Expand Down
22 changes: 5 additions & 17 deletions Services/DataX.Flow/DataX.Flow.SchemaInference/SchemaGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using Microsoft.Extensions.Logging;
using DataX.Flow.Common;
using DataX.Utilities.Blob;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using DataX.Config.ConfigDataModel;
using DataX.Flow.Common;
using DataX.Flow.SchemaInference.Eventhub;
using DataX.Flow.SchemaInference.Kafka;
using DataX.Config.ConfigDataModel;
using DataX.Utilities.KeyVault;
using DataX.ServiceHost.ServiceFabric;
using DataX.Utilities.Blob;
using Microsoft.Extensions.Logging;

namespace DataX.Flow.SchemaInference
{
Expand All @@ -25,7 +22,6 @@ public class SchemaGenerator
private readonly string _checkpointContainerName = "";
private IMessageBus _messageBus = null;
private ILogger _logger;
private const string _certLocation = @".\cacert.pem";

/// <summary>
/// Schema Generator constructor called for generating schema
Expand All @@ -45,17 +41,9 @@ public SchemaGenerator(string brokerList, string connectionString, List<string>
_blobDirectory = blobDirectory;
_checkpointContainerName = checkpointContainerName;

if (!File.Exists(_certLocation))
{
var certSource = KeyVault.GetSecretFromKeyvault(ServiceFabricUtil.GetServiceFabricConfigSetting("CACertificateLocation").Result.ToString());

WebClient webClient = new WebClient();
webClient.DownloadFile(certSource, _certLocation);
}

if (inputType == Constants.InputType_Kafka || inputType == Constants.InputType_KafkaEventHub)
{
_messageBus = new KafkaMessageBus(brokerList, connectionString, hubNames, _certLocation, consumerGroup, inputType, logger);
_messageBus = new KafkaMessageBus(brokerList, connectionString, hubNames, consumerGroup, inputType, logger);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task<ApiResult> GetInputSchema(JObject jObject)

// ResourceCreation is one of the environment variables.
// If you don't want to create resource, you can set this to false.
if (_engineEnvironment.ResourceCreation && diag.InputType != Constants.InputType_Kafka)
if (_engineEnvironment.ResourceCreation && (diag.InputType == Constants.InputType_EventHub || diag.InputType == Constants.InputType_IoTHub))
{
var inputSubscriptionId = string.IsNullOrEmpty(diag.InputSubscriptionId) ? Helper.GetSecretFromKeyvaultIfNeeded(_engineEnvironment.EngineFlowConfig.SubscriptionId) : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputSubscriptionId);
var inputResourceGroup = string.IsNullOrEmpty(diag.InputResourceGroup) ? _engineEnvironment.EngineFlowConfig.EventHubResourceGroupName : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputResourceGroup);
Expand Down Expand Up @@ -104,7 +104,7 @@ public async Task<ApiResult> RefreshSample(JObject jObject)

// ResourceCreation is one of the environment variables.
// If you don't want to create resource, you can set this to false.
if (_engineEnvironment.ResourceCreation && diag.InputType != Constants.InputType_Kafka)
if (_engineEnvironment.ResourceCreation && (diag.InputType == Constants.InputType_EventHub || diag.InputType == Constants.InputType_IoTHub))
{
var inputSubscriptionId = string.IsNullOrEmpty(diag.InputSubscriptionId) ? Helper.GetSecretFromKeyvaultIfNeeded(_engineEnvironment.EngineFlowConfig.SubscriptionId) : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputSubscriptionId);
var inputResourceGroup = string.IsNullOrEmpty(diag.InputResourceGroup) ? _engineEnvironment.EngineFlowConfig.EventHubResourceGroupName : Helper.GetSecretFromKeyvaultIfNeeded(diag.InputResourceGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Licensed under the MIT License
<Parameter Name="cosmosDBConfigDatabaseName" Value="[cosmosDBConfigDatabaseName]" />
<Parameter Name="cosmosDBConfigCollectionName" Value="[cosmosDBConfigCollectionName]" />
<Parameter Name="AppInsightsIntrumentationKey" Value="[AppInsightsIntrumentationKey]" />
<Parameter Name="CACertificateLocation" Value="[CACertificateLocation]" />
</Section>
</Settings>
</ConfigOverride>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ Licensed under the MIT License
<Parameter Name="cosmosDBConfigDatabaseName" Value="" MustOverride="true" />
<Parameter Name="cosmosDBConfigCollectionName" Value="" MustOverride="true" />
<Parameter Name="AppInsightsIntrumentationKey" Value="" MustOverride="true" />
<Parameter Name="CACertificateLocation" Value="" MustOverride="true" />
</Section>
</Settings>

0 comments on commit b00905a

Please sign in to comment.