Path: blob/master/thirdparty/keepalive/keepalive.py
2992 views
#!/usr/bin/env python1# -*- coding: utf-8 -*-23# This library is free software; you can redistribute it and/or4# modify it under the terms of the GNU Lesser General Public5# License as published by the Free Software Foundation; either6# version 2.1 of the License, or (at your option) any later version.7#8# This library is distributed in the hope that it will be useful,9# but WITHOUT ANY WARRANTY; without even the implied warranty of10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU11# Lesser General Public License for more details.12#13# You should have received a copy of the GNU Lesser General Public14# License along with this library; if not, write to the15# Free Software Foundation, Inc.,16# 59 Temple Place, Suite 330,17# Boston, MA 02111-1307 USA1819# This file was part of urlgrabber, a high-level cross-protocol url-grabber20# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko21# Copyright 2015 Sergio Fernández2223"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.2425>>> import urllib226>>> from keepalive import HTTPHandler27>>> keepalive_handler = HTTPHandler()28>>> opener = _urllib.request.build_opener(keepalive_handler)29>>> _urllib.request.install_opener(opener)30>>>31>>> fo = _urllib.request.urlopen('http://www.python.org')3233If a connection to a given host is requested, and all of the existing34connections are still in use, another connection will be opened. If35the handler tries to use an existing connection but it fails in some36way, it will be closed and removed from the pool.3738To remove the handler, simply re-run build_opener with no arguments, and39install that opener.4041You can explicitly close connections by using the close_connection()42method of the returned file-like object (described below) or you can43use the handler methods:4445close_connection(host)46close_all()47open_connections()4849NOTE: using the close_connection and close_all methods of the handler50should be done with care when using multiple threads.51* there is nothing that prevents another thread from creating new52connections immediately after connections are closed53* no checks are done to prevent in-use connections from being closed5455>>> keepalive_handler.close_all()5657EXTRA ATTRIBUTES AND METHODS5859Upon a status of 200, the object returned has a few additional60attributes and methods, which should not be used if you want to61remain consistent with the normal urllib2-returned objects:6263close_connection() - close the connection to the host64readlines() - you know, readlines()65status - the return status (ie 404)66reason - english translation of status (ie 'File not found')6768If you want the best of both worlds, use this inside an69AttributeError-catching try:7071>>> try: status = fo.status72>>> except AttributeError: status = None7374Unfortunately, these are ONLY there if status == 200, so it's not75easy to distinguish between non-200 responses. The reason is that76urllib2 tries to do clever things with error codes 301, 302, 401,77and 407, and it wraps the object upon return.7879For python versions earlier than 2.4, you can avoid this fancy error80handling by setting the module-level global HANDLE_ERRORS to zero.81You see, prior to 2.4, it's the HTTP Handler's job to determine what82to handle specially, and what to just pass up. HANDLE_ERRORS == 083means "pass everything up". In python 2.4, however, this job no84longer belongs to the HTTP Handler and is now done by a NEW handler,85HTTPErrorProcessor. Here's the bottom line:8687python version < 2.488HANDLE_ERRORS == 1 (default) pass up 200, treat the rest as89errors90HANDLE_ERRORS == 0 pass everything up, error processing is91left to the calling code92python version >= 2.493HANDLE_ERRORS == 1 pass up 200, treat the rest as errors94HANDLE_ERRORS == 0 (default) pass everything up, let the95other handlers (specifically,96HTTPErrorProcessor) decide what to do9798In practice, setting the variable either way makes little difference99in python 2.4, so for the most consistent behavior across versions,100you probably just want to use the defaults, which will give you101exceptions on errors.102103"""104105from __future__ import print_function106107try:108from thirdparty.six.moves import http_client as _http_client109from thirdparty.six.moves import range as _range110from thirdparty.six.moves import urllib as _urllib111except ImportError:112from six.moves import http_client as _http_client113from six.moves import range as _range114from six.moves import urllib as _urllib115116import socket117import threading118119DEBUG = None120121import sys122if sys.version_info < (2, 4): HANDLE_ERRORS = 1123else: HANDLE_ERRORS = 0124125class ConnectionManager:126"""127The connection manager must be able to:128* keep track of all existing129"""130def __init__(self):131self._lock = threading.Lock()132self._hostmap = {} # map hosts to a list of connections133self._connmap = {} # map connections to host134self._readymap = {} # map connection to ready state135136def add(self, host, connection, ready):137self._lock.acquire()138try:139if host not in self._hostmap: self._hostmap[host] = []140self._hostmap[host].append(connection)141self._connmap[connection] = host142self._readymap[connection] = ready143finally:144self._lock.release()145146def remove(self, connection):147self._lock.acquire()148try:149try:150host = self._connmap[connection]151except KeyError:152pass153else:154del self._connmap[connection]155del self._readymap[connection]156self._hostmap[host].remove(connection)157if not self._hostmap[host]: del self._hostmap[host]158finally:159self._lock.release()160161def set_ready(self, connection, ready):162try: self._readymap[connection] = ready163except KeyError: pass164165def get_ready_conn(self, host):166conn = None167try:168self._lock.acquire()169if host in self._hostmap:170for c in self._hostmap[host]:171if self._readymap.get(c):172self._readymap[c] = 0173conn = c174break175finally:176self._lock.release()177return conn178179def get_all(self, host=None):180if host:181return list(self._hostmap.get(host, []))182else:183return dict(self._hostmap)184185class KeepAliveHandler:186def __init__(self):187self._cm = ConnectionManager()188189#### Connection Management190def open_connections(self):191"""return a list of connected hosts and the number of connections192to each. [('foo.com:80', 2), ('bar.org', 1)]"""193return [(host, len(li)) for (host, li) in self._cm.get_all().items()]194195def close_connection(self, host):196"""close connection(s) to <host>197host is the host:port spec, as in 'www.cnn.com:8080' as passed in.198no error occurs if there is no connection to that host."""199for h in self._cm.get_all(host):200self._cm.remove(h)201h.close()202203def close_all(self):204"""close all open connections"""205for host, conns in self._cm.get_all().items():206for h in conns:207self._cm.remove(h)208h.close()209210def _request_closed(self, request, host, connection):211"""tells us that this request is now closed and the the212connection is ready for another request"""213self._cm.set_ready(connection, 1)214215def _remove_connection(self, host, connection, close=0):216if close: connection.close()217self._cm.remove(connection)218219#### Transaction Execution220def do_open(self, req):221host = req.host222if not host:223raise _urllib.error.URLError('no host given')224225try:226h = self._cm.get_ready_conn(host)227while h:228r = self._reuse_connection(h, req, host)229230# if this response is non-None, then it worked and we're231# done. Break out, skipping the else block.232if r: break233234# connection is bad - possibly closed by server235# discard it and ask for the next free connection236h.close()237self._cm.remove(h)238h = self._cm.get_ready_conn(host)239else:240# no (working) free connections were found. Create a new one.241h = self._get_connection(host)242if DEBUG: DEBUG.info("creating new connection to %s (%d)",243host, id(h))244self._cm.add(host, h, 0)245self._start_transaction(h, req)246r = h.getresponse()247except (socket.error, _http_client.HTTPException) as err:248raise _urllib.error.URLError(err)249250if DEBUG: DEBUG.info("STATUS: %s, %s", r.status, r.reason)251252# if not a persistent connection, don't try to reuse it253if r.will_close:254if DEBUG: DEBUG.info('server will close connection, discarding')255self._cm.remove(h)256257r._handler = self258r._host = host259r._url = req.get_full_url()260r._connection = h261r.code = r.status262r.headers = r.msg263r.msg = r.reason264265if r.status == 200 or not HANDLE_ERRORS:266return r267else:268return self.parent.error('http', req, r,269r.status, r.msg, r.headers)270271def _reuse_connection(self, h, req, host):272"""start the transaction with a re-used connection273return a response object (r) upon success or None on failure.274This DOES not close or remove bad connections in cases where275it returns. However, if an unexpected exception occurs, it276will close and remove the connection before re-raising.277"""278try:279self._start_transaction(h, req)280r = h.getresponse()281# note: just because we got something back doesn't mean it282# worked. We'll check the version below, too.283except (socket.error, _http_client.HTTPException):284r = None285except:286# adding this block just in case we've missed287# something we will still raise the exception, but288# lets try and close the connection and remove it289# first. We previously got into a nasty loop290# where an exception was uncaught, and so the291# connection stayed open. On the next try, the292# same exception was raised, etc. The tradeoff is293# that it's now possible this call will raise294# a DIFFERENT exception295if DEBUG: DEBUG.error("unexpected exception - closing " + \296"connection to %s (%d)", host, id(h))297self._cm.remove(h)298h.close()299raise300301if r is None or r.version == 9:302# httplib falls back to assuming HTTP 0.9 if it gets a303# bad header back. This is most likely to happen if304# the socket has been closed by the server since we305# last used the connection.306if DEBUG: DEBUG.info("failed to re-use connection to %s (%d)",307host, id(h))308r = None309else:310if DEBUG: DEBUG.info("re-using connection to %s (%d)", host, id(h))311312return r313314def _start_transaction(self, h, req):315try:316if req.data:317data = req.data318if hasattr(req, 'selector'):319h.putrequest(req.get_method() or 'POST', req.selector, skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))320else:321h.putrequest(req.get_method() or 'POST', req.get_selector(), skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))322if 'Content-type' not in req.headers:323h.putheader('Content-type',324'application/x-www-form-urlencoded')325if 'Content-length' not in req.headers:326h.putheader('Content-length', '%d' % len(data))327else:328if hasattr(req, 'selector'):329h.putrequest(req.get_method() or 'GET', req.selector, skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))330else:331h.putrequest(req.get_method() or 'GET', req.get_selector(), skip_host=req.has_header("Host"), skip_accept_encoding=req.has_header("Accept-encoding"))332except (socket.error, _http_client.HTTPException) as err:333raise _urllib.error.URLError(err)334335if 'Connection' not in req.headers:336req.headers['Connection'] = 'keep-alive'337338for args in self.parent.addheaders:339if args[0] not in req.headers:340h.putheader(*args)341for k, v in req.headers.items():342h.putheader(k, v)343h.endheaders()344if req.data:345h.send(data)346347def _get_connection(self, host):348return NotImplementedError349350class HTTPHandler(KeepAliveHandler, _urllib.request.HTTPHandler):351def __init__(self):352KeepAliveHandler.__init__(self)353354def http_open(self, req):355return self.do_open(req)356357def _get_connection(self, host):358return HTTPConnection(host)359360class HTTPSHandler(KeepAliveHandler, _urllib.request.HTTPSHandler):361def __init__(self, ssl_factory=None):362KeepAliveHandler.__init__(self)363if not ssl_factory:364try:365import sslfactory366ssl_factory = sslfactory.get_factory()367except ImportError:368pass369self._ssl_factory = ssl_factory370371def https_open(self, req):372return self.do_open(req)373374def _get_connection(self, host):375try: return self._ssl_factory.get_https_connection(host)376except AttributeError: return HTTPSConnection(host)377378class HTTPResponse(_http_client.HTTPResponse):379# we need to subclass HTTPResponse in order to380# 1) add readline() and readlines() methods381# 2) add close_connection() methods382# 3) add info() and geturl() methods383384# in order to add readline(), read must be modified to deal with a385# buffer. example: readline must read a buffer and then spit back386# one line at a time. The only real alternative is to read one387# BYTE at a time (ick). Once something has been read, it can't be388# put back (ok, maybe it can, but that's even uglier than this),389# so if you THEN do a normal read, you must first take stuff from390# the buffer.391392# the read method wraps the original to accomodate buffering,393# although read() never adds to the buffer.394# Both readline and readlines have been stolen with almost no395# modification from socket.py396397398def __init__(self, sock, debuglevel=0, strict=0, method=None):399if method: # the httplib in python 2.3 uses the method arg400_http_client.HTTPResponse.__init__(self, sock, debuglevel, method)401else: # 2.2 doesn't402_http_client.HTTPResponse.__init__(self, sock, debuglevel)403self.fileno = sock.fileno404self.code = None405self._method = method406self._rbuf = b""407self._rbufsize = 8096408self._handler = None # inserted by the handler later409self._host = None # (same)410self._url = None # (same)411self._connection = None # (same)412413_raw_read = _http_client.HTTPResponse.read414415def close(self):416if self.fp:417self.fp.close()418self.fp = None419if self._handler:420self._handler._request_closed(self, self._host,421self._connection)422423# Note: Patch for Python3 (otherwise, connections won't be reusable)424def _close_conn(self):425self.close()426427def close_connection(self):428self._handler._remove_connection(self._host, self._connection, close=1)429self.close()430431def info(self):432return self.headers433434def geturl(self):435return self._url436437def read(self, amt=None):438# the _rbuf test is only in this first if for speed. It's not439# logically necessary440if self._rbuf and not amt is None:441L = len(self._rbuf)442if amt > L:443amt -= L444else:445s = self._rbuf[:amt]446self._rbuf = self._rbuf[amt:]447return s448449s = self._rbuf + self._raw_read(amt)450self._rbuf = b""451return s452453def readline(self, limit=-1):454data = b""455i = self._rbuf.find('\n')456while i < 0 and not (0 < limit <= len(self._rbuf)):457new = self._raw_read(self._rbufsize)458if not new: break459i = new.find('\n')460if i >= 0: i = i + len(self._rbuf)461self._rbuf = self._rbuf + new462if i < 0: i = len(self._rbuf)463else: i = i+1464if 0 <= limit < len(self._rbuf): i = limit465data, self._rbuf = self._rbuf[:i], self._rbuf[i:]466return data467468def readlines(self, sizehint = 0):469total = 0470list = []471while 1:472line = self.readline()473if not line: break474list.append(line)475total += len(line)476if sizehint and total >= sizehint:477break478return list479480481class HTTPConnection(_http_client.HTTPConnection):482# use the modified response class483response_class = HTTPResponse484485class HTTPSConnection(_http_client.HTTPSConnection):486response_class = HTTPResponse487488#########################################################################489##### TEST FUNCTIONS490#########################################################################491492def error_handler(url):493global HANDLE_ERRORS494orig = HANDLE_ERRORS495keepalive_handler = HTTPHandler()496opener = _urllib.request.build_opener(keepalive_handler)497_urllib.request.install_opener(opener)498pos = {0: 'off', 1: 'on'}499for i in (0, 1):500print(" fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i))501HANDLE_ERRORS = i502try:503fo = _urllib.request.urlopen(url)504foo = fo.read()505fo.close()506try: status, reason = fo.status, fo.reason507except AttributeError: status, reason = None, None508except IOError as e:509print(" EXCEPTION: %s" % e)510raise511else:512print(" status = %s, reason = %s" % (status, reason))513HANDLE_ERRORS = orig514hosts = keepalive_handler.open_connections()515print("open connections:", hosts)516keepalive_handler.close_all()517518def continuity(url):519from hashlib import md5520format = '%25s: %s'521522# first fetch the file with the normal http handler523opener = _urllib.request.build_opener()524_urllib.request.install_opener(opener)525fo = _urllib.request.urlopen(url)526foo = fo.read()527fo.close()528m = md5(foo)529print(format % ('normal urllib', m.hexdigest()))530531# now install the keepalive handler and try again532opener = _urllib.request.build_opener(HTTPHandler())533_urllib.request.install_opener(opener)534535fo = _urllib.request.urlopen(url)536foo = fo.read()537fo.close()538m = md5(foo)539print(format % ('keepalive read', m.hexdigest()))540541fo = _urllib.request.urlopen(url)542foo = ''543while 1:544f = fo.readline()545if f: foo = foo + f546else: break547fo.close()548m = md5(foo)549print(format % ('keepalive readline', m.hexdigest()))550551def comp(N, url):552print(' making %i connections to:\n %s' % (N, url))553554sys.stdout.write(' first using the normal urllib handlers')555# first use normal opener556opener = _urllib.request.build_opener()557_urllib.request.install_opener(opener)558t1 = fetch(N, url)559print(' TIME: %.3f s' % t1)560561sys.stdout.write(' now using the keepalive handler ')562# now install the keepalive handler and try again563opener = _urllib.request.build_opener(HTTPHandler())564_urllib.request.install_opener(opener)565t2 = fetch(N, url)566print(' TIME: %.3f s' % t2)567print(' improvement factor: %.2f' % (t1/t2, ))568569def fetch(N, url, delay=0):570import time571lens = []572starttime = time.time()573for i in _range(N):574if delay and i > 0: time.sleep(delay)575fo = _urllib.request.urlopen(url)576foo = fo.read()577fo.close()578lens.append(len(foo))579diff = time.time() - starttime580581j = 0582for i in lens[1:]:583j = j + 1584if not i == lens[0]:585print("WARNING: inconsistent length on read %i: %i" % (j, i))586587return diff588589def test_timeout(url):590global DEBUG591dbbackup = DEBUG592class FakeLogger:593def debug(self, msg, *args): print(msg % args)594info = warning = error = debug595DEBUG = FakeLogger()596print(" fetching the file to establish a connection")597fo = _urllib.request.urlopen(url)598data1 = fo.read()599fo.close()600601i = 20602print(" waiting %i seconds for the server to close the connection" % i)603while i > 0:604sys.stdout.write('\r %2i' % i)605sys.stdout.flush()606time.sleep(1)607i -= 1608sys.stderr.write('\r')609610print(" fetching the file a second time")611fo = _urllib.request.urlopen(url)612data2 = fo.read()613fo.close()614615if data1 == data2:616print(' data are identical')617else:618print(' ERROR: DATA DIFFER')619620DEBUG = dbbackup621622623def test(url, N=10):624print("checking error hander (do this on a non-200)")625try: error_handler(url)626except IOError as e:627print("exiting - exception will prevent further tests")628sys.exit()629print()630print("performing continuity test (making sure stuff isn't corrupted)")631continuity(url)632print()633print("performing speed comparison")634comp(N, url)635print()636print("performing dropped-connection check")637test_timeout(url)638639if __name__ == '__main__':640import time641import sys642try:643N = int(sys.argv[1])644url = sys.argv[2]645except:646print("%s <integer> <url>" % sys.argv[0])647else:648test(url, N)649650651