Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Examples/AvroExamples/AvroExamples/2-RegisterAssemblies.usql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
DROP ASSEMBLY IF EXISTS [Avro];
USE DATABASE [Avro];

DROP ASSEMBLY IF EXISTS [Avro];
CREATE ASSEMBLY [Avro] FROM @"/Assemblies/Avro/Avro.dll";
DROP ASSEMBLY IF EXISTS [Microsoft.Analytics.Samples.Formats];
CREATE ASSEMBLY [Microsoft.Analytics.Samples.Formats] FROM @"/Assemblies/Avro/Microsoft.Analytics.Samples.Formats.dll";
Expand Down
4 changes: 2 additions & 2 deletions Examples/AvroExamples/AvroExamples/3-SimpleAvro.usql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ REFERENCE ASSEMBLY [log4net];
REFERENCE ASSEMBLY [Avro];
REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];

DECLARE @input_file string = @"\TwitterStream\{*}\{*}\{*}.avro";
DECLARE @output_file string = @"\output\twitter.csv";
DECLARE @input_file string = @"/Avro/Samples/twitter.avro";
DECLARE @output_file string = @"/output/twitter.csv";

@rs =
EXTRACT
Expand Down
64 changes: 64 additions & 0 deletions Examples/AvroExamples/AvroExamples/4-EventHubsCapture.usql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
USE DATABASE Avro;

REFERENCE ASSEMBLY [Newtonsoft.Json];
REFERENCE ASSEMBLY [log4net];
REFERENCE ASSEMBLY [Avro];
REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];

DECLARE @input_file string = @"/Avro/Samples/EHC.avro";
DECLARE @output_file string = @"/output/EHC.csv";

@rs =
EXTRACT
SequenceNumber long,
Offset string,
EnqueuedTimeUtc string,
SystemProperties SQL.MAP<string,string>,
Properties SQL.MAP<string,string>,
Body byte[]
FROM @input_file
USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"
{
""type"" : ""record"",
""name"" : ""EventData"",
""namespace"" : ""Microsoft.ServiceBus.Messaging"",
""fields"" : [ {
""name"" : ""SequenceNumber"",
""type"" : ""long""
}, {
""name"" : ""Offset"",
""type"" : ""string""
}, {
""name"" : ""EnqueuedTimeUtc"",
""type"" : ""string""
}, {
""name"" : ""SystemProperties"",
""type"" : {
""type"" : ""map"",
""values"" : [ ""long"", ""double"", ""string"", ""bytes"" ]
}
}, {
""name"" : ""Properties"",
""type"" : {
""type"" : ""map"",
""values"" : [ ""long"", ""double"", ""string"", ""bytes"", ""null"" ]
}
}, {
""name"" : ""Body"",
""type"" : [ ""null"", ""bytes"" ]
} ]
}
");

@mapexplode =
SELECT SequenceNumber,
Offset,
EnqueuedTimeUtc,
Encoding.UTF8.GetString(Body) AS Body,
p.key AS PropertiesKey,
p.value AS PropertiesValue
FROM @rs
OUTER APPLY EXPLODE(Properties) AS p(key, value);

OUTPUT @mapexplode TO @output_file USING Outputters.Text();

