From 1b3108961bf8f9ad1936766f3188bc25078e2d71 Mon Sep 17 00:00:00 2001 From: Evgeny Grigorenko Date: Thu, 23 Jun 2022 13:09:33 +0000 Subject: [PATCH 1/8] Add batch_size to analyze_iterator --- .../presidio_analyzer/batch_analyzer_engine.py | 8 +++++--- .../presidio_analyzer/nlp_engine/nlp_engine.py | 2 +- .../presidio_analyzer/nlp_engine/spacy_nlp_engine.py | 10 ++++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index 5245b707c..031d814df 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -1,7 +1,7 @@ import logging -from typing import List, Iterable, Dict, Union, Any, Optional, Iterator, Tuple +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union -from presidio_analyzer import DictAnalyzerResult, RecognizerResult, AnalyzerEngine +from presidio_analyzer import AnalyzerEngine, DictAnalyzerResult, RecognizerResult from presidio_analyzer.nlp_engine import NlpArtifacts logger = logging.getLogger("presidio-analyzer") @@ -28,6 +28,7 @@ def analyze_iterator( self, texts: Iterable[Union[str, bool, float, int]], language: str, + batch_size: int = None, **kwargs, ) -> List[List[RecognizerResult]]: """ @@ -35,6 +36,7 @@ def analyze_iterator( :param texts: An list containing strings to be analyzed. :param language: Input language + :param batch_size: Batch size to process in a single iteration :param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method. """ @@ -45,7 +47,7 @@ def analyze_iterator( nlp_artifacts_batch: Iterator[ Tuple[str, NlpArtifacts] ] = self.analyzer_engine.nlp_engine.process_batch( - texts=texts, language=language + texts=texts, language=language, batch_size=batch_size ) list_results = [] diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index 14262e267..dc89c210a 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -18,7 +18,7 @@ def process_text(self, text: str, language: str) -> NlpArtifacts: @abstractmethod def process_batch( - self, texts: Iterable[str], language: str, **kwargs + self, texts: Iterable[str], language: str, batch_size: int = None, **kwargs ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py index 7bd3c4a99..3c33660c0 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py @@ -1,12 +1,11 @@ import logging -from typing import Optional, Dict, Iterator, Tuple, Union, List +from typing import Dict, Iterator, List, Optional, Tuple, Union import spacy +from presidio_analyzer.nlp_engine import NlpArtifacts, NlpEngine from spacy.language import Language from spacy.tokens import Doc -from presidio_analyzer.nlp_engine import NlpArtifacts, NlpEngine - logger = logging.getLogger("presidio-analyzer") @@ -48,11 +47,14 @@ def process_batch( self, texts: Union[List[str], List[Tuple[str, object]]], language: str, + batch_size: int = None, as_tuples: bool = False, ) -> Iterator[Optional[NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts using spacy pipe.""" texts = (str(text) for text in texts) - docs = self.nlp[language].pipe(texts, as_tuples=as_tuples) + docs = self.nlp[language].pipe( + texts, as_tuples=as_tuples, batch_size=batch_size + ) for doc in docs: yield doc.text, self._doc_to_nlp_artifact(doc, language) From 590769a455de3017d82efc5c7acbb6d50b2589f9 Mon Sep 17 00:00:00 2001 From: Evgeny Grigorenko Date: Thu, 23 Jun 2022 13:13:02 +0000 Subject: [PATCH 2/8] Add batch_size to tests --- presidio-analyzer/tests/test_batch_analyzer_engine.py | 2 +- presidio-analyzer/tests/test_spacy_nlp_engine.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/presidio-analyzer/tests/test_batch_analyzer_engine.py b/presidio-analyzer/tests/test_batch_analyzer_engine.py index 6469c82cf..a694673a4 100644 --- a/presidio-analyzer/tests/test_batch_analyzer_engine.py +++ b/presidio-analyzer/tests/test_batch_analyzer_engine.py @@ -34,7 +34,7 @@ def test_analyze_iterator_returns_list_of_recognizer_results( texts, expected_output, batch_analyzer_engine_simple ): - results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en") + results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2) assert len(results) == len(expected_output) for result, expected_result in zip(results, expected_output): diff --git a/presidio-analyzer/tests/test_spacy_nlp_engine.py b/presidio-analyzer/tests/test_spacy_nlp_engine.py index 313405da5..6b4e3d9bd 100644 --- a/presidio-analyzer/tests/test_spacy_nlp_engine.py +++ b/presidio-analyzer/tests/test_spacy_nlp_engine.py @@ -13,7 +13,7 @@ def test_simple_process_text(nlp_engine): def test_process_batch_strings(nlp_engine): nlp_artifacts_batch = nlp_engine.process_batch( - ["simple text", "simple text"], language="en" + ["simple text", "simple text"], language="en", batch_size=1 ) assert isinstance(nlp_artifacts_batch, Iterator) nlp_artifacts_batch = list(nlp_artifacts_batch) From dc380ea6f91b6aeb43488e8207260d666fe277bc Mon Sep 17 00:00:00 2001 From: Evgeny Grigorenko Date: Mon, 27 Jun 2022 09:57:43 +0000 Subject: [PATCH 3/8] Make batch_size optional --- presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py | 2 +- presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py | 4 ++-- .../presidio_analyzer/nlp_engine/spacy_nlp_engine.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index 8e7d4f57b..4a75fb0c4 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -28,7 +28,7 @@ def analyze_iterator( self, texts: Iterable[Union[str, bool, float, int]], language: str, - batch_size: int = None, + batch_size: Optional[int] = None, **kwargs, ) -> List[List[RecognizerResult]]: """ diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index dc89c210a..8ab91c8a0 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Iterable, Iterator, Tuple +from typing import Iterable, Iterator, Optional, Tuple from presidio_analyzer.nlp_engine import NlpArtifacts @@ -18,7 +18,7 @@ def process_text(self, text: str, language: str) -> NlpArtifacts: @abstractmethod def process_batch( - self, texts: Iterable[str], language: str, batch_size: int = None, **kwargs + self, texts: Iterable[str], language: str, batch_size: Optional[int] = None, **kwargs ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py index 3c33660c0..f41c65028 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py @@ -47,7 +47,7 @@ def process_batch( self, texts: Union[List[str], List[Tuple[str, object]]], language: str, - batch_size: int = None, + batch_size: Optional[int] = None, as_tuples: bool = False, ) -> Iterator[Optional[NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts using spacy pipe.""" From 0957e8b19f1ca8243dc9e0d7f5d3102ec29c94fa Mon Sep 17 00:00:00 2001 From: Evgeny Grigorenko Date: Mon, 27 Jun 2022 10:50:44 +0000 Subject: [PATCH 4/8] Fix linting --- .../presidio_analyzer/nlp_engine/nlp_engine.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index 8ab91c8a0..9e9813ee4 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -18,7 +18,11 @@ def process_text(self, text: str, language: str) -> NlpArtifacts: @abstractmethod def process_batch( - self, texts: Iterable[str], language: str, batch_size: Optional[int] = None, **kwargs + self, + texts: Iterable[str], + language: str, + batch_size: Optional[int] = None, + **kwargs, ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. From e9c82613bb4e654fc6f9f5fad66789663ac651bf Mon Sep 17 00:00:00 2001 From: Evgeny Grigorenko Date: Thu, 28 Jul 2022 13:51:04 +0200 Subject: [PATCH 5/8] Update batch_size param description --- presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index 9d102d8f8..63a37ed9d 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -37,6 +37,7 @@ def analyze_iterator( :param texts: An list containing strings to be analyzed. :param language: Input language :param batch_size: Batch size to process in a single iteration + (default value depends on the nlp engine implementation) :param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method. """ From d98c69e9a4672fd9168a03298b6f0d8ce314df79 Mon Sep 17 00:00:00 2001 From: Omri Mendels Date: Mon, 3 Feb 2025 13:14:53 +0000 Subject: [PATCH 6/8] adding n_process argument and updating the notebook --- docs/samples/python/batch_processing.ipynb | 110 ++++++++++++++++-- .../batch_analyzer_engine.py | 15 ++- .../nlp_engine/nlp_engine.py | 4 +- .../nlp_engine/spacy_nlp_engine.py | 15 ++- .../tests/test_analyzer_engine_provider.py | 4 +- .../tests/test_batch_analyzer_engine.py | 9 +- 6 files changed, 133 insertions(+), 24 deletions(-) diff --git a/docs/samples/python/batch_processing.ipynb b/docs/samples/python/batch_processing.ipynb index d4bb5c795..ea2ee329e 100644 --- a/docs/samples/python/batch_processing.ipynb +++ b/docs/samples/python/batch_processing.ipynb @@ -10,8 +10,9 @@ "outputs": [], "source": [ "# download presidio\n", - "!pip install presidio_analyzer presidio_anonymizer\n", - "!python -m spacy download en_core_web_lg" + "#!pip install presidio_analyzer presidio_anonymizer\n", + "#!python -m spacy download en_core_web_lg\n", + "#!pip install pandas" ] }, { @@ -145,8 +146,8 @@ "" ], "text/plain": [ - " name phrase phone number phrase integer \n", - "0 Charlie likes this Please call 212-555-1234 after 2pm 1 \\\n", + " name phrase phone number phrase integer \\\n", + "0 Charlie likes this Please call 212-555-1234 after 2pm 1 \n", "1 You should talk to Mike his number is 978-428-7111 2 \n", "2 Mary had a little startup Phone number: 202-342-1234 3 \n", "\n", @@ -322,8 +323,8 @@ "" ], "text/plain": [ - " name phrase \n", - "0 likes this \\\n", + " name phrase \\\n", + "0 likes this \n", "1 You should talk to \n", "2 had a little startup \n", "\n", @@ -496,11 +497,104 @@ "\n", "Are not yet supported. Consider breaking the JSON to parts if needed." ] + }, + { + "cell_type": "markdown", + "id": "c708ff56", + "metadata": {}, + "source": [ + "## Multiprocessing\n", + "\n", + "`BatchAnalyzerEngine` builds upon spaCy's pipelines. For more info about multiprocessing, see https://spacy.io/usage/processing-pipelines#multiprocessing.\n", + "\n", + "In Presidio, one can pass the `n_process` argument and the `batch_size` parameter to define how processing is done in parallel." + ] + }, + { + "cell_type": "markdown", + "id": "81316c6c", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "09a80e87", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n" + ] + } + ], + "source": [ + "import multiprocessing\n", + "import psutil\n", + "import time\n", + "\n", + "def analyze_batch_multiprocess(n_process=12, batch_size=4):\n", + " \"\"\"Run BatchAnalyzer with `n_process` processes and batch size of `batch_size`.\"\"\"\n", + " list_of_texts = [\"My name is mike\"]*1000\n", + "\n", + " results = batch_analyzer.analyze_iterator(\n", + " texts=list_of_texts, \n", + " language=\"en\",\n", + " n_process=n_process, \n", + " batch_size=batch_size\n", + " )\n", + "\n", + " return list(results)\n", + "\n", + "\n", + "\n", + "def monitor_processes():\n", + " \"\"\"Monitor all Python processes dynamically.\"\"\"\n", + " while True:\n", + " processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) if \"python\" in p.info['name']]\n", + " print(f\"[Monitor] Active Python processes: {len(processes)} - {[p.info['pid'] for p in processes]}\")\n", + " time.sleep(1)\n", + "\n", + "\n", + "# Run interactive monitoring\n", + "monitor_proc = multiprocessing.Process(target=monitor_processes, daemon=True)\n", + "monitor_proc.start()\n", + "\n", + "# Run the batch analyzer process\n", + "analyze_batch_multiprocess(n_process=4, batch_size=2)\n", + "\n", + "# Wait for everything to conclude\n", + "time.sleep(1) \n", + "\n", + "# Clean up (not needed if daemon=True, but useful if stopping manually)\n", + "monitor_proc.terminate()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b7b6c64", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "presidio-analyzer-sAyh6tzK-py3.12", "language": "python", "name": "python3" }, @@ -514,7 +608,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.5" + "version": "3.12.1" } }, "nbformat": 4, diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index 5072de09b..542e4354e 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -27,7 +27,8 @@ def analyze_iterator( self, texts: Iterable[Union[str, bool, float, int]], language: str, - batch_size: Optional[int] = None, + batch_size: Optional[int] = 1, + n_process: Optional[int] = 1, **kwargs, ) -> List[List[RecognizerResult]]: """ @@ -36,6 +37,7 @@ def analyze_iterator( :param texts: An list containing strings to be analyzed. :param language: Input language :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` :param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method. (default value depends on the nlp engine implementation) """ @@ -44,10 +46,13 @@ def analyze_iterator( texts = self._validate_types(texts) # Process the texts as batch for improved performance - nlp_artifacts_batch: Iterator[ - Tuple[str, NlpArtifacts] - ] = self.analyzer_engine.nlp_engine.process_batch( - texts=texts, language=language, batch_size=batch_size + nlp_artifacts_batch: Iterator[Tuple[str, NlpArtifacts]] = ( + self.analyzer_engine.nlp_engine.process_batch( + texts=texts, + language=language, + batch_size=batch_size, + n_process=n_process, + ) ) list_results = [] diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index 33664566c..892506b0f 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod - from typing import Iterable, Iterator, List, Optional, Tuple from presidio_analyzer.nlp_engine import NlpArtifacts @@ -30,7 +29,8 @@ def process_batch( self, texts: Iterable[str], language: str, - batch_size: Optional[int] = None, + batch_size: Optional[int] = 1, + n_process: Optional[int] = 1, **kwargs, # noqa ANN003 ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py index c1b498d6d..bfd58397a 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py @@ -1,14 +1,16 @@ import logging - from pathlib import Path from typing import Dict, Iterator, List, Optional, Tuple, Union import spacy -from presidio_analyzer.nlp_engine import NlpArtifacts, NlpEngine from spacy.language import Language from spacy.tokens import Doc, Span -from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts, NlpEngine +from presidio_analyzer.nlp_engine import ( + NerModelConfiguration, + NlpArtifacts, + NlpEngine, +) logger = logging.getLogger("presidio-analyzer") @@ -113,13 +115,16 @@ def process_batch( self, texts: Union[List[str], List[Tuple[str, object]]], language: str, - batch_size: Optional[int] = None, + batch_size: Optional[int] = 1, + n_process: Optional[int] = 1, as_tuples: bool = False, ) -> Iterator[Optional[NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts using spacy pipe. :param texts: A list of texts to process. :param language: The language of the texts. + :param batch_size: Default batch size for pipe and evaluate. + :param n_process: Number of processors to process texts. :param as_tuples: If set to True, inputs should be a sequence of (text, context) tuples. Output will then be a sequence of (doc, context) tuples. Defaults to False. @@ -130,7 +135,7 @@ def process_batch( texts = (str(text) for text in texts) docs = self.nlp[language].pipe( - texts, as_tuples=as_tuples, batch_size=batch_size + texts, as_tuples=as_tuples, batch_size=batch_size, n_process=n_process ) for doc in docs: yield doc.text, self._doc_to_nlp_artifact(doc, language) diff --git a/presidio-analyzer/tests/test_analyzer_engine_provider.py b/presidio-analyzer/tests/test_analyzer_engine_provider.py index 042fe5f67..8ac60396e 100644 --- a/presidio-analyzer/tests/test_analyzer_engine_provider.py +++ b/presidio-analyzer/tests/test_analyzer_engine_provider.py @@ -5,9 +5,9 @@ from presidio_analyzer import AnalyzerEngineProvider, RecognizerResult from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpArtifacts -from presidio_analyzer.nlp_engine.transformers_nlp_engine import TransformersNlpEngine from presidio_analyzer.predefined_recognizers import AzureAILanguageRecognizer +import pytest def get_full_paths(analyzer_yaml, nlp_engine_yaml=None, recognizer_registry_yaml=None): this_path = Path(__file__).parent.absolute() @@ -155,7 +155,7 @@ def test_analyzer_engine_provider_with_files_per_provider(): assert len(recognizer_registry.recognizers) == 6 assert recognizer_registry.supported_languages == ["en", "es"] - +@pytest.mark.skipif(pytest.importorskip("azure"), reason="Optional dependency not installed") # noqa: E501 def test_analyzer_engine_provider_with_azure_ai_language(): analyzer_yaml, _, _ = get_full_paths( "conf/test_azure_ai_language_reco.yaml", diff --git a/presidio-analyzer/tests/test_batch_analyzer_engine.py b/presidio-analyzer/tests/test_batch_analyzer_engine.py index fdde1bbb2..6a1672268 100644 --- a/presidio-analyzer/tests/test_batch_analyzer_engine.py +++ b/presidio-analyzer/tests/test_batch_analyzer_engine.py @@ -267,13 +267,18 @@ def test_analyze_dict_with_nones_returns_empty_result(batch_analyzer_engine_simp for r in res: assert not r + +@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)]) def test_batch_analyze_iterator_returns_list_of_recognizer_results( - batch_analyzer_engine_simple + batch_analyzer_engine_simple, n_process, batch_size ): texts = ["My name is David", "Call me at 2352351232", "I was born at 1/5/1922"] expected_output = [[], [RecognizerResult(entity_type="PHONE_NUMBER", start=11, end=21, score= 0.4)], []] - results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2) + results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, + language="en", + batch_size=batch_size, + n_process=n_process) assert len(results) == len(expected_output) for result, expected_result in zip(results, expected_output): From bfc09c35dab1a7b300e7d93170701f602bef4de1 Mon Sep 17 00:00:00 2001 From: Omri Name <3776619+omri374@users.noreply.github.com> Date: Mon, 3 Feb 2025 21:09:57 +0000 Subject: [PATCH 7/8] added more tests and support in structured --- .../batch_analyzer_engine.py | 7 ++++++ .../tests/test_batch_analyzer_engine.py | 13 +++++++---- .../presidio_structured/analysis_builder.py | 22 ++++++++++++++++--- .../tests/test_analysis_builder.py | 13 +++++++++++ 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index 542e4354e..e3b01350b 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -70,6 +70,8 @@ def analyze_dict( input_dict: Dict[str, Union[Any, Iterable[Any]]], language: str, keys_to_skip: Optional[List[str]] = None, + batch_size: Optional[int] = 1, + n_process: Optional[int] = 1, **kwargs, ) -> Iterator[DictAnalyzerResult]: """ @@ -80,6 +82,9 @@ def analyze_dict( :param input_dict: The input dictionary for analysis :param language: Input language :param keys_to_skip: Keys to ignore during analysis + :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` + :param kwargs: Additional keyword arguments for the `AnalyzerEngine.analyze` method. Use this to pass arguments to the analyze method, @@ -124,6 +129,8 @@ def analyze_dict( texts=value, language=language, context=specific_context, + n_process=n_process, + batch_size=batch_size, **kwargs, ) else: diff --git a/presidio-analyzer/tests/test_batch_analyzer_engine.py b/presidio-analyzer/tests/test_batch_analyzer_engine.py index 6a1672268..07acdaf94 100644 --- a/presidio-analyzer/tests/test_batch_analyzer_engine.py +++ b/presidio-analyzer/tests/test_batch_analyzer_engine.py @@ -41,8 +41,10 @@ def test_analyze_iterator_returns_list_of_recognizer_results( assert result == expected_result # fmt: on - -def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple): +@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)]) +def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple, + n_process, + batch_size): d = { "url": "https://microsoft.com", @@ -51,7 +53,10 @@ def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple): "misc": "microsoft.com or (202)-555-1234", } - results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, language="en") + results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, + language="en", + n_process=n_process, + batch_size=batch_size) results = list(results) # url @@ -282,4 +287,4 @@ def test_batch_analyze_iterator_returns_list_of_recognizer_results( assert len(results) == len(expected_output) for result, expected_result in zip(results, expected_output): - assert result == expected_result \ No newline at end of file + assert result == expected_result diff --git a/presidio-structured/presidio_structured/analysis_builder.py b/presidio-structured/presidio_structured/analysis_builder.py index 6be47c515..b0396e24d 100644 --- a/presidio-structured/presidio_structured/analysis_builder.py +++ b/presidio-structured/presidio_structured/analysis_builder.py @@ -26,8 +26,16 @@ def __init__( self, analyzer: Optional[AnalyzerEngine] = None, analyzer_score_threshold: Optional[float] = None, + n_process: Optional[int] = 1, + batch_size: Optional[int] = 1 ) -> None: - """Initialize the configuration generator.""" + """Initialize the configuration generator. + + :param analyzer: AnalyzerEngine instance + :param analyzer_score_threshold: threshold for filtering out results + :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` + """ default_score_threshold = ( analyzer_score_threshold if analyzer_score_threshold is not None else 0 ) @@ -37,6 +45,8 @@ def __init__( else analyzer ) self.batch_analyzer = BatchAnalyzerEngine(analyzer_engine=self.analyzer) + self.n_process = n_process + self.batch_size = batch_size @abstractmethod def generate_analysis( @@ -92,7 +102,10 @@ def generate_analysis( """ logger.debug("Starting JSON BatchAnalyzer analysis") analyzer_results = self.batch_analyzer.analyze_dict( - input_dict=data, language=language + input_dict=data, + language=language, + n_process=self.n_process, + batch_size=self.batch_size ) key_recognizer_result_map = self._generate_analysis_from_results_json( @@ -240,7 +253,10 @@ def _batch_analyze_df( for column in df.columns: logger.debug(f"Finding most common PII entity for column {column}") analyzer_results = self.batch_analyzer.analyze_iterator( - [val for val in df[column]], language=language + [val for val in df[column]], + language=language, + n_process=self.n_process, + batch_size=self.batch_size ) column_analyzer_results_map[column] = analyzer_results diff --git a/presidio-structured/tests/test_analysis_builder.py b/presidio-structured/tests/test_analysis_builder.py index 99a0e3e59..1b97ebe32 100644 --- a/presidio-structured/tests/test_analysis_builder.py +++ b/presidio-structured/tests/test_analysis_builder.py @@ -112,6 +112,19 @@ def test_analysis_tabular_when_default_threshold_is_zero_then_all_results_pass( assert len(structured_analysis.entity_mapping) == 3 +def test_analysis_tabular_when_multiprocess_then_results_are_correct( + sample_df, +): + analyzer_engine = AnalyzerEngine(default_score_threshold=0) + tabular_analysis_builder = PandasAnalysisBuilder(analyzer_engine, + n_process=4, + batch_size=2) + structured_analysis = tabular_analysis_builder.generate_analysis(sample_df) + + assert len(structured_analysis.entity_mapping) == 3 + + + def test_generate_analysis_json(json_analysis_builder, sample_json): structured_analysis = json_analysis_builder.generate_analysis(sample_json) From 64aaf1db485109fa60695489d2487403b777c2f2 Mon Sep 17 00:00:00 2001 From: omri374 <3776619+omri374@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:07:02 +0000 Subject: [PATCH 8/8] removed optional from default int value --- .../presidio_analyzer/batch_analyzer_engine.py | 8 ++++---- .../presidio_analyzer/nlp_engine/nlp_engine.py | 6 +++--- .../nlp_engine/spacy_nlp_engine.py | 4 ++-- .../presidio_structured/analysis_builder.py | 14 +++++++------- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index e3b01350b..805d2a7a4 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -27,8 +27,8 @@ def analyze_iterator( self, texts: Iterable[Union[str, bool, float, int]], language: str, - batch_size: Optional[int] = 1, - n_process: Optional[int] = 1, + batch_size: int = 1, + n_process: int = 1, **kwargs, ) -> List[List[RecognizerResult]]: """ @@ -70,8 +70,8 @@ def analyze_dict( input_dict: Dict[str, Union[Any, Iterable[Any]]], language: str, keys_to_skip: Optional[List[str]] = None, - batch_size: Optional[int] = 1, - n_process: Optional[int] = 1, + batch_size: int = 1, + n_process: int = 1, **kwargs, ) -> Iterator[DictAnalyzerResult]: """ diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index 892506b0f..a916c732d 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Iterable, Iterator, List, Optional, Tuple +from typing import Iterable, Iterator, List, Tuple from presidio_analyzer.nlp_engine import NlpArtifacts @@ -29,8 +29,8 @@ def process_batch( self, texts: Iterable[str], language: str, - batch_size: Optional[int] = 1, - n_process: Optional[int] = 1, + batch_size: int = 1, + n_process: int = 1, **kwargs, # noqa ANN003 ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py index bfd58397a..2ecf6a299 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py @@ -115,8 +115,8 @@ def process_batch( self, texts: Union[List[str], List[Tuple[str, object]]], language: str, - batch_size: Optional[int] = 1, - n_process: Optional[int] = 1, + batch_size: int = 1, + n_process: int = 1, as_tuples: bool = False, ) -> Iterator[Optional[NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts using spacy pipe. diff --git a/presidio-structured/presidio_structured/analysis_builder.py b/presidio-structured/presidio_structured/analysis_builder.py index b0396e24d..9fe3d0749 100644 --- a/presidio-structured/presidio_structured/analysis_builder.py +++ b/presidio-structured/presidio_structured/analysis_builder.py @@ -26,11 +26,11 @@ def __init__( self, analyzer: Optional[AnalyzerEngine] = None, analyzer_score_threshold: Optional[float] = None, - n_process: Optional[int] = 1, - batch_size: Optional[int] = 1 + n_process: int = 1, + batch_size: int = 1 ) -> None: """Initialize the configuration generator. - + :param analyzer: AnalyzerEngine instance :param analyzer_score_threshold: threshold for filtering out results :param batch_size: Batch size to process in a single iteration @@ -102,7 +102,7 @@ def generate_analysis( """ logger.debug("Starting JSON BatchAnalyzer analysis") analyzer_results = self.batch_analyzer.analyze_dict( - input_dict=data, + input_dict=data, language=language, n_process=self.n_process, batch_size=self.batch_size @@ -253,9 +253,9 @@ def _batch_analyze_df( for column in df.columns: logger.debug(f"Finding most common PII entity for column {column}") analyzer_results = self.batch_analyzer.analyze_iterator( - [val for val in df[column]], - language=language, - n_process=self.n_process, + [val for val in df[column]], + language=language, + n_process=self.n_process, batch_size=self.batch_size ) column_analyzer_results_map[column] = analyzer_results