Skip to content

Commit 97633c9

Browse files
author
Benjamin Calef
committed
[0.1.0] handle idempotent and non-idempotent query + simplify parameters
1 parent 9a2b7c8 commit 97633c9

11 files changed

+83
-78
lines changed

Model/ForkedArrayProcessor.php

+2-4
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,20 @@ public function __construct(
3333
* @param callable $callback
3434
* @param int $pageSize
3535
* @param int $maxChildrenProcess
36-
* @param bool $isParallelize
3736
* @return void
3837
*/
3938
public function process(
4039
array $array,
4140
callable $callback,
4241
int $pageSize = 1000,
43-
int $maxChildrenProcess = 10,
44-
bool $isParallelize = true
42+
int $maxChildrenProcess = 10
4543
): void {
4644
/** @var ArrayWrapper $itemProvider */
4745
$itemProvider = $this->arrayWrapperFactory->create([
4846
'items' => $array,
4947
'pageSize' => $pageSize
5048
]);
5149

52-
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess, $isParallelize, false);
50+
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess);
5351
}
5452
}

Model/ForkedCollectionProcessor.php

+5-6
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,24 @@ public function __construct(
3434
* @param callable $callback
3535
* @param int $pageSize
3636
* @param int $maxChildrenProcess
37-
* @param bool $isParallelize
38-
* @param bool $isFallBackEnabled
37+
* @param bool $isIdempotent
3938
* @return void
4039
*/
4140
public function process(
4241
Collection $collection,
4342
callable $callback,
4443
int $pageSize = 1000,
4544
int $maxChildrenProcess = 10,
46-
bool $isParallelize = true,
47-
bool $isFallBackEnabled = false
45+
bool $isIdempotent = true
4846
): void {
4947
/** @var CollectionWrapper $itemProvider */
5048
$itemProvider = $this->collectionWrapperFactory->create([
5149
'collection' => $collection,
52-
'maxChildrenProcess' => $maxChildrenProcess,
5350
'pageSize' => $pageSize,
51+
'maxChildrenProcess' => $maxChildrenProcess,
52+
'isIdempotent' => $isIdempotent
5453
]);
5554

56-
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess, $isParallelize, $isFallBackEnabled);
55+
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess);
5756
}
5857
}

Model/ForkedSearchResultProcessor.php

+5-6
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ public function __construct(
3232
* @param callable $callback
3333
* @param int $pageSize
3434
* @param int $maxChildrenProcess
35-
* @param bool $isParallelize
36-
* @param bool $isFallBackEnabled
35+
* @param bool $isIdempotent
3736
* @return void
3837
*/
3938
public function process(
@@ -42,8 +41,7 @@ public function process(
4241
callable $callback,
4342
int $pageSize = 1000,
4443
int $maxChildrenProcess = 10,
45-
bool $isParallelize = true,
46-
bool $isFallBackEnabled = false
44+
bool $isIdempotent = true
4745
): void {
4846
if (!method_exists($repository, 'getList')) {
4947
throw new InvalidArgumentException('The repository class must have a method called "getList"');
@@ -54,9 +52,10 @@ public function process(
5452
'searchCriteria' => $searchCriteria,
5553
'repository' => $repository,
5654
'pageSize' => $pageSize,
57-
'maxChildrenProcess' => $maxChildrenProcess
55+
'maxChildrenProcess' => $maxChildrenProcess,
56+
'isIdempotent' => $isIdempotent
5857
]);
5958

60-
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess, $isParallelize, $isFallBackEnabled);
59+
$this->forkedProcessorRunner->run($itemProvider, $callback, $maxChildrenProcess);
6160
}
6261
}

Model/ItemProvider/ArrayWrapper.php

+8
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,12 @@ public function getItems(): array
6666

6767
return array_slice($this->items, $offset, $this->pageSize);
6868
}
69+
70+
/**
71+
* @inheirtDoc
72+
*/
73+
public function isIdempotent(): bool
74+
{
75+
return true;
76+
}
6977
}