12 changes: 12 additions & 0 deletions Examples/AvroExamples/AvroExamples/4-EventHubsCapture.usql.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace AvroExamples
{

}
10 changes: 8 additions & 2 deletions Examples/AvroExamples/AvroExamples/AvroExamples.usqlproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<USQLProjectVersion>2</USQLProjectVersion>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<SchemaVersion>2.0</SchemaVersion>
Expand All @@ -11,7 +12,6 @@
<Name>AvroExamples</Name>
<RootNamespace>AvroExamples</RootNamespace>
<RuntimeVersion>default</RuntimeVersion>
<OutputStreamPath>C:\Users\flmader\AppData\Local\USQLDataRoot</OutputStreamPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand All @@ -25,6 +25,7 @@
<Script Include="1-CreateDB.usql" />
<Script Include="2-RegisterAssemblies.usql" />
<Script Include="3-SimpleAvro.usql" />
<Script Include="4-EventHubsCapture.usql" />
<ScriptCode Include="1-CreateDB.usql.cs">
<DependentUpon>1-CreateDB.usql</DependentUpon>
</ScriptCode>
Expand All @@ -34,6 +35,11 @@
<ScriptCode Include="3-SimpleAvro.usql.cs">
<DependentUpon>3-SimpleAvro.usql</DependentUpon>
</ScriptCode>
<ScriptCode Include="4-EventHubsCapture.usql.cs">
<DependentUpon>4-EventHubsCapture.usql</DependentUpon>
</ScriptCode>
</ItemGroup>
<Import Project="$(AppData)\Microsoft\DataLake\MsBuild\1.0\Usql.targets" />
<!--target for CICD build added by project migration tool-->
<Import Project="USqlSDKBuild.targets" Condition="Exists('USqlSDKBuild.targets')" />
<Import Project="$(USQLSDKPath)\USqlSDKBuild.targets" Condition="!Exists('USqlSDKBuild.targets') And '$(USQLSDKPath)' != '' And Exists('$(USQLSDKPath)\USqlSDKBuild.targets')" />
</Project>
39 changes: 39 additions & 0 deletions Examples/AvroExamples/AvroExamples/AvroExamples.usqlproj.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>3013db4a-4f98-4ee0-9796-3172fb40c065</ProjectGuid>
<OutputType>File</OutputType>
<AssemblyName>Algebra.xml</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<Name>AvroExamples</Name>
<RootNamespace>AvroExamples</RootNamespace>
<RuntimeVersion>default</RuntimeVersion>
<OutputStreamPath>C:\Users\flmader\AppData\Local\USQLDataRoot</OutputStreamPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugSymbols>false</DebugSymbols>
<OutputPath>bin\Release\</OutputPath>
</PropertyGroup>
<ItemGroup>
<Script Include="1-CreateDB.usql" />
<Script Include="2-RegisterAssemblies.usql" />
<Script Include="3-SimpleAvro.usql" />
<ScriptCode Include="1-CreateDB.usql.cs">
<DependentUpon>1-CreateDB.usql</DependentUpon>
</ScriptCode>
<ScriptCode Include="2-RegisterAssemblies.usql.cs">
<DependentUpon>2-RegisterAssemblies.usql</DependentUpon>
</ScriptCode>
<ScriptCode Include="3-SimpleAvro.usql.cs">
<DependentUpon>3-SimpleAvro.usql</DependentUpon>
</ScriptCode>
</ItemGroup>
<Import Project="$(AppData)\Microsoft\DataLake\MsBuild\1.0\Usql.targets" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,165 @@ public void AvroExtractor_DatatypeNullableString_Extracted()
Assert.IsTrue(result[1].Get<string>("Value") == null);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfInt_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""int""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", 1 } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, int?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, int?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, int?>>("Value").Values[0] == 1);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfBool_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""boolean""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", true } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, bool?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, bool?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, bool?>>("Value").Values[0] == true);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfLong_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""long""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", 0x7FFFFFFFFFFFFFFF } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, long?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, long?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, long?>>("Value").Values[0] == 0x7FFFFFFFFFFFFFFF);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfDouble_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""double""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", 3D } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, double?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, double?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, double?>>("Value").Values[0] == 3D);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfString_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""string""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", "value1" } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, string>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, string>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, string>>("Value").Values[0] == "value1");
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfFloat_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""float""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", 3.5F } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, float?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, float?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, float?>>("Value").Values[0] == 3.5F);
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfByte_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""bytes""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
byte[] bytes = { 2, 4, 6 };
var dict = new Dictionary<string, object> { { "item1", bytes } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, byte[]>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, byte[]>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, byte[]>>("Value").Values[0].SequenceEqual(bytes));
}

[TestMethod]
public void AvroExtractor_DatatypeMap_MapOfInt_EmptyMap_Extracted()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": [""int"",""null""]}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object>();
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, int?>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, int?>>("Value"));
Assert.IsTrue(result[0].Get<SqlMap<string, int?>>("Value").Values.Count == 0);
}

[TestMethod]
[ExpectedException(typeof(Exception), "Unsupported datatype for SQL.MAP.")]
public void AvroExtractor_DatatypeMap_MapOfUnspportedType_Exception()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""int""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", 1 } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, int[]>>(data, schema);

Assert.IsNotNull(result[0].Get<SqlMap<string, int>>("Value"));
}

[TestMethod]
[ExpectedException(typeof(Exception), "")]
public void AvroExtractor_DatatypeMap_DatatypeMismatch_Exception()
{
var schema = @"{""fields"":[{""name"": ""Value"",""type"": { ""type"": ""map"",""values"": ""string""}}],""name"": ""SingleColumnPoco"",""namespace"": ""Microsoft.Analytics.Samples.Formats.Tests"",""type"": ""record""}";
var dict = new Dictionary<string, object> { { "item1", "asdf" } };
var data = new List<SingleColumnPoco<Dictionary<string, object>>>
{
new SingleColumnPoco<Dictionary<string, object>>() { Value = dict }
};

var result = ExecuteExtract<Dictionary<string, object>, SqlMap<string, int?>>(data, schema);
}

[TestMethod]
public void AvroExtractor_EmptyFile_ReturnNoRow()
{
Expand All @@ -318,7 +477,12 @@ public void AvroExtractor_EmptyFile_ReturnNoRow()

private IList<IRow> ExecuteExtract<T>(List<SingleColumnPoco<T>> data, string schema)
{
var output = SingleColumnRowGenerator<T>().AsUpdatable();
return ExecuteExtract<T,T>(data, schema);
}

private IList<IRow> ExecuteExtract<T,O>(List<SingleColumnPoco<T>> data, string schema)
{
var output = SingleColumnRowGenerator<O>().AsUpdatable();

using (var dataStream = new MemoryStream())
{
Expand Down
Loading