Skip to content

Commit

Permalink
Merge pull request #78 from my-dev-app/fix/rotation
Browse files Browse the repository at this point in the history
Fix/rotation
  • Loading branch information
0x78f1935 authored Nov 24, 2024
2 parents c0a280e + 14e1ea2 commit bde0159
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 92 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ targets = [
# Initialize proxy relay
proxy_relay = AProxyRelay(
targets=targets,
timeout=5,
timeout=30,
scrape=True,
filter=True,
zones=['us'],
Expand Down
16 changes: 11 additions & 5 deletions aproxyrelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
"""
from asyncio import get_event_loop, gather
from datetime import datetime, UTC
from logging import basicConfig, INFO, DEBUG, getLogger
from typing import Callable
from queue import Queue

import logging

from .core import AProxyRelayCore


class AProxyRelay(AProxyRelayCore):
def __init__(
self,
targets: list[str],
timeout: int = 5,
timeout: int = 30,
scrape: bool = True,
filter: bool = True,
zones: list[str] = ['US'], # noqa: B006
Expand All @@ -48,7 +49,7 @@ def __init__(
```py
proxy_relay = AProxyRelay(
targets=targets,
timeout=5,
timeout=30,
scrape=True,
filter=True,
zones=['US', 'DE'],
Expand All @@ -58,8 +59,13 @@ def __init__(
```
"""
# Configure the logger
basicConfig(level=INFO if not debug else DEBUG)
self.logger = getLogger(__name__)
logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG)
self.logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG if debug else logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

# Initialize Core
AProxyRelayCore.__init__(self)
Expand Down
3 changes: 1 addition & 2 deletions aproxyrelay/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ async def get_proxies(self) -> None:

if self.filter and self.scrape:
self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950
async with ClientSession(conn_timeout=15) as session:
await self._test_all_proxies(session)
await self._test_all_proxies()
self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
else:
while not self._queue_filter.empty():
Expand Down
36 changes: 16 additions & 20 deletions aproxyrelay/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
Process class, once all proxies have been received, we are going to obtain the data for the targets.
This class contains the core mechanics for scraping the targets.
"""
from aiosocks2.connector import ProxyConnector, ProxyClientRequest
from aiohttp import ClientSession
from asyncio import gather
from queue import Queue

Expand All @@ -32,31 +30,29 @@ async def _process_targets_main(self) -> None:
"""
self.logger.info('[aProxyRelay] Processing ...')

async with ClientSession(
connector=ProxyConnector(remote_resolve=True),
request_class=ProxyClientRequest,
conn_timeout=self.timeout
) as session:
tasks = []
tasks = []

while not self._queue_target_process.empty():
proxy = self.proxies.get()
if isinstance(proxy, dict):
proxy = f"{proxy['protocol'].replace('https', 'http')}://{proxy['ip']}:{proxy['port']}"
target = self._queue_target_process.get()
while not self._queue_target_process.empty():
proxy = self.proxies.get()
if isinstance(proxy, dict):
proxy = f"{proxy['protocol'].replace('https', 'http')}://{proxy['ip']}:{proxy['port']}"
target = self._queue_target_process.get()

# Append the coroutine object to the tasks list
tasks.append(self._obtain_targets(proxy, target, session))
self.proxies.put(proxy)
# Append the coroutine object to the tasks list
tasks.append(self._obtain_targets(proxy, target))

self.proxies = Queue()
# Use asyncio.gather to concurrently execute all tasks
await gather(*tasks)
# Use asyncio.gather to concurrently execute all tasks
await gather(*tasks)

self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue ... Please wait...')
self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950

# Proxy queue is empty but targets are available
if self.proxies.empty() and self._queue_target_process.qsize() > 0:
self.logger.info(
f'[aProxyRelay] All Proxies exhausted ({self._queue_target_process.qsize()}) items left in Queue ... Please wait...'
)
await self.get_proxies()
await self.process_targets()
# Proxy queue has proxies targets are available
elif not self.proxies.empty() and self._queue_target_process.qsize() > 0:
await self.process_targets()
145 changes: 83 additions & 62 deletions aproxyrelay/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
Class which handles all requests made throughout the library.
"""
from aiohttp.client_exceptions import ClientHttpProxyError, \
ServerDisconnectedError, \
ClientProxyConnectionError, \
from ssl import SSLCertVerificationError, SSLError
from aiohttp import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ServerDisconnectedError, \
ClientResponseError, \
ClientOSError, \
ServerTimeoutError, \
InvalidURL
from aiosocks2.errors import SocksError
from asyncio import gather, TimeoutError
InvalidURL, \
ConnectionTimeoutError
from aiohttp_socks import ProxyConnectionError, ProxyConnector, ProxyError
from asyncio import IncompleteReadError, gather, TimeoutError
from json import dumps

from .scrapers import proxy_list
Expand Down Expand Up @@ -79,12 +79,9 @@ async def _request_proxy_page(self, url, session) -> None:
else:
self.proxies.put(row)

async def _test_all_proxies(self, session):
async def _test_all_proxies(self):
"""
Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`.
Args:
session: aiohttp session without proxy support
"""
# Use asyncio.gather to run multiple tests concurrently
to_filter = []
Expand All @@ -95,10 +92,10 @@ async def _test_all_proxies(self, session):

# Remove duplicate entries
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))]
tasks = [self._test_proxy_link(proxy['proxy'], proxy, session) for proxy in to_filter]
tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter]
await gather(*tasks)

async def _test_proxy_link(self, proxy_url, data, session) -> None:
async def _test_proxy_link(self, proxy_url, data) -> None:
"""
Asynchronously call gg.my-dev.app, a website built by the creator of this package.
If the connection was successful, the proxy works!
Expand All @@ -109,31 +106,48 @@ async def _test_proxy_link(self, proxy_url, data, session) -> None:
proxy_url: The URL of the proxy to be tested.
data: Additional data for the proxy test.
"""
# If port is empty, assume port 80
if data['port'] == '':
data['port'] = '80'
# Make sure port is range
if int(data['port']) < 0 or int(data['port']) > 65535: return
try:
async with session.post(
'https://gg.my-dev.app/api/v1/proxies/validate/lib',
proxy=proxy_url,
headers={
**self._get_header(),
'Content-Type': 'application/json'
},
data=dumps(data)
) as response:
if response.status == 200:
self.proxies.put(data)
self._filtered_available = self._filtered_available + 1
else:
self._filtered_failed = self._filtered_failed + 1
self.logger.debug(f'[aProxyRelay] Processing: {proxy_url} -> Added to queue')
connector = ProxyConnector.from_url(proxy_url.replace('unknown', 'socks4'))
timeout = ClientTimeout(total=self.timeout, connect=self.timeout)
async with ClientSession(connector=connector, timeout=timeout) as session:
async with session.post(
'https://gg.my-dev.app/api/v1/proxies/validate/lib',
headers={
**self._get_header(),
'Content-Type': 'application/json'
},
data=dumps(data)
) as response:
if response.status == 200:
self.proxies.put(data)
self._filtered_available = self._filtered_available + 1
self.logger.debug(f'[aProxyRelay] Succeed: {proxy_url} -> Freshly Discovered')
else:
self._filtered_failed = self._filtered_failed + 1
self.logger.debug(f'[aProxyRelay] Succeed: {proxy_url} -> Addres Known')
except (
ClientHttpProxyError,
ServerDisconnectedError,
ClientProxyConnectionError,
ClientResponseError,
ClientOSError,
ServerTimeoutError,
InvalidURL,
ConnectionResetError,
):
ProxyError,
SSLCertVerificationError,
ProxyConnectionError,
ConnectionTimeoutError,
IncompleteReadError,
UnicodeEncodeError,
SSLError,
ConnectionAbortedError,
ServerDisconnectedError,
ClientResponseError,
TimeoutError
) as e:
self.logger.debug(f'[aProxyRelay] Failed: {proxy_url} -> {repr(e)}')
self._filtered_failed = self._filtered_failed + 1

async def _fetch_proxy_servers(self, urls, session):
Expand Down Expand Up @@ -172,7 +186,7 @@ async def _request_proxy_servers(self, url, session) -> None:
self.proxies.put(row)
self._filtered_ggs = self._filtered_ggs + 1

async def _obtain_targets(self, proxy_url, target, session) -> None:
async def _obtain_targets(self, proxy_url, target) -> None:
"""
Asynchronously fetch the targets with our proxies.
The 'steam' variable should be defaulted to False and should only be used when targeting Steam.
Expand All @@ -182,37 +196,44 @@ async def _obtain_targets(self, proxy_url, target, session) -> None:
proxy_url: The URL of the proxy to be used for the request.
"""
try:
async with session.get(
target,
proxy=proxy_url,
headers={
**self._get_header(),
'Content-Type': 'application/json'
},
) as response:
status = response.status
if status in (200, 202,):
self.proxies.put(proxy_url)
data = await response.json()
if data:
if pack := self.unpack(data, target):
self._queue_result.put(pack)
connector = ProxyConnector.from_url(proxy_url.replace('unknown', 'socks4'))
timeout = ClientTimeout(total=self.timeout, connect=self.timeout)
async with ClientSession(connector=connector, timeout=timeout) as session:
async with session.get(
target,
headers={
**self._get_header(),
'Content-Type': 'application/json'
},
) as response:
status = response.status
if status in (200, 202,):
self.proxies.put(proxy_url)
data = await response.json()
if data:
if pack := self.unpack(data, target):
self._queue_result.put(pack)
else:
self.logger.warning(f'[aProxyRelay] Could not unpack data for: {target}')
else:
self.logger.warning(f'[aProxyRelay] Could not unpack data for: {target}')
self.logger.warning(f'[aProxyRelay] Target {target} Data seems to be None: {data}')
else:
self.logger.warning(f'[aProxyRelay] Target {target} Data seems to be None: {data}')
else:
self._queue_target_process.put(target)

self._queue_target_process.put(target)
except (
ClientHttpProxyError,
ServerDisconnectedError,
ClientProxyConnectionError,
ClientResponseError,
ClientOSError,
ServerTimeoutError,
InvalidURL,
SocksError,
TimeoutError,
):
ConnectionResetError,
ProxyError,
SSLCertVerificationError,
ProxyConnectionError,
ConnectionTimeoutError,
IncompleteReadError,
UnicodeEncodeError,
SSLError,
ConnectionAbortedError,
ServerDisconnectedError,
ClientResponseError,
TimeoutError
) as e:
self.logger.debug(f'[aProxyRelay] Failed: {target} -> {repr(e)}')
self._queue_target_process.put(target)
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Initialize proxy relay
proxy_relay = AProxyRelay(
targets=targets,
timeout=5,
timeout=30,
scrape=True,
filter=True,
zones=['us'],
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
packages=find_packages(),
install_requires=[
'aiohttp',
'aiosocks2',
'aiohttp_socks',
'beautifulsoup4',
],
extras_require={
Expand Down

0 comments on commit bde0159

Please sign in to comment.