Skip to content

Commit 5711ef1

Browse files
committed
.
1 parent e55a85f commit 5711ef1

File tree

7 files changed

+119
-34
lines changed

7 files changed

+119
-34
lines changed

code/KustoCopyConsole/Entity/RowItem.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,17 @@ private class RowTypeConverter : DefaultTypeConverter
6666
public string FileVersion { get; set; } = string.Empty;
6767

6868
[Index(1)]
69-
public DateTime? Created { get; set; } = DateTime.Now;
69+
[TypeConverter(typeof(RowTypeConverter))]
70+
public RowType RowType { get; set; } = RowType.Unspecified;
7071

7172
[Index(2)]
72-
public DateTime? Updated { get; set; } = DateTime.Now;
73+
public string State { get; set; } = string.Empty;
7374

7475
[Index(3)]
75-
[TypeConverter(typeof(RowTypeConverter))]
76-
public RowType RowType { get; set; } = RowType.Unspecified;
76+
public DateTime? Created { get; set; } = DateTime.Now;
7777

7878
[Index(4)]
79-
public string State { get; set; } = string.Empty;
79+
public DateTime? Updated { get; set; } = DateTime.Now;
8080

8181
[Index(5)]
8282
public string SourceClusterUri { get; set; } = string.Empty;

code/KustoCopyConsole/Kusto/DbClientFactory.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
namespace KustoCopyConsole.Kusto
1717
{
18-
internal class DbClientFactory
18+
internal class DbClientFactory : IDisposable
1919
{
2020
private readonly ProviderFactory _providerFactory;
2121
private readonly IImmutableDictionary<Uri, PriorityExecutionQueue<KustoDbPriority>> _sourceClusterExportQueueMap;
@@ -107,6 +107,11 @@ .show capacity
107107
}
108108
#endregion
109109

110+
void IDisposable.Dispose()
111+
{
112+
((IDisposable) _providerFactory).Dispose();
113+
}
114+
110115
public DbQueryClient GetDbQueryClient(Uri sourceUri, string database)
111116
{
112117
try

code/KustoCopyConsole/Kusto/ProviderFactory.cs

+13-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace KustoCopyConsole.Kusto
1212
{
13-
internal class ProviderFactory
13+
internal class ProviderFactory : IDisposable
1414
{
1515
private readonly ImmutableDictionary<Uri, ICslAdminProvider> _commandProviderMap;
1616
private readonly ImmutableDictionary<Uri, ICslQueryProvider> _queryProviderMap;
@@ -68,6 +68,18 @@ public ProviderFactory(MainJobParameterization parameterization, TokenCredential
6868
}
6969
#endregion
7070

71+
void IDisposable.Dispose()
72+
{
73+
var disposables = _commandProviderMap.Values.Cast<IDisposable>()
74+
.Concat(_queryProviderMap.Values)
75+
.Concat(_dmCommandProviderMap.Values);
76+
77+
foreach (var disposable in disposables)
78+
{
79+
disposable.Dispose();
80+
}
81+
}
82+
7183
public ICslAdminProvider GetCommandProvider(Uri clusterUri)
7284
{
7385
return _commandProviderMap[clusterUri];

code/KustoCopyConsole/KustoCopyConsole.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<Version>0.0.2</Version>
4+
<Version>0.0.2.0</Version>
55
<OutputType>Exe</OutputType>
66
<TargetFramework>net8.0</TargetFramework>
77
<ImplicitUsings>enable</ImplicitUsings>

code/KustoCopyConsole/Orchestration/MainOrchestration.cs

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal static async Task<MainOrchestration> CreateAsync(
2121
CancellationToken ct)
2222
{
2323
var parameterization = CreateParameterization(options);
24+
2425
var appendStorage = CreateAppendStorage();
2526
var rowItemGateway = new RowItemGateway(appendStorage, CompactItems);
2627
var dbClientFactory = await DbClientFactory.CreateAsync(
@@ -137,6 +138,7 @@ private static MainJobParameterization CreateParameterization(CommandLineOptions
137138
async ValueTask IAsyncDisposable.DisposeAsync()
138139
{
139140
await ((IAsyncDisposable)_rowItemGateway).DisposeAsync();
141+
((IDisposable)_dbClientFactory).Dispose();
140142
}
141143

142144
public MainJobParameterization Parameterization { get; }

code/KustoCopyConsole/Program.cs

+12-11
Original file line numberDiff line numberDiff line change
@@ -157,18 +157,19 @@ private static async Task RunOptionsAsync(CommandLineOptions options)
157157
};
158158
try
159159
{
160-
var orchestration = await MainOrchestration.CreateAsync(
160+
await using (var orchestration = await MainOrchestration.CreateAsync(
161161
options,
162-
cancellationTokenSource.Token);
163-
164-
Trace.WriteLine("");
165-
Trace.WriteLine("Parameterization:");
166-
Trace.WriteLine("");
167-
Trace.WriteLine(orchestration.Parameterization.ToYaml());
168-
Trace.WriteLine("");
169-
Trace.WriteLine("Processing...");
170-
Trace.WriteLine("");
171-
await orchestration.ProcessAsync(cancellationTokenSource.Token);
162+
cancellationTokenSource.Token))
163+
{
164+
Trace.WriteLine("");
165+
Trace.WriteLine("Parameterization:");
166+
Trace.WriteLine("");
167+
Trace.WriteLine(orchestration.Parameterization.ToYaml());
168+
Trace.WriteLine("");
169+
Trace.WriteLine("Processing...");
170+
Trace.WriteLine("");
171+
await orchestration.ProcessAsync(cancellationTokenSource.Token);
172+
}
172173
}
173174
finally
174175
{

code/KustoCopyConsole/Storage/RowItemGateway.cs

+80-15
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ namespace KustoCopyConsole.Storage
1515
internal class RowItemGateway : IAsyncDisposable
1616
{
1717
private static readonly Version CURRENT_FILE_VERSION = new Version(0, 0, 1, 0);
18+
private static readonly TimeSpan MAX_BUFFER_TIME = TimeSpan.FromSeconds(5);
1819

1920
private readonly IAppendStorage _appendStorage;
2021
private readonly Func<IEnumerable<RowItem>, IEnumerable<RowItem>> _compactFunc;
21-
private readonly MemoryStream _memoryStream = new MemoryStream();
22+
private readonly MemoryStream _bufferStream = new MemoryStream();
23+
private DateTime? _bufferStartTime;
2224

2325
public RowItemGateway(
2426
IAppendStorage appendStorage,
@@ -30,7 +32,7 @@ public RowItemGateway(
3032

3133
async ValueTask IAsyncDisposable.DisposeAsync()
3234
{
33-
await FlushAsync();
35+
await FlushAsync(CancellationToken.None);
3436
await _appendStorage.DisposeAsync();
3537
}
3638

@@ -39,24 +41,35 @@ public async Task<IImmutableList<RowItem>> MigrateToLatestVersionAsync(Cancellat
3941
return await CompactAsync(ct);
4042
}
4143

42-
public Task AppendAsync(RowItem item, CancellationToken ct)
44+
public async Task AppendAsync(RowItem item, CancellationToken ct)
4345
{
44-
item.Validate();
45-
throw new NotImplementedException();
46-
}
47-
48-
public Task AppendAtomicallyAsync(IEnumerable<RowItem> items, CancellationToken ct)
49-
{
50-
//item.Validate();
46+
var bufferToWrite = AppendToBuffer(item);
5147

52-
throw new NotImplementedException();
48+
if (bufferToWrite == null
49+
&& _bufferStartTime != null
50+
&& DateTime.Now - _bufferStartTime > MAX_BUFFER_TIME)
51+
{
52+
await FlushAsync(ct);
53+
bufferToWrite = _bufferStream.ToArray();
54+
_bufferStream.SetLength(0);
55+
_bufferStartTime = null;
56+
}
57+
if (bufferToWrite != null)
58+
{
59+
await _appendStorage.AtomicAppendAsync(bufferToWrite, ct);
60+
}
5361
}
5462

55-
public async Task FlushAsync()
63+
public async Task FlushAsync(CancellationToken ct)
5664
{
57-
await Task.CompletedTask;
65+
var bufferToWrite = _bufferStream.ToArray();
66+
67+
_bufferStream.SetLength(0);
68+
_bufferStartTime = null;
69+
await _appendStorage.AtomicAppendAsync(bufferToWrite, ct);
5870
}
5971

72+
#region Compaction
6073
private async Task<IImmutableList<RowItem>> CompactAsync(CancellationToken ct)
6174
{
6275
var readBuffer = await _appendStorage.LoadAllAsync(ct);
@@ -103,14 +116,13 @@ private async Task<IImmutableList<RowItem>> CompactAsync(CancellationToken ct)
103116
csv.WriteHeader<RowItem>();
104117
csv.NextRecord();
105118
csv.WriteRecords(items);
106-
csv.NextRecord();
107119
csv.Flush();
108120
writer.Flush();
109121

110122
var writeBuffer = tempMemoryStream.ToArray();
111123

112124
await _appendStorage.AtomicReplaceAsync(writeBuffer, ct);
113-
125+
114126
return items;
115127
}
116128
}
@@ -146,5 +158,58 @@ private IImmutableList<RowItem> CompactBuffer(byte[] readBuffer)
146158
return allNewItems.ToImmutableArray();
147159
}
148160
}
161+
#endregion
162+
163+
private byte[]? AppendToBuffer(RowItem item)
164+
{
165+
item.Validate();
166+
lock (_bufferStream)
167+
{
168+
var lengthBefore = _bufferStream.Length;
169+
170+
using (var writer = new StreamWriter(_bufferStream, leaveOpen: true))
171+
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
172+
{
173+
csv.WriteRecord(item);
174+
csv.NextRecord();
175+
csv.Flush();
176+
writer.Flush();
177+
}
178+
179+
var lengthAfter = _bufferStream.Length;
180+
181+
if (lengthAfter > _appendStorage.MaxBufferSize)
182+
{ // Buffer is too long: write buffer before this item
183+
if (lengthBefore == 0)
184+
{
185+
throw new CopyException(
186+
$"Buffer to write to the log is too long: {lengthAfter}",
187+
false);
188+
}
189+
_bufferStream.SetLength(lengthBefore);
190+
191+
var allBuffer = _bufferStream.ToArray();
192+
var beforeBuffer = new byte[lengthBefore];
193+
var remainBuffer = new byte[lengthAfter - lengthBefore];
194+
195+
Array.Copy(allBuffer, beforeBuffer, lengthBefore);
196+
Array.Copy(allBuffer, lengthBefore, remainBuffer, 0, remainBuffer.Length);
197+
_bufferStream.SetLength(0);
198+
_bufferStream.Write(
199+
allBuffer,
200+
(int)lengthBefore,
201+
(int)(lengthAfter - lengthBefore));
202+
_bufferStartTime = DateTime.Now;
203+
204+
return beforeBuffer;
205+
}
206+
else if (_bufferStartTime == null)
207+
{
208+
_bufferStartTime = DateTime.Now;
209+
}
210+
}
211+
212+
return null;
213+
}
149214
}
150215
}

0 commit comments

Comments
 (0)