Skip to content

Commit

Permalink
merging latest changes from Microsoft/master
Browse files Browse the repository at this point in the history
  • Loading branch information
skaarthik authored Jan 29, 2017
2 parents da69c1f + 7d0f7ae commit e14b92b
Show file tree
Hide file tree
Showing 80 changed files with 1,668 additions and 929 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ language: csharp
solution: csharp/SparkCLR.sln
sudo: required
dist: trusty
env:
- JDK=openjdk7
before_install:
- sudo apt-get install xsltproc
- nuget install NUnit.Runners -Version 3.0.0 -OutputDirectory testrunner
Expand All @@ -12,6 +14,8 @@ before_install:
- export M2="$M2_HOME/bin"
- export PATH="$M2:$PATH"
- hash -r
before_script:
- jdk_switcher use $JDK
script:
- export MAVEN_OPTS="-XX:MaxPermSize=2g -Xmx4g"
- export JAVA_OPTS="-XX:MaxPermSize=2g -Xmx4g"
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ For example, the word count sample in Apache Spark can be implemented in C# as f
```c#
var lines = sparkContext.TextFile(@"hdfs://path/to/input.txt");
var words = lines.FlatMap(s => s.Split(' '));
var wordCounts = words.Map(w => new KeyValuePair<string, int>(w.Trim(), 1))
var wordCounts = words.Map(w => new Tuple<string, int>(w.Trim(), 1))
.ReduceByKey((x, y) => x + y);
var wordCountCollection = wordCounts.Collect();
wordCounts.SaveAsTextFile(@"hdfs://path/to/wordcount.txt");
Expand Down Expand Up @@ -63,7 +63,7 @@ StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpoint
.Map(kvp => Encoding.UTF8.GetString(kvp.Value))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(
.Map(columns => new Tuple<string, int>(
string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y,
windowDurationInSecs, slideDurationInSecs, 3)
Expand Down Expand Up @@ -119,6 +119,7 @@ Refer to the [docs folder](docs) for design overview and other info on Mobius
* [Configuration parameters in Mobius](./notes/configuration-mobius.md)
* [Troubleshoot errors in Mobius](./notes/troubleshooting-mobius.md)
* [Debug Mobius apps](./notes/running-mobius-app.md#debug-mode)
* [Implementing Spark Apps in F# using Mobius](./notes/spark-fsharp-mobius.md)

## Supported Spark Versions

Expand Down
3 changes: 2 additions & 1 deletion build/localmode/RunSamples.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ if "%precheck%" == "bad" (goto :EOF)
@rem
set SPARK_VERSION=2.0.0
set HADOOP_VERSION=2.6
@echo [RunSamples.cmd] SPARK_VERSION=%SPARK_VERSION%, HADOOP_VERSION=%HADOOP_VERSION%
set APACHE_DIST_SERVER=archive.apache.org
@echo [RunSamples.cmd] SPARK_VERSION=%SPARK_VERSION%, HADOOP_VERSION=%HADOOP_VERSION%, APACHE_DIST_SERVER=%APACHE_DIST_SERVER%

@rem download runtime dependencies
pushd "%CMDHOME%"
Expand Down
28 changes: 22 additions & 6 deletions build/localmode/downloadtools.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#
Param([string] $stage, [string] $verbose)

$envValue = [Environment]::GetEnvironmentVariable("APACHE_DIST_SERVER")
$apacheDistServer = if ($envValue -eq $null) { "archive.apache.org" } else { $envValue }

if ($stage.ToLower() -eq "run")
{
# retrieve hadoop and spark versions from environment variables
Expand All @@ -18,8 +21,8 @@ if ($stage.ToLower() -eq "run")

$envValue = [Environment]::GetEnvironmentVariable("SPARK_VERSION")
$sparkVersion = if ($envValue -eq $null) { "1.6.1" } else { $envValue }
Write-Output "[downloadtools] hadoopVersion=$hadoopVersion, sparkVersion=$sparkVersion"

Write-Output "[downloadtools] hadoopVersion=$hadoopVersion, sparkVersion=$sparkVersion, apacheDistServer=$apacheDistServer"
}

function Get-ScriptDirectory
Expand Down Expand Up @@ -73,8 +76,16 @@ function Download-File($url, $output)
$output = [System.IO.Path]::GetFullPath($output)
if (test-path $output)
{
Write-Output "[downloadtools.Download-File] $output exists. No need to download."
return
if ((Get-Item $output).Length -gt 0)
{
Write-Output "[downloadtools.Download-File] $output exists. No need to download."
return
}
else
{
Write-Output "[downloadtools.Download-File] [WARNING] $output exists but is empty. We need to download a new copy of the file."
Remove-Item $output
}
}

$start_time = Get-Date
Expand Down Expand Up @@ -122,6 +133,11 @@ function Download-File($url, $output)
}

Write-Output "[downloadtools.Download-File] Download completed. Time taken: $howlong"

if ( !(test-path $output) -or (Get-Item $output).Length -eq 0)
{
throw [System.IO.FileNotFoundException] "Failed to download file $output from $url"
}
}

function Unzip-File($zipFile, $targetDir)
Expand Down Expand Up @@ -252,7 +268,7 @@ function Download-BuildTools
$mvnCmd = "$toolsDir\$mvnVer\bin\mvn.cmd"
if (!(test-path $mvnCmd))
{
$url = "http://www.us.apache.org/dist/maven/maven-3/3.3.9/binaries/$mvnVer-bin.tar.gz"
$url = "http://$apacheDistServer/dist/maven/maven-3/3.3.9/binaries/$mvnVer-bin.tar.gz"
$output="$toolsDir\$mvnVer-bin.tar.gz"
Download-File $url $output
Untar-File $output $toolsDir
Expand Down Expand Up @@ -402,7 +418,7 @@ function Download-RuntimeDependencies
$sparkSubmit="$S_HOME\bin\spark-submit.cmd"
if (!(test-path $sparkSubmit))
{
$url = "http://www.us.apache.org/dist/spark/spark-$sparkVersion/spark-$sparkVersion-bin-hadoop$hadoopVersion.tgz"
$url = "http://$apacheDistServer/dist/spark/spark-$sparkVersion/spark-$sparkVersion-bin-hadoop$hadoopVersion.tgz"
$output = "$toolsDir\spark-$sparkVersion-bin-hadoop$hadoopVersion.tgz"
Download-File $url $output
Untar-File $output $toolsDir
Expand Down
5 changes: 3 additions & 2 deletions build/localmode/run-samples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ done
# setup Hadoop and Spark versions
export SPARK_VERSION=2.0.0
export HADOOP_VERSION=2.6
echo "[run-samples.sh] SPARK_VERSION=$SPARK_VERSION, HADOOP_VERSION=$HADOOP_VERSION"
export APACHE_DIST_SERVER=archive.apache.org
echo "[run-samples.sh] SPARK_VERSION=$SPARK_VERSION, HADOOP_VERSION=$HADOOP_VERSION, APACHE_DIST_SERVER=$APACHE_DIST_SERVER"

export FWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

Expand All @@ -30,7 +31,7 @@ export SPARK=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION
export SPARK_HOME="$TOOLS_DIR/$SPARK"
if [ ! -d "$SPARK_HOME" ];
then
wget "http://www.us.apache.org/dist/spark/spark-$SPARK_VERSION/$SPARK.tgz" -O "$TOOLS_DIR/$SPARK.tgz"
wget "http://$APACHE_DIST_SERVER/dist/spark/spark-$SPARK_VERSION/$SPARK.tgz" -O "$TOOLS_DIR/$SPARK.tgz"
tar xfz "$TOOLS_DIR/$SPARK.tgz" -C "$TOOLS_DIR"
fi
export PATH="$SPARK_HOME/bin:$PATH"
Expand Down
1 change: 1 addition & 0 deletions csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
<Compile Include="Sql\SparkSession.cs" />
<Compile Include="Sql\SqlContext.cs" />
<Compile Include="Sql\Types.cs" />
<Compile Include="Sql\UdfRegistration.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />
<Compile Include="Streaming\ConstantInputDStream.cs" />
<Compile Include="Streaming\DStream.cs" />
Expand Down
14 changes: 7 additions & 7 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/Accumulator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,19 @@ internal int StartUpdateServer()
for (int i = 0; i < numUpdates; i++)
{
var ms = new MemoryStream(SerDe.ReadBytes(ns));
KeyValuePair<int, dynamic> update = (KeyValuePair<int, dynamic>)formatter.Deserialize(ms);
var update = (Tuple<int, dynamic>)formatter.Deserialize(ms);
if (Accumulator.accumulatorRegistry.ContainsKey(update.Key))
if (Accumulator.accumulatorRegistry.ContainsKey(update.Item1))
{
Accumulator accumulator = Accumulator.accumulatorRegistry[update.Key];
accumulator.GetType().GetMethod("Add").Invoke(accumulator, new object[] { update.Value });
Accumulator accumulator = Accumulator.accumulatorRegistry[update.Item1];
accumulator.GetType().GetMethod("Add").Invoke(accumulator, new object[] { update.Item2 });
}
else
{
Console.Error.WriteLine("WARN: cann't find update.Key: {0} for accumulator, will create a new one", update.Key);
Console.Error.WriteLine("WARN: cann't find update.Key: {0} for accumulator, will create a new one", update.Item1);
var genericAccumulatorType = typeof(Accumulator<>);
var specificAccumulatorType = genericAccumulatorType.MakeGenericType(update.Value.GetType());
Activator.CreateInstance(specificAccumulatorType, new object[] { update.Key, update.Value });
var specificAccumulatorType = genericAccumulatorType.MakeGenericType(update.Item2.GetType());
Activator.CreateInstance(specificAccumulatorType, new object[] { update.Item1, update.Item2 });
}
}
ns.WriteByte((byte)1); // acknowledge byte other than -1
Expand Down
30 changes: 15 additions & 15 deletions csharp/Adapter/Microsoft.Spark.CSharp/Core/OrderedRDDFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,31 @@ public static class OrderedRDDFunctions
{

/// <summary>
/// Sorts this RDD, which is assumed to consist of KeyValuePair pairs.
/// Sorts this RDD, which is assumed to consist of Tuple pairs.
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
/// <param name="self"></param>
/// <param name="ascending"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static RDD<KeyValuePair<K, V>> SortByKey<K, V>(this RDD<KeyValuePair<K, V>> self,
public static RDD<Tuple<K, V>> SortByKey<K, V>(this RDD<Tuple<K, V>> self,
bool ascending = true, int? numPartitions = null)
{
return SortByKey<K, V, K>(self, ascending, numPartitions, new DefaultSortKeyFuncHelper<K>().Execute);
}
/// <summary>
/// Sorts this RDD, which is assumed to consist of KeyValuePairs. If key is type of string, case is sensitive.
/// Sorts this RDD, which is assumed to consist of Tuples. If Item1 is type of string, case is sensitive.
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="self"></param>
/// <param name="ascending"></param>
/// <param name="numPartitions">Number of partitions. Each partition of the sorted RDD contains a sorted range of the elements.</param>
/// <param name="keyFunc">RDD will sort by keyFunc(key) for every key in KeyValuePair. Must not be null.</param>
/// <param name="keyFunc">RDD will sort by keyFunc(key) for every Item1 in Tuple. Must not be null.</param>
/// <returns></returns>
public static RDD<KeyValuePair<K, V>> SortByKey<K, V, U>(this RDD<KeyValuePair<K, V>> self,
public static RDD<Tuple<K, V>> SortByKey<K, V, U>(this RDD<Tuple<K, V>> self,
bool ascending, int? numPartitions, Func<K, U> keyFunc)
{
if (keyFunc == null)
Expand Down Expand Up @@ -73,7 +73,7 @@ public static RDD<KeyValuePair<K, V>> SortByKey<K, V, U>(this RDD<KeyValuePair<K
/* first compute the boundary of each part via sampling: we want to partition
* the key-space into bins such that the bins have roughly the same
* number of (key, value) pairs falling into them */
U[] samples = self.Sample(false, fraction, 1).Map(kv => kv.Key).Collect().Select(k => keyFunc(k)).ToArray();
U[] samples = self.Sample(false, fraction, 1).Map(kv => kv.Item1).Collect().Select(k => keyFunc(k)).ToArray();
Array.Sort(samples, StringComparer.Ordinal); // case sensitive if key type is string

List<U> bounds = new List<U>();
Expand Down Expand Up @@ -103,13 +103,13 @@ public static RDD<KeyValuePair<K, V>> SortByKey<K, V, U>(this RDD<KeyValuePair<K
/// <param name="partitionFunc"></param>
/// <param name="ascending"></param>
/// <returns></returns>
public static RDD<KeyValuePair<K, V>> repartitionAndSortWithinPartitions<K, V>(
this RDD<KeyValuePair<K, V>> self,
public static RDD<Tuple<K, V>> repartitionAndSortWithinPartitions<K, V>(
this RDD<Tuple<K, V>> self,
int? numPartitions = null,
Func<K, int> partitionFunc = null,
bool ascending = true)
{
return self.MapPartitionsWithIndex<KeyValuePair<K, V>>((pid, iter) => ascending ? iter.OrderBy(kv => kv.Key) : iter.OrderByDescending(kv => kv.Key));
return self.MapPartitionsWithIndex<Tuple<K, V>>((pid, iter) => ascending ? iter.OrderBy(kv => kv.Item1) : iter.OrderByDescending(kv => kv.Item1));
}

[Serializable]
Expand All @@ -123,22 +123,22 @@ public SortByKeyHelper(Func<K, U> f, bool ascending = true)
this.ascending = ascending;
}

public IEnumerable<KeyValuePair<K, V>> Execute(int pid, IEnumerable<KeyValuePair<K, V>> kvs)
public IEnumerable<Tuple<K, V>> Execute(int pid, IEnumerable<Tuple<K, V>> kvs)
{
IEnumerable<KeyValuePair<K, V>> ordered;
IEnumerable<Tuple<K, V>> ordered;
if (ascending)
{
if (typeof(K) == typeof(string))
ordered = kvs.OrderBy(k => func(k.Key).ToString(), StringComparer.Ordinal);
ordered = kvs.OrderBy(k => func(k.Item1).ToString(), StringComparer.Ordinal);
else
ordered = kvs.OrderBy(k => func(k.Key));
ordered = kvs.OrderBy(k => func(k.Item1));
}
else
{
if (typeof(K) == typeof(string))
ordered = kvs.OrderByDescending(k => func(k.Key).ToString(), StringComparer.Ordinal);
ordered = kvs.OrderByDescending(k => func(k.Item1).ToString(), StringComparer.Ordinal);
else
ordered = kvs.OrderByDescending(k => func(k.Key));
ordered = kvs.OrderByDescending(k => func(k.Item1));
}
return ordered;
}
Expand Down
Loading

0 comments on commit e14b92b

Please sign in to comment.