Skip to content

Commit

Permalink
Merge pull request #80 from my-dev-app/hotfix/bandwitdh
Browse files Browse the repository at this point in the history
Hotfix/bandwitdh
  • Loading branch information
0x78f1935 authored Nov 25, 2024
2 parents 89ca549 + 9cc0d2c commit 350b3e4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
5 changes: 0 additions & 5 deletions aproxyrelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ def __init__(
# Configure the logger
logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG)
self.logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG if debug else logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

# Initialize Core
AProxyRelayCore.__init__(self)
Expand Down
4 changes: 3 additions & 1 deletion aproxyrelay/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ async def get_proxies(self) -> None:
if self.scrape:
async with ClientSession(conn_timeout=self.timeout) as session:
await self._fetch_proxy_page(scrapes, session)
self.logger.info(f'[aProxyRelay] Scraper: Found {self._queue_filter.qsize()} competent proxy servers')
self.logger.info(f'[aProxyRelay] Scraper: Found ({self._queue_filter.qsize()}) competent proxy servers')
else:
self.logger.info('[aProxyRelay] Scraper: Skip discovery of new proxy servers ...')

if self.filter and self.scrape:
self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950
self.logger.info(f'[aProxyRelay] Keep an eye on "pending task name" once it reaches ({self._queue_filter.qsize()}) all tests have been completed') # noqa: B950
self.logger.info('[aProxyRelay] Grab some coffee ... please wait ...')
await self._test_all_proxies()
self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
else:
Expand Down
16 changes: 14 additions & 2 deletions aproxyrelay/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def __init__(self) -> None:
"""
self._queue_result = Queue() # Holds target results

def _chunk_list(self, lst, chunk_size):
"""Chunks a list in its desired size"""
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

async def _process_targets_main(self) -> None:
"""
Start the Proxy Relay Processor. Proxies in the queue are nothing less than burners.
Expand All @@ -41,8 +45,16 @@ async def _process_targets_main(self) -> None:
# Append the coroutine object to the tasks list
tasks.append(self._obtain_targets(proxy, target))

# Use asyncio.gather to concurrently execute all tasks
await gather(*tasks)
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
chunks = self._chunk_list(tasks, 10000)
i = 0
for chunk in chunks:
self.logger.info(f"[aProxyRelay] Processing ({i}/{len(tasks)}) ... please wait ...")
i += int(len(chunk))
# Use asyncio.gather to concurrently execute all tasks
await gather(*chunk)
# # Use asyncio.gather to concurrently execute all tasks
# await gather(*tasks)

self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950

Expand Down
21 changes: 17 additions & 4 deletions aproxyrelay/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def __init__(self) -> None:
"""
self.logger.info("[aProxyRelay] Request module initialized!")

def _chunk_list(self, lst, chunk_size):
"""Chunks a list in its desired size"""
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

async def _fetch_proxy_page(self, urls, session):
"""
Use asyncio.gather to run multiple requests concurrently by executing `self._request_proxy_page`.
Expand Down Expand Up @@ -84,16 +88,25 @@ async def _test_all_proxies(self):
Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`.
"""
# Use asyncio.gather to run multiple tests concurrently
to_filter = []
raw = []
while not self._queue_filter.empty():
_target = self._queue_filter.get()
_target['proxy'] = f"{_target['protocol'].replace('https', 'http')}://{_target['ip']}:{_target['port']}"
to_filter.append(_target)
raw.append(_target)

# Remove duplicate entries
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))]
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in raw]))]
self.logger.info(f"[aProxyRelay] Found ({int(len(raw)) - int(len(to_filter))}) duplicates which have been removed")
tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter]
await gather(*tasks)
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
chunks = self._chunk_list(tasks, 10000)
i = 0
for chunk in chunks:
self.logger.info(f"[aProxyRelay] Brewing ({i}/{len(tasks)}) ... please wait ...")
i += int(len(chunk))
# Use asyncio.gather to concurrently execute all tasks
await gather(*chunk)
# await gather(*tasks)

async def _test_proxy_link(self, proxy_url, data) -> None:
"""
Expand Down

0 comments on commit 350b3e4

Please sign in to comment.