Skip to content

Commit

Permalink
Merge pull request #70 from my-dev-app/fix/invalid_request_data
Browse files Browse the repository at this point in the history
Fix/invalid request data
  • Loading branch information
0x78f1935 authored May 25, 2024
2 parents cff511a + 604621f commit 777b08e
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
6 changes: 3 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
"configurations": [
{
"name": "Start: Example",
"type": "python",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/example.py",
"program": "${workspaceFolder}/ipython_config.py",
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "Test: Pytest",
"type": "python",
"type": "debugpy",
"request": "launch",
"module": "pytest",
"justMyCode": true
Expand Down
6 changes: 3 additions & 3 deletions aproxyrelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def _main(self) -> Queue:
if self.proxies.qsize() > 0:
await self.process_targets()
else:
self.logger.error('Could not establish any available proxy! Please try again later.')
self.logger.error('[aProxyRelay] Could not establish any available proxy! Please try again later.')
return self._queue_result

def start(self) -> Queue:
Expand All @@ -99,13 +99,13 @@ def start(self) -> Queue:
Queue: A queue containing the scraped data from the API.
"""
self.started = datetime.now(UTC)
self.logger.info(f'Started proxy relay at {self.started} ... Please wait ...!')
self.logger.info(f'[aProxyRelay] Started proxy relay at {self.started} ... Please wait ...!')

loop = get_event_loop()
loop.set_debug(self.debug)
results = loop.run_until_complete(gather(self._main()))
result = results.pop()

self.logger.info(f'Data scraped! Took {datetime.now(UTC) - self.started}, enjoy!')
self.logger.info(f'[aProxyRelay] Data scraped! Took {datetime.now(UTC) - self.started}, enjoy!')

return result
22 changes: 10 additions & 12 deletions aproxyrelay/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ async def get_proxies(self) -> None:
"""
Asynchronously fill the self.proxies queue with fresh proxies.
"""
self.logger.info('Initializing parsers ...')
self.logger.info('[aProxyRelay] Initializing parsers ...')
ggs = []
scrapes = []
for item in proxy_list:
self.logger.info(f'Loading: {item["parser"].__name__}')
self.logger.info(f'[aProxyRelay] Loading: {item["parser"].__name__}')
parser = item['parser']
for zone in self.zones:
url = await parser.format_url(url=item['url'], zone=zone)
Expand All @@ -68,34 +68,32 @@ async def get_proxies(self) -> None:
scrapes.append(url)
ggs = list(set(ggs))
scrapes = list(set(scrapes))
self.logger.info(f'Parsers loaded: GG: {len(ggs)}, Other: {len(scrapes)}, Total: {len(ggs + scrapes)} ...')
self.logger.info(f'[aProxyRelay] Parsers loaded: GG: {len(ggs)}, Other: {len(scrapes)}, Total: {len(ggs + scrapes)} ...')

if self.scrape:
async with ClientSession(conn_timeout=self.timeout) as session:
await self._fetch_proxy_page(scrapes, session)
self.logger.info(f'Scraper: Found {self._queue_filter.qsize()} competent proxy servers')
self.logger.info(f'[aProxyRelay] Scraper: Found {self._queue_filter.qsize()} competent proxy servers')
else:
self.logger.info('Scraper: Skip discovery of new proxy servers ...')
self.logger.info('[aProxyRelay] Scraper: Skip discovery of new proxy servers ...')

if self.filter and self.scrape:
self.logger.info(
f'Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...'
)
self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950
async with ClientSession(conn_timeout=15) as session:
await self._test_all_proxies(session)
self.logger.info(f'Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
else:
while not self._queue_filter.empty():
_target = self._queue_filter.get()
_target['proxy'] = f"{_target['protocol'].replace('https', 'http')}://{_target['ip']}:{_target['port']}"
self.proxies.put(_target)
self.logger.info('Filter: Skip tests for scraped proxy servers ...')
self.logger.info('[aProxyRelay] Filter: Skip tests for scraped proxy servers ...')

async with ClientSession(conn_timeout=self.timeout) as session:
await self._fetch_proxy_servers(ggs, session)

self.logger.info(f'Scraper: Found {self._filtered_ggs} additional available proxy servers')
self.logger.info(f'Found {self.proxies.qsize()} working proxies, took {datetime.now(UTC) - self.started}, Please wait...')
self.logger.info(f'[aProxyRelay] Scraper: Found {self._filtered_ggs} additional available proxy servers')
self.logger.info(f'[aProxyRelay] Found {self.proxies.qsize()} working proxies, took {datetime.now(UTC) - self.started}, Please wait...') # noqa: B950

async def process_targets(self) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions aproxyrelay/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _process_targets_main(self) -> None:
When they fail, we delete them from memory. Once the proxy queue is empty, we look for new proxies
before we continue with our targets.
"""
self.logger.info('Processing ...')
self.logger.info('[aProxyRelay] Processing ...')

async with ClientSession(
connector=ProxyConnector(remote_resolve=True),
Expand All @@ -53,7 +53,7 @@ async def _process_targets_main(self) -> None:
# Use asyncio.gather to concurrently execute all tasks
await gather(*tasks)

self.logger.info(f'Processing ({self._queue_target_process.qsize()}) items in Queue ... Please wait...')
self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue ... Please wait...')

if self.proxies.empty() and self._queue_target_process.qsize() > 0:
await self.get_proxies()
Expand Down
15 changes: 9 additions & 6 deletions aproxyrelay/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self) -> None:
"""
Initialize an instance of AProxyRelayRequests.
"""
self.logger.debug("AProxyRelay Request module initialized!")
self.logger.info("[aProxyRelay] Request module initialized!")

async def _fetch_proxy_page(self, urls, session):
"""
Expand Down Expand Up @@ -57,7 +57,7 @@ async def _request_proxy_page(self, url, session) -> None:
return

async with session.get(url, headers=self._get_header()) as response:
self.logger.info(f"Scraper: {url}, Status Code: {response.status}")
self.logger.info(f"[aProxyRelay] Scraper: {url}, Status Code: {response.status}")
if response.status == 200:
new_queue = await parser.scrape(parser.zone, response)
while not new_queue.empty():
Expand Down Expand Up @@ -151,7 +151,7 @@ async def _request_proxy_servers(self, url, session) -> None:
zone = url.split('zone=')[1].split('&')[0]

async with session.get(url, headers=self._get_header()) as response:
self.logger.info(f"Scraper: {url}, Status Code: {response.status}")
self.logger.info(f"[aProxyRelay] Scraper: {url}, Status Code: {response.status}")
if response.status == 200:
new_queue = await parser.scrape(zone, response)
while not new_queue.empty():
Expand Down Expand Up @@ -181,10 +181,13 @@ async def _obtain_targets(self, proxy_url, target, session) -> None:
if status == 200:
self.proxies.put(proxy_url)
data = await response.json()
if pack := self.unpack(data, target):
self._queue_result.put(pack)
if data:
if pack := self.unpack(data, target):
self._queue_result.put(pack)
else:
self.logger.warning(f'[aProxyRelay] Could not unpack data for: {target}')
else:
self.logger.warning(f'Could not unpack data for: {target}')
self.logger.warning(f'[aProxyRelay] Target {target} Data seems to be None: {data}')
else:
self._queue_target_process.put(target)

Expand Down

0 comments on commit 777b08e

Please sign in to comment.