diff --git a/deb/postinst b/deb/postinst index f7b2ef5d..98db2deb 100644 --- a/deb/postinst +++ b/deb/postinst @@ -35,8 +35,8 @@ case "$1" in [ -e /etc/pagekite/pagekite.rc.dpkg-bak ] \ && mv /etc/pagekite/pagekite.rc.dpkg-bak /etc/pagekite.d/89_old_pagekite.rc - chmod 644 /etc/pagekite.d/* || true - chmod 600 /etc/pagekite.d/[019]* || true + chmod 644 /etc/pagekite.d/*.rc* || true + chmod 600 /etc/pagekite.d/[019]*rc* || true [ -d /etc/pagekite ] && rmdir /etc/pagekite || true ;; diff --git a/doc/HISTORY.txt b/doc/HISTORY.txt index 38857f9c..8134562a 100644 --- a/doc/HISTORY.txt +++ b/doc/HISTORY.txt @@ -1,10 +1,15 @@ Version history - highlights ============================ -v1.5.0.200412 +v1.5.???????? ------------- + - This release (v?) is all about performance and efficiency! - Create ping.pagekite fast-path in dedicated thread - + - Make select loop timing and read sizes configurable, tweak defaults + - Remove 0.4.x flow-control, fix major bugs in current flow control code + - Fix locking-related deadlocks under PyPy + - Added --watchdog=N, to self-reap locked up processes + - Disabled old ssl workarounds on modern versions of Python (broke PyPy) v1.5.0.200327 ------------- diff --git a/etc/pagekite.d/accept.acl.sample b/etc/pagekite.d/accept.acl.sample index 8f80b0ae..e00fda1b 100644 --- a/etc/pagekite.d/accept.acl.sample +++ b/etc/pagekite.d/accept.acl.sample @@ -2,7 +2,10 @@ # # This is a file for use on frontend relays to restrict access. Note # that this effects both tunnels and client connections and is really -# only intended for blacklisting abusive clients on a temporary basis. +# only intended for blocking abusive clients on a temporary basis. +# +# WARNING: This is inefficient and slow. Every line added to this file +# has a cost. # # To enable these rules, rename the file and add the following to one # of the `/etc/pagekite.d/*.rc` files: diff --git a/pagekite/common.py b/pagekite/common.py index 274abe2e..9bde96e4 100755 --- a/pagekite/common.py +++ b/pagekite/common.py @@ -75,8 +75,17 @@ LOOPBACK_BE = LOOPBACK_HN + ':2' LOOPBACK = {'FE': LOOPBACK_FE, 'BE': LOOPBACK_BE} +# This is how many bytes we are willing to read per cycle. +MAX_READ_BYTES = 16 * 1024 +MAX_READ_TUNNEL_X = 3.1 # 3x above, + fudge factor + +# Higher values save CPU and prevent individual tunnels +# from hogging all our resources, but hurt latency and +# reduce per-tunnel throughput. +SELECT_LOOP_MIN_MS = 5 + # Re-evaluate our choice of frontends every 45-60 minutes. -FE_PING_INTERVAL = (45 * 60) + random.randint(0, 900) +FE_PING_INTERVAL = (45 * 60) + random.randint(0, 900) # This is a global count of disconnect errors; we use this # to adjust the ping interval over time. diff --git a/pagekite/compat.py b/pagekite/compat.py index b18a3b37..fd436fee 100755 --- a/pagekite/compat.py +++ b/pagekite/compat.py @@ -104,7 +104,6 @@ def format_exc(): except ImportError: from queue import Queue - # SSL/TLS strategy: prefer pyOpenSSL, as it comes with built-in Context # objects. If that fails, look for Python 2.6+ native ssl support and # create a compatibility wrapper. If both fail, bomb with a ConfigError @@ -112,7 +111,7 @@ def format_exc(): # import sockschain socks = sockschain -if socks.HAVE_PYOPENSSL: +if socks.HAVE_PYOPENSSL or tuple(sys.version_info) > (2, 7, 10): SSL = socks.SSL SEND_ALWAYS_BUFFERS = False SEND_MAX_BYTES = 16 * 1024 @@ -122,7 +121,7 @@ def format_exc(): SSL = socks.SSL SEND_ALWAYS_BUFFERS = True SEND_MAX_BYTES = 4 * 1024 - TUNNEL_SOCKET_BLOCKS = True # Workaround for http://bugs.python.org/issue8240 + TUNNEL_SOCKET_BLOCKS = True # Workaround for http://bugs.python.org/issue8240 else: SEND_ALWAYS_BUFFERS = False diff --git a/pagekite/httpd.py b/pagekite/httpd.py index f9176d5e..ff146e91 100755 --- a/pagekite/httpd.py +++ b/pagekite/httpd.py @@ -420,10 +420,8 @@ def do_POST(self, command='POST'): "string (%s bytes).") % clength) posted = cgi.parse_qs(self.rfile.read(clength), 1) elif self.host_config.get('xmlrpc', False): - # We wrap the XMLRPC request handler in _BEGIN/_END in order to - # expose the request environment to the RPC functions. - RCI = self.server.RCI - return RCI._END(SimpleXMLRPCRequestHandler.do_POST(RCI._BEGIN(self))) + with self.server.RCI.lock: + return SimpleXMLRPCRequestHandler.do_POST(self) self.post_data.seek(0) except socket.error: @@ -861,7 +859,12 @@ def handleHttpRequest(self, scheme, netloc, path, params, query, frag, photobackup = self.host_config.get('photobackup', False) if path == self.host_config.get('yamon', False): - if common.gYamon: + if qs.get('view', [None])[0] == 'conns': + from pagekite.pk import Watchdog + llines = [] + Watchdog.DumpConnState(self.server.pkite.conns, logfunc=llines.append) + data['body'] = '\n'.join(llines) + '\n' + elif common.gYamon: self.server.pkite.Overloaded(yamon=common.gYamon) data['body'] = common.gYamon.render_vars_text(qs.get('view', [None])[0]) else: @@ -964,7 +967,7 @@ def __init__(self, httpd, pkite, conns): self.conns = conns self.modified = False - self.lock = threading.Lock() + self.lock = threading.RLock() self.request = None self.auth_tokens = {httpd.secret: self.ACL_READ} @@ -978,17 +981,6 @@ def __init__(self, httpd, pkite, conns): 'tokens': self.auth_tokens, 'data': logging.LOG}} - def _BEGIN(self, request_object): - self.lock.acquire() - self.request = request_object - return request_object - - def _END(self, rv=None): - if self.request: - self.request = None - self.lock.release() - return rv - def connections(self, auth_token): if (not self.request.host_config.get('console', False) or self.ACL_READ not in self.auth_tokens.get(auth_token, self.ACL_OPEN)): @@ -1135,15 +1127,12 @@ def __init__(self, sspec, pkite, conns, gYamon = common.gYamon = yamond.YamonD(sspec) gYamon.vset('started', int(time.time())) gYamon.vset('version', APPVER) + gYamon.vset('version_python', sys.version.replace('\n', ' ')) gYamon.vset('httpd_ssl_enabled', self.enable_ssl) gYamon.vset('errors', 0) gYamon.lcreate("tunnel_rtt", 100) gYamon.lcreate("tunnel_wrtt", 100) gYamon.lists['buffered_bytes'] = [1, 0, common.buffered_bytes] - gYamon.views['selectables'] = (selectables.SELECTABLES, { - 'idle': [0, 0, self.conns.idle], - 'conns': [0, 0, self.conns.conns] - }) except: pass diff --git a/pagekite/pk.py b/pagekite/pk.py index f7b295ef..4af2ffd5 100755 --- a/pagekite/pk.py +++ b/pagekite/pk.py @@ -95,13 +95,13 @@ 'fe_certname=', 'fe_nocertcheck', 'ca_certs=', 'kitename=', 'kitesecret=', 'fingerpath=', 'backend=', 'define_backend=', 'be_config=', - 'insecure', 'ratelimit_ips=', + 'insecure', 'ratelimit_ips=', 'max_read_bytes=', 'select_loop_min_ms=', 'service_on=', 'service_off=', 'service_cfg=', 'tunnel_acl=', 'client_acl=', 'accept_acl_file=', 'frontend=', 'nofrontend=', 'frontends=', 'keepalive=', 'torify=', 'socksify=', 'proxy=', 'noproxy', 'new', 'all', 'noall', 'dyndns=', 'nozchunks', 'sslzlib', 'wschunks', - 'buffers=', 'noprobes', 'debugio', 'watch=', 'loglevel=', + 'buffers=', 'noprobes', 'debugio', 'watch=', 'loglevel=', 'watchdog=', 'overload=', 'overload_cpu=', 'overload_mem=', 'overload_file=', # DEPRECATED: 'reloadfile=', 'autosave', 'noautosave', 'webroot=', @@ -175,6 +175,85 @@ def supports_auth(self): return ('AUTH' in self.capabilities) +class Watchdog(threading.Thread): + """Kill the app if it locks up.""" + daemon = True + + def __init__(self, timeout): + threading.Thread.__init__(self) + self.pid = os.getpid() + self.conns = [] + self.timeout = timeout + self.updated = time.time() + self.locks = {} + + @classmethod + def DumpConnState(cls, conns, close=False, logfunc=None): + for fpc in copy.copy(conns.ping_helper.clients): + try: + if close: + (logfunc or logging.LogError)('Closing FPC %s' % (fpc,)) + fpc[1].close() + else: + (logfunc or logging.LogInfo)('FastPing: %s' % (fpc,)) + except: + pass + for conn in copy.copy(conns.conns): + try: + if close: + (logfunc or logging.LogError)('Closing %s' % conn) + conn.fd.close() + else: + (logfunc or logging.LogInfo)('Conn %s' % conn) + except: + pass + + def patpatpat(self): + self.updated = time.time() + + def run(self): + import signal + if self.timeout: + self.timeout = max(15, self.timeout) # Lower than this won't work! + if common.gYamon and self.timeout: + common.gYamon.vset('watchdog', self.timeout) + + failed = 5 # Log happy message after first sleep + worries = 0 + last_update = self.updated + while self.timeout and (failed < 10) and (worries < self.timeout): + time.sleep(self.timeout / 10.0) + if self.updated == last_update: + failed += 1 + worries += 1 + logging.LogInfo('Watchdog is worried (timeout=%ds, failures=%d/10, worries=%.1f/%d)' + % (self.timeout, failed, worries, self.timeout)) + if common.gYamon: + common.gYamon.vadd('watchdog_worried', 1) + if failed in (1, 6): + os.kill(self.pid, signal.SIGUSR1) + else: + if failed: + logging.LogInfo('Watchdog is happy (timeout=%ds)' % self.timeout) + failed = 0 + worries *= 0.9 + last_update = self.updated + + if self.timeout: + try: + for lock_name, lock in self.locks.iteritems(): + logging.LogDebug('Lock %s %s' % ( + lock_name, + lock.acquire(blocking=False) and 'is free' or 'is LOCKED')) + self.DumpConnState(self.conns, close=True) + finally: + logging.LogError('Watchdog is sad: kill -INT %s' % self.pid) + os.kill(self.pid, signal.SIGINT) + time.sleep(2) + logging.LogError('Watchdog is sad: kill -9 %s' % self.pid) + os.kill(self.pid, 9) + + class AuthThread(threading.Thread): """Handle authentication work in a separate thread.""" daemon = True @@ -187,16 +266,14 @@ def __init__(self, conns): self.qtime = 0.250 # A decent initial estimate def check(self, requests, conn, callback): - self.qc.acquire() - self.jobs.append((requests, conn, callback)) - self.qc.notify() - self.qc.release() + with self.qc: + self.jobs.append((requests, conn, callback)) + self.qc.notify() def quit(self): - self.qc.acquire() - self.keep_running = False - self.qc.notify() - self.qc.release() + with self.qc: + self.keep_running = False + self.qc.notify() try: self.join() except RuntimeError: @@ -213,128 +290,127 @@ def run(self): logging.LogDebug('AuthThread: done') def _run(self): - self.qc.acquire() - while self.keep_running: - now = int(time.time()) - if not self.jobs: - (requests, conn, callback) = None, None, None - self.qc.wait() - else: - (requests, conn, callback) = self.jobs.pop(0) - if logging.DEBUG_IO: print('=== AUTH REQUESTS\n%s\n===' % requests) - self.qc.release() - - quotas = [] - q_conns = [] - q_days = [] - ip_limits = [] - results = [] - log_info = [] - session = '%x:%s:' % (now, globalSecret()) - for request in requests: - try: - proto, domain, srand, token, sign, prefix = request - except: - logging.LogError('Invalid request: %s' % (request, )) - continue - - what = '%s:%s:%s' % (proto, domain, srand) - session += what - if not token or not sign: - # Send a challenge. Our challenges are time-stamped, so we can - # put stict bounds on possible replay attacks (20 minutes atm). - results.append(('%s-SignThis' % prefix, - '%s:%s' % (what, signToken(payload=what, - timestamp=now)))) - else: - # Note: These 15 seconds are a magic number which should be well - # below the timeout in proto.conns.Tunnel._Connect(). - if ((not self.conns.config.authfail_closed) - and len(self.jobs) >= (15 / self.qtime)): # Float division - logging.LogWarning('Quota lookup skipped, over 15s worth of jobs queued') - (quota, days, conns, ipc, ips, reason) = ( - -2, None, None, None, None, None) - else: - # This is a bit lame, but we only check the token if the quota - # for this connection has never been verified. - t0 = time.time() - (quota, days, conns, ipc, ips, reason) = ( - self.conns.config.GetDomainQuota( - proto, domain, srand, token, sign, - check_token=(conn.quota is None))) - elapsed = (time.time() - t0) - self.qtime = max(0.2, (0.9 * self.qtime) + (0.1 * elapsed)) - - duplicates = self.conns.Tunnel(proto, domain) - if not quota: - if not reason: reason = 'quota' - results.append(('%s-Invalid' % prefix, what)) - results.append(('%s-Invalid-Why' % prefix, - '%s;%s' % (what, reason))) - log_info.extend([('rejected', domain), - ('quota', quota), - ('reason', reason)]) - elif duplicates: - # Duplicates... is the old one dead? Trigger a ping. - for conn in duplicates: - conn.TriggerPing() - results.append(('%s-Duplicate' % prefix, what)) - log_info.extend([('rejected', domain), - ('duplicate', 'yes')]) - else: - results.append(('%s-OK' % prefix, what)) - quotas.append((quota, request)) - if conns: q_conns.append(conns) - if days: q_days.append(days) - if not ipc: - try: - ipc, ips = self.conns.config.GetDefaultIPsPerSecond(domain) - except ValueError: - pass - if ipc and ips: - ip_limits.append((float(ipc)/ips, ipc, ips)) # Float division - if (proto.startswith('http') and - self.conns.config.GetTlsEndpointCtx(domain)): - results.append(('%s-SSL-OK' % prefix, what)) - - results.append(('%s-SessionID' % prefix, - '%x:%s' % (now, sha1hex(session)))) - results.append(('%s-Misc' % prefix, urlencode({ - 'motd': (self.conns.config.motd_message or ''), - }))) - for upgrade in self.conns.config.upgrade_info: - results.append(('%s-Upgrade' % prefix, ';'.join(upgrade))) - - if quotas: - min_qconns = min(q_conns or [0]) - if q_conns and min_qconns: - results.append(('%s-QConns' % prefix, min_qconns)) - - min_qdays = min(q_days or [0]) - if q_days and min_qdays: - results.append(('%s-QDays' % prefix, min_qdays)) - - min_ip_limits = min(ip_limits or [(0, None, None)])[1:] - if ip_limits and min_ip_limits[0]: - results.append(('%s-IPsPerSec' % prefix, '%s/%s' % min_ip_limits)) - - nz_quotas = [qp for qp in quotas if qp[0] and qp[0] > 0] - if nz_quotas: - quota = min(nz_quotas)[0] - conn.quota = [quota, [qp[1] for qp in nz_quotas], time.time()] - results.append(('%s-Quota' % prefix, quota)) - elif requests: - if not conn.quota: - conn.quota = [None, requests, time.time()] + with self.qc: + while self.keep_running: + now = int(time.time()) + if not self.jobs: + (requests, conn, callback) = None, None, None + self.qc.wait() + else: + (requests, conn, callback) = self.jobs.pop(0) + if logging.DEBUG_IO: print('=== AUTH REQUESTS\n%s\n===' % requests) + self.qc.release() + + quotas = [] + q_conns = [] + q_days = [] + ip_limits = [] + results = [] + log_info = [] + session = '%x:%s:' % (now, globalSecret()) + for request in requests: + try: + proto, domain, srand, token, sign, prefix = request + except: + logging.LogError('Invalid request: %s' % (request, )) + continue + + what = '%s:%s:%s' % (proto, domain, srand) + session += what + if not token or not sign: + # Send a challenge. Our challenges are time-stamped, so we can + # put stict bounds on possible replay attacks (20 minutes atm). + results.append(('%s-SignThis' % prefix, + '%s:%s' % (what, signToken(payload=what, + timestamp=now)))) else: - conn.quota[2] = time.time() + # Note: These 15 seconds are a magic number which should be well + # below the timeout in proto.conns.Tunnel._Connect(). + if ((not self.conns.config.authfail_closed) + and len(self.jobs) >= (15 / self.qtime)): # Float division + logging.LogWarning('Quota lookup skipped, over 15s worth of jobs queued') + (quota, days, conns, ipc, ips, reason) = ( + -2, None, None, None, None, None) + else: + # This is a bit lame, but we only check the token if the quota + # for this connection has never been verified. + t0 = time.time() + (quota, days, conns, ipc, ips, reason) = ( + self.conns.config.GetDomainQuota( + proto, domain, srand, token, sign, + check_token=(conn.quota is None))) + elapsed = (time.time() - t0) + self.qtime = max(0.2, (0.9 * self.qtime) + (0.1 * elapsed)) + + duplicates = self.conns.Tunnel(proto, domain) + if not quota: + if not reason: reason = 'quota' + results.append(('%s-Invalid' % prefix, what)) + results.append(('%s-Invalid-Why' % prefix, + '%s;%s' % (what, reason))) + log_info.extend([('rejected', domain), + ('quota', quota), + ('reason', reason)]) + elif duplicates: + # Duplicates... is the old one dead? Trigger a ping. + for conn in duplicates: + conn.TriggerPing() + results.append(('%s-Duplicate' % prefix, what)) + log_info.extend([('rejected', domain), + ('duplicate', 'yes')]) + else: + results.append(('%s-OK' % prefix, what)) + quotas.append((quota, request)) + if conns: q_conns.append(conns) + if days: q_days.append(days) + if not ipc: + try: + ipc, ips = self.conns.config.GetDefaultIPsPerSecond(domain) + except ValueError: + pass + if ipc and ips: + ip_limits.append((float(ipc)/ips, ipc, ips)) # Float division + if (proto.startswith('http') and + self.conns.config.GetTlsEndpointCtx(domain)): + results.append(('%s-SSL-OK' % prefix, what)) + + results.append(('%s-SessionID' % prefix, + '%x:%s' % (now, sha1hex(session)))) + results.append(('%s-Misc' % prefix, urlencode({ + 'motd': (self.conns.config.motd_message or ''), + }))) + for upgrade in self.conns.config.upgrade_info: + results.append(('%s-Upgrade' % prefix, ';'.join(upgrade))) + + if quotas: + min_qconns = min(q_conns or [0]) + if q_conns and min_qconns: + results.append(('%s-QConns' % prefix, min_qconns)) + + min_qdays = min(q_days or [0]) + if q_days and min_qdays: + results.append(('%s-QDays' % prefix, min_qdays)) + + min_ip_limits = min(ip_limits or [(0, None, None)])[1:] + if ip_limits and min_ip_limits[0]: + results.append(('%s-IPsPerSec' % prefix, '%s/%s' % min_ip_limits)) + + nz_quotas = [qp for qp in quotas if qp[0] and qp[0] > 0] + if nz_quotas: + quota = min(nz_quotas)[0] + conn.quota = [quota, [qp[1] for qp in nz_quotas], time.time()] + results.append(('%s-Quota' % prefix, quota)) + elif requests: + if not conn.quota: + conn.quota = [None, requests, time.time()] + else: + conn.quota[2] = time.time() - if logging.DEBUG_IO: print('=== AUTH RESULTS\n%s\n===' % results) - callback(results, log_info) - self.qc.acquire() + if logging.DEBUG_IO: print('=== AUTH RESULTS\n%s\n===' % results) + callback(results, log_info) + self.qc.acquire() self.buffering = 0 - self.qc.release() ##[ Selectables ]############################################################## @@ -345,7 +421,7 @@ class Connections(object): def __init__(self, config): self.config = config self.ip_tracker = {} - self.lock = threading.Lock() + self.lock = threading.RLock() self.idle = [] self.conns = [] self.conns_by_id = {} @@ -388,7 +464,7 @@ def SetIdle(self, conn, seconds): self.idle.append((time.time() + seconds, conn.last_activity, conn)) def TrackIP(self, ip, domain): - tick = '%d' % (time.time()/12) + tick = '%d' % (time.time()//12) with self.lock: if tick not in self.ip_tracker: deadline = int(tick)-10 @@ -783,7 +859,7 @@ def DoFrontendWork(self, loop_count): # Update our idea of what it means to be overloaded. if self.pkite.overload and (1 == loop_count % 20): - self.pkite.CalculateOverload() + self.pkite.CalculateOverload(yamon=common.gYamon) # FIXME: Front-ends should close dead back-end tunnels. for tid in self.conns.tunnels: @@ -1056,6 +1132,8 @@ def ResetConfiguration(self): self.main_loop = True self.watch_level = [None] + self.watchdog = None + self.overload = None self.overload_cpu = 0.75 self.overload_mem = 0.85 @@ -1492,7 +1570,6 @@ def addManualFrontends(): com + ('overload_mem = %-5s # 0=fixed' % self.overload_cpu), com + ('overload_cpu = %-5s # 0=fixed' % self.overload_mem) ]) - config.extend([ '', '##[ Front-end access controls (default=deny, if configured) ]##', @@ -1527,8 +1604,13 @@ def addManualFrontends(): (self.no_probes and 'noprobes' or '# noprobes'), (self.crash_report_url and '# nocrashreport' or 'nocrashreport'), p('savefile = %s', safe and self.savefile, '/path/to/savefile'), - '', ]) + if common.MAX_READ_BYTES != 16*1024: + config.append('max_read_bytes = %sx%.3f' + % (common.MAX_READ_BYTES, common.MAX_READ_TUNNEL_X)) + if common.SELECT_LOOP_MIN_MS != 5: + config.append('select_loop_min_ms = %s' % common.SELECT_LOOP_MIN_MS) + config.append('') if self.daemonize or self.setuid or self.setgid or self.pidfile or new: config.extend([ @@ -1630,6 +1712,20 @@ def CanSaveConfig(self, savefile=None, _raise=None): return False return savefile + def CanSaveConfig(self, savefile=None, _raise=None): + savefile = savefile or self.savefile or self.rcfile + try: + if os.path.exists(savefile): + open(savefile, 'r+').close() + else: + open(savefile, 'w').close() # FIXME: Python3.3 adds mode=x, use it! + os.remove(savefile) + except (IOError, OSError): + if _raise is not None: + raise _raise("Could not write to: %s" % savefile) + return False + return savefile + def SaveUserConfig(self, quiet=False): self.savefile = self.savefile or self.rcfile try: @@ -1880,14 +1976,15 @@ def _get_overload_factor(self): if common.gYamon is not None: return ( common.gYamon.values.get('backends_live', 0) + - common.gYamon.values.get('selectables_live', 1)) + common.gYamon.values.get('selectables_live', 1)) or 1 return (len(self.conns.tunnels) or 1) - def CalculateOverload(self, cload=None): + def CalculateOverload(self, cload=None, yamon=None): # Check overload file first, it overrides everything if self.overload_file: try: - new_overload = int(open(self.overload_file, 'r').read().strip()) + with open(self.overload_file, 'r') as fd: + new_overload = int(fd.read().strip()) if new_overload != self.overload_current: self.overload_current = new_overload logging.LogInfo( @@ -1900,25 +1997,21 @@ def CalculateOverload(self, cload=None): # FIXME: This is almost certainly linux specific. # FIXME: There are too many magic numbers in here. try: - # Check internal load, abort if load is low anyway. - cload = cload or self._get_overload_factor() - if ((cload <= (self.overload // 2)) and - (self.overload == self.overload_current)): - return - # If both are disabled, just bail out. if not (self.overload_cpu or self.overload_mem): return # Check system load. - loadavg = float(open('/proc/loadavg', 'r').read().strip().split()[1]) + with open('/proc/loadavg', 'r') as fd: + loadavg = float(fd.read().strip().split()[1]) meminfo = {} - for line in open('/proc/meminfo', 'r'): - try: - key, val = line.lower().split(':') - meminfo[key] = int(val.strip().split()[0]) - except ValueError: - pass + with open('/proc/meminfo', 'r') as fd: + for line in fd: + try: + key, val = line.lower().split(':') + meminfo[key] = int(val.strip().split()[0]) + except ValueError: + pass # Figure out how much RAM is available memfree = meminfo.get('memavailable') @@ -1929,14 +2022,19 @@ def CalculateOverload(self, cload=None): if not self.overload_membase: self.overload_membase = float(meminfo['memtotal']) - memfree # Sanity checks... are these really necessary? - self.overload_membase = max(75000, self.overload_membase) + self.overload_membase = max(50000, self.overload_membase) self.overload_membase = min(self.overload_membase, 0.9 * meminfo['memtotal']) + # Check internal load, abort if load is low anyway. + cload = cload or self._get_overload_factor() + if cload < 50: + return + # Calculate the implied unit cost of every live connection memtotal = float(meminfo['memtotal'] - self.overload_membase) - munit = max(75, float(memtotal - memfree) / cload) # 75KB/conn=optimism! # Float division - lunit = loadavg / cload # Float division + munit = max(32, float(memtotal - memfree) / cload) # 32KB/conn=optimism! # Float division + lunit = max(0.10, loadavg) / cload # Calculate overload factors based on the unit costs moverload = int(self.overload_mem * float(memtotal) / munit) # Integer division @@ -1962,6 +2060,9 @@ def CalculateOverload(self, cload=None): self.overload, moverload, loverload, cload, munit, lunit, memfree, memtotal, loadavg)) + if yamon is not None: + yamon.vset('overload_unit_mem', munit) + yamon.vset('overload_unit_cpu', lunit) except (IOError, OSError, ValueError, KeyError, TypeError): pass @@ -2133,9 +2234,8 @@ def BindUiSspec(self, force=False): def LoadMOTD(self): if self.motd: try: - f = open(self.motd, 'r') - self.motd_message = ''.join(f.readlines()).strip()[:8192] - f.close() + with open(self.motd, 'r') as f: + self.motd_message = ''.join(f.readlines()).strip()[:8192] except (OSError, IOError): pass @@ -2337,6 +2437,15 @@ def Configure(self, argv): which, limit = '*', arg self.GetDefaultIPsPerSecond(None, limit.strip()) # ValueErrors if bad self.ratelimit_ips[which.strip()] = limit.strip() + elif opt == '--max_read_bytes': + if 'x' in arg: + base, tmul = arg.split('x') + common.MAX_READ_BYTES = max(1024, int(base)) + common.MAX_READ_TUNNEL_X = max(1, float(tmul)) + else: + common.MAX_READ_BYTES = max(1024, int(arg)) + elif opt == '--select_loop_min_ms': + common.SELECT_LOOP_MIN_MS = max(0, min(int(arg), 100)) elif opt == '--accept_acl_file': self.accept_acl_file = arg elif opt == '--client_acl': @@ -2453,6 +2562,8 @@ def Configure(self, argv): elif opt == '--sslzlib': self.enable_sslzlib = True elif opt == '--watch': self.watch_level[0] = int(arg) + elif opt == '--watchdog': + self.watchdog = Watchdog(int(arg)) elif opt == '--overload': self.overload_current = self.overload = int(arg) elif opt == '--overload_file': @@ -3238,9 +3349,10 @@ def GetHostIpAddrs(self, host): rv = [] if host[:1] == '@': try: - for line in (l.strip() for l in open(host[1:], 'r')): - if line and line[:1] not in ('#', ';'): - rv.append(line) + with open(host[1:], 'r') as fd: + for line in (l.strip() for l in fd): + if line and line[:1] not in ('#', ';'): + rv.append(line) logging.LogDebug('Loaded %d IPs from %s' % (len(rv), host[1:])) except: logging.LogDebug('Failed to load IPs from %s' % host[1:]) @@ -3795,28 +3907,30 @@ def Epoll(self, epoll, waittime): evs = [] broken = False try: - bbc = 0 with self.conns.lock: - for c in self.conns.conns: - fd, mask = c.fd, 0 - if not c.IsDead(): - if c.IsBlocked(): - bbc += len(c.write_blocked) - mask |= select.EPOLLOUT - if c.IsReadable(now): - mask |= select.EPOLLIN + clist = copy.copy(self.conns.conns) - if mask: - try: - fdc[fd.fileno()] = fd - except socket.error: - # If this fails, then the socket has HUPed, however we need to - # bypass epoll to make sure that's reflected in iready below. - bid = 'dead-%d' % len(evs) - fdc[bid] = fd - evs.append((bid, select.EPOLLHUP)) - # Trigger removal of c.fd, if it was still in the epoll. - fd, mask = None, 0 + bbc = 0 + for c in clist: + fd, mask = c.fd, 0 + if not c.IsDead(): + if c.IsBlocked(): + bbc += len(c.write_blocked) + mask |= select.EPOLLOUT + if c.IsReadable(now): + mask |= select.EPOLLIN + + if mask: + try: + fdc[fd.fileno()] = fd + except socket.error: + # If this fails, then the socket has HUPed, however we need to + # bypass epoll to make sure that's reflected in iready below. + bid = 'dead-%d' % len(evs) + fdc[bid] = fd + evs.append((bid, select.EPOLLHUP)) + # Trigger removal of c.fd, if it was still in the epoll. + fd, mask = None, 0 if mask: try: @@ -3872,13 +3986,30 @@ def Loop(self): if self.tunnel_manager: self.tunnel_manager.start() if self.ui_comm: self.ui_comm.start() + if self.watchdog: + self.watchdog.conns = self.conns.conns + try: + self.watchdog.locks['httpd.RCI.lock'] = self.ui_httpd.httpd.RCI.lock + except AttributeError: + pass + if common.gYamon: + self.watchdog.locks['YamonD.lock'] = common.gYamon.lock + # FIXME: Add the AuthApp locks? + for i in range(0, len(self.conns.auth_pool)): + lock_name = 'conns.auth_pool[%d].qc' % i + self.watchdog.locks[lock_name] = self.conns.auth_pool[i].qc + self.watchdog.locks.update({ + 'Connections.lock': self.conns.lock, + 'SELECTABLE_LOCK': SELECTABLE_LOCK}) + self.watchdog.start() + epoll, mypoll = self.CreatePollObject() - self.last_barf = self.last_loop = time.time() + self.last_loop = time.time() logging.LogDebug('Entering main %s loop' % (epoll and 'epoll' or 'select')) loop_count = 0 while self.keep_looping: - epoll, iready, oready, eready = mypoll(epoll, 1.1) + epoll, iready, oready, eready = mypoll(epoll, 1.10) now = time.time() if oready: @@ -3898,24 +4029,35 @@ def Loop(self): self.last_loop = now loop_count += 1 - if now - self.last_barf > (logging.DEBUG_IO and 15 or 600): - self.last_barf = now - if epoll: - epoll.close() - epoll, mypoll = self.CreatePollObject() - with SELECTABLE_LOCK: - gc.collect() - log_info, lvl = [('main_loop', loop_count)], logging.LOG_LEVEL_INFO - if logging.LOG_LEVEL >= logging.LOG_LEVEL_DEBUG: - log_info.append(('selectable_map', '%s' % SELECTABLES)) - lvl = logging.LOG_LEVEL_DEBUG - logging.Log(log_info, level=lvl) - if logging.DEBUG_IO: - for obj in gc.get_objects(): - if isinstance(obj, Selectable): - if obj.dead: - holders = gc.get_referrers(obj) - logging.LogDebug('Dead: %s held by %s' % (obj, str(holders[-1])[:50])) + # This delay does things! + # Pro: + # - Reduce overhead by batching IO events together + # Mixed: + # - Along with Tunnel.maxread, this caps the per-stream/tunnel + # bandwidth. The default SELECT_LOOP_MIN_MS=5, combined with + # a MAX_READ_BYTES=16 (doubled for tunnels) lets us read from + # the socket 200x/second: 200 * 32kB =~ 6MB/s. This is the + # MAXIMUM outgoing bandwidth of any live tunnel, limiting + # how much load any single connection can generate. Total + # incoming bandwidth per-conn is half that. + # Con: + # - Adds latency + # + if self.isfrontend: + snooze = max(0, (now + common.SELECT_LOOP_MIN_MS/1000.0) - time.time()) + if snooze: + if oready: + snooze /= 2 + time.sleep(snooze) + else: + snooze = 0 + + if 0 == (loop_count % (5 if logging.DEBUG_IO else 250)): + logging.LogDebug('Loop #%d (i=%d, o=%d, e=%d, s=%.3fs) v%s' + % (loop_count, len(iready), len(oready), len(eready), snooze, APPVER)) + + if self.watchdog: + self.watchdog.patpatpat() if epoll: epoll.close() @@ -3937,8 +4079,11 @@ def Start(self, howtoquit='CTRL+C = Stop'): alignright='[%s]' % howtoquit) config_report = [('started', self.pyfile), ('version', APPVER), ('platform', sys.platform), + ('python', sys.version.replace('\n', ' ')), ('argv', ' '.join(sys.argv[1:])), - ('ca_certs', self.ca_certs)] + ('ca_certs', self.ca_certs), + ('send_always_buffers', SEND_ALWAYS_BUFFERS), + ('tunnel_socket_blocks', TUNNEL_SOCKET_BLOCKS)] for optf in self.rcfiles_loaded: config_report.append(('optfile_%s' % optf, 'ok')) logging.Log(config_report, level=logging.LOG_LEVEL) @@ -4018,6 +4163,16 @@ def reopen(x,y): logging.LogWarning( 'Warning: signal handler unavailable, logrotate will not work.') + # Set up SIGUSR1 handler. + try: + import signal + def dumpconns(x,y): + logging.LogInfo('SIGUSR1 received, dumping conn state') + Watchdog.DumpConnState(self.conns) + signal.signal(signal.SIGUSR1, dumpconns) + except Exception: + logging.LogError('Warning: signal handler unavailable, kill -USR1 will not work.') + # Disable compression in OpenSSL if socks.HAVE_SSL and not self.enable_sslzlib: socks.DisableSSLCompression() @@ -4028,9 +4183,8 @@ def reopen(x,y): # Create PID file if self.pidfile: - pf = open(self.pidfile, 'w') - pf.write('%s\n' % os.getpid()) - pf.close() + with open(self.pidfile, 'w') as pf: + pf.write('%s\n' % os.getpid()) # Do this after creating the PID and log-files. if self.daemonize: diff --git a/pagekite/proto/conns.py b/pagekite/proto/conns.py index ea8c69f4..dee76ff2 100755 --- a/pagekite/proto/conns.py +++ b/pagekite/proto/conns.py @@ -66,10 +66,6 @@ def __init__(self, conns): self.server_info = ['x.x.x.x:x', [], [], [], False, False, None, False] self.Init(conns) - # We want to be sure to read the entire chunk at once, including - # headers to save cycles, so we double the size we're willing to - # read here. - self.maxread *= 2 def Init(self, conns): self.conns = conns @@ -82,6 +78,7 @@ def Init(self, conns): self.using_tls = False self.filters = [] self.ip_limits = None + self.maxread = int(common.MAX_READ_BYTES * common.MAX_READ_TUNNEL_X) def Cleanup(self, close=True): if self.users: @@ -751,11 +748,14 @@ def SendData(self, conn, data, sid=None, host=None, proto=None, port=None, sending.append(data) return self.SendChunked(sending, zhistory=self.zhistory.get(sid)) - # Larger amounts we break into fragments to work around bugs in - # some of our small-buffered embedded clients. We aim for roughly - # one fragment per packet, assuming an MTU of 1500 bytes. + # Larger amounts we break into fragments at the FE, to work around bugs + # in some of our small-buffered embedded clients. We aim for roughly + # one fragment per packet, assuming an MTU of 1500 bytes. We use + # much larger fragments at the back-end, relays can be assumed to + # be up-to-date and larger chunks saves CPU and improves throughput. + frag_size = self.conns.config.isfrontend and 1024 or (self.maxread+1024) sending.append('') - frag_size = max(1024, 1400-len(''.join(sending))) + frag_size = max(frag_size, 1400-len(''.join(sending))) first = True while data or first: sending[-1] = data[:frag_size] @@ -764,7 +764,7 @@ def SendData(self, conn, data, sid=None, host=None, proto=None, port=None, data = data[frag_size:] if first: sending = ['SID: %s\r\n' % sid, '\r\n', ''] - frag_size = max(1024, 1400-len(''.join(sending))) + frag_size = max(frag_size, 1400-len(''.join(sending))) first = False return True @@ -865,14 +865,11 @@ def SendQuota(self, pong=''): ) % (pong, self.quota[0]), compress=False, just_buffer=True) - def SendProgress(self, sid, conn, throttle=False): - # FIXME: Optimize this away unless meaningful progress has been made? + def SendProgress(self, sid, conn): msg = ('NOOP: 1\r\n' 'SID: %s\r\n' - 'SKB: %d\r\n') % (sid, (conn.all_out + conn.wrote_bytes)//1024) - throttle = throttle and ('SPD: %d\r\n' % conn.write_speed) or '' - return self.SendChunked('%s%s\r\n!' % (msg, throttle), - compress=False, just_buffer=True) + 'SKB: %d\r\n\r\n') % (sid, (conn.all_out + conn.wrote_bytes)/1024) + return self.SendChunked(msg, compress=False, just_buffer=True) def ProcessCorruptChunk(self, data): self.ResetRemoteZChunks() @@ -888,17 +885,12 @@ def Probe(self, host): return False return True - def AutoThrottle(self, max_speed=None, remote=False, delay=0.2): - # Never throttle tunnels. - return True - def ProgressTo(self, parse): try: sid = int(parse.Header('SID')[0]) - bps = int((parse.Header('SPD') or [-1])[0]) skb = int((parse.Header('SKB') or [-1])[0]) if sid in self.users: - self.users[sid].RecordProgress(skb, bps) + self.users[sid].RecordProgress(skb) except: logging.LogError(('Tunnel::ProgressTo: That made no sense! %s' ) % format_exc()) @@ -1148,9 +1140,6 @@ def ProcessChunk(self, data): # select/epoll loop catch and handle it. pass - if len(conn.write_blocked) > 0 and conn.created < time.time()-3: - return self.SendProgress(sid, conn, throttle=True) - else: # No connection? Close this stream. self.CloseStream(sid) @@ -1168,7 +1157,6 @@ def __init__(self, conns, which, backends): if self.fd: self.fd = None self.weighted_rtt = -1000 - self.lock = WithableStub() self.backends = backends self.require_all = True self.server_info[self.S_NAME] = LOOPBACK[which] @@ -1581,12 +1569,6 @@ def ProcessData(self, data): self.LogDebug('Send to tunnel failed') return False - # Back off if tunnel is stuffed. - if self.tunnel and len(self.tunnel.write_blocked) > 1024000: - # FIXME: think about this... - self.Throttle(delay=(len(self.tunnel.write_blocked)-204800)//max(50000, - self.tunnel.write_speed)) - if self.read_eof: return self.ProcessEofRead() return True @@ -1598,6 +1580,12 @@ class UnknownConn(MagicProtocolParser): def __init__(self, fd, address, on_port, conns): MagicProtocolParser.__init__(self, fd, address, on_port, ui=conns.config.ui) self.peeking = True + self.sid = -1 + self.host = None + self.proto = None + self.said_hello = False + self.bad_loops = 0 + self.error_details = {} # Set up our parser chain. self.parsers = [HttpLineParser] @@ -1611,13 +1599,6 @@ def __init__(self, fd, address, on_port, conns): self.conns.Add(self) self.conns.SetIdle(self, 10) - self.sid = -1 - self.host = None - self.proto = None - self.said_hello = False - self.bad_loops = 0 - self.error_details = {} - def Cleanup(self, close=True): MagicProtocolParser.Cleanup(self, close=close) self.conns = self.parser = None @@ -1936,10 +1917,10 @@ def __init__(self, fd, address, on_port, conns): self.Send('PageKite? %s\r\n' % self.challenge) def readline(self): - self.qc.acquire() - while not self.lines: self.qc.wait() - line = self.lines.pop(0) - self.qc.release() + with self.qc: + while not self.lines: + self.qc.wait() + line = self.lines.pop(0) return line def write(self, data): @@ -1959,10 +1940,9 @@ def Disconnect(self): def ProcessLine(self, line, lines): if self.state == self.STATE_LIVE: - self.qc.acquire() - self.lines.append(line) - self.qc.notify() - self.qc.release() + with self.qc: + self.lines.append(line) + self.qc.notify() return True elif self.state == self.STATE_PASSWORD: if line.strip() == self.expect: @@ -2006,7 +1986,7 @@ def __init__(self, conns): self.clients = [] self.rejection = None self.overloaded = False - self.processing = 0 + self.waiting = True self.sleeptime = 0.03 self.fast_pinged = [] self.next_pinglog = time.time() + 1 @@ -2020,20 +2000,14 @@ def up_rejection(self): advertise=False) def add_client(self, client, addr, handler): + client.setblocking(0) with self.lock: - if self.processing < 1 and not self.clients: - ping_queue = True - else: - ping_queue = False - - client.setblocking(0) self.clients.append((time.time(), client, addr, handler)) - if ping_queue: - self.wq.put(1) + if self.waiting: + self.wq.put(1) def run_once(self): now = time.time() - self.processing = len(self.clients) with self.lock: _clients, self.clients = self.clients, [] for ts, client, addr, handler in _clients: @@ -2059,16 +2033,14 @@ def run_once(self): logging.LogDebug('IOError, dropping ' + obfuIp(addr[0])) # No action: just let the client get garbage collected except: - pass - self.processing -= 1 + logging.LogDebug('Error in FastPing: ' + format_exc()) if now > self.next_pinglog: - if self.fast_pinged: - logging.LogDebug('Fast ping %s %d clients: %s' % ( - 'discouraged' if self.overloaded else 'welcomed', - len(self.fast_pinged), - ', '.join(self.fast_pinged))) - self.fast_pinged = [] + logging.LogDebug('Fast ping %s %d clients: %s' % ( + 'discouraged' if self.overloaded else 'welcomed', + len(self.fast_pinged), + ', '.join(self.fast_pinged))) + self.fast_pinged = [] self.up_rejection() self.next_pinglog = now + 1 @@ -2076,10 +2048,12 @@ def run_once(self): def run_until(self, deadline): try: - self.sleeptime = 0.03 while (time.time() + self.sleeptime) < deadline and self.clients: + with self.lock: + self.waiting = True while not self.wq.empty(): self.wq.get() + self.waiting = False time.sleep(self.sleeptime) self.run_once() except: @@ -2089,9 +2063,11 @@ def run(self): while True: try: while True: + with self.lock: + self.waiting = True while not self.clients or not self.wq.empty(): self.wq.get() - self.sleeptime = 0.03 + self.waiting = False time.sleep(self.sleeptime) self.run_once() except: @@ -2129,23 +2105,24 @@ def check_acl(self, ipaddr, default=True): try: ipaddr = '%s' % ipaddr lc = 0 - for line in open(self.acl, 'r'): - line = line.lower().strip() - lc += 1 - if line.startswith('#') or not line: - continue - try: - words = line.split() - pattern, rule = words[:2] - reason = ' '.join(words[2:]) - if ipaddr == pattern: - self.acl_match = (lc, pattern, rule, reason) - return bool('allow' in rule) - elif re.compile(pattern).match(ipaddr): - self.acl_match = (lc, pattern, rule, reason) - return bool('allow' in rule) - except IndexError: - self.LogDebug('Invalid line %d in ACL %s' % (lc, self.acl)) + with open(self.acl, 'r') as fd: + for line in fd: + line = line.lower().strip() + lc += 1 + if line.startswith('#') or not line: + continue + try: + words = line.split() + pattern, rule = words[:2] + reason = ' '.join(words[2:]) + if ipaddr == pattern: + self.acl_match = (lc, pattern, rule, reason) + return bool('allow' in rule) + elif re.compile(pattern).match(ipaddr): + self.acl_match = (lc, pattern, rule, reason) + return bool('allow' in rule) + except IndexError: + self.LogDebug('Invalid line %d in ACL %s' % (lc, self.acl)) except: self.LogDebug( 'Failed to read/parse %s: %s' % (self.acl, format_exc())) @@ -2153,48 +2130,46 @@ def check_acl(self, ipaddr, default=True): return default def HandleClient(self, client, address): + log_info = [('port', self.port)] if self.check_acl(address[0]): - log_info = [('accept', '%s:%s' % (obfuIp(address[0]), address[1]))] + log_info += [('accept', '%s:%s' % (obfuIp(address[0]), address[1]))] uc = self.connclass(client, address, self.port, self.conns) else: - log_info = [('reject', '%s:%s' % (obfuIp(address[0]), address[1]))] + log_info += [('reject', '%s:%s' % (obfuIp(address[0]), address[1]))] client.close() if self.acl: - log_info.extend([('acl_line', '%s' % self.acl_match[0]), - ('reason', self.acl_match[3])]) + log_info += [('acl_line', '%s' % self.acl_match[0]), + ('reason', self.acl_match[3])] self.Log(log_info) return True def ReadData(self, maxread=None): try: + self.sstate = 'accept' self.last_activity = time.time() client, address = self.fd.accept() - if client: - if self.port not in SMTP_PORTS: - self.conns.ping_helper.add_client(client, address, self.HandleClient) - else: -<<<<<<< HEAD - log_info = [('reject', '%s:%s' % (obfuIp(address[0]), address[1]))] - client.close() - if self.acl: - log_info.extend([('acl_line', '%s' % self.acl_match[0]), - ('reason', self.acl_match[3])]) - self.Log(log_info) - return True - - except IOError as err: -======= - self.HandleClient(client, address) + if self.port not in SMTP_PORTS: + while client: + try: + self.conns.ping_helper.add_client(client, address, self.HandleClient) + client, address = self.fd.accept() + except IOError: + client = None + elif client: + self.sstate = 'client' + self.HandleClient(client, address) + self.sstate = (self.dead and 'dead' or 'idle') return True except IOError, err: ->>>>>>> 7c04593... Create an efficient fast-path for PageKite ping traffic + self.sstate += '/ioerr=%s' % (err.errno,) self.LogDebug('Listener::ReadData: error: %s (%s)' % (err, err.errno)) - except socket.error as err: - (errno, msg) = err.args + except socket.error, (errno, msg): + self.sstate += '/sockerr=%s' % (errno,) self.LogInfo('Listener::ReadData: error: %s (errno=%s)' % (msg, errno)) - except Exception as e: + except Exception, e: + self.sstate += '/exc' self.LogDebug('Listener::ReadData: %s' % e) return True diff --git a/pagekite/proto/parsers.py b/pagekite/proto/parsers.py index 29ca57ea..731354dc 100755 --- a/pagekite/proto/parsers.py +++ b/pagekite/proto/parsers.py @@ -185,13 +185,8 @@ def Parse(self, line): elif (self.state == self.IN_BODY): return self.ParseBody(line) -<<<<<<< HEAD except ValueError as err: - logging.LogDebug('Parse failed: %s, %s, %s' % (self.state, err, self.lines)) -======= - except ValueError, err: logging.LogDebug('HTTP parse failed: %s, %s, %s' % (self.state, err, self.lines)) ->>>>>>> 7c04593... Create an efficient fast-path for PageKite ping traffic self.state = BaseLineParser.PARSE_FAILED return False @@ -272,13 +267,8 @@ def Parse(self, line): self.lines[-1] = '%s %s %s\n' % (ocmd, arg0, ' '.join(args[1:])) else: self.state = BaseLineParser.PARSE_FAILED -<<<<<<< HEAD except Exception as err: - logging.LogDebug('Parse failed: %s, %s, %s' % (self.state, err, self.lines)) -======= - except Exception, err: logging.LogDebug('IRC parse failed: %s, %s, %s' % (self.state, err, self.lines)) ->>>>>>> 7c04593... Create an efficient fast-path for PageKite ping traffic self.state = BaseLineParser.PARSE_FAILED return (self.state != BaseLineParser.PARSE_FAILED) diff --git a/pagekite/proto/proto.py b/pagekite/proto/proto.py index f0c00737..9c72d30f 100755 --- a/pagekite/proto/proto.py +++ b/pagekite/proto/proto.py @@ -49,9 +49,10 @@ def globalSecret(): # Next, see if we can augment that with some real randomness. try: - newSecret = sha1hex(s(open('/dev/urandom', 'rb').read(64)) + gSecret) - gSecret = newSecret - logging.LogDebug('Seeded signatures using /dev/urandom, hooray!') + with open('/dev/urandom', 'rb') as fd: + newSecret = sha1hex(fd.read(64) + gSecret) + gSecret = newSecret + logging.LogDebug('Seeded signatures using /dev/urandom, hooray!') except: try: newSecret = sha1hex(s(os.urandom(64)) + gSecret) @@ -62,6 +63,7 @@ def globalSecret(): return gSecret + TOKEN_LENGTH=36 def signToken(token=None, secret=None, payload='', timestamp=None, length=TOKEN_LENGTH): diff --git a/pagekite/proto/selectables.py b/pagekite/proto/selectables.py index 6eace9e3..3b669e55 100755 --- a/pagekite/proto/selectables.py +++ b/pagekite/proto/selectables.py @@ -48,22 +48,22 @@ def obfuIp(ip): return '~%s' % '.'.join([q for q in quads[-2:]]) -SELECTABLE_LOCK = threading.Lock() +SELECTABLE_LOCK = threading.RLock() # threading.Lock() will deadlock on pypy! SELECTABLE_ID = 0 -SELECTABLES = {} +SELECTABLES = set([]) def getSelectableId(what): global SELECTABLES, SELECTABLE_ID, SELECTABLE_LOCK with SELECTABLE_LOCK: count = 0 + SELECTABLE_ID += 1 + SELECTABLE_ID %= 0x20000 while SELECTABLE_ID in SELECTABLES: SELECTABLE_ID += 1 - SELECTABLE_ID %= 0x40000 - if (SELECTABLE_ID % 0x00800) == 0: - logging.LogDebug('Selectable map: %s' % (SELECTABLES, )) + SELECTABLE_ID %= 0x20000 count += 1 - if count > 0x40000: + if count > 0x20000: raise ValueError('Too many conns!') - SELECTABLES[SELECTABLE_ID] = what + SELECTABLES.add(SELECTABLE_ID) return SELECTABLE_ID @@ -74,7 +74,7 @@ class Selectable(object): errno.EDEADLK, errno.EWOULDBLOCK, errno.ENOBUFS, errno.EALREADY) - def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, + def __init__(self, fd=None, address=None, on_port=None, maxread=None, ui=None, tracked=True, bind=None, backlog=100): self.fd = None @@ -98,6 +98,7 @@ def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, self.address = address self.on_port = on_port self.created = self.bytes_logged = time.time() + self.lock = threading.RLock() self.last_activity = 0 self.dead = False self.ui = ui @@ -108,7 +109,7 @@ def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, self.q_days = None # Read-related variables - self.maxread = maxread + self.maxread = maxread or common.MAX_READ_BYTES self.read_bytes = self.all_in = 0 self.read_eof = False self.peeking = False @@ -121,14 +122,10 @@ def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, self.write_eof = False self.write_retry = None - # Flow control v1 - self.throttle_until = (time.time() - 1) - self.max_read_speed = 96*1024 # Flow control v2 self.acked_kb_delta = 0 # Compression stuff - self.lock = threading.Lock() self.zw = None self.zlevel = 1 self.zreset = False @@ -138,6 +135,7 @@ def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, self.ws_zero_mask = False # Logging + self.sstate = 'new' self.alt_id = None self.countas = 'selectables_live' self.sid = self.gsid = getSelectableId(self.countas) @@ -153,12 +151,11 @@ def __init__(self, fd=None, address=None, on_port=None, maxread=16*1024, common.gYamon.vadd('selectables', 1) def CountAs(self, what): - if common.gYamon: - common.gYamon.vadd(self.countas, -1) - common.gYamon.vadd(what, 1) - self.countas = what - global SELECTABLES - SELECTABLES[self.gsid] = '%s %s' % (self.countas, self) + with self.lock: + if common.gYamon: + common.gYamon.vadd(self.countas, -1) + common.gYamon.vadd(what, 1) + self.countas = what def Cleanup(self, close=True): self.peeked = self.zw = '' @@ -171,27 +168,34 @@ def Cleanup(self, close=True): self.fd = None if not self.dead: self.dead = True + self.sstate = 'dead' self.CountAs('selectables_dead') if close: self.LogTraffic(final=True) + try: + global SELECTABLES, SELECTABLE_LOCK + with SELECTABLE_LOCK: + SELECTABLES.remove(self.gsid) + except KeyError: + pass def __del__(self): + # Important: This can run at random times, especially under pypy, so all + # locks must be re-entrant (RLock), otherwise we deadlock. try: - if common.gYamon: - common.gYamon.vadd(self.countas, -1) - common.gYamon.vadd('selectables', -1) + with self.lock: + if common.gYamon and self.countas: + common.gYamon.vadd(self.countas, -1) + common.gYamon.vadd('selectables', -1) + self.countas = None except AttributeError: pass - with SELECTABLE_LOCK: - global SELECTABLES - if self.gsid in SELECTABLES: - del SELECTABLES[self.gsid] def __str__(self): - return '%s: %s<%s%s%s>' % (self.log_id, self.__class__, - self.read_eof and '-' or 'r', - self.write_eof and '-' or 'w', - len(self.write_blocked)) + return '%s: %s<%s|%s%s%s>' % (self.log_id, self.__class__, self.sstate, + self.read_eof and '-' or 'r', + self.write_eof and '-' or 'w', + len(self.write_blocked)) def __html__(self): try: @@ -219,7 +223,7 @@ def __html__(self): self.all_out + self.wrote_bytes, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.created)), - self.dead and 'dead' or 'alive') + self.sstate) def ResetZChunks(self): with self.lock: @@ -267,12 +271,12 @@ def Log(self, values, level=logging.LOG_LEVEL_DEFAULT): def LogError(self, error, params=None): values = params or [] - if self.log_id: values.append(('id', self.log_id)) + if self.log_id: values.extend([('id', self.log_id), ('s', self.sstate)]) logging.LogError(error, values) def LogDebug(self, message, params=None): values = params or [] - if self.log_id: values.append(('id', self.log_id)) + if self.log_id: values.extend([('id', self.log_id), ('s', self.sstate)]) logging.LogDebug(message, values) def LogWarning(self, warning, params=None): @@ -282,7 +286,7 @@ def LogWarning(self, warning, params=None): def LogInfo(self, message, params=None): values = params or [] - if self.log_id: values.append(('id', self.log_id)) + if self.log_id: values.extend([('id', self.log_id), ('s', self.sstate)]) logging.LogInfo(message, values) def LogTrafficStatus(self, final=False): @@ -301,26 +305,20 @@ def LogTraffic(self, final=False): common.gYamon.vadd("bytes_all", self.wrote_bytes + self.read_bytes, wrap=1000000000) - if final: - self.Log([('wrote', '%d' % self.wrote_bytes), - ('wbps', '%d' % self.write_speed), - ('read', '%d' % self.read_bytes), - ('eof', '1')], - level=logging.LOG_LEVEL_MACH) - else: - self.Log([('wrote', '%d' % self.wrote_bytes), + log_info = [('wrote', '%d' % self.wrote_bytes), ('wbps', '%d' % self.write_speed), - ('read', '%d' % self.read_bytes)], - level=logging.LOG_LEVEL_MACH) + ('read', '%d' % self.read_bytes)] + if self.acked_kb_delta: + log_info.append(('delta', '%d' % self.acked_kb_delta)) + if final: + log_info.append(('eof', '1')) + self.Log(log_info) self.bytes_logged = now self.wrote_bytes = self.read_bytes = 0 elif final: self.Log([('eof', '1')], level=logging.LOG_LEVEL_MACH) - global SELECTABLES - SELECTABLES[self.gsid] = '%s %s' % (self.countas, self) - def SayHello(self): pass @@ -329,8 +327,6 @@ def ProcessData(self, data): return False def ProcessEof(self): - global SELECTABLES - SELECTABLES[self.gsid] = '%s %s' % (self.countas, self) if self.read_eof and self.write_eof and not self.write_blocked: self.Cleanup() return False @@ -352,7 +348,9 @@ def EatPeeked(self, eat_bytes=None, keep_peeking=False): discard = '' while len(discard) < eat_bytes: try: - discard += s(self.fd.recv(eat_bytes - len(discard))) + bytecount = eat_bytes - len(discard) + self.sstate = 'eat(%d)' % bytecount + discard += self.fd.recv(bytecount) except socket.error as err: (errno, msg) = err.args self.LogInfo('Error reading (%d/%d) socket: %s (errno=%s)' % ( @@ -361,6 +359,7 @@ def EatPeeked(self, eat_bytes=None, keep_peeking=False): if logging.DEBUG_IO: print('===[ ATE %d PEEKED BYTES ]===\n' % eat_bytes) + self.sstate = 'ate(%d)' % eat_bytes self.peeked -= eat_bytes self.peeking = keep_peeking return @@ -371,33 +370,24 @@ def ReadData(self, maxread=None): now = time.time() maxread = maxread or self.maxread - flooded = self.Flooded(now) - if flooded > self.max_read_speed and not self.acked_kb_delta: - # FIXME: This is v1 flow control, kill it when 0.4.7 is "everywhere" - last = self.throttle_until - # Disable local throttling for really slow connections; remote - # throttles (trigged by blocked sockets) still work. - if self.max_read_speed > 1024: - self.AutoThrottle() - maxread = 1024 - if now > last and self.all_in > 2*self.max_read_speed: - self.max_read_speed *= 1.25 - self.max_read_speed += maxread - try: if self.peeking: + self.sstate = 'peek(%d)' % maxread data = s(self.fd.recv(maxread, socket.MSG_PEEK)) self.peeked = len(data) if logging.DEBUG_IO: print('<== PEEK =[%s]==(\n%s)==' % (self, data[:320])) else: + self.sstate = 'read(%d)' % maxread data = s(self.fd.recv(maxread)) if logging.DEBUG_IO: - print(('<== IN =[%s @ %dbps]==(\n%s)==' - ) % (self, self.max_read_speed, data[:320])) - except (SSL.WantReadError, SSL.WantWriteError) as err: + print('<== IN =[%s]==(\n%s)==' % (self, data[:160])) + self.sstate = 'data(%d)' % len(data) + except (SSL.WantReadError, SSL.WantWriteError): + self.sstate += '/SSL.WRE' return True except IOError as err: + self.sstate += '/ioerr=%s' % (err.errno,) if err.errno not in self.HARMLESS_ERRNOS: self.LogDebug('Error reading socket: %s (%s)' % (err, err.errno)) common.DISCONNECT_COUNT += 1 @@ -405,11 +395,13 @@ def ReadData(self, maxread=None): else: return True except (SSL.Error, SSL.ZeroReturnError, SSL.SysCallError) as err: + self.sstate += '/SSL.Error' self.LogDebug('Error reading socket (SSL): %s' % err) common.DISCONNECT_COUNT += 1 return False except socket.error as err: (errno, msg) = err.args + self.sstate += '/sockerr=%s' % (err.errno,) if errno in self.HARMLESS_ERRNOS: return True else: @@ -417,60 +409,28 @@ def ReadData(self, maxread=None): common.DISCONNECT_COUNT += 1 return False - self.last_activity = now - if data is None or data == '': - self.read_eof = True - if logging.DEBUG_IO: - print('<== IN =[%s]==(EOF)==' % self) - return self.ProcessData('') - else: - if not self.peeking: - self.read_bytes += len(data) - if self.acked_kb_delta: - self.acked_kb_delta += (len(data)//1024) - if self.read_bytes > logging.LOG_THRESHOLD: self.LogTraffic() - return self.ProcessData(data) - - def Flooded(self, now=None): - delta = ((now or time.time()) - self.created) - if delta >= 1: - flooded = self.read_bytes + self.all_in - flooded -= self.max_read_speed * 0.95 * delta - return flooded - else: - return 0 - - def RecordProgress(self, skb, bps): + try: + self.last_activity = now + if data is None or data == '': + self.sstate += '/EOF' + self.read_eof = True + if logging.DEBUG_IO: + print('<== IN =[%s]==(EOF)==' % self) + return self.ProcessData('') + else: + if not self.peeking: + self.read_bytes += len(data) + if self.acked_kb_delta: + self.acked_kb_delta += (len(data)/1024) + if self.read_bytes > logging.LOG_THRESHOLD: self.LogTraffic() + return self.ProcessData(data) + finally: + self.sstate = (self.dead and 'dead' or 'idle') + + def RecordProgress(self, skb): if skb >= 0: all_read = (self.all_in + self.read_bytes) // 1024 - if self.acked_kb_delta: - self.acked_kb_delta = max(1, all_read - skb) - self.LogDebug('Delta is: %d' % self.acked_kb_delta) - elif bps >= 0: - self.Throttle(max_speed=bps, remote=True) - - def Throttle(self, max_speed=None, remote=False, delay=0.2): - if max_speed: - self.max_read_speed = max_speed - - flooded = max(-1, self.Flooded()) - if self.max_read_speed: - delay = min(10, max(0.1, flooded/self.max_read_speed)) # Float division - if flooded < 0: delay = 0 - - if delay: - ot = self.throttle_until - self.throttle_until = time.time() + delay - if ((self.throttle_until - ot) > 30 or - (int(ot) != int(self.throttle_until) and delay > 8)): - self.LogInfo('Throttled %.1fs until %x (flood=%d, bps=%s, %s)' % ( - delay, int(self.throttle_until), flooded, - self.max_read_speed, remote and 'remote' or 'local')) - - return True - - def AutoThrottle(self, max_speed=None, remote=False, delay=0.2): - return self.Throttle(max_speed, remote, delay) + self.acked_kb_delta = max(1, all_read - skb) def Send(self, data, try_flush=False, activity=False, just_buffer=False, allow_blocking=False): @@ -491,6 +451,7 @@ def Send(self, data, try_flush=False, activity=False, try: want_send = self.write_retry or min(len(sending), SEND_MAX_BYTES) sent_bytes = None + self.sstate = 'send(%d)' % (want_send) # Try to write for up to 5 seconds before giving up for try_wait in (0, 0, 0.1, 0.2, 0.2, 0.2, 0.3, 0.5, 0.5, 1, 1, 1, 0): try: @@ -505,13 +466,16 @@ def Send(self, data, try_flush=False, activity=False, if logging.DEBUG_IO: print('=== WRITE SSL RETRY: =[%s: %s bytes]==' % (self, want_send)) if try_wait: + self.sstate = 'send/SSL.WRE(%d,%.1f)' % (want_send, try_wait) time.sleep(try_wait) if sent_bytes is None: + self.sstate += '/retries' self.LogInfo('Error sending: Too many SSL write retries') self.ProcessEofWrite() common.DISCONNECT_COUNT += 1 return False except IOError as err: + self.sstate += '/ioerr=%s' % (err.errno,) if err.errno not in self.HARMLESS_ERRNOS: self.LogInfo('Error sending: %s' % err) self.ProcessEofWrite() @@ -523,6 +487,7 @@ def Send(self, data, try_flush=False, activity=False, self.write_retry = want_send except socket.error as err: (errno, msg) = err.args + self.sstate += '/sockerr=%s' % (errno,) if errno not in self.HARMLESS_ERRNOS: self.LogInfo('Error sending: %s (errno=%s)' % (msg, errno)) self.ProcessEofWrite() @@ -533,11 +498,13 @@ def Send(self, data, try_flush=False, activity=False, print('=== WRITE HICCUP: =[%s: %s bytes]==' % (self, want_send)) self.write_retry = want_send except (SSL.Error, SSL.ZeroReturnError, SSL.SysCallError) as err: + self.sstate += '/SSL.Error' self.LogInfo('Error sending (SSL): %s' % err) self.ProcessEofWrite() common.DISCONNECT_COUNT += 1 return False except AttributeError: + self.sstate += '/AttrError' # This has been seen in the wild, is most likely some sort of # race during shutdown. :-( self.LogInfo('AttributeError, self.fd=%s' % self.fd) @@ -554,6 +521,8 @@ def Send(self, data, try_flush=False, activity=False, if self.write_eof and not self.write_blocked: self.ProcessEofWrite() + + self.sstate = (self.dead and 'dead' or 'idle') return True def SendChunked(self, data, compress=True, zhistory=None, just_buffer=False): @@ -609,8 +578,7 @@ def Flush(self, loops=50, wait=False, allow_blocking=False): def IsReadable(s, now): return (s.fd and (not s.read_eof) - and (s.acked_kb_delta < 64) # FIXME - and (s.throttle_until <= now)) + and (s.acked_kb_delta < (3 * s.maxread/1024))) def IsBlocked(s): return (s.fd and (len(s.write_blocked) > 0)) diff --git a/pagekite/ui/remote.py b/pagekite/ui/remote.py index 05e2248c..20507ba1 100755 --- a/pagekite/ui/remote.py +++ b/pagekite/ui/remote.py @@ -275,9 +275,9 @@ def reset(self): # These routines are used by the PageKite UI, to communicate with us... def readline(self): - try: - self.pk_readlock.acquire() - while (not self.pk_incoming) and (not self.pk_eof): self.pk_readlock.wait() + with self.pk_readlock: + while (not self.pk_incoming) and (not self.pk_eof): + self.pk_readlock.wait() if self.pk_incoming: line = self.pk_incoming.pop(0) else: @@ -285,48 +285,37 @@ def readline(self): if self.debug: print('>>PK>> %s' % line.strip()) return line - finally: - self.pk_readlock.release() def write(self, data): if self.debug: print('>>GUI>> %s' % data.strip()) - try: - self.gui_readlock.acquire() + with self.gui_readlock: if data: self.gui_incoming += data else: self.gui_eof = True self.gui_readlock.notify() - finally: - self.gui_readlock.release() # And these are used by the GUI, to communicate with PageKite. def recv(self, bytecount): - try: - self.gui_readlock.acquire() + with self.gui_readlock: while (len(self.gui_incoming) < bytecount) and (not self.gui_eof): self.gui_readlock.wait() data = self.gui_incoming[0:bytecount] self.gui_incoming = self.gui_incoming[bytecount:] return data - finally: - self.gui_readlock.release() def send(self, data): if not data.endswith('\n') and data != '': raise ValueError('Please always send whole lines') if self.debug: print('< self.values[var]: - self.values[var] = value - finally: - self.lock.release() + # Unlocked, since we don't change the size of self.values + if value > self.values[var]: + self.values[var] = value + + def vmin(self, var, value): + # Unlocked, since we don't change the size of self.values + if value < self.values[var]: + self.values[var] = value def vscale(self, var, ratio, add=0): - try: - self.lock.acquire() - if var not in self.values: - self.values[var] = 0 - self.values[var] *= ratio - self.values[var] += add - finally: - self.lock.release() + if var not in self.values: + with self.lock: + self.values[var] = self.values.get(var, 0) + # Unlocked, since we don't change the size of self.values + self.values[var] *= ratio + self.values[var] += add def vset(self, var, value): - try: - self.lock.acquire() + with self.lock: self.values[var] = value - finally: - self.lock.release() def vadd(self, var, value, wrap=None): - try: - self.lock.acquire() - if var not in self.values: - self.values[var] = 0 - self.values[var] += value - if wrap is not None and self.values[var] >= wrap: - self.values[var] -= wrap - finally: - self.lock.release() - - def vmin(self, var, value): - try: - self.lock.acquire() - if value < self.values[var]: - self.values[var] = value - finally: - self.lock.release() + if var not in self.values: + with self.lock: + self.values[var] = self.values.get(var, 0) + # We assume the GIL will guarantee these do sane things + self.values[var] += value + if wrap: + self.values[var] %= wrap def vdel(self, var): - try: - self.lock.acquire() - if var in self.values: + if var in self.values: + with self.lock: del self.values[var] - finally: - self.lock.release() def lcreate(self, listn, elems): - try: - self.lock.acquire() - self.lists[listn] = [elems, 0, ['' for x in range(0, elems)]] - finally: - self.lock.release() + with self.lock: + self.lists[listn] = [elems, 0, ['' for x in xrange(0, elems)]] def ladd(self, listn, value): - try: - self.lock.acquire() + with self.lock: lst = self.lists[listn] lst[2][lst[1]] = value lst[1] += 1 lst[1] %= lst[0] - finally: - self.lock.release() def render_vars_text(self, view=None): if view: @@ -193,12 +177,23 @@ def render_vars_text(self, view=None): data = [] for var in values: data.append('%s: %s\n' % (var, values[var])) + if var == 'started': + data.append( + 'started_days_ago: %.3f\n' % ((time.time() - values[var]) / 86400)) for lname in lists: (elems, offset, lst) = lists[lname] l = lst[offset:] l.extend(lst[:offset]) data.append('%s: %s\n' % (lname, ' '.join(['%s' % (x, ) for x in l]))) + try: + slist = sorted([float(i) for i in l if i]) + if len(slist) >= 10: + data.append('%s_m50: %.2f\n' % (lname, slist[int(len(slist) * 0.5)])) + data.append('%s_m90: %.2f\n' % (lname, slist[int(len(slist) * 0.9)])) + data.append('%s_avg: %.2f\n' % (lname, sum(slist) / len(slist))) + except (ValueError, TypeError, IndexError, ZeroDivisionError): + pass data.sort() return ''.join(data) @@ -212,7 +207,8 @@ def run(self): self.httpd = self.server(self, self.handler) self.sspec = self.httpd.server_address self.running = True - while self.running: self.httpd.handle_request() + while self.running: + self.httpd.handle_request() if __name__ == '__main__':