12
12
# limitations under the License.
13
13
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
14
14
15
+ import asyncio
15
16
from abc import ABC , abstractmethod
16
17
from types import TracebackType
17
- from typing import Any , Dict , Optional , Type
18
-
19
- from typing_extensions import Self
18
+ from typing import Any , Dict , List , Optional , Type
20
19
21
20
from camel .logger import get_logger
22
21
from camel .utils import BatchProcessor
23
22
24
23
logger = get_logger (__name__ )
25
24
26
25
27
- class BaseExtractor (ABC ):
28
- r"""Base class for all response extractors.
26
+ class BaseExtractorStrategy (ABC ):
27
+ r"""Abstract base class for extraction strategies."""
28
+
29
+ @abstractmethod
30
+ async def extract (self , text : str ) -> Optional [str ]:
31
+ r"""Asynchronously extracts relevant parts from text.
32
+
33
+ Args:
34
+ text (str): The input text to process.
35
+
36
+ Returns:
37
+ Optional[str]: Extracted str if successful, otherwise None.
38
+ """
39
+ pass
40
+
41
+
42
+ class BaseExtractor :
43
+ r"""Base class for response extractors with a fixed strategy pipeline.
29
44
30
- An extractor takes the response and extracts the relevant parts,
31
- converting them into a format that the verifier can handle.
32
- Implements async context manager protocol for proper resource management.
45
+ This extractor:
46
+ - Uses a **fixed multi-stage pipeline** of extraction strategies.
47
+ - Tries **each strategy in order** within a stage until one succeeds.
48
+ - Feeds the **output of one stage into the next** for processing.
49
+ - Supports **async execution** for efficient processing.
50
+ - Provides **batch processing and resource monitoring** options.
33
51
"""
34
52
35
53
def __init__ (
36
54
self ,
55
+ pipeline : List [List [BaseExtractorStrategy ]],
37
56
cache_templates : bool = True ,
38
57
max_cache_size : int = 1000 ,
39
58
extraction_timeout : float = 30.0 ,
@@ -43,9 +62,12 @@ def __init__(
43
62
memory_threshold : float = 85.0 ,
44
63
** kwargs ,
45
64
):
46
- r"""Initialize the extractor.
65
+ r"""Initialize the extractor with a multi-stage strategy pipeline .
47
66
48
67
Args:
68
+ pipeline (List[List[BaseExtractorStrategy]]):
69
+ A fixed list of lists where each list represents a stage
70
+ containing extractor strategies executed in order.
49
71
cache_templates (bool): Whether to cache extraction templates.
50
72
(default: :obj:`True`)
51
73
max_cache_size (int): Maximum number of templates to cache.
@@ -61,11 +83,8 @@ def __init__(
61
83
memory_threshold (float): Memory usage percentage threshold for
62
84
scaling down. (default: :obj:`85.0`)
63
85
**kwargs: Additional extractor parameters.
64
-
65
- Raises:
66
- ValueError: If invalid parameter values are provided
67
86
"""
68
- # Store all parameters in metadata dict for compatibility
87
+
69
88
self ._metadata = {
70
89
'cache_templates' : cache_templates ,
71
90
'max_cache_size' : max_cache_size ,
@@ -81,14 +100,7 @@ def __init__(
81
100
self ._cache : Dict [str , Any ] = {}
82
101
self ._batch_processor : Optional [BatchProcessor ] = None
83
102
84
- # Store configuration parameters
85
- self ._cache_templates = cache_templates
86
- self ._max_cache_size = max_cache_size
87
- self ._extraction_timeout = extraction_timeout
88
- self ._batch_size = batch_size
89
- self ._monitoring_interval = monitoring_interval
90
- self ._cpu_threshold = cpu_threshold
91
- self ._memory_threshold = memory_threshold
103
+ self ._pipeline = pipeline
92
104
93
105
async def setup (self ) -> None :
94
106
r"""Set up the extractor with necessary resources.
@@ -106,17 +118,15 @@ async def setup(self) -> None:
106
118
return
107
119
108
120
try :
109
- # Initialize template cache if enabled
110
- if self ._cache_templates :
121
+ if self ._metadata ["cache_templates" ]:
111
122
self ._template_cache : Dict [str , Any ] = {}
112
123
113
- # Set up batch processing if needed
114
- if self ._batch_size > 1 :
124
+ if self ._metadata ["batch_size" ] > 1 :
115
125
self ._batch_processor = BatchProcessor (
116
- initial_batch_size = self ._batch_size ,
117
- monitoring_interval = self ._monitoring_interval ,
118
- cpu_threshold = self ._cpu_threshold ,
119
- memory_threshold = self ._memory_threshold ,
126
+ initial_batch_size = self ._metadata [ "batch_size" ] ,
127
+ monitoring_interval = self ._metadata [ "monitoring_interval" ] ,
128
+ cpu_threshold = self ._metadata [ "cpu_threshold" ] ,
129
+ memory_threshold = self ._metadata [ "memory_threshold" ] ,
120
130
)
121
131
122
132
self ._is_setup = True
@@ -171,13 +181,6 @@ async def cleanup(self) -> None:
171
181
)
172
182
173
183
# Preserve init config in metadata
174
- self ._metadata = {
175
- 'cache_templates' : self ._cache_templates ,
176
- 'max_cache_size' : self ._max_cache_size ,
177
- 'extraction_timeout' : self ._extraction_timeout ,
178
- 'batch_size' : self ._batch_size ,
179
- }
180
-
181
184
if not errors :
182
185
logger .info (
183
186
f"{ self .__class__ .__name__ } cleaned up successfully"
@@ -187,23 +190,19 @@ async def cleanup(self) -> None:
187
190
errors .append (f"Unexpected error during cleanup: { e } " )
188
191
189
192
finally :
190
- # Always mark as uninitialized, even if cleanup fails
191
193
self ._is_setup = False
192
194
self ._batch_processor = None
193
195
194
196
if errors :
195
- error_msg = (
196
- f"Errors during { self .__class__ .__name__ } cleanup: "
197
- f"{ '; ' .join (errors )} "
198
- )
197
+ error_msg = f"Errors during cleanup: { '; ' .join (errors )} "
199
198
logger .error (error_msg )
200
199
raise RuntimeError (error_msg )
201
200
202
- async def __aenter__ (self ) -> Self :
201
+ async def __aenter__ (self ) -> "BaseExtractor" :
203
202
r"""Async context manager entry.
204
203
205
204
Returns:
206
- Self reference for context manager usage .
205
+ BaseExtractor: The initialized extractor instance .
207
206
"""
208
207
await self .setup ()
209
208
return self
@@ -226,38 +225,61 @@ async def __aexit__(
226
225
"""
227
226
await self .cleanup ()
228
227
229
- @abstractmethod
230
- async def extract (
231
- self , response : str , context : Optional [Dict [str , Any ]] = None
232
- ) -> str :
233
- r"""Extract relevant parts from a response.
234
-
235
- Extracts:
236
- 1. Final answer or output
237
- 2. Chain of thought reasoning steps
238
- 3. Difficulty assessment
228
+ async def extract (self , response : str ) -> Optional [str ]:
229
+ r"""Extracts a normalized, comparable part of the LLM response
230
+ using the fixed multi-stage strategy pipeline.
239
231
240
232
Args:
241
- response (str): Raw response from agent generation.
242
- context (Optional[Dict[str, Any]]): Optional context for
243
- extraction like:
244
- - final_answer
245
- - rationale
246
- - complexity
233
+ response (str): The raw response text.
247
234
248
235
Returns:
249
- str: Extracted content string .
236
+ Optional[ str] : Extracted data if successful, otherwise None .
250
237
251
238
Raises:
252
239
ValueError: If response is empty or invalid.
253
- NotImplementedError: If no implementation is provided.
254
240
RuntimeError: If extractor is not initialized.
255
241
"""
256
242
if not self ._is_setup :
257
243
raise RuntimeError (
258
- f"{ self .__class__ .__name__ } must be initialized "
259
- "before extraction"
244
+ "Extractor must be initialized before extraction"
260
245
)
261
246
if not response or not response .strip ():
262
247
raise ValueError ("Empty or whitespace-only response" )
263
- raise NotImplementedError ("Subclasses must implement extract()" )
248
+
249
+ current_input = response # Initial input
250
+
251
+ for stage in self ._pipeline :
252
+ stage_success = (
253
+ False # Track if any strategy in the stage succeeds
254
+ )
255
+
256
+ for strategy in stage :
257
+ try :
258
+ # Apply the extraction timeout
259
+ result = await asyncio .wait_for (
260
+ strategy .extract (current_input ),
261
+ timeout = self ._metadata ["extraction_timeout" ],
262
+ )
263
+
264
+ if result is not None :
265
+ current_input = result # Feed into next stage
266
+ stage_success = True
267
+ break # Move to next stage if valid extraction occurs
268
+
269
+ except asyncio .TimeoutError :
270
+ logger .warning (
271
+ f"Strategy { strategy .__class__ .__name__ } timed out "
272
+ f"after { self ._metadata ['extraction_timeout' ]} seconds"
273
+ )
274
+ except Exception as e :
275
+ logger .warning (
276
+ f"Strategy { strategy .__class__ .__name__ } failed: { e } "
277
+ )
278
+
279
+ if not stage_success :
280
+ logger .debug (
281
+ "No strategy in stage succeeded, stopping extraction."
282
+ )
283
+ return None # Stop processing if the stage fails
284
+
285
+ return current_input # Final processed output
0 commit comments