Path: blob/master/thirdparty/keepalive/keepalive.py
3554 views
#!/usr/bin/env python1# -*- coding: utf-8 -*-23# This library is free software; you can redistribute it and/or4# modify it under the terms of the GNU Lesser General Public5# License as published by the Free Software Foundation; either6# version 2.1 of the License, or (at your option) any later version.7#8# This library is distributed in the hope that it will be useful,9# but WITHOUT ANY WARRANTY; without even the implied warranty of10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU11# Lesser General Public License for more details.12#13# You should have received a copy of the GNU Lesser General Public14# License along with this library; if not, write to the15# Free Software Foundation, Inc.,16# 59 Temple Place, Suite 330,17# Boston, MA 02111-1307 USA1819# This file was part of urlgrabber, a high-level cross-protocol url-grabber20# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko21# Copyright 2015 Sergio Fernández2223"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.2425>>> import urllib226>>> from keepalive import HTTPHandler27>>> keepalive_handler = HTTPHandler()28>>> opener = _urllib.request.build_opener(keepalive_handler)29>>> _urllib.request.install_opener(opener)30>>>31>>> fo = _urllib.request.urlopen('http://www.python.org')3233If a connection to a given host is requested, and all of the existing34connections are still in use, another connection will be opened. If35the handler tries to use an existing connection but it fails in some36way, it will be closed and removed from the pool.3738To remove the handler, simply re-run build_opener with no arguments, and39install that opener.4041You can explicitly close connections by using the close_connection()42method of the returned file-like object (described below) or you can43use the handler methods:4445close_connection(host)46close_all()47open_connections()4849NOTE: using the close_connection and close_all methods of the handler50should be done with care when using multiple threads.51* there is nothing that prevents another thread from creating new52connections immediately after connections are closed53* no checks are done to prevent in-use connections from being closed5455>>> keepalive_handler.close_all()5657EXTRA ATTRIBUTES AND METHODS5859Upon a status of 200, the object returned has a few additional60attributes and methods, which should not be used if you want to61remain consistent with the normal urllib2-returned objects:6263close_connection() - close the connection to the host64readlines() - you know, readlines()65status - the return status (ie 404)66reason - english translation of status (ie 'File not found')6768If you want the best of both worlds, use this inside an69AttributeError-catching try:7071>>> try: status = fo.status72>>> except AttributeError: status = None7374Unfortunately, these are ONLY there if status == 200, so it's not75easy to distinguish between non-200 responses. The reason is that76urllib2 tries to do clever things with error codes 301, 302, 401,77and 407, and it wraps the object upon return.7879For python versions earlier than 2.4, you can avoid this fancy error80handling by setting the module-level global HANDLE_ERRORS to zero.81You see, prior to 2.4, it's the HTTP Handler's job to determine what82to handle specially, and what to just pass up. HANDLE_ERRORS == 083means "pass everything up". In python 2.4, however, this job no84longer belongs to the HTTP Handler and is now done by a NEW handler,85HTTPErrorProcessor. Here's the bottom line:8687python version < 2.488HANDLE_ERRORS == 1 (default) pass up 200, treat the rest as89errors90HANDLE_ERRORS == 0 pass everything up, error processing is91left to the calling code92python version >= 2.493HANDLE_ERRORS == 1 pass up 200, treat the rest as errors94HANDLE_ERRORS == 0 (default) pass everything up, let the95other handlers (specifically,96HTTPErrorProcessor) decide what to do9798In practice, setting the variable either way makes little difference99in python 2.4, so for the most consistent behavior across versions,100you probably just want to use the defaults, which will give you101exceptions on errors.102103"""104105from __future__ import print_function106107try:108from thirdparty.six.moves import http_client as _http_client109from thirdparty.six.moves import range as _range110from thirdparty.six.moves import urllib as _urllib111except ImportError:112from six.moves import http_client as _http_client113from six.moves import range as _range114from six.moves import urllib as _urllib115116import socket117import threading118119DEBUG = None120121import sys122if sys.version_info < (2, 4): HANDLE_ERRORS = 1123else: HANDLE_ERRORS = 0124125class ConnectionManager:126"""127The connection manager must be able to:128* keep track of all existing129"""130def __init__(self):131self._lock = threading.Lock()132self._hostmap = {} # map hosts to a list of connections133self._connmap = {} # map connections to host134self._readymap = {} # map connection to ready state135136def add(self, host, connection, ready):137self._lock.acquire()138try:139if host not in self._hostmap: self._hostmap[host] = []140self._hostmap[host].append(connection)141self._connmap[connection] = host142self._readymap[connection] = ready143finally:144self._lock.release()145146def remove(self, connection):147self._lock.acquire()148try:149try:150host = self._connmap[connection]151except KeyError:152pass153else:154del self._connmap[connection]155del self._readymap[connection]156try:157self._hostmap[host].remove(connection)158except ValueError:159pass160if not self._hostmap[host]: del self._hostmap[host]161finally:162self._lock.release()163164def set_ready(self, connection, ready):165self._lock.acquire()166try:167if connection in self._readymap: self._readymap[connection] = ready168finally:169self._lock.release()170171def get_ready_conn(self, host):172conn = None173try:174self._lock.acquire()175if host in self._hostmap:176for c in self._hostmap[host]:177if self._readymap.get(c):178self._readymap[c] = 0179conn = c180break181finally:182self._lock.release()183return conn184185def get_all(self, host=None):186self._lock.acquire()187try:188if host:189return list(self._hostmap.get(host, []))190else:191return dict(self._hostmap)192finally:193self._lock.release()194195class KeepAliveHandler:196def __init__(self):197self._cm = ConnectionManager()198199#### Connection Management200def open_connections(self):201"""return a list of connected hosts and the number of connections202to each. [('foo.com:80', 2), ('bar.org', 1)]"""203return [(host, len(li)) for (host, li) in self._cm.get_all().items()]204205def close_connection(self, host):206"""close connection(s) to <host>207host is the host:port spec, as in 'www.cnn.com:8080' as passed in.208no error occurs if there is no connection to that host."""209for h in self._cm.get_all(host):210self._cm.remove(h)211h.close()212213def close_all(self):214"""close all open connections"""215for host, conns in self._cm.get_all().items():216for h in conns:217self._cm.remove(h)218h.close()219220def _request_closed(self, request, host, connection):221"""tells us that this request is now closed and the the222connection is ready for another request"""223self._cm.set_ready(connection, 1)224225def _remove_connection(self, host, connection, close=0):226if close: connection.close()227self._cm.remove(connection)228229#### Transaction Execution230def do_open(self, req):231host = req.host232if not host:233raise _urllib.error.URLError('no host given')234235try:236h = self._cm.get_ready_conn(host)237while h:238r = self._reuse_connection(h, req, host)239240# if this response is non-None, then it worked and we're241# done. Break out, skipping the else block.242if r: break243244# connection is bad - possibly closed by server245# discard it and ask for the next free connection246h.close()247self._cm.remove(h)248h = self._cm.get_ready_conn(host)249else:250# no (working) free connections were found. Create a new one.251h = self._get_connection(host)252if DEBUG: DEBUG.info("creating new connection to %s (%d)",253host, id(h))254self._start_transaction(h, req)255r = h.getresponse()256self._cm.add(host, h, 0)257except (socket.error, _http_client.HTTPException) as err:258raise _urllib.error.URLError(err)259260if DEBUG: DEBUG.info("STATUS: %s, %s", r.status, r.reason)261262if not r.will_close:263try:264headers = getattr(r, 'msg', None)265if headers:266c_head = headers.get("connection")267if c_head and "close" in c_head.lower():268r.will_close = True269except Exception:270pass271272# if not a persistent connection, don't try to reuse it273if r.will_close:274if DEBUG: DEBUG.info('server will close connection, discarding')275self._cm.remove(h)276h.close()277278r._handler = self279r._host = host280r._url = req.get_full_url()281r._connection = h282r.code = r.status283r.headers = r.msg284285if r.status == 200 or not HANDLE_ERRORS:286return r287else:288return self.parent.error('http', req, r,289r.status, r.reason, r.headers)290291def _reuse_connection(self, h, req, host):292"""start the transaction with a re-used connection293return a response object (r) upon success or None on failure.294This DOES not close or remove bad connections in cases where295it returns. However, if an unexpected exception occurs, it296will close and remove the connection before re-raising.297"""298try:299self._start_transaction(h, req)300r = h.getresponse()301# note: just because we got something back doesn't mean it302# worked. We'll check the version below, too.303except (socket.error, _http_client.HTTPException):304r = None305except Exception:306# adding this block just in case we've missed307# something we will still raise the exception, but308# lets try and close the connection and remove it309# first. We previously got into a nasty loop310# where an exception was uncaught, and so the311# connection stayed open. On the next try, the312# same exception was raised, etc. The tradeoff is313# that it's now possible this call will raise314# a DIFFERENT exception315if DEBUG: DEBUG.error("unexpected exception - closing " + \316"connection to %s (%d)", host, id(h))317self._cm.remove(h)318h.close()319raise320321if r is None or r.version == 9:322# httplib falls back to assuming HTTP 0.9 if it gets a323# bad header back. This is most likely to happen if324# the socket has been closed by the server since we325# last used the connection.326if DEBUG: DEBUG.info("failed to re-use connection to %s (%d)",327host, id(h))328r = None329else:330if DEBUG: DEBUG.info("re-using connection to %s (%d)", host, id(h))331332return r333334def _start_transaction(self, h, req):335try:336if req.data:337data = req.data338if hasattr(req, 'selector'):339h.putrequest(req.get_method() or 'POST', req.selector, skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))340else:341h.putrequest(req.get_method() or 'POST', req.get_selector(), skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))342if 'Content-type' not in req.headers:343h.putheader('Content-type',344'application/x-www-form-urlencoded')345if 'Content-length' not in req.headers:346h.putheader('Content-length', '%d' % len(data))347else:348if hasattr(req, 'selector'):349h.putrequest(req.get_method() or 'GET', req.selector, skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))350else:351h.putrequest(req.get_method() or 'GET', req.get_selector(), skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))352except (socket.error, _http_client.HTTPException) as err:353raise _urllib.error.URLError(err)354355if 'Connection' not in req.headers:356h.putheader('Connection', 'keep-alive')357358for args in self.parent.addheaders:359if args[0] not in req.headers:360h.putheader(*args)361for k, v in req.headers.items():362h.putheader(k, v)363h.endheaders()364if req.data:365h.send(req.data)366367def _get_connection(self, host):368raise NotImplementedError()369370class HTTPHandler(KeepAliveHandler, _urllib.request.HTTPHandler):371def __init__(self):372KeepAliveHandler.__init__(self)373374def http_open(self, req):375return self.do_open(req)376377def _get_connection(self, host):378return HTTPConnection(host)379380class HTTPSHandler(KeepAliveHandler, _urllib.request.HTTPSHandler):381def __init__(self, ssl_factory=None):382KeepAliveHandler.__init__(self)383if not ssl_factory:384try:385import sslfactory386ssl_factory = sslfactory.get_factory()387except ImportError:388pass389self._ssl_factory = ssl_factory390391def https_open(self, req):392return self.do_open(req)393394def _get_connection(self, host):395if self._ssl_factory:396return self._ssl_factory.get_https_connection(host)397else:398return HTTPSConnection(host)399400class HTTPResponse(_http_client.HTTPResponse):401# we need to subclass HTTPResponse in order to402# 1) add readline() and readlines() methods403# 2) add close_connection() methods404# 3) add info() and geturl() methods405406# in order to add readline(), read must be modified to deal with a407# buffer. example: readline must read a buffer and then spit back408# one line at a time. The only real alternative is to read one409# BYTE at a time (ick). Once something has been read, it can't be410# put back (ok, maybe it can, but that's even uglier than this),411# so if you THEN do a normal read, you must first take stuff from412# the buffer.413414# the read method wraps the original to accomodate buffering,415# although read() never adds to the buffer.416# Both readline and readlines have been stolen with almost no417# modification from socket.py418419420def __init__(self, sock, debuglevel=0, strict=0, method=None):421if method:422_http_client.HTTPResponse.__init__(self, sock, debuglevel, method=method)423else:424_http_client.HTTPResponse.__init__(self, sock, debuglevel)425self.fileno = sock.fileno426self.code = None427self._method = method428self._rbuf = b""429self._rbufsize = 8096430self._handler = None # inserted by the handler later431self._host = None # (same)432self._url = None # (same)433self._connection = None # (same)434435_raw_read = _http_client.HTTPResponse.read436437def close(self):438if self.fp:439self.fp.close()440self.fp = None441if self._handler:442self._handler._request_closed(self, self._host,443self._connection)444445# Note: Patch for Python3 (otherwise, connections won't be reusable)446def _close_conn(self):447self.close()448449def close_connection(self):450self._handler._remove_connection(self._host, self._connection, close=1)451self.close()452453def info(self):454return self.headers455456def geturl(self):457return self._url458459def read(self, amt=None):460# the _rbuf test is only in this first if for speed. It's not461# logically necessary462if self._rbuf and not amt is None:463L = len(self._rbuf)464if amt > L:465amt -= L466else:467s = self._rbuf[:amt]468self._rbuf = self._rbuf[amt:]469return s470471s = self._rbuf + self._raw_read(amt)472self._rbuf = b""473return s474475def readline(self, limit=-1):476data = b""477i = self._rbuf.find(b'\n')478while i < 0 and not (0 < limit <= len(self._rbuf)):479new = self._raw_read(self._rbufsize)480if not new: break481i = new.find(b'\n')482if i >= 0: i = i + len(self._rbuf)483self._rbuf = self._rbuf + new484if i < 0: i = len(self._rbuf)485else: i = i+1486if 0 <= limit < len(self._rbuf): i = limit487data, self._rbuf = self._rbuf[:i], self._rbuf[i:]488return data489490def readlines(self, sizehint = 0):491total = 0492lines = []493while 1:494line = self.readline()495if not line: break496lines.append(line)497total += len(line)498if sizehint and total >= sizehint:499break500return lines501502503class HTTPConnection(_http_client.HTTPConnection):504# use the modified response class505response_class = HTTPResponse506507class HTTPSConnection(_http_client.HTTPSConnection):508response_class = HTTPResponse509510#########################################################################511##### TEST FUNCTIONS512#########################################################################513514def error_handler(url):515global HANDLE_ERRORS516orig = HANDLE_ERRORS517keepalive_handler = HTTPHandler()518opener = _urllib.request.build_opener(keepalive_handler)519_urllib.request.install_opener(opener)520pos = {0: 'off', 1: 'on'}521for i in (0, 1):522print(" fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i))523HANDLE_ERRORS = i524try:525fo = _urllib.request.urlopen(url)526foo = fo.read()527fo.close()528try: status, reason = fo.status, fo.reason529except AttributeError: status, reason = None, None530except IOError as e:531print(" EXCEPTION: %s" % e)532raise533else:534print(" status = %s, reason = %s" % (status, reason))535HANDLE_ERRORS = orig536hosts = keepalive_handler.open_connections()537print("open connections:", hosts)538keepalive_handler.close_all()539540def continuity(url):541from hashlib import md5542format = '%25s: %s'543544# first fetch the file with the normal http handler545opener = _urllib.request.build_opener()546_urllib.request.install_opener(opener)547fo = _urllib.request.urlopen(url)548foo = fo.read()549fo.close()550m = md5(foo)551print(format % ('normal urllib', m.hexdigest()))552553# now install the keepalive handler and try again554opener = _urllib.request.build_opener(HTTPHandler())555_urllib.request.install_opener(opener)556557fo = _urllib.request.urlopen(url)558foo = fo.read()559fo.close()560m = md5(foo)561print(format % ('keepalive read', m.hexdigest()))562563fo = _urllib.request.urlopen(url)564foo = b''565while 1:566f = fo.readline()567if f: foo += f568else: break569fo.close()570m = md5(foo)571print(format % ('keepalive readline', m.hexdigest()))572573def comp(N, url):574print(' making %i connections to:\n %s' % (N, url))575576sys.stdout.write(' first using the normal urllib handlers')577# first use normal opener578opener = _urllib.request.build_opener()579_urllib.request.install_opener(opener)580t1 = fetch(N, url)581print(' TIME: %.3f s' % t1)582583sys.stdout.write(' now using the keepalive handler ')584# now install the keepalive handler and try again585opener = _urllib.request.build_opener(HTTPHandler())586_urllib.request.install_opener(opener)587t2 = fetch(N, url)588print(' TIME: %.3f s' % t2)589print(' improvement factor: %.2f' % (t1/t2, ))590591def fetch(N, url, delay=0):592import time593lens = []594starttime = time.time()595for i in _range(N):596if delay and i > 0: time.sleep(delay)597fo = _urllib.request.urlopen(url)598foo = fo.read()599fo.close()600lens.append(len(foo))601diff = time.time() - starttime602603j = 0604for i in lens[1:]:605j = j + 1606if not i == lens[0]:607print("WARNING: inconsistent length on read %i: %i" % (j, i))608609return diff610611def test_timeout(url):612global DEBUG613dbbackup = DEBUG614class FakeLogger:615def debug(self, msg, *args): print(msg % args)616info = warning = error = debug617DEBUG = FakeLogger()618print(" fetching the file to establish a connection")619fo = _urllib.request.urlopen(url)620data1 = fo.read()621fo.close()622623i = 20624print(" waiting %i seconds for the server to close the connection" % i)625while i > 0:626sys.stdout.write('\r %2i' % i)627sys.stdout.flush()628time.sleep(1)629i -= 1630sys.stderr.write('\r')631632print(" fetching the file a second time")633fo = _urllib.request.urlopen(url)634data2 = fo.read()635fo.close()636637if data1 == data2:638print(' data are identical')639else:640print(' ERROR: DATA DIFFER')641642DEBUG = dbbackup643644645def test(url, N=10):646print("checking error hander (do this on a non-200)")647try: error_handler(url)648except IOError as e:649print("exiting - exception will prevent further tests")650sys.exit()651print()652print("performing continuity test (making sure stuff isn't corrupted)")653continuity(url)654print()655print("performing speed comparison")656comp(N, url)657print()658print("performing dropped-connection check")659test_timeout(url)660661if __name__ == '__main__':662import time663import sys664try:665N = int(sys.argv[1])666url = sys.argv[2]667except:668print("%s <integer> <url>" % sys.argv[0])669else:670test(url, N)671672673