Model/ItemProvider/CollectionWrapper.php

+20-3
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,25 @@ class CollectionWrapper implements ItemProviderInterface
1717
/** @var int */
1818
private $maxChildrenProcess;
1919

20+
/** @var bool */
21+
private $isIdempotent;
22+
2023
/**
2124
* @param Collection $collection
2225
* @param int $pageSize
2326
* @param int $maxChildrenProcess
27+
* @param bool $isIdempotent
2428
*/
25-
public function __construct(Collection $collection, int $pageSize, int $maxChildrenProcess)
26-
{
29+
public function __construct(
30+
Collection $collection,
31+
int $pageSize,
32+
int $maxChildrenProcess,
33+
bool $isIdempotent
34+
) {
2735
$this->collection = $collection;
2836
$this->pageSize = $pageSize;
2937
$this->maxChildrenProcess = $maxChildrenProcess;
38+
$this->isIdempotent = $maxChildrenProcess > 1 ? $isIdempotent : true;
3039
}
3140

3241
/**
@@ -35,7 +44,7 @@ public function __construct(Collection $collection, int $pageSize, int $maxChild
3544
public function setCurrentPage(int $currentPage): void
3645
{
3746
$this->collection->setPageSize($this->getPageSize());
38-
if ($this->maxChildrenProcess > 1) {
47+
if (!$this->isIdempotent()) {
3948
$moduloPage = $currentPage % $this->maxChildrenProcess;
4049
$currentPage = $moduloPage === 0 ? $this->maxChildrenProcess : $moduloPage;
4150
}
@@ -79,4 +88,12 @@ public function getItems(): array
7988

8089
return $this->collection->getItems();
8190
}
91+
92+
/**
93+
* @inheirtDoc
94+
*/
95+
public function isIdempotent(): bool
96+
{
97+
return $this->isIdempotent;
98+
}
8299
}

Model/ItemProvider/ItemProviderInterface.php

+5
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ public function getTotalPages(): int;
3131
* @return array
3232
*/
3333
public function getItems(): array;
34+
35+
/**
36+
* @return bool
37+
*/
38+
public function isIdemPotent(): bool;
3439
}

Model/ItemProvider/SearchResultWrapper.php

+16-2
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,28 @@ class SearchResultWrapper implements ItemProviderInterface
2121
/** @var int */
2222
private $maxChildrenProcess;
2323

24+
/** @var bool */
25+
private $isIdempotent;
26+
2427
/**
2528
* @param SearchCriteria $searchCriteria
2629
* @param $repository
2730
* @param int $pageSize
2831
* @param int $maxChildrenProcess
32+
* @param bool $isIdempotent
2933
*/
3034
public function __construct(
3135
SearchCriteria $searchCriteria,
3236
$repository,
3337
int $pageSize,
34-
int $maxChildrenProcess
38+
int $maxChildrenProcess,
39+
bool $isIdempotent
3540
) {
3641
$this->searchCriteria = $searchCriteria;
3742
$this->repository = $repository;
3843
$this->pageSize = $pageSize;
3944
$this->maxChildrenProcess = $maxChildrenProcess;
45+
$this->isIdempotent = $maxChildrenProcess > 1 ? $isIdempotent : true;
4046
}
4147

4248
/**
@@ -45,7 +51,7 @@ public function __construct(
4551
public function setCurrentPage(int $currentPage): void
4652
{
4753
$this->searchCriteria->setPageSize($this->getPageSize());
48-
if ($this->maxChildrenProcess > 1) {
54+
if (!$this->isIdempotent()) {
4955
$moduloPage = $currentPage % $this->maxChildrenProcess;
5056
$currentPage = $moduloPage === 0 ? $this->maxChildrenProcess : $moduloPage;
5157
}
@@ -87,6 +93,14 @@ public function getItems(): array
8793
return $this->getSearchResults()->getItems();
8894
}
8995

96+
/**
97+
* @inheirtDoc
98+
*/
99+
public function isIdempotent(): bool
100+
{
101+
return $this->isIdempotent;
102+
}
103+
90104
/**
91105
* @return SearchResultsInterface
92106
*/

