Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For databricks livequery mount storage account container #118

Merged
merged 3 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public static class Constants
public const string PrefixKeyVault = "keyvault";
public const string PrefixHdfs = "hdfs://";
public const string PrefixDbfs = "dbfs:/";
public const string PrefixDbfsMount = "mnt/livequery/";

public const string AccountSecretPrefix = "datax-sa-";
}
Expand Down
5 changes: 4 additions & 1 deletion Services/DataX.Flow/DataX.Flow.Common/EngineEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ public async Task<ApiResult> GetEnvironmentVariables()

FlowBlobConnectionString = KeyVault.GetSecretFromKeyvault(serviceKeyvaultName, flowConfigObj.ConfiggenSecretPrefix + flowConfigObj.StorageAccountName + "-blobconnectionstring");
OpsBlobConnectionString = KeyVault.GetSecretFromKeyvault(serviceKeyvaultName, flowConfigObj.ConfiggenSecretPrefix + flowConfigObj.OpsStorageAccountName + "-blobconnectionstring");
SparkConnInfo = Helper.ParseConnectionString(Helper.PathResolver(flowConfigObj.SparkConnectionString));
if (EngineFlowConfig.SparkType != DataX.Config.ConfigDataModel.Constants.SparkTypeDataBricks)
rohit489 marked this conversation as resolved.
Show resolved Hide resolved
{
SparkConnInfo = Helper.ParseConnectionString(Helper.PathResolver(flowConfigObj.SparkConnectionString));
}
return ApiResult.CreateSuccess("");
}

Expand Down
43 changes: 42 additions & 1 deletion Services/DataX.Flow/DataX.Flow.Common/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using DataX.Utilities.KeyVault;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
Expand Down Expand Up @@ -352,7 +353,47 @@ public static SparkConnectionInfo ParseConnectionString(string connectionString)
UserName = match.Groups[2].Value,
Password = match.Groups[3].Value
};
}
}

/// <summary>
/// This method converts wasbs path to dbfs file path
/// </summary>
/// <param name="filePath">wasbs file path</param>
/// <param name="fileName">file name</param>
/// <returns>Returns dbfs file path</returns>
public static string ConvertToDbfsFilePath(string filePath, string fileName = "")
{
Regex opsPath = new Regex(@"wasbs:\/\/(.*)@(.*).blob.core.windows.net\/(.*)$", RegexOptions.IgnoreCase);
var match = opsPath.Match(filePath);
if (match.Success)
{
string result = Path.Combine(Config.ConfigDataModel.Constants.PrefixDbfs, Config.ConfigDataModel.Constants.PrefixDbfsMount + match.Groups[1].Value + "/", match.Groups[3].Value, fileName);
return result;
}
else
{
throw new Exception("Cannot convert to DBFS file path");
}
}

