diff --git a/docs/samples/python/batch_processing.ipynb b/docs/samples/python/batch_processing.ipynb index d4bb5c795..ea2ee329e 100644 --- a/docs/samples/python/batch_processing.ipynb +++ b/docs/samples/python/batch_processing.ipynb @@ -10,8 +10,9 @@ "outputs": [], "source": [ "# download presidio\n", - "!pip install presidio_analyzer presidio_anonymizer\n", - "!python -m spacy download en_core_web_lg" + "#!pip install presidio_analyzer presidio_anonymizer\n", + "#!python -m spacy download en_core_web_lg\n", + "#!pip install pandas" ] }, { @@ -145,8 +146,8 @@ "" ], "text/plain": [ - " name phrase phone number phrase integer \n", - "0 Charlie likes this Please call 212-555-1234 after 2pm 1 \\\n", + " name phrase phone number phrase integer \\\n", + "0 Charlie likes this Please call 212-555-1234 after 2pm 1 \n", "1 You should talk to Mike his number is 978-428-7111 2 \n", "2 Mary had a little startup Phone number: 202-342-1234 3 \n", "\n", @@ -322,8 +323,8 @@ "" ], "text/plain": [ - " name phrase \n", - "0 likes this \\\n", + " name phrase \\\n", + "0 likes this \n", "1 You should talk to \n", "2 had a little startup \n", "\n", @@ -496,11 +497,104 @@ "\n", "Are not yet supported. Consider breaking the JSON to parts if needed." ] + }, + { + "cell_type": "markdown", + "id": "c708ff56", + "metadata": {}, + "source": [ + "## Multiprocessing\n", + "\n", + "`BatchAnalyzerEngine` builds upon spaCy's pipelines. For more info about multiprocessing, see https://spacy.io/usage/processing-pipelines#multiprocessing.\n", + "\n", + "In Presidio, one can pass the `n_process` argument and the `batch_size` parameter to define how processing is done in parallel." + ] + }, + { + "cell_type": "markdown", + "id": "81316c6c", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "09a80e87", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n", + "[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n" + ] + } + ], + "source": [ + "import multiprocessing\n", + "import psutil\n", + "import time\n", + "\n", + "def analyze_batch_multiprocess(n_process=12, batch_size=4):\n", + " \"\"\"Run BatchAnalyzer with `n_process` processes and batch size of `batch_size`.\"\"\"\n", + " list_of_texts = [\"My name is mike\"]*1000\n", + "\n", + " results = batch_analyzer.analyze_iterator(\n", + " texts=list_of_texts, \n", + " language=\"en\",\n", + " n_process=n_process, \n", + " batch_size=batch_size\n", + " )\n", + "\n", + " return list(results)\n", + "\n", + "\n", + "\n", + "def monitor_processes():\n", + " \"\"\"Monitor all Python processes dynamically.\"\"\"\n", + " while True:\n", + " processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) if \"python\" in p.info['name']]\n", + " print(f\"[Monitor] Active Python processes: {len(processes)} - {[p.info['pid'] for p in processes]}\")\n", + " time.sleep(1)\n", + "\n", + "\n", + "# Run interactive monitoring\n", + "monitor_proc = multiprocessing.Process(target=monitor_processes, daemon=True)\n", + "monitor_proc.start()\n", + "\n", + "# Run the batch analyzer process\n", + "analyze_batch_multiprocess(n_process=4, batch_size=2)\n", + "\n", + "# Wait for everything to conclude\n", + "time.sleep(1) \n", + "\n", + "# Clean up (not needed if daemon=True, but useful if stopping manually)\n", + "monitor_proc.terminate()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b7b6c64", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "presidio-analyzer-sAyh6tzK-py3.12", "language": "python", "name": "python3" }, @@ -514,7 +608,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.5" + "version": "3.12.1" } }, "nbformat": 4, diff --git a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py index aba6b9096..805d2a7a4 100644 --- a/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py +++ b/presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py @@ -27,7 +27,8 @@ def analyze_iterator( self, texts: Iterable[Union[str, bool, float, int]], language: str, - batch_size: Optional[int] = None, + batch_size: int = 1, + n_process: int = 1, **kwargs, ) -> List[List[RecognizerResult]]: """ @@ -36,6 +37,7 @@ def analyze_iterator( :param texts: An list containing strings to be analyzed. :param language: Input language :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` :param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method. (default value depends on the nlp engine implementation) """ @@ -46,7 +48,10 @@ def analyze_iterator( # Process the texts as batch for improved performance nlp_artifacts_batch: Iterator[Tuple[str, NlpArtifacts]] = ( self.analyzer_engine.nlp_engine.process_batch( - texts=texts, language=language, batch_size=batch_size + texts=texts, + language=language, + batch_size=batch_size, + n_process=n_process, ) ) @@ -65,6 +70,8 @@ def analyze_dict( input_dict: Dict[str, Union[Any, Iterable[Any]]], language: str, keys_to_skip: Optional[List[str]] = None, + batch_size: int = 1, + n_process: int = 1, **kwargs, ) -> Iterator[DictAnalyzerResult]: """ @@ -75,6 +82,9 @@ def analyze_dict( :param input_dict: The input dictionary for analysis :param language: Input language :param keys_to_skip: Keys to ignore during analysis + :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` + :param kwargs: Additional keyword arguments for the `AnalyzerEngine.analyze` method. Use this to pass arguments to the analyze method, @@ -119,6 +129,8 @@ def analyze_dict( texts=value, language=language, context=specific_context, + n_process=n_process, + batch_size=batch_size, **kwargs, ) else: diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py index a4d248824..a916c732d 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Iterable, Iterator, List, Optional, Tuple +from typing import Iterable, Iterator, List, Tuple from presidio_analyzer.nlp_engine import NlpArtifacts @@ -29,7 +29,8 @@ def process_batch( self, texts: Iterable[str], language: str, - batch_size: Optional[int] = None, + batch_size: int = 1, + n_process: int = 1, **kwargs, # noqa ANN003 ) -> Iterator[Tuple[str, NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts. diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py index f844454ea..2ecf6a299 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py @@ -6,7 +6,11 @@ from spacy.language import Language from spacy.tokens import Doc, Span -from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts, NlpEngine +from presidio_analyzer.nlp_engine import ( + NerModelConfiguration, + NlpArtifacts, + NlpEngine, +) logger = logging.getLogger("presidio-analyzer") @@ -111,13 +115,16 @@ def process_batch( self, texts: Union[List[str], List[Tuple[str, object]]], language: str, - batch_size: Optional[int] = None, + batch_size: int = 1, + n_process: int = 1, as_tuples: bool = False, ) -> Iterator[Optional[NlpArtifacts]]: """Execute the NLP pipeline on a batch of texts using spacy pipe. :param texts: A list of texts to process. :param language: The language of the texts. + :param batch_size: Default batch size for pipe and evaluate. + :param n_process: Number of processors to process texts. :param as_tuples: If set to True, inputs should be a sequence of (text, context) tuples. Output will then be a sequence of (doc, context) tuples. Defaults to False. @@ -128,7 +135,7 @@ def process_batch( texts = (str(text) for text in texts) docs = self.nlp[language].pipe( - texts, as_tuples=as_tuples, batch_size=batch_size + texts, as_tuples=as_tuples, batch_size=batch_size, n_process=n_process ) for doc in docs: yield doc.text, self._doc_to_nlp_artifact(doc, language) diff --git a/presidio-analyzer/tests/test_analyzer_engine_provider.py b/presidio-analyzer/tests/test_analyzer_engine_provider.py index 042fe5f67..8ac60396e 100644 --- a/presidio-analyzer/tests/test_analyzer_engine_provider.py +++ b/presidio-analyzer/tests/test_analyzer_engine_provider.py @@ -5,9 +5,9 @@ from presidio_analyzer import AnalyzerEngineProvider, RecognizerResult from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpArtifacts -from presidio_analyzer.nlp_engine.transformers_nlp_engine import TransformersNlpEngine from presidio_analyzer.predefined_recognizers import AzureAILanguageRecognizer +import pytest def get_full_paths(analyzer_yaml, nlp_engine_yaml=None, recognizer_registry_yaml=None): this_path = Path(__file__).parent.absolute() @@ -155,7 +155,7 @@ def test_analyzer_engine_provider_with_files_per_provider(): assert len(recognizer_registry.recognizers) == 6 assert recognizer_registry.supported_languages == ["en", "es"] - +@pytest.mark.skipif(pytest.importorskip("azure"), reason="Optional dependency not installed") # noqa: E501 def test_analyzer_engine_provider_with_azure_ai_language(): analyzer_yaml, _, _ = get_full_paths( "conf/test_azure_ai_language_reco.yaml", diff --git a/presidio-analyzer/tests/test_batch_analyzer_engine.py b/presidio-analyzer/tests/test_batch_analyzer_engine.py index 0945353fc..07acdaf94 100644 --- a/presidio-analyzer/tests/test_batch_analyzer_engine.py +++ b/presidio-analyzer/tests/test_batch_analyzer_engine.py @@ -34,15 +34,17 @@ def test_analyze_iterator_returns_list_of_recognizer_results( texts, expected_output, batch_analyzer_engine_simple ): - results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en") + results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2) assert len(results) == len(expected_output) for result, expected_result in zip(results, expected_output): assert result == expected_result # fmt: on - -def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple): +@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)]) +def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple, + n_process, + batch_size): d = { "url": "https://microsoft.com", @@ -51,7 +53,10 @@ def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple): "misc": "microsoft.com or (202)-555-1234", } - results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, language="en") + results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, + language="en", + n_process=n_process, + batch_size=batch_size) results = list(results) # url @@ -267,14 +272,19 @@ def test_analyze_dict_with_nones_returns_empty_result(batch_analyzer_engine_simp for r in res: assert not r + +@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)]) def test_batch_analyze_iterator_returns_list_of_recognizer_results( - batch_analyzer_engine_simple + batch_analyzer_engine_simple, n_process, batch_size ): texts = ["My name is David", "Call me at 2352351232", "I was born at 1/5/1922"] expected_output = [[], [RecognizerResult(entity_type="PHONE_NUMBER", start=11, end=21, score= 0.4)], []] - results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2) + results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, + language="en", + batch_size=batch_size, + n_process=n_process) assert len(results) == len(expected_output) for result, expected_result in zip(results, expected_output): - assert result == expected_result \ No newline at end of file + assert result == expected_result diff --git a/presidio-analyzer/tests/test_spacy_nlp_engine.py b/presidio-analyzer/tests/test_spacy_nlp_engine.py index e930e244f..a63d12bd4 100644 --- a/presidio-analyzer/tests/test_spacy_nlp_engine.py +++ b/presidio-analyzer/tests/test_spacy_nlp_engine.py @@ -21,6 +21,7 @@ def test_simple_process_text(spacy_nlp_engine): assert nlp_artifacts.lemmas[1] == "text" + def test_process_batch_strings(spacy_nlp_engine): nlp_artifacts_batch = spacy_nlp_engine.process_batch( ["simple text", "simple text"], language="en" diff --git a/presidio-structured/presidio_structured/analysis_builder.py b/presidio-structured/presidio_structured/analysis_builder.py index 6be47c515..9fe3d0749 100644 --- a/presidio-structured/presidio_structured/analysis_builder.py +++ b/presidio-structured/presidio_structured/analysis_builder.py @@ -26,8 +26,16 @@ def __init__( self, analyzer: Optional[AnalyzerEngine] = None, analyzer_score_threshold: Optional[float] = None, + n_process: int = 1, + batch_size: int = 1 ) -> None: - """Initialize the configuration generator.""" + """Initialize the configuration generator. + + :param analyzer: AnalyzerEngine instance + :param analyzer_score_threshold: threshold for filtering out results + :param batch_size: Batch size to process in a single iteration + :param n_process: Number of processors to use. Defaults to `1` + """ default_score_threshold = ( analyzer_score_threshold if analyzer_score_threshold is not None else 0 ) @@ -37,6 +45,8 @@ def __init__( else analyzer ) self.batch_analyzer = BatchAnalyzerEngine(analyzer_engine=self.analyzer) + self.n_process = n_process + self.batch_size = batch_size @abstractmethod def generate_analysis( @@ -92,7 +102,10 @@ def generate_analysis( """ logger.debug("Starting JSON BatchAnalyzer analysis") analyzer_results = self.batch_analyzer.analyze_dict( - input_dict=data, language=language + input_dict=data, + language=language, + n_process=self.n_process, + batch_size=self.batch_size ) key_recognizer_result_map = self._generate_analysis_from_results_json( @@ -240,7 +253,10 @@ def _batch_analyze_df( for column in df.columns: logger.debug(f"Finding most common PII entity for column {column}") analyzer_results = self.batch_analyzer.analyze_iterator( - [val for val in df[column]], language=language + [val for val in df[column]], + language=language, + n_process=self.n_process, + batch_size=self.batch_size ) column_analyzer_results_map[column] = analyzer_results diff --git a/presidio-structured/tests/test_analysis_builder.py b/presidio-structured/tests/test_analysis_builder.py index 99a0e3e59..1b97ebe32 100644 --- a/presidio-structured/tests/test_analysis_builder.py +++ b/presidio-structured/tests/test_analysis_builder.py @@ -112,6 +112,19 @@ def test_analysis_tabular_when_default_threshold_is_zero_then_all_results_pass( assert len(structured_analysis.entity_mapping) == 3 +def test_analysis_tabular_when_multiprocess_then_results_are_correct( + sample_df, +): + analyzer_engine = AnalyzerEngine(default_score_threshold=0) + tabular_analysis_builder = PandasAnalysisBuilder(analyzer_engine, + n_process=4, + batch_size=2) + structured_analysis = tabular_analysis_builder.generate_analysis(sample_df) + + assert len(structured_analysis.entity_mapping) == 3 + + + def test_generate_analysis_json(json_analysis_builder, sample_json): structured_analysis = json_analysis_builder.generate_analysis(sample_json)