Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiprocessing parameters #1521

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 102 additions & 8 deletions docs/samples/python/batch_processing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
"outputs": [],
"source": [
"# download presidio\n",
"!pip install presidio_analyzer presidio_anonymizer\n",
"!python -m spacy download en_core_web_lg"
"#!pip install presidio_analyzer presidio_anonymizer\n",
"#!python -m spacy download en_core_web_lg\n",
"#!pip install pandas"
]
},
{
Expand Down Expand Up @@ -145,8 +146,8 @@
"</div>"
],
"text/plain": [
" name phrase phone number phrase integer \n",
"0 Charlie likes this Please call 212-555-1234 after 2pm 1 \\\n",
" name phrase phone number phrase integer \\\n",
"0 Charlie likes this Please call 212-555-1234 after 2pm 1 \n",
"1 You should talk to Mike his number is 978-428-7111 2 \n",
"2 Mary had a little startup Phone number: 202-342-1234 3 \n",
"\n",
Expand Down Expand Up @@ -322,8 +323,8 @@
"</div>"
],
"text/plain": [
" name phrase \n",
"0 <PERSON> likes this \\\n",
" name phrase \\\n",
"0 <PERSON> likes this \n",
"1 You should talk to <PERSON> \n",
"2 <PERSON> had a little startup \n",
"\n",
Expand Down Expand Up @@ -496,11 +497,104 @@
"\n",
"Are not yet supported. Consider breaking the JSON to parts if needed."
]
},
{
"cell_type": "markdown",
"id": "c708ff56",
"metadata": {},
"source": [
"## Multiprocessing\n",
"\n",
"`BatchAnalyzerEngine` builds upon spaCy's pipelines. For more info about multiprocessing, see https://spacy.io/usage/processing-pipelines#multiprocessing.\n",
"\n",
"In Presidio, one can pass the `n_process` argument and the `batch_size` parameter to define how processing is done in parallel."
]
},
{
"cell_type": "markdown",
"id": "81316c6c",
"metadata": {},
"source": []
},
{
"cell_type": "code",
"execution_count": 25,
"id": "09a80e87",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
"[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n"
]
}
],
"source": [
"import multiprocessing\n",
"import psutil\n",
"import time\n",
"\n",
"def analyze_batch_multiprocess(n_process=12, batch_size=4):\n",
" \"\"\"Run BatchAnalyzer with `n_process` processes and batch size of `batch_size`.\"\"\"\n",
" list_of_texts = [\"My name is mike\"]*1000\n",
"\n",
" results = batch_analyzer.analyze_iterator(\n",
" texts=list_of_texts, \n",
" language=\"en\",\n",
" n_process=n_process, \n",
" batch_size=batch_size\n",
" )\n",
"\n",
" return list(results)\n",
"\n",
"\n",
"\n",
"def monitor_processes():\n",
" \"\"\"Monitor all Python processes dynamically.\"\"\"\n",
" while True:\n",
" processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) if \"python\" in p.info['name']]\n",
" print(f\"[Monitor] Active Python processes: {len(processes)} - {[p.info['pid'] for p in processes]}\")\n",
" time.sleep(1)\n",
"\n",
"\n",
"# Run interactive monitoring\n",
"monitor_proc = multiprocessing.Process(target=monitor_processes, daemon=True)\n",
"monitor_proc.start()\n",
"\n",
"# Run the batch analyzer process\n",
"analyze_batch_multiprocess(n_process=4, batch_size=2)\n",
"\n",
"# Wait for everything to conclude\n",
"time.sleep(1) \n",
"\n",
"# Clean up (not needed if daemon=True, but useful if stopping manually)\n",
"monitor_proc.terminate()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7b7b6c64",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "presidio-analyzer-sAyh6tzK-py3.12",
"language": "python",
"name": "python3"
},
Expand All @@ -514,7 +608,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.5"
"version": "3.12.1"
}
},
"nbformat": 4,
Expand Down
16 changes: 14 additions & 2 deletions presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def analyze_iterator(
self,
texts: Iterable[Union[str, bool, float, int]],
language: str,
batch_size: Optional[int] = None,
batch_size: int = 1,
n_process: int = 1,
**kwargs,
) -> List[List[RecognizerResult]]:
"""
Expand All @@ -36,6 +37,7 @@ def analyze_iterator(
:param texts: An list containing strings to be analyzed.
:param language: Input language
:param batch_size: Batch size to process in a single iteration
:param n_process: Number of processors to use. Defaults to `1`
:param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method.
(default value depends on the nlp engine implementation)
"""
Expand All @@ -46,7 +48,10 @@ def analyze_iterator(
# Process the texts as batch for improved performance
nlp_artifacts_batch: Iterator[Tuple[str, NlpArtifacts]] = (
self.analyzer_engine.nlp_engine.process_batch(
texts=texts, language=language, batch_size=batch_size
texts=texts,
language=language,
batch_size=batch_size,
n_process=n_process,
)
)

Expand All @@ -65,6 +70,8 @@ def analyze_dict(
input_dict: Dict[str, Union[Any, Iterable[Any]]],
language: str,
keys_to_skip: Optional[List[str]] = None,
batch_size: int = 1,
n_process: int = 1,
**kwargs,
) -> Iterator[DictAnalyzerResult]:
"""
Expand All @@ -75,6 +82,9 @@ def analyze_dict(
:param input_dict: The input dictionary for analysis
:param language: Input language
:param keys_to_skip: Keys to ignore during analysis
:param batch_size: Batch size to process in a single iteration
:param n_process: Number of processors to use. Defaults to `1`

:param kwargs: Additional keyword arguments
for the `AnalyzerEngine.analyze` method.
Use this to pass arguments to the analyze method,
Expand Down Expand Up @@ -119,6 +129,8 @@ def analyze_dict(
texts=value,
language=language,
context=specific_context,
n_process=n_process,
batch_size=batch_size,
**kwargs,
)
else:
Expand Down
5 changes: 3 additions & 2 deletions presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Iterable, Iterator, List, Optional, Tuple
from typing import Iterable, Iterator, List, Tuple

from presidio_analyzer.nlp_engine import NlpArtifacts

Expand Down Expand Up @@ -29,7 +29,8 @@ def process_batch(
self,
texts: Iterable[str],
language: str,
batch_size: Optional[int] = None,
batch_size: int = 1,
n_process: int = 1,
**kwargs, # noqa ANN003
) -> Iterator[Tuple[str, NlpArtifacts]]:
"""Execute the NLP pipeline on a batch of texts.
Expand Down
13 changes: 10 additions & 3 deletions presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from spacy.language import Language
from spacy.tokens import Doc, Span

from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts, NlpEngine
from presidio_analyzer.nlp_engine import (
NerModelConfiguration,
NlpArtifacts,
NlpEngine,
)

logger = logging.getLogger("presidio-analyzer")

Expand Down Expand Up @@ -111,13 +115,16 @@ def process_batch(
self,
texts: Union[List[str], List[Tuple[str, object]]],
language: str,
batch_size: Optional[int] = None,
batch_size: int = 1,
n_process: int = 1,
as_tuples: bool = False,
) -> Iterator[Optional[NlpArtifacts]]:
"""Execute the NLP pipeline on a batch of texts using spacy pipe.

:param texts: A list of texts to process.
:param language: The language of the texts.
:param batch_size: Default batch size for pipe and evaluate.
:param n_process: Number of processors to process texts.
:param as_tuples: If set to True, inputs should be a sequence of
(text, context) tuples. Output will then be a sequence of
(doc, context) tuples. Defaults to False.
Expand All @@ -128,7 +135,7 @@ def process_batch(

texts = (str(text) for text in texts)
docs = self.nlp[language].pipe(
texts, as_tuples=as_tuples, batch_size=batch_size
texts, as_tuples=as_tuples, batch_size=batch_size, n_process=n_process
)
for doc in docs:
yield doc.text, self._doc_to_nlp_artifact(doc, language)
Expand Down
4 changes: 2 additions & 2 deletions presidio-analyzer/tests/test_analyzer_engine_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from presidio_analyzer import AnalyzerEngineProvider, RecognizerResult
from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpArtifacts

from presidio_analyzer.nlp_engine.transformers_nlp_engine import TransformersNlpEngine
from presidio_analyzer.predefined_recognizers import AzureAILanguageRecognizer

import pytest

def get_full_paths(analyzer_yaml, nlp_engine_yaml=None, recognizer_registry_yaml=None):
this_path = Path(__file__).parent.absolute()
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_analyzer_engine_provider_with_files_per_provider():
assert len(recognizer_registry.recognizers) == 6
assert recognizer_registry.supported_languages == ["en", "es"]


@pytest.mark.skipif(pytest.importorskip("azure"), reason="Optional dependency not installed") # noqa: E501
def test_analyzer_engine_provider_with_azure_ai_language():
analyzer_yaml, _, _ = get_full_paths(
"conf/test_azure_ai_language_reco.yaml",
Expand Down
24 changes: 17 additions & 7 deletions presidio-analyzer/tests/test_batch_analyzer_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ def test_analyze_iterator_returns_list_of_recognizer_results(
texts, expected_output, batch_analyzer_engine_simple
):

results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en")
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2)

assert len(results) == len(expected_output)
for result, expected_result in zip(results, expected_output):
assert result == expected_result
# fmt: on


def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple):
@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)])
def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple,
n_process,
batch_size):

d = {
"url": "https://microsoft.com",
Expand All @@ -51,7 +53,10 @@ def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple):
"misc": "microsoft.com or (202)-555-1234",
}

results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, language="en")
results = batch_analyzer_engine_simple.analyze_dict(input_dict=d,
language="en",
n_process=n_process,
batch_size=batch_size)
results = list(results)

# url
Expand Down Expand Up @@ -267,14 +272,19 @@ def test_analyze_dict_with_nones_returns_empty_result(batch_analyzer_engine_simp
for r in res:
assert not r


@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)])
def test_batch_analyze_iterator_returns_list_of_recognizer_results(
batch_analyzer_engine_simple
batch_analyzer_engine_simple, n_process, batch_size
):
texts = ["My name is David", "Call me at 2352351232", "I was born at 1/5/1922"]
expected_output = [[], [RecognizerResult(entity_type="PHONE_NUMBER", start=11, end=21, score= 0.4)], []]

results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2)
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts,
language="en",
batch_size=batch_size,
n_process=n_process)

assert len(results) == len(expected_output)
for result, expected_result in zip(results, expected_output):
assert result == expected_result
assert result == expected_result
1 change: 1 addition & 0 deletions presidio-analyzer/tests/test_spacy_nlp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_simple_process_text(spacy_nlp_engine):
assert nlp_artifacts.lemmas[1] == "text"



omri374 marked this conversation as resolved.
Show resolved Hide resolved
def test_process_batch_strings(spacy_nlp_engine):
nlp_artifacts_batch = spacy_nlp_engine.process_batch(
["simple text", "simple text"], language="en"
Expand Down
22 changes: 19 additions & 3 deletions presidio-structured/presidio_structured/analysis_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@ def __init__(
self,
analyzer: Optional[AnalyzerEngine] = None,
analyzer_score_threshold: Optional[float] = None,
n_process: int = 1,
batch_size: int = 1
) -> None:
"""Initialize the configuration generator."""
"""Initialize the configuration generator.

:param analyzer: AnalyzerEngine instance
:param analyzer_score_threshold: threshold for filtering out results
:param batch_size: Batch size to process in a single iteration
:param n_process: Number of processors to use. Defaults to `1`
"""
default_score_threshold = (
analyzer_score_threshold if analyzer_score_threshold is not None else 0
)
Expand All @@ -37,6 +45,8 @@ def __init__(
else analyzer
)
self.batch_analyzer = BatchAnalyzerEngine(analyzer_engine=self.analyzer)
self.n_process = n_process
self.batch_size = batch_size

@abstractmethod
def generate_analysis(
Expand Down Expand Up @@ -92,7 +102,10 @@ def generate_analysis(
"""
logger.debug("Starting JSON BatchAnalyzer analysis")
analyzer_results = self.batch_analyzer.analyze_dict(
input_dict=data, language=language
input_dict=data,
language=language,
n_process=self.n_process,
batch_size=self.batch_size
)

key_recognizer_result_map = self._generate_analysis_from_results_json(
Expand Down Expand Up @@ -240,7 +253,10 @@ def _batch_analyze_df(
for column in df.columns:
logger.debug(f"Finding most common PII entity for column {column}")
analyzer_results = self.batch_analyzer.analyze_iterator(
[val for val in df[column]], language=language
[val for val in df[column]],
language=language,
n_process=self.n_process,
batch_size=self.batch_size
)
column_analyzer_results_map[column] = analyzer_results

Expand Down
Loading
Loading