/// <summary>
/// This method returns a string value based on the spark type
/// </summary>
/// <param name="sparkType">sparkType</param>
/// <param name="valueForHDInsightEnv">Value to be used in case of HDInsight environment</param>
/// <param name="valueForDatabricksEnv">Value to be used in case of Databricks environment</param>
/// <returns>Returns string value based on spark type</returns>
public static string SetValueBasedOnSparkType(string sparkType, string valueForHDInsightEnv, string valueForDatabricksEnv)
{
if (sparkType != Config.ConfigDataModel.Constants.SparkTypeDataBricks)
{
return valueForHDInsightEnv;
}
else
{
return valueForDatabricksEnv;
}
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="MSTest.TestAdapter" Version="1.3.2" />
<PackageReference Include="MSTest.TestFramework" Version="1.3.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DataX.Flow.InteractiveQuery\DataX.Flow.InteractiveQuery.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// *********************************************************************
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using DataX.Flow.Common;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace DataX.Flow.InteractiveQuery.Tests
{
[TestClass]
public class InteractiveQueryTests
{
[TestMethod]
public void ConvertWasbsToDbfsFilePath()
{
//Test ConvertToDbfsFilePath with both filePath and fileName parameters
string wasbsPath = "wasbs://[email protected]/";
string fileName = "testFile.json";
string actualValue = Helper.ConvertToDbfsFilePath(wasbsPath, fileName);
string expectedValue = "dbfs:/mnt/livequery/mycontainer/testFile.json";
Assert.AreEqual(expectedValue, actualValue, "DBFS file path is incorrect");

//Test ConvertToDbfsFilePath with only filePath parameter
wasbsPath = "wasbs://[email protected]/testfolder/testFile.json";
actualValue = Helper.ConvertToDbfsFilePath(wasbsPath);
expectedValue = "dbfs:/mnt/livequery/mycontainer/testfolder/testFile.json";
Assert.AreEqual(expectedValue, actualValue, "DBFS file path is incorrect");
}

[TestMethod]
public void TestMountCode()
{
string expectedValue = "dbutils.fs.mount(source = \"wasbs://[email protected]/\", mountPoint = \"/mnt/livequery//mycontainer\", extraConfigs = Map(\"fs.azure.account.key.teststorageaccount.blob.core.windows.net\"->dbutils.secrets.get(scope = \"testkeyvault\", key = \"datax-sa-teststorageaccount\")))";

//Test unnested file path
string dbfsPath = "dbfs:/mnt/livequery/mycontainer/testFile.json";
string opsStorageAccountName = "teststorageaccount";
string sparkKeyVaultName = "testkeyvault";
string actualValue = KernelService.CreateMountCode(dbfsPath, opsStorageAccountName, sparkKeyVaultName);
Assert.AreEqual(expectedValue, actualValue, "Mount code is incorrect");

//Test nested file path
dbfsPath = "dbfs:/mnt/livequery/mycontainer/folder1/folder2/testFile.json";
actualValue = KernelService.CreateMountCode(dbfsPath, opsStorageAccountName, sparkKeyVaultName);
Assert.AreEqual(expectedValue, actualValue, "Mount code is incorrect");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using System.Text.RegularExpressions;

namespace DataX.Flow.InteractiveQuery
{
Expand All @@ -37,8 +38,8 @@ public InteractiveQueryManager(ILogger logger, IConfiguration configuration)
_engineEnvironment = new EngineEnvironment(_configuration);
}

private string SetupSteps { get; set; } = string.Empty;

private string SetupSteps { get; set; } = string.Empty;
/// <summary>
/// This method gets called for api/kernel
/// </summary>
Expand All @@ -65,8 +66,10 @@ public async Task<ApiResult> CreateAndInitializeKernel(JObject jObject)
}

var hashValue = Helper.GetHashCode(diag.UserName);
string sampleDataPath = Path.Combine(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json");

string sampleDataPath = Helper.SetValueBasedOnSparkType(_engineEnvironment.EngineFlowConfig.SparkType,
Path.Combine(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json"),
Helper.ConvertToDbfsFilePath(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json"));

response = await CreateAndInitializeKernelHelper(diag.InputSchema, diag.UserName, diag.Name, sampleDataPath, diag.NormalizationSnippet, diag.ReferenceDatas, diag.Functions, diag.DatabricksToken);
if (response.Error.HasValue && response.Error.Value)
{
Expand Down Expand Up @@ -214,8 +217,11 @@ public async Task<ApiResult> RecycleKernelHelper(InteractiveQueryObject diag, bo
KernelService kernelService = CreateKernelService(diag.DatabricksToken);

//Create the xml with the scala steps to execute to initialize the kernel
var hashValue = Helper.GetHashCode(diag.UserName);
var sampleDataPath = Path.Combine(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json");
var hashValue = Helper.GetHashCode(diag.UserName);
var sampleDataPath = Helper.SetValueBasedOnSparkType(_engineEnvironment.EngineFlowConfig.SparkType,
Path.Combine(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json"),
Helper.ConvertToDbfsFilePath(_engineEnvironment.OpsSparkSamplePath, $"{diag.Name}-{hashValue}.json"));

DiagnosticInputhelper(diag.InputSchema, sampleDataPath, diag.NormalizationSnippet, diag.Name);

response = await kernelService.RecycleKernelAsync(diag.KernelId, diag.UserName, diag.Name, _engineEnvironment.OpsBlobConnectionString, Path.Combine(_engineEnvironment.OpsDiagnosticPath, _GarbageCollectBlobName), SetupSteps, isReSample, diag.ReferenceDatas, diag.Functions);
Expand Down Expand Up @@ -329,6 +335,11 @@ private async Task<ApiResult> CreateAndInitializeKernelHelper(string rawSchema,
return ApiResult.CreateError(response.Message);
}

if(_engineEnvironment.EngineFlowConfig.SparkType == Config.ConfigDataModel.Constants.SparkTypeDataBricks)
{
kernelService.MountStorage(_engineEnvironment.EngineFlowConfig, sampleDataPath, kernelId);
}

response = await kernelService.CreateandInitializeKernelAsync(kernelId, SetupSteps, false, referenceDatas, functions);
if (response.Error.HasValue && response.Error.Value)
{
Expand Down Expand Up @@ -479,7 +490,7 @@ private void DiagnosticInputhelper(string rawSchema, string sampleDataPath, stri
{
["RawSchema"] = rawSchema,
["SampleDataPath"] = sampleDataPath,
["NormalizationSnippet"] = finalNormalizationString,
["NormalizationSnippet"] = finalNormalizationString,
["BinName"] = TranslateBinNames(_engineEnvironment.EngineFlowConfig.BinaryName, _engineEnvironment.EngineFlowConfig.OpsStorageAccountName, _engineEnvironment.EngineFlowConfig.InteractiveQueryDefaultContainer),
["KernelDisplayName"] = _engineEnvironment.GenerateKernelDisplayName(flowId)
};
Expand Down
Loading