From 37003394c3d1d6437024f0e20df93e84354a70e2 Mon Sep 17 00:00:00 2001 From: vplauzon Date: Sun, 22 Dec 2024 17:33:19 -0500 Subject: [PATCH] Start runner approach --- code/KustoCopyConsole/KustoCopyConsole.csproj | 20 +- code/KustoCopyConsole/Program.cs | 10 +- code/KustoCopyConsole/Runner/MainRunner.cs | 174 ++++++++++++++++++ 3 files changed, 199 insertions(+), 5 deletions(-) create mode 100755 code/KustoCopyConsole/Runner/MainRunner.cs diff --git a/code/KustoCopyConsole/KustoCopyConsole.csproj b/code/KustoCopyConsole/KustoCopyConsole.csproj index 58c101e..0042b12 100755 --- a/code/KustoCopyConsole/KustoCopyConsole.csproj +++ b/code/KustoCopyConsole/KustoCopyConsole.csproj @@ -1,7 +1,7 @@  - 0.0.2.0 + 0.0.3.0 Exe net8.0 enable @@ -20,6 +20,15 @@ + + + + + + + + + @@ -38,4 +47,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/code/KustoCopyConsole/Program.cs b/code/KustoCopyConsole/Program.cs index 43470f2..e8402e5 100755 --- a/code/KustoCopyConsole/Program.cs +++ b/code/KustoCopyConsole/Program.cs @@ -1,7 +1,9 @@ using CommandLine; using CommandLine.Text; using KustoCopyConsole.JobParameter; -using KustoCopyConsole.Orchestration; +using KustoCopyConsole.Kusto; +using KustoCopyConsole.Runner; +using KustoCopyConsole.Storage; using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -155,18 +157,18 @@ private static async Task RunOptionsAsync(CommandLineOptions options) }; try { - await using (var orchestration = await MainOrchestration.CreateAsync( + await using (var mainRunner = await MainRunner.CreateAsync( options, cancellationTokenSource.Token)) { Trace.WriteLine(""); Trace.WriteLine("Parameterization:"); Trace.WriteLine(""); - Trace.WriteLine(orchestration.Parameterization.ToYaml()); + Trace.WriteLine(mainRunner.Parameterization.ToYaml()); Trace.WriteLine(""); Trace.WriteLine("Processing..."); Trace.WriteLine(""); - await orchestration.ProcessAsync(cancellationTokenSource.Token); + await mainRunner.RunAsync(cancellationTokenSource.Token); } } finally diff --git a/code/KustoCopyConsole/Runner/MainRunner.cs b/code/KustoCopyConsole/Runner/MainRunner.cs new file mode 100755 index 0000000..769ab6d --- /dev/null +++ b/code/KustoCopyConsole/Runner/MainRunner.cs @@ -0,0 +1,174 @@ +using Azure.Core; +using Azure.Identity; +using KustoCopyConsole.JobParameter; +using KustoCopyConsole.Kusto; +using KustoCopyConsole.Storage.LocalDisk; +using KustoCopyConsole.Storage; +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace KustoCopyConsole.Runner +{ + internal class MainRunner : IAsyncDisposable + { + private readonly RowItemGateway _rowItemGateway; + private readonly DbClientFactory _dbClientFactory; + + #region Constructors + internal static async Task CreateAsync( + CommandLineOptions options, + CancellationToken ct) + { + var parameterization = CreateParameterization(options); + + var appendStorage = CreateAppendStorage(options); + var rowItemGateway = await RowItemGateway.CreateAsync(appendStorage, ct); + var dbClientFactory = await DbClientFactory.CreateAsync( + parameterization, + CreateCredentials(options.Authentication), + ct); + + return new MainRunner(parameterization, dbClientFactory, rowItemGateway); + } + + private MainRunner( + MainJobParameterization parameterization, + DbClientFactory dbClientFactory, + RowItemGateway rowItemGateway) + { + Parameterization = parameterization; + _dbClientFactory = dbClientFactory; + _rowItemGateway = rowItemGateway; + } + + private static TokenCredential CreateCredentials(string authentication) + { + if (string.IsNullOrWhiteSpace(authentication)) + { + //return new DefaultAzureCredential(); + return new AzureCliCredential(); + } + else + { + throw new NotImplementedException(); + } + } + + private static IAppendStorage CreateAppendStorage(CommandLineOptions options) + { + return new LocalAppendStorage(GetLocalLogFilePath(options)); + } + + private static string GetLocalLogFilePath(CommandLineOptions options) + { + const string DEFAULT_FILE_NAME = "kusto-copy.log"; + + if (string.IsNullOrWhiteSpace(options.LogFilePath)) + { + return DEFAULT_FILE_NAME; + } + else if (Directory.Exists(options.LogFilePath)) + { + return Path.Combine(options.LogFilePath, DEFAULT_FILE_NAME); + } + else + { + return options.LogFilePath; + } + } + + private static MainJobParameterization CreateParameterization(CommandLineOptions options) + { + if (!string.IsNullOrWhiteSpace(options.Source)) + { + if (string.IsNullOrWhiteSpace(options.Destination)) + { + throw new CopyException( + $"Source is specified ('options.Source'): destination is expected", + false); + } + + if (!Uri.TryCreate(options.Source, UriKind.Absolute, out var source)) + { + throw new CopyException($"Can't parse source: '{options.Source}'", false); + } + if (!Uri.TryCreate(options.Destination, UriKind.Absolute, out var destination)) + { + throw new CopyException( + $"Can't parse destination: '{options.Destination}'", + false); + } + var sourceBuilder = new UriBuilder(source); + var sourcePathParts = sourceBuilder.Path.Split('/'); + var destinationBuilder = new UriBuilder(destination); + var destinationPathParts = destinationBuilder.Path.Split('/'); + + if (sourcePathParts.Length != 3) + { + throw new CopyException( + $"Source ('{options.Source}') should be of the form 'https://help.kusto.windows.net/Samples/nyc_taxi'", + false); + } + if (destinationPathParts.Length != 2) + { + throw new CopyException( + $"Destination ('{options.Destination}') should be of the form 'https://mycluster.eastus.kusto.windows.net/mydb'", + false); + } + + var sourceDb = sourcePathParts[1]; + var sourceTable = sourcePathParts[2]; + var destinationDb = sourcePathParts[1]; + + sourceBuilder.Path = string.Empty; + destinationBuilder.Path = string.Empty; + + return new MainJobParameterization + { + IsContinuousRun = options.IsContinuousRun, + Activities = ImmutableList.Create( + new ActivityParameterization + { + Source = new TableParameterization + { + ClusterUri = sourceBuilder.ToString(), + DatabaseName = sourceDb, + TableName = sourceTable + }, + Destinations = ImmutableList.Create(new TableParameterization + { + ClusterUri = destinationBuilder.ToString(), + DatabaseName = destinationDb + }), + Query = options.Query, + TableOption = new TableOption() + }) + }; + } + else + { + throw new NotImplementedException(); + } + } + #endregion + + async ValueTask IAsyncDisposable.DisposeAsync() + { + await ((IAsyncDisposable)_rowItemGateway).DisposeAsync(); + ((IDisposable)_dbClientFactory).Dispose(); + } + + public MainJobParameterization Parameterization { get; } + + public async Task RunAsync(CancellationToken token) + { + await Task.CompletedTask; + + return; + } + } +} \ No newline at end of file