Model/Processor/ForkedProcessor.php

+3-11
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ class ForkedProcessor
2222
/** @var int */
2323
private $maxChildrenProcess;
2424

25-
/** @var bool */
26-
private $isFallBackEnabled;
27-
2825
/** @var bool */
2926
private $running = true;
3027

@@ -33,22 +30,17 @@ class ForkedProcessor
3330
* @param ItemProviderInterface $itemProvider
3431
* @param callable $callback
3532
* @param int $maxChildrenProcess
36-
* @param bool $isParallelize
37-
* @param bool $isFallBackEnabled
3833
*/
3934
public function __construct(
4035
LoggerInterface $logger,
4136
ItemProviderInterface $itemProvider,
4237
callable $callback,
43-
int $maxChildrenProcess = 10,
44-
bool $isParallelize = true,
45-
bool $isFallBackEnabled = false
38+
int $maxChildrenProcess = 10
4639
) {
4740
$this->logger = $logger;
4841
$this->itemProvider = $itemProvider;
4942
$this->callback = $callback;
50-
$this->maxChildrenProcess = $isParallelize ? $maxChildrenProcess : 1;
51-
$this->isFallBackEnabled = $isFallBackEnabled;
43+
$this->maxChildrenProcess = $maxChildrenProcess;
5244
pcntl_signal(SIGINT, [$this, 'handleSigInt']);
5345
}
5446

@@ -157,7 +149,7 @@ private function handleMultipleChildProcesses(): void
157149
}
158150

