1
1
import re
2
+ import hmac
2
3
import ipaddress
3
- import httpx
4
+
5
+ import requests
4
6
from CloudFlare import CloudFlare
5
7
from CloudFlare .exceptions import CloudFlareAPIError
6
8
from flask import Flask , request
@@ -68,27 +70,6 @@ def contains_malicious_patterns(_input: str) -> bool:
68
70
EMAIL : str = config_instance ().CLOUDFLARE_SETTINGS .EMAIL
69
71
TOKEN : str = config_instance ().CLOUDFLARE_SETTINGS .TOKEN
70
72
71
- async_client = httpx .AsyncClient (http2 = True , limits = httpx .Limits (max_connections = 100 , max_keepalive_connections = 20 ))
72
-
73
-
74
- async def send_request (api_url : str , headers : dict [str , str | int ], method : str = 'get' ,
75
- data : str | None = None ):
76
- try :
77
- if method .lower () == "get" :
78
- response = await async_client .get (url = api_url , headers = headers , timeout = 360000 )
79
- elif method .lower () == "post" :
80
- if data :
81
- response = await async_client .post (url = api_url , json = data , headers = headers , timeout = 360000 )
82
- else :
83
- response = await async_client .post (url = api_url , headers = headers , timeout = 360000 )
84
- else :
85
- return None
86
- except httpx .HTTPError as http_err :
87
- raise http_err
88
- except Exception as err :
89
- raise err
90
- return response .json ()
91
-
92
73
93
74
class Firewall :
94
75
"""
@@ -116,6 +97,7 @@ def init_app(self, app: Flask):
116
97
app .before_request (self .is_host_valid )
117
98
app .before_request (self .is_edge_ip_allowed )
118
99
app .before_request (self .check_if_request_malicious )
100
+ app .before_request (self .verify_client_secret_token )
119
101
# obtain the latest cloudflare edge servers
120
102
ipv4 , ipv6 = self .get_ip_ranges ()
121
103
# updating the ip ranges
@@ -161,6 +143,19 @@ def check_if_request_malicious(self):
161
143
if any ((pattern .match (path ) for pattern in self .compiled_bad_patterns )):
162
144
raise UnAuthenticatedError ('Payload is suspicious' )
163
145
146
+ @staticmethod
147
+ def verify_client_secret_token ():
148
+ client_secret_token = request .headers .get ('X_CLIENT_SECRET_TOKEN' )
149
+ if not client_secret_token :
150
+ raise UnAuthenticatedError ('Missing client secret token' )
151
+
152
+ expected_secret_token = config_instance ().CLOUDFLARE_SETTINGS .get ('X_CLIENT_SECRET_TOKEN' )
153
+ if not expected_secret_token :
154
+ raise ValueError ('Missing expected client secret token' )
155
+
156
+ if not hmac .compare_digest (client_secret_token , expected_secret_token ):
157
+ raise UnAuthenticatedError ('Invalid client secret token' )
158
+
164
159
@staticmethod
165
160
def get_client_ip () -> str :
166
161
"""
@@ -184,10 +179,12 @@ def get_ip_ranges() -> tuple[list[str], list[str]]:
184
179
_uri = 'https://api.cloudflare.com/client/v4/ips'
185
180
_headers = {'Accept' : 'application/json' , 'X-Auth-Email' : EMAIL }
186
181
try :
187
- response = await send_request (api_url = _uri , headers = _headers )
188
- ipv4_cidrs = response .get ('result' , {}).get ('ipv4_cidrs' , DEFAULT_IPV4 )
189
- ipv6_cidrs = response .get ('result' , {}).get ('ipv6_cidrs' , [])
190
- return ipv4_cidrs , ipv6_cidrs
182
+ with requests .Session () as send_request :
183
+ response = send_request .get (url = _uri , headers = _headers )
184
+ response_data : dict [str , dict [str , str ] | list [str ]] = response .json ()
185
+ ipv4_cidrs = response_data .get ('result' , {}).get ('ipv4_cidrs' , DEFAULT_IPV4 )
186
+ ipv6_cidrs = response_data .get ('result' , {}).get ('ipv6_cidrs' , [])
187
+ return ipv4_cidrs , ipv6_cidrs
191
188
192
189
except CloudFlareAPIError :
193
190
return [], []
0 commit comments