Problem with multi threaded Python app and socket connections
You could try gevent
:
from gevent.pool import Pool
from gevent import monkey; monkey.patch_all() # patches stdlib
import sys
import logging
from httplib import HTTPSConnection
from timeit import default_timer as timer
info = logging.getLogger().info
def connect(hostname):
info("connecting %s", hostname)
h = HTTPSConnection(hostname, timeout=2)
try: h.connect()
except IOError, e:
info("error %s reason: %s", hostname, e)
else:
info("done %s", hostname)
finally:
h.close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
pool = Pool(20) # limit number of concurrent connections
start = timer()
for _ in pool.imap(connect, hosts_list):
pass
info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start)
if __name__=="__main__":
main()
It can process more than one host per second.
Output
2011-01-31 11:08:29,052 getting hostname list
2011-01-31 11:08:29,052 spawning jobs
2011-01-31 11:08:29,053 connecting www.yahoo.com
2011-01-31 11:08:29,053 connecting www.abc.com
2011-01-31 11:08:29,053 connecting www.google.com
2011-01-31 11:08:29,053 connecting stackoverflow.com
2011-01-31 11:08:29,053 connecting facebook.com
2011-01-31 11:08:29,054 connecting youtube.com
2011-01-31 11:08:29,054 connecting live.com
2011-01-31 11:08:29,054 connecting baidu.com
2011-01-31 11:08:29,054 connecting wikipedia.org
2011-01-31 11:08:29,054 connecting blogspot.com
2011-01-31 11:08:29,054 connecting qq.com
2011-01-31 11:08:29,055 connecting twitter.com
2011-01-31 11:08:29,055 connecting msn.com
2011-01-31 11:08:29,055 connecting yahoo.co.jp
2011-01-31 11:08:29,055 connecting taobao.com
2011-01-31 11:08:29,055 connecting google.co.in
2011-01-31 11:08:29,056 connecting sina.com.cn
2011-01-31 11:08:29,056 connecting amazon.com
2011-01-31 11:08:29,056 connecting google.de
2011-01-31 11:08:29,056 connecting google.com.hk
2011-01-31 11:08:29,188 done www.google.com
2011-01-31 11:08:29,189 done google.com.hk
2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused
2011-01-31 11:08:29,225 done google.co.in
2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,250 done google.de
2011-01-31 11:08:29,262 done blogspot.com
2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,465 done amazon.com
2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused
2011-01-31 11:08:29,496 done www.yahoo.com
2011-01-31 11:08:29,521 done stackoverflow.com
2011-01-31 11:08:29,606 done youtube.com
2011-01-31 11:08:29,939 done twitter.com
2011-01-31 11:08:33,056 error qq.com reason: timed out
2011-01-31 11:08:33,057 error taobao.com reason: timed out
2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out
2011-01-31 11:08:34,466 done facebook.com
2011-01-31 11:08:35,056 error baidu.com reason: timed out
2011-01-31 11:08:35,057 20 hosts took us 6 seconds
I wonder if writing this using twisted might help? Could anyone show what my example would look like written using twisted?
This variant is much faster than the code that uses gevent
:
#!/usr/bin/env python
import sys
from timeit import default_timer as timer
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python import log
info = log.msg
class NoopProtocol(protocol.Protocol):
def makeConnection(self, transport):
transport.loseConnection()
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
info("connecting %s" % host)
cc = protocol.ClientCreator(reactor, NoopProtocol)
d = cc.connectSSL(host, port, contextFactory, timeout)
d.addCallbacks(lambda _: info("done %s" % host),
lambda f: info("error %s reason: %s" % (host, f.value)))
return d
def n_at_a_time(it, n):
"""Iterate over `it` concurently `n` items at a time.
`it` - an iterator creating Deferreds
`n` - number of concurrent iterations
return a deferred that fires on completion
"""
return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])
def main():
try:
log.startLogging(sys.stderr, setStdout=False)
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
start = timer()
jobs = (connect(host, 443, timeout=2) for host in hosts_list)
d = n_at_a_time(jobs, n=20) # limit number of simultaneous connections
d.addCallback(lambda _: info("%d hosts took us %.2g seconds" % (
len(hosts_list), timer() - start)))
d.addBoth(lambda _: (info("the end"), reactor.stop()))
except:
log.err()
reactor.stop()
if __name__=="__main__":
reactor.callWhenRunning(main)
reactor.run()
Here's a variant that uses t.i.d.inlineCallbacks
. It requires Python 2.5 or newer. It allows to write the asynchronous code in a synchronous (blocking) manner:
#!/usr/bin/env python
import sys
from timeit import default_timer as timer
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python import log
info = log.msg
class NoopProtocol(protocol.Protocol):
def makeConnection(self, transport):
transport.loseConnection()
@defer.inlineCallbacks
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
info("connecting %s" % host)
cc = protocol.ClientCreator(reactor, NoopProtocol)
try:
yield cc.connectSSL(host, port, contextFactory, timeout)
except Exception, e:
info("error %s reason: %s" % (host, e))
else:
info("done %s" % host)
def n_at_a_time(it, n):
"""Iterate over `it` concurently `n` items at a time.
`it` - an iterator creating Deferreds
`n` - number of concurrent iterations
return a deferred that fires on completion
"""
return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])
@defer.inlineCallbacks
def main():
try:
log.startLogging(sys.stderr, setStdout=False)
info("getting hostname list")
hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
hosts_list = open(hosts_file).read().splitlines()
info("spawning jobs")
start = timer()
jobs = (connect(host, 443, timeout=2) for host in hosts_list)
yield n_at_a_time(jobs, n=20) # limit number of simultaneous connections
info("%d hosts took us %.2g seconds" % (len(hosts_list), timer()-start))
info("the end")
except:
log.err()
finally:
reactor.stop()
if __name__=="__main__":
reactor.callWhenRunning(main)
reactor.run()