def make_connection(self, host):
self.realhost = host
proxies = urllib.getproxies()
proxyurl = None
if 'http' in proxies:
proxyurl = proxies['http']
elif 'all' in proxies:
proxyurl = proxies['all']
if proxyurl:
urltype, proxyhost = urllib.splittype(proxyurl)
host, selector = urllib.splithost(proxyhost)
h = httplib.HTTP(host)
self.proxy_is_used = True
return h
else:
self.proxy_is_used = False
return Transport.make_connection(self, host)
python类getproxies()的实例源码
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self):
self.session = requests.session()
self.session.proxies = urllib.getproxies()
self.headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en;q=1, fr;q=0.9, de;q=0.8, ja;q=0.7, nl;q=0.6, it;q=0.5",
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
"X-Robinhood-API-Version": "1.0.0",
"Connection": "keep-alive",
"User-Agent": "Robinhood/823 (iPhone; iOS 9.1.2; Scale/2.00)"
}
self.session.headers = self.headers
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def get_proxies():
proxies = getproxies()
filtered_proxies = {}
for key, value in proxies.items():
if key.startswith('http'):
if not value.startswith('http'):
filtered_proxies[key] = 'http://%s' % value
else:
filtered_proxies[key] = value
return filtered_proxies
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None, proxy_bypass=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
if proxy_bypass is None:
proxy_bypass = urllib.proxy_bypass
self._proxy_bypass = proxy_bypass
_urllib2_fork.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def __init__(self, proxies=None, proxy_bypass=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
if proxy_bypass is None:
proxy_bypass = urllib.proxy_bypass
self._proxy_bypass = proxy_bypass
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None, proxy_bypass=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
if proxy_bypass is None:
proxy_bypass = urllib.proxy_bypass
self._proxy_bypass = proxy_bypass
def request(self, query):
"""
This function was intended as an internal method - it just takes the query
you pass it and sends it to the API along with your API ID & Key. If you
know the API or rest real well have at it.
"""
if not self.custid or not self.custkey:
raise Exception('Customer ID and Customer Key are required')
fullQuery = self.host + query #host part doesn't change
if self.debug:
print "fullQuery: " + fullQuery
headers = self.getHeaders() #format the API key & ID
#check for proxy information
proxies = urllib.getproxies()
#use requests library to pull request
r = requests.get( fullQuery, headers=headers, proxies=proxies )
#Error handling for HTTP request
# 400 - bad request
if r.status_code == 400:
raise Exception('HTTP Error 400 - Bad request.')
# 404 - oh shit
if r.status_code == 404:
raise Exception('HTTP Error 404 - awww snap.')
# catch all?
if r.status_code != 200:
raise Exception('HTTP Error: ' + str(r.status_code) )
return r
#end request()
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
def __init__(self, proxies=None, proxy_bypass=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
if proxy_bypass is None:
proxy_bypass = urllib.proxy_bypass
self._proxy_bypass = proxy_bypass
def __init__(self, proxies=None, proxy_bypass=None):
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
self.proxies = proxies
for type, url in proxies.items():
setattr(self, '%s_open' % type,
lambda r, proxy=url, type=type, meth=self.proxy_open: \
meth(r, proxy, type))
if proxy_bypass is None:
proxy_bypass = urllib.proxy_bypass
self._proxy_bypass = proxy_bypass
def start_schedulers(options):
try:
from multiprocessing import Process
except:
sys.stderr.write('Sorry, -K only supported for python 2.6-2.7\n')
return
processes = []
apps = [(app.strip(), None) for app in options.scheduler.split(',')]
if options.scheduler_groups:
apps = options.scheduler_groups
code = "from gluon.globals import current;current._scheduler.loop()"
logging.getLogger().setLevel(options.debuglevel)
if options.folder:
os.chdir(options.folder)
if len(apps) == 1 and not options.with_scheduler:
app_, code = get_code_for_scheduler(apps[0], options)
if not app_:
return
print('starting single-scheduler for "%s"...' % app_)
run(app_, True, True, None, False, code)
return
# Work around OS X problem: http://bugs.python.org/issue9405
import urllib
urllib.getproxies()
for app in apps:
app_, code = get_code_for_scheduler(app, options)
if not app_:
continue
print('starting scheduler for "%s"...' % app_)
args = (app_, True, True, None, False, code)
p = Process(target=run, args=args)
processes.append(p)
print("Currently running %s scheduler processes" % (len(processes)))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print("Processes started")
for p in processes:
try:
p.join()
except (KeyboardInterrupt, SystemExit):
print("Processes stopped")
except:
p.terminate()
p.join()
def start_schedulers(options):
try:
from multiprocessing import Process
except:
sys.stderr.write('Sorry, -K only supported for python 2.6-2.7\n')
return
processes = []
apps = [(app.strip(), None) for app in options.scheduler.split(',')]
if options.scheduler_groups:
apps = options.scheduler_groups
code = "from gluon import current;current._scheduler.loop()"
logging.getLogger().setLevel(options.debuglevel)
if options.folder:
os.chdir(options.folder)
if len(apps) == 1 and not options.with_scheduler:
app_, code = get_code_for_scheduler(apps[0], options)
if not app_:
return
print 'starting single-scheduler for "%s"...' % app_
run(app_, True, True, None, False, code)
return
# Work around OS X problem: http://bugs.python.org/issue9405
import urllib
urllib.getproxies()
for app in apps:
app_, code = get_code_for_scheduler(app, options)
if not app_:
continue
print 'starting scheduler for "%s"...' % app_
args = (app_, True, True, None, False, code)
p = Process(target=run, args=args)
processes.append(p)
print "Currently running %s scheduler processes" % (len(processes))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print "Processes started"
for p in processes:
try:
p.join()
except (KeyboardInterrupt, SystemExit):
print "Processes stopped"
except:
p.terminate()
p.join()
def start_schedulers(options):
try:
from multiprocessing import Process
except:
sys.stderr.write('Sorry, -K only supported for python 2.6-2.7\n')
return
processes = []
apps = [(app.strip(), None) for app in options.scheduler.split(',')]
if options.scheduler_groups:
apps = options.scheduler_groups
code = "from gluon import current;current._scheduler.loop()"
logging.getLogger().setLevel(options.debuglevel)
if options.folder:
os.chdir(options.folder)
if len(apps) == 1 and not options.with_scheduler:
app_, code = get_code_for_scheduler(apps[0], options)
if not app_:
return
print 'starting single-scheduler for "%s"...' % app_
run(app_, True, True, None, False, code)
return
# Work around OS X problem: http://bugs.python.org/issue9405
import urllib
urllib.getproxies()
for app in apps:
app_, code = get_code_for_scheduler(app, options)
if not app_:
continue
print 'starting scheduler for "%s"...' % app_
args = (app_, True, True, None, False, code)
p = Process(target=run, args=args)
processes.append(p)
print "Currently running %s scheduler processes" % (len(processes))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print "Processes started"
for p in processes:
try:
p.join()
except (KeyboardInterrupt, SystemExit):
print "Processes stopped"
except:
p.terminate()
p.join()
def start_schedulers(options):
try:
from multiprocessing import Process
except:
sys.stderr.write('Sorry, -K only supported for python 2.6-2.7\n')
return
processes = []
apps = [(app.strip(), None) for app in options.scheduler.split(',')]
if options.scheduler_groups:
apps = options.scheduler_groups
code = "from gluon.globals import current;current._scheduler.loop()"
logging.getLogger().setLevel(options.debuglevel)
if options.folder:
os.chdir(options.folder)
if len(apps) == 1 and not options.with_scheduler:
app_, code = get_code_for_scheduler(apps[0], options)
if not app_:
return
print('starting single-scheduler for "%s"...' % app_)
run(app_, True, True, None, False, code)
return
# Work around OS X problem: http://bugs.python.org/issue9405
import urllib
urllib.getproxies()
for app in apps:
app_, code = get_code_for_scheduler(app, options)
if not app_:
continue
print('starting scheduler for "%s"...' % app_)
args = (app_, True, True, None, False, code)
p = Process(target=run, args=args)
processes.append(p)
print("Currently running %s scheduler processes" % (len(processes)))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print("Processes started")
for p in processes:
try:
p.join()
except (KeyboardInterrupt, SystemExit):
print("Processes stopped")
except:
p.terminate()
p.join()
def start_schedulers(options):
try:
from multiprocessing import Process
except:
sys.stderr.write('Sorry, -K only supported for python 2.6-2.7\n')
return
processes = []
apps = [(app.strip(), None) for app in options.scheduler.split(',')]
if options.scheduler_groups:
apps = options.scheduler_groups
code = "from gluon import current;current._scheduler.loop()"
logging.getLogger().setLevel(options.debuglevel)
if options.folder:
os.chdir(options.folder)
if len(apps) == 1 and not options.with_scheduler:
app_, code = get_code_for_scheduler(apps[0], options)
if not app_:
return
print 'starting single-scheduler for "%s"...' % app_
run(app_, True, True, None, False, code)
return
# Work around OS X problem: http://bugs.python.org/issue9405
import urllib
urllib.getproxies()
for app in apps:
app_, code = get_code_for_scheduler(app, options)
if not app_:
continue
print 'starting scheduler for "%s"...' % app_
args = (app_, True, True, None, False, code)
p = Process(target=run, args=args)
processes.append(p)
print "Currently running %s scheduler processes" % (len(processes))
p.start()
##to avoid bashing the db at the same time
time.sleep(0.7)
print "Processes started"
for p in processes:
try:
p.join()
except (KeyboardInterrupt, SystemExit):
print "Processes stopped"
except:
p.terminate()
p.join()