Skip to content

Commit d0053dc

Browse files
committed
feat: cleanup of table data is now an external service
Beacuse the clean operation was producing a deadlock on table metadata it was decided to move cleanup on main thread with consecutive table cleanup. Moving changes to main thread allowed to un-block full pottential of multiple workers to import data in parallel
1 parent a068ff4 commit d0053dc

11 files changed

+127
-25
lines changed

src/Command/ImportCommand.php

+6-1
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,13 @@ public function execute(InputInterface $input, OutputInterface $output): int
9898
)->create();
9999

100100
$inputSource = new JsonImportSourceFactory($input->getArgument('directory'), $config);
101+
$tables = $inputSource->listTables();
101102

102-
foreach ($inputSource->listTables() as $table) {
103+
foreach ($tables as $table) {
104+
$importer->cleanTable($table);
105+
}
106+
107+
foreach ($tables as $table) {
103108
$importer->importTable($table, $progressNotifier);
104109
}
105110

src/Export/TableListService.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
use EcomDev\MySQL2JSONL\Configuration;
1313
use EcomDev\MySQL2JSONL\TableEntry;
14+
use PDO;
1415

1516
final readonly class TableListService
1617
{
@@ -21,12 +22,12 @@ public function __construct(private Configuration $configuration)
2122
public function tablesToExport(): array
2223
{
2324
$connection = $this->configuration->createPDOConnection();
24-
$connection->prepare('SET information_schema_stats_expiry=0')->execute();
2525

2626
$result = $connection->prepare(
2727
'SELECT TABLE_NAME, TABLE_ROWS FROM information_schema.tables'
2828
. ' WHERE TABLE_SCHEMA = SCHEMA() AND TABLE_TYPE = ?'
2929
);
30+
3031
$result->execute(['BASE TABLE']);
3132

3233
$tables = [];
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/**
4+
* Copyright © EcomDev B.V. All rights reserved.
5+
* See LICENSE for license details.
6+
*/
7+
8+
declare(strict_types=1);
9+
10+
namespace EcomDev\MySQL2JSONL\Import;
11+
12+
use EcomDev\MySQL2JSONL\Configuration;
13+
use EcomDev\MySQL2JSONL\TableEntry;
14+
use PDO;
15+
16+
final readonly class BlockingImportCleanupService implements ImportCleanupService
17+
{
18+
private function __construct(
19+
private ImportMode $importMode,
20+
private PDO $connection,
21+
) {
22+
}
23+
24+
public static function create(
25+
ImportMode $importMode,
26+
PDO $connection
27+
): BlockingImportCleanupService {
28+
return new self($importMode, $connection);
29+
}
30+
31+
public function cleanTable(TableEntry $table): void
32+
{
33+
if ($this->importMode->isTruncate()) {
34+
$this->connection->query(
35+
sprintf('TRUNCATE `%s`', $table->name)
36+
);
37+
}
38+
}
39+
}

src/Import/BlockingImportService.php

+7-6
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
use EcomDev\MySQL2JSONL\TableEntry;
88
use PDO;
99

10-
final readonly class BlockingImportService implements ImportService
10+
final readonly class BlockingImportService implements ImportService, ImportCleanupService
1111
{
1212
public function __construct(
1313
private PDO $connection,
1414
private InsertOnDuplicate $insertOnDuplicate,
1515
private ImportSourceFactory $sourceFactory,
16-
private ImportMode $mode,
16+
private BlockingImportCleanupService $cleanupService,
1717
private int $batchSize
1818
) {
1919
}
@@ -24,10 +24,6 @@ public function importTable(TableEntry $table, ProgressNotifier $notifier): void
2424

2525
$columns = $source->header();
2626

27-
if ($this->mode->isTruncate()) {
28-
$this->connection->prepare(sprintf('TRUNCATE TABLE `%s`', $table->name))->execute();
29-
}
30-
3127
$statementCache = InsertStatementCache::create(
3228
$this->connection,
3329
$this->insertOnDuplicate,
@@ -61,4 +57,9 @@ public function importTable(TableEntry $table, ProgressNotifier $notifier): void
6157
$this->connection->commit();
6258
$notifier->finish($table->name);
6359
}
60+
61+
public function cleanTable(TableEntry $table): void
62+
{
63+
$this->cleanupService->cleanTable($table);
64+
}
6465
}

src/Import/BlockingImportServiceFactory.php

+8-4
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@ public static function createFromConfiguration(Configuration $configuration, Imp
1919
return new self($configuration, $importSourceFactory, new InsertOnDuplicate());
2020
}
2121

22-
public function create(): ImportService
22+
public function create(): BlockingImportService
2323
{
24+
$connection = $this->configuration->createPDOConnection();
2425
return new BlockingImportService(
25-
$this->configuration->createPDOConnection(),
26+
$connection,
2627
$this->insertOnDuplicate,
2728
$this->importSourceFactory,
28-
$this->configuration->importMode,
29-
$this->configuration->batchSize
29+
BlockingImportCleanupService::create(
30+
$this->configuration->importMode,
31+
$connection,
32+
),
33+
$this->configuration->batchSize,
3034
);
3135
}
3236
}

src/Import/ImportCleanupService.php

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
/**
4+
* Copyright © EcomDev B.V. All rights reserved.
5+
* See LICENSE for license details.
6+
*/
7+
8+
declare(strict_types=1);
9+
10+
namespace EcomDev\MySQL2JSONL\Import;
11+
12+
use EcomDev\MySQL2JSONL\TableEntry;
13+
14+
/**
15+
* Service to clean-up table data on import
16+
*/
17+
interface ImportCleanupService
18+
{
19+
public function cleanTable(TableEntry $table): void;
20+
}

src/Import/WorkerImportService.php

+8-1
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
use function Amp\async;
2020

21-
final readonly class WorkerImportService implements ImportService
21+
final readonly class WorkerImportService implements ImportService, ImportCleanupService
2222
{
2323
public function __construct(
2424
private Configuration $configuration,
2525
private WorkerPool $workerPool,
2626
private string $inputDirectory,
2727
private PendingExecution $pendingExecution,
28+
private ImportCleanupService $cleanupService,
2829
) {
2930
}
3031

@@ -51,4 +52,10 @@ public function await(): void
5152
$this->pendingExecution->await();
5253
$this->workerPool->shutdown();
5354
}
55+
56+
57+
public function cleanTable(TableEntry $table): void
58+
{
59+
$this->cleanupService->cleanTable($table);
60+
}
5461
}

src/Import/WorkerImportServiceFactory.php

+6-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
private function __construct(
2020
private Configuration $configuration,
2121
private WorkerPool $workerPool,
22-
private string $inputDirectory,
22+
private string $inputDirectory
2323
) {
2424
}
2525

@@ -38,7 +38,11 @@ public function create(): WorkerImportService
3838
$this->configuration,
3939
$this->workerPool,
4040
$this->inputDirectory,
41-
new PendingExecution()
41+
new PendingExecution(),
42+
BlockingImportCleanupService::create(
43+
$this->configuration->importMode,
44+
$this->configuration->createPDOConnection()
45+
),
4246
);
4347
}
4448
}

tests/Export/TableListServiceTest.php

+25-10
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,30 @@ public function usesConfigurationToListExportableTables()
2626
->withExcludeCondition(TableNameCondition::endsWith('_varchar'))
2727
->withExcludeCondition(TableNameCondition::contains('gallery'));
2828

29-
$this->assertEquals(
30-
[
31-
TableEntry::fromName('catalog_product_entity_datetime')->withRows(6),
32-
TableEntry::fromName('catalog_product_entity_decimal')->withRows(3903),
33-
TableEntry::fromName('catalog_product_entity_int')->withRows(12480),
34-
TableEntry::fromName('catalog_product_entity_text')->withRows(2652),
35-
TableEntry::fromName('catalog_product_entity_tier_price')->withRows(0)
36-
],
37-
(new TableListService($config))->tablesToExport()
38-
);
29+
30+
$expectedTableValues = [
31+
TableEntry::fromName('catalog_product_entity_datetime')->withRows(6),
32+
TableEntry::fromName('catalog_product_entity_decimal')->withRows(3903),
33+
TableEntry::fromName('catalog_product_entity_int')->withRows(12480),
34+
TableEntry::fromName('catalog_product_entity_text')->withRows(2652),
35+
TableEntry::fromName('catalog_product_entity_tier_price')->withRows(0)
36+
];
37+
38+
$actualTableValues = (new TableListService($config))->tablesToExport();
39+
40+
foreach ($expectedTableValues as $index => $expectedTable) {
41+
$this->assertArrayHasKey(
42+
$index,
43+
$actualTableValues,
44+
'Table list does not match'
45+
);
46+
47+
$this->assertEquals($expectedTable->name, $actualTableValues[$index]->name, 'Table name does not match expected');
48+
$this->assertEqualsWithDelta(
49+
$expectedTable->rowCount,
50+
$actualTableValues[$index]->rowCount,
51+
$expectedTable->rowCount * 0.05
52+
);
53+
}
3954
}
4055
}

tests/Import/ImportTableTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public function importsTableDataFromSourceWithTruncate()
4343
);
4444

4545
$import = $factory->create();
46+
47+
$import->cleanTable($source->table);
4648
$import->importTable($source->table, $this);
4749

4850
$connection = $this->containerWithConfig->configuration->createPDOConnection();
@@ -73,6 +75,7 @@ public function appendsExistingStoresWhenUpdateStrategyIsUsed()
7375
);
7476

7577
$import = $factory->create();
78+
$import->cleanTable($source->table);
7679
$import->importTable($source->table, $this);
7780

7881
$connection = $this->containerWithConfig->configuration->createPDOConnection();
@@ -105,6 +108,7 @@ public function notifiesOfTheImportPorgress()
105108
);
106109

107110
$import = $factory->create();
111+
$import->cleanTable($source->table);
108112
$import->importTable($source->table, $this);
109113

110114
$this->assertEquals(
@@ -138,6 +142,7 @@ public function batchesLongInputs()
138142
);
139143

140144
$import = $factory->create();
145+
$import->cleanTable($source->table);
141146
$import->importTable($source->table, $this);
142147

143148
$connection = $this->containerWithConfig->configuration->createPDOConnection();

tests/Import/WorkerImportTableTest.php

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public function importsDataFromSource()
3838
$service = WorkerImportServiceFactory::fromConfiguration($container->configuration, $this->path)
3939
->create();
4040

41+
$service->cleanTable(TableEntry::fromName('store'));
4142
$service->importTable(TableEntry::fromName('store')->withRows(2), $this);
4243
$service->await();
4344

0 commit comments

Comments
 (0)