Skip to content

Commit f520b51

Browse files
committed
.
1 parent d54f72f commit f520b51

16 files changed

+174
-130
lines changed

code/KustoCopyConsole/CommandLineOptions.cs

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public class CommandLineOptions
4040
HelpText = "Continuous run: if set, runs continuously, otherwise, stop after one iteration")]
4141
public bool IsContinuousRun { get; set; } = false;
4242

43+
[Option('q', "query", Required = false, HelpText = "Set query.")]
44+
public string Query { get; set; } = string.Empty;
45+
4346
[Option("job-name", Required = false, HelpText = "Set job name.")]
4447
public string JobName { get; set; } = "default";
4548

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System.Collections.Immutable;
2+
3+
namespace KustoCopyConsole.JobParameter
4+
{
5+
public class ActivityParameterization
6+
{
7+
public TableParameterization Source { get; set; } = new();
8+
9+
public IImmutableList<TableParameterization> Destinations { get; set; } =
10+
ImmutableArray<TableParameterization>.Empty;
11+
12+
public string Query { get; set; } = string.Empty;
13+
14+
public TableOption TableOption { get; set; } = new();
15+
16+
internal void Validate()
17+
{
18+
Source.Validate();
19+
if (string.IsNullOrWhiteSpace(Source.DatabaseName))
20+
{
21+
throw new CopyException("Source database name is required", false);
22+
}
23+
if (string.IsNullOrWhiteSpace(Source.TableName))
24+
{
25+
throw new CopyException("Source table name is required", false);
26+
}
27+
foreach (var d in Destinations)
28+
{
29+
d.Validate();
30+
}
31+
TableOption.Validate();
32+
}
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+

2+
namespace KustoCopyConsole.JobParameter
3+
{
4+
public class ClusterOption
5+
{
6+
public string ClusterUri { get; set; } = string.Empty;
7+
8+
public int ConcurrentQueryCount { get; set; } = 0;
9+
10+
public int ConcurrentExportCommandCount { get; set; } = 0;
11+
12+
public void Validate()
13+
{
14+
if (ConcurrentQueryCount < 0)
15+
{
16+
throw new CopyException(
17+
$"{nameof(ConcurrentQueryCount)} should be above zero "
18+
+ $"but is {ConcurrentQueryCount}",
19+
false);
20+
}
21+
if (ConcurrentExportCommandCount < 0)
22+
{
23+
throw new CopyException(
24+
$"{nameof(ConcurrentExportCommandCount)} should be above zero "
25+
+ $"but is {ConcurrentExportCommandCount}",
26+
false);
27+
}
28+
}
29+
}
30+
}

code/KustoCopyConsole/JobParameter/DestinationParameterization.cs

-9
This file was deleted.

code/KustoCopyConsole/JobParameter/MainJobParameterization.cs

+9-15
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,21 @@ namespace KustoCopyConsole.JobParameter
1010
{
1111
internal class MainJobParameterization
1212
{
13-
public IImmutableList<SourceClusterParameterization> SourceClusters { get; set; } =
14-
ImmutableArray<SourceClusterParameterization>.Empty;
13+
public IImmutableList<ActivityParameterization> Activities { get; set; } =
14+
ImmutableArray<ActivityParameterization>.Empty;
15+
16+
public IImmutableList<ClusterOption> ClusterOptions { get; set; } =
17+
ImmutableArray<ClusterOption>.Empty;
1518

1619
internal void Validate()
1720
{
18-
var sourceUriDuplicate = SourceClusters
19-
.Select(s => NormalizedUri.NormalizeUri(s.SourceClusterUri))
20-
.GroupBy(s => s)
21-
.Where(g => g.Count() > 1)
22-
.Select(g => g.Key)
23-
.FirstOrDefault();
24-
25-
if (sourceUriDuplicate!=null)
21+
foreach (var a in Activities)
2622
{
27-
throw new CopyException(
28-
$"Cluster URI '{sourceUriDuplicate}' appears twice in the parameterization",
29-
false);
23+
a.Validate();
3024
}
31-
foreach (var s in SourceClusters)
25+
foreach (var c in ClusterOptions)
3226
{
33-
s.Validate();
27+
c.Validate();
3428
}
3529
}
3630

code/KustoCopyConsole/JobParameter/SourceClusterParameterization.cs

-27
This file was deleted.

code/KustoCopyConsole/JobParameter/SourceDatabaseParameterization.cs

-15
This file was deleted.

code/KustoCopyConsole/JobParameter/SourceTableParameterization.cs

-11
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace KustoCopyConsole.JobParameter
2+
{
3+
public class TableOption
4+
{
5+
public bool IsContinuousRun { get; set; } = false;
6+
7+
public TimeSpan ExtentTimeRange { get; set; } = TimeSpan.FromDays(1);
8+
9+
public ExportMode ExportMode { get; set; } = ExportMode.BackFillAndNew;
10+
11+
public TimeSpan IterationWait { get; set; } = TimeSpan.FromMinutes(5);
12+
13+
internal void Validate()
14+
{
15+
}
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+

2+
namespace KustoCopyConsole.JobParameter
3+
{
4+
public class TableParameterization
5+
{
6+
public string ClusterUri { get; set; } = string.Empty;
7+
8+
public string DatabaseName { get; set; } = string.Empty;
9+
10+
public string TableName { get; set; } = string.Empty;
11+
12+
internal void Validate()
13+
{
14+
throw new NotImplementedException();
15+
}
16+
}
17+
}

code/KustoCopyConsole/Kusto/DbClientFactory.cs

+15-6
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,21 @@ public static async Task<DbClientFactory> CreateAsync(
2828
CancellationToken ct)
2929
{
3030
var providerFactory = new ProviderFactory(parameterization, credentials);
31-
var sourceClusters = parameterization.SourceClusters
32-
.Select(s => new
31+
var clusterOptionMap = parameterization.ClusterOptions
32+
.ToImmutableDictionary(o => NormalizedUri.NormalizeUri(o.ClusterUri));
33+
var sourceClusters = parameterization.Activities
34+
.Select(a => NormalizedUri.NormalizeUri(a.Source.ClusterUri))
35+
.Distinct()
36+
.Select(uri => new
3337
{
34-
Uri = NormalizedUri.NormalizeUri(s.SourceClusterUri),
35-
s.ConcurrentExportCommandCount,
36-
s.ConcurrentQueryCount
38+
Uri = uri,
39+
Option = clusterOptionMap.ContainsKey(uri) ? clusterOptionMap[uri] : null
40+
})
41+
.Select(o => new
42+
{
43+
o.Uri,
44+
ConcurrentExportCommandCount = o.Option?.ConcurrentExportCommandCount ?? 0,
45+
ConcurrentQueryCount = o.Option?.ConcurrentQueryCount ?? 0
3746
});
3847
var countTasks = sourceClusters
3948
.Select(s => new
@@ -109,7 +118,7 @@ .show capacity
109118

110119
void IDisposable.Dispose()
111120
{
112-
((IDisposable) _providerFactory).Dispose();
121+
((IDisposable)_providerFactory).Dispose();
113122
}
114123

115124
public DbQueryClient GetDbQueryClient(Uri sourceUri, string database)

code/KustoCopyConsole/Kusto/ProviderFactory.cs

+5-7
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ internal class ProviderFactory : IDisposable
1919
#region Constructor
2020
public ProviderFactory(MainJobParameterization parameterization, TokenCredential credentials)
2121
{
22-
var sourceClusterUris = parameterization.SourceClusters
23-
.Select(s => NormalizedUri.NormalizeUri(s.SourceClusterUri))
22+
var sourceClusterUris = parameterization.Activities
23+
.Select(a => NormalizedUri.NormalizeUri(a.Source.ClusterUri))
2424
.Distinct();
2525
var sourceBuilders = sourceClusterUris
2626
.Select(uri => new
@@ -29,11 +29,9 @@ public ProviderFactory(MainJobParameterization parameterization, TokenCredential
2929
Builder = new KustoConnectionStringBuilder(uri.ToString())
3030
.WithAadAzureTokenCredentialsAuthentication(credentials)
3131
});
32-
var destinationClusterUris = parameterization.SourceClusters
33-
.Select(s => s.Databases.Select(db => db.Destinations.Select(d => d.DestinationClusterUri)))
34-
.SelectMany(e => e)
35-
.SelectMany(e => e)
36-
.Select(s => NormalizedUri.NormalizeUri(s))
32+
var destinationClusterUris = parameterization.Activities
33+
.SelectMany(a => a.Destinations)
34+
.Select(d => NormalizedUri.NormalizeUri(d.ClusterUri))
3735
.Distinct();
3836
var destinationIngestionBuilders = destinationClusterUris
3937
.Select(uri => new

code/KustoCopyConsole/Orchestration/MainOrchestration.cs

+16-15
Original file line numberDiff line numberDiff line change
@@ -107,24 +107,25 @@ private static MainJobParameterization CreateParameterization(CommandLineOptions
107107

108108
return new MainJobParameterization
109109
{
110-
SourceClusters = ImmutableList.Create(
111-
new SourceClusterParameterization
110+
Activities = ImmutableList.Create(
111+
new ActivityParameterization
112112
{
113-
IsContinuousRun = options.IsContinuousRun,
114-
SourceClusterUri = sourceBuilder.ToString(),
115-
Databases = ImmutableList.Create(new SourceDatabaseParameterization
113+
Source = new TableParameterization
116114
{
115+
ClusterUri = sourceBuilder.ToString(),
117116
DatabaseName = sourceDb,
118-
Tables = ImmutableList.Create(new SourceTableParameterization
119-
{
120-
TableName = sourceTable
121-
}),
122-
Destinations = ImmutableList.Create(new DestinationParameterization
123-
{
124-
DestinationClusterUri = destinationBuilder.ToString(),
125-
DatabaseName = destinationDb
126-
})
127-
})
117+
TableName = sourceTable
118+
},
119+
Destinations = ImmutableList.Create(new TableParameterization
120+
{
121+
ClusterUri = destinationBuilder.ToString(),
122+
DatabaseName = destinationDb
123+
}),
124+
Query = options.Query,
125+
TableOption = new TableOption
126+
{
127+
IsContinuousRun = options.IsContinuousRun
128+
}
128129
})
129130
};
130131
}

code/KustoCopyConsole/Orchestration/SourceDatabaseOrchestration.cs

+10-10
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ public override async Task ProcessAsync(CancellationToken ct)
3535
.Select(c => c.RowItem)
3636
.Where(i => i.ParseState<SourceDatabaseState>() != SourceDatabaseState.Completed)
3737
.ToImmutableArray();
38-
var allSourceDatabases = Parameterization.SourceClusters
39-
.Select(c => c.Databases.Select(db => new
40-
{
41-
ClusterUri = NormalizedUri.NormalizeUri(c.SourceClusterUri),
42-
db.DatabaseName,
43-
ClusterParameters = c,
44-
DatabaseParameters = db
45-
}))
46-
.SelectMany(a => a)
47-
.ToImmutableDictionary(o => (o.ClusterUri, o.DatabaseName));
38+
//var allSourceDatabases = Parameterization.SourceClusters
39+
// .Select(c => c.Databases.Select(db => new
40+
// {
41+
// ClusterUri = NormalizedUri.NormalizeUri(c.SourceClusterUri),
42+
// db.DatabaseName,
43+
// ClusterParameters = c,
44+
// DatabaseParameters = db
45+
// }))
46+
// .SelectMany(a => a)
47+
// .ToImmutableDictionary(o => (o.ClusterUri, o.DatabaseName));
4848

4949
await Task.CompletedTask;
5050
//if (_sourceCluster.ExportMode == ExportMode.BackFillOnly && completedItems.Any())

code/KustoCopyConsole/Storage/RowItemGateway.cs

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
using System.Linq;
1212
using System.Text;
1313
using System.Threading.Tasks;
14-
using static Kusto.Cloud.Platform.Utils.CachedBufferEncoder;
1514

1615
namespace KustoCopyConsole.Storage
1716
{

design/kusto-copy-v2.yaml

+18-14
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,23 @@
22
# * Data lake accounts
33
# * Authentication
44

5-
sourceClusters:
6-
- sourceClusterUri: https://inttestsyenycav4i2vma.eastus.kusto.windows.net
7-
continuousRun: false # If true, it goes on, if false, it stops after one iteration completes
8-
exportMode: "backfillOnly" # One of the following values: backfillOnly, newOnly, backFillAndNew
9-
iterationWait: 00:05:00 # Time to wait between two iterations
5+
clusterOptions:
6+
- clusterUri: https://inttestsyenycav4i2vma.eastus.kusto.windows.net
107
concurrentQueryCount: 5 # Number of queries / commands sent in parallel to a given cluster (Default is %10 of query slots)
118
concurrentExportCommandCount: 20 # Number of export commands sent in parallel to a given cluster (Default will max export slots)
12-
databases:
13-
- databaseName: github
14-
tables:
15-
- tableName: Table1
16-
extentTimeRange: 24:00:00 # The max range of extent creation time to be merge together
17-
query: "| project ColA, ColB"
18-
destinations:
19-
- destinationClusterUri: https://inttestsyenycav4i2vma.eastus.kusto.windows.net
20-
databaseName: github
9+
activities:
10+
- source:
11+
clusterUri: https://inttestsyenycav4i2vma.eastus.kusto.windows.net
12+
databaseName: github
13+
tableName: Table1
14+
destinations:
15+
- clusterUri: https://inttestsyenycav4i2vma.eastus.kusto.windows.net
16+
databaseName: github # Optional, if not provided take same value as source
17+
tableName: Table1 # Optional, if not provided take same value as source
18+
query: "| project ColA, ColB"
19+
tableOption:
20+
isContinuousRun: false # If true, it goes on, if false, it stops after one iteration completes
21+
extentTimeRange: 24:00:00 # The max range of extent creation time to be merge together
22+
exportMode: "backfillOnly" # One of the following values: backfillOnly, newOnly, backFillAndNew
23+
iterationWait: 00:05:00 # Time to wait between two iterations
24+

0 commit comments

Comments
 (0)