159151
// Fallback based on database query
160-
if ($this->isFallBackEnabled) {
152+
if (!$this->itemProvider->isIdemPotent()) {
161153
$size = $this->itemProvider->getSize();
162154
$this->logger->info('Missing items from original query collection', ['total_items' => $size]);
163155
if ($size !== 0) {

Model/Processor/ForkedProcessorRunner.php

+1-7
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,18 @@ public function __construct(
2222
* @param ItemProviderInterface $itemProvider
2323
* @param callable $callback
2424
* @param int $maxChildrenProcess
25-
* @param bool $isParallelize
26-
* @param bool $isFallBackEnabled
2725
* @return void
2826
*/
2927
public function run(
3028
ItemProviderInterface $itemProvider,
3129
callable $callback,
32-
int $maxChildrenProcess,
33-
bool $isParallelize,
34-
bool $isFallBackEnabled
30+
int $maxChildrenProcess
3531
): void {
3632
/** @var $forkedProcessor ForkedProcessor */
3733
$forkedProcessor = $this->forkedProcessorFactory->create([
3834
'itemProvider' => $itemProvider,
3935
'callback' => $callback,
4036
'maxChildrenProcess' => $maxChildrenProcess,
41-
'isParallelize' => $isParallelize,
42-
'isFallBackEnabled' => $isFallBackEnabled
4337
]);
4438

4539
$forkedProcessor->process();

README.md

+17-38
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ class MyAwesomeClass
5151
$searchCriteria,
5252
$productRepository,
5353
$callback,
54-
$pageSize = 100,
54+
$pageSize = 1000,
5555
$maxChildrenProcess = 10,
56-
$isParallelize = true
56+
$isIdempotent = true
5757
);
5858
}
5959
```
@@ -86,9 +86,9 @@ class MyAwesomeClass
8686
$searchCriteria,
8787
$productRepository,
8888
$callback,
89-
$pageSize = 100,
89+
$pageSize = 1000,
9090
$maxChildrenProcess = 10,
91-
$isParallelize = true
91+
$isIdempotent = true
9292
);
9393
}
9494
```
@@ -119,8 +119,7 @@ class MyAwesomeClass
119119
$array,
120120
$callback,
121121
$pageSize = 2,
122-
$maxChildrenProcess = 2,
123-
$isParallelize = true
122+
$maxChildrenProcess = 2
124123
);
125124
}
126125
```
@@ -166,16 +165,18 @@ function specified by the user on each item of that page.
166165

167166
- `$maxChildrenProcess`: This parameter is used to set the maximum number of child
168167
processes that can be run simultaneously. This is used to control the number of threads
169-
that will be used by the multi-threading process.
170-
171-
- `$isParallelize`: This parameter is used to set whether the multi-threading process should run
172-
in parallel or sequentially. If set to true, the process will run in parallel,
173-
if set to false, the process will run sequentially. Running task sequentially may be useful when
174-
you want to keep the sort order of your items.
175-
176-
- `$isFallBackEnabled`: This parameter is set to false by default and can be used for `ForkedSearchResultProcessor`
177-
or `ForkedCollectionProcessor` it can be useful only if your initial query has a flag
178-
that will be processed in your callback method.
168+
that will be used by the multi-threading process. If set to 1, by definition you will have no parallelization,
169+
the parent process will wait the child process to finish before creating another one.
170+
171+
- `$isIdempotent`: This parameter is a flag set to `true` by default and can be used for `ForkedSearchResultProcessor`
172+
or `ForkedCollectionProcessor` when your `$maxChildrenProcess` is greater than one.
173+
While fetching data from database with `ForkedSearchResult` and `ForkedCollectionProcessor` you may change values
174+
queried: by modifying items on columns queried you will change the nature of the initial collection query and at the end,
175+
the OFFSET limit in the query will be invalid because the native pagination system expect the pagination to be
176+
processed by only one process. To avoid that, set `$isIdempotent` to `false`.<br>
177+
E.G.: In your collection query, you request all products `disabled`, in your callback method you `enable` and save
178+
them in database, then in this particular case you are modifying the column that you request in your collection,
179+
your query is not idempotent.
179180

180181
### Memory Limit
181182
This module allows to bypass the limitation of the memory limit, because the memory
@@ -188,28 +189,6 @@ system and adjust the parameters accordingly.
188189
### Limitations
189190
This module uses `pcntl_fork()` function which is not available on Windows.
190191

191-
There is limitation for Multi-threading pagination on `ForkedSearchResultProcessor` and `ForkedCollectionProcessor`:<br>
192-
Multi-threading pagination works only if your initial query has a flag that will be excluded
193-
from results on each pagination iteration.
194-
195-
For example:<br>
196-
You select all orders without a `gift_cards` value in your query, if you set a `gift_cards` value on your callback
197-
method and save it in database, then multi-threading and fallback system will work.
198-
Otherwise, children will go for the same items because your query doesn't flag a specific column modified
199-
by your callback function.<br>
200-
<b>So currently you need a flag to be sure to not proceed the same items</b>.
201-
202-
Workaround:<br>
203-
You can create your own flag by using a column like modified_at in your collection/searchCriteria query
204-
to request items with a modified_at value lower than the current timestamp, then you just have to update
205-
the modified_at column in your callback method and set the current timestamp.
206-
This way your query will fetch fresh items on each iteration.
207-
208-
I will provide support to handle this soon, sorry for the inconvenient.
209-
Until then if you want to process data, I recommend usage of single thread solution,
210-
or the `ForkedArrayProcessor` which is not impacted by pagination system.
211-
212-
213192
### Conclusion
214193
This module provides a useful tool for running commands or processing collections
215194
and search criteria in a multi-threaded way, making it a great solution for improving

composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "zepgram/module-multi-threading",
33
"description": "This module is a powerful tool for developers who want to process large data sets in a short amount of time",
44
"type": "magento2-module",
5-
"version": "0.0.3",
5+
"version": "0.1.0",
66
"authors": [
77
{
88
"name": "Benjamin Calef",

0 commit comments

Comments
 (0)