diff --git a/Examples/DataFormats/.vs/Microsoft.Analytics.Samples/v15/sqlite3/storage.ide b/Examples/DataFormats/.vs/Microsoft.Analytics.Samples/v15/sqlite3/storage.ide new file mode 100644 index 0000000..d11cf2c Binary files /dev/null and b/Examples/DataFormats/.vs/Microsoft.Analytics.Samples/v15/sqlite3/storage.ide differ diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/EventDataExtractorTest.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/EventDataExtractorTest.cs new file mode 100644 index 0000000..c62a59c --- /dev/null +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/EventDataExtractorTest.cs @@ -0,0 +1,129 @@ +using Avro; +using Avro.File; +using Avro.Generic; +using Avro.IO; +using Microsoft.Analytics.Interfaces; +using Microsoft.Analytics.Samples.Formats.ApacheAvro; +using Microsoft.Analytics.UnitTest; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.Analytics.Samples.Formats.Tests +{ + [TestClass] + + public class EventDataExtractorTest + { + + private const string avroSchema = @" +{ + ""type"":""record"", + ""name"":""EventData"", + ""namespace"":""Microsoft.ServiceBus.Messaging"", + ""fields"":[ + {""name"":""SequenceNumber"",""type"":""long""}, + {""name"":""Offset"",""type"":""string""}, + {""name"":""EnqueuedTimeUtc"",""type"":""string""}, + {""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}}, + {""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}}, + {""name"":""Body"",""type"":[""null"",""bytes""]} + ] + }"; + + private const string extractMap = @" +{ + ""SequenceNumber"": {""AvroField"": ""SequenceNumber""}, + ""EnqueuedTimeUtc"": {""AvroField"": ""EnqueuedTimeUtc""}, + ""Body"": {""AvroField"": ""Body""}, + ""Route"": {""AvroField"": ""Properties"",""Key"": ""internal_source""} +}"; + + + + + + [TestMethod] + public void SchemaTest() + { + // + var output = SingleColumnRowGenerator("Route").AsUpdatable(); + + var data = new List + { + new EventDataPoco() + { + Body = Encoding.UTF8.GetBytes("This is a test"), + EnqueuedTimeUtc = DateTime.UtcNow.ToString("s"), + Offset = Guid.NewGuid().ToString(), + Properties = new Dictionary(){ + { "internal_source", "mapping"} + }, + SequenceNumber = 1000, + SystemProperties = new Dictionary() + } + }; + + var result = ExecuteExtract(data, output); + + Assert.IsTrue(result[0].Get("Route") == "mapping"); + + + } + + + private IList ExecuteExtract(List data, IUpdatableRow output) + { + + using (var dataStream = new MemoryStream()) + { + SerializeAvro(dataStream, data, avroSchema); + + var reader = new USqlStreamReader(dataStream); + var extractor = new EventDataExtractor(avroSchema, extractMap); + return extractor.Extract(reader, output).ToList(); + + } + } + + + private void SerializeAvro(MemoryStream dataStream, List data, string schema) + { + var avroSchema = Schema.Parse(schema); + var writer = new GenericWriter(avroSchema); + var fileWriter = DataFileWriter.OpenWriter(writer, dataStream); + var encoder = new BinaryEncoder(dataStream); + + foreach (EventDataPoco record in data) + { + var genericRecord = new GenericRecord(avroSchema as RecordSchema); + + genericRecord.Add("Body", record.Body); + genericRecord.Add("Offset", record.Offset); + genericRecord.Add("Properties", record.Properties); + genericRecord.Add("SystemProperties", record.SystemProperties); + genericRecord.Add("EnqueuedTimeUtc", record.EnqueuedTimeUtc); + genericRecord.Add("SequenceNumber", record.SequenceNumber); + + fileWriter.Append(genericRecord); + } + + fileWriter.Flush(); + dataStream.Seek(0, SeekOrigin.Begin); + } + + + + public IRow SingleColumnRowGenerator(string name) + { + var foo = new USqlColumn(name); + var columns = new List { foo }; + var schema = new USqlSchema(columns); + return new USqlRow(schema, null); + } + } +} diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/Microsoft.Analytics.Samples.Formats.Tests.csproj b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/Microsoft.Analytics.Samples.Formats.Tests.csproj index fa6a760..4c929f8 100644 --- a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/Microsoft.Analytics.Samples.Formats.Tests.csproj +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats.Tests/Microsoft.Analytics.Samples.Formats.Tests.csproj @@ -67,6 +67,7 @@ + diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataExtractor.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataExtractor.cs new file mode 100644 index 0000000..8504118 --- /dev/null +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataExtractor.cs @@ -0,0 +1,101 @@ +// +// Copyright (c) Microsoft and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// +using System.Collections.Generic; +using Microsoft.Analytics.Interfaces; +using Avro.File; +using Avro.Generic; +using System.IO; +using System.Runtime.Serialization; +using System; +using System.Text; +using Newtonsoft.Json; +using Microsoft.Analytics.Types.Sql; + +namespace Microsoft.Analytics.Samples.Formats.ApacheAvro +{ + + + public class OutputToAvroMap + { + public string AvroField { get; set; } + public string Key { get; set; } + } + + + + [SqlUserDefinedExtractor(AtomicFileProcessing = true)] + public class EventDataExtractor : IExtractor + { + + private readonly string avroSchema; + private readonly Dictionary _extractMap; + + public EventDataExtractor(string avroSchema, string extractMap) + { + this.avroSchema = avroSchema; + _extractMap = JsonConvert.DeserializeObject>(extractMap); + + } + + public override IEnumerable Extract(IUnstructuredReader input, IUpdatableRow output) + { + var avschema = Avro.Schema.Parse(avroSchema); + var reader = new GenericDatumReader(avschema, avschema); + + using (var ms = new MemoryStream()) + { + CreateSeekableStream(input, ms); + ms.Position = 0; + + + var fileReader = DataFileReader.OpenReader(ms, avschema); + + while (fileReader.HasNext()) + { + var avroRecord = fileReader.Next(); + + var userprops = (Dictionary)avroRecord["Properties"]; + + foreach (var column in output.Schema) + { + var map = _extractMap[column.Name]; + if (map.AvroField == "Properties") + { + output.Set(column.Name, userprops[map.Key]); + } + else if (avroRecord[column.Name] != null) + { + output.Set(column.Name, avroRecord[column.Name]); + } + else + { + output.Set(column.Name, null); + } + } + + yield return output.AsReadOnly(); + } + } + } + + + + private void CreateSeekableStream(IUnstructuredReader input, MemoryStream output) + { + input.BaseStream.CopyTo(output); + } + } +} \ No newline at end of file diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataPoco.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataPoco.cs new file mode 100644 index 0000000..1c0d9a1 --- /dev/null +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Avro/EventDataPoco.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Threading.Tasks; + + +namespace Microsoft.Analytics.Samples.Formats.ApacheAvro +{ + + + [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")] + public class EventDataPoco + { + [DataMember(Name = "SequenceNumber")] + public long SequenceNumber { get; set; } + + [DataMember(Name = "Offset")] + public string Offset { get; set; } + + [DataMember(Name = "EnqueuedTimeUtc")] + public string EnqueuedTimeUtc { get; set; } + + [DataMember(Name = "Body")] + public byte[] Body { get; set; } + + [DataMember(Name = "SystemProperties")] + public IDictionary SystemProperties { get; set; } + + [DataMember(Name = "Properties")] + public IDictionary Properties { get; set; } + + + public EventDataPoco(dynamic record) + { + SequenceNumber = (long)record.SequenceNumber; + Offset = (string)record.Offset; + EnqueuedTimeUtc = (string)record.EnqueuedTimeUtc; + SystemProperties = (Dictionary)record.SystemProperties; + Properties = (Dictionary)record.Properties; + Body = (byte[])record.Body; + } + + public EventDataPoco() { } + + } +} diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Microsoft.Analytics.Samples.Formats.csproj b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Microsoft.Analytics.Samples.Formats.csproj index a0f1d46..e200b76 100644 --- a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Microsoft.Analytics.Samples.Formats.csproj +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Microsoft.Analytics.Samples.Formats.csproj @@ -48,11 +48,14 @@ + + + diff --git a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Properties/AssemblyInfo.cs b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Properties/AssemblyInfo.cs index 20efe82..7e57139 100644 --- a/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Properties/AssemblyInfo.cs +++ b/Examples/DataFormats/Microsoft.Analytics.Samples.Formats/Properties/AssemblyInfo.cs @@ -21,7 +21,7 @@ // associated with an assembly. [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Microsoft")] +[assembly: AssemblyCompany("Microsoft, portions by Mesh Systems")] [assembly: AssemblyProduct("Microsoft.Analytics.Samples")] [assembly: AssemblyCopyright("Copyright © 2015")] [assembly: AssemblyTrademark("")] @@ -30,3 +30,5 @@ // The following GUID is for the ID of the typelib if this project is exposed to COM [assembly: Guid("48B4A89E-7DF7-4E25-B617-CCBEB63F6416")] +[assembly: AssemblyVersion("1.0.*")] +