def add_user_devices(self, serial):
# (url, access_token, api_token) = self.get_api_conf()
api_url = self.url + "/api/v1/user/devices"
token = self.access_token + " " + self.api_token
data = {'serial': serial}
request = urllib2.Request(api_url, json.dumps(data))
request.add_header('Authorization', token)
request.add_header('Content-Type', 'application/json')
try:
urllib2.urlopen(request)
except Exception, e:
print e.code
print e.read()
# ?????????
python类urlopen()的实例源码
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def remove_devices_user(self, device_list):
# (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang")
for device in device_list:
serial = device["serial"]
api_url = self.url + "/api/v1/user/devices/%s" % serial
print api_url
token = self.access_token + " " + self.api_token
request = urllib2.Request(api_url)
request.add_header('Authorization', token)
request.get_method = lambda: 'DELETE'
try:
urllib2.urlopen(request)
except Exception, e:
print e.code
print e.read()
# ?????????
def get(self, url, proxy=None):
if proxy:
proxy = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
try:
response = urllib2.urlopen(url)
except HTTPError, e:
resp = e.read()
self.status_code = e.code
except URLError, e:
resp = e.read()
self.status_code = e.code
else:
self.status_code = response.code
resp = response.read()
return resp
def run(self):
data = self.getData()
value = {
data: {
"type": self.data_type
}
}
json_data = json.dumps(value)
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
try:
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())
self.report(report)
except urllib2.HTTPError:
self.error("Hippocampe: " + str(sys.exc_info()[1]))
except urllib2.URLError:
self.error("Hippocampe: service is not available")
except Exception as e:
self.unexpectedError(e)
def getMessagePayload(self):
self.logger.debug("Preparing client->device message payload")
salon = -127
try:
salon = read_temp()
except Exception as e:
self.logger.error("error reading local temp")
self.logger.exception(e)
piwnica = -127
relay = 0
try:
os.system("sudo ifconfig eth0 192.168.1.101 netmask 255.255.255.0")
txt = urllib2.urlopen(relay1_addr).read()
lines = string.split(txt, '\n')
piwnica = float(lines[1])
relay = int(lines[0])
except Exception as e:
self.logger.error("error reading data from {0}".format(relay1_addr))
self.logger.exception(e)
payloadDict = {"values":{}}
payloadDict["values"]["relay"] = relay
if salon > -127:
payloadDict["values"]["salon"] = salon
if piwnica > -127:
payloadDict["values"]["piwnica"] = piwnica
payload = json.dumps(payloadDict)
return payload
def downloadFilesSave(links, fileFormat): # main function
if (links == 'EMPTY'): # if links list is empty
return ' NO LINKS FOUND !'
for link in links:
name = random.randint(0, 10000001)
if (name in os.listdir(os.getcwd())): # random name to files
name = random.randint(0, 10000001)
if (format not in ['zip', 'png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']):
try:
saveFile=open(str(name)+'.' + fileFormat, 'w')
saveFile.write(urllib2.urlopen(link).read())
saveFile.close()
except urllib2.URLError:
pass
else:
try:
saveFile=open(str(name)+'.' + fileFormat, 'wb')
saveFile.write(urllib2.urlopen(link).read())
saveFile.close()
except urllib2.URLError:
pass
return ' {} DOWNLOADS SUCCESSFULL YET !'.format(len(os.listdir(os.getcwd())))
def get_system_status():
session_attributes = {}
card_title = "BART System Status"
reprompt_text = ""
should_end_session = False
response = urllib2.urlopen(API_BASE + "/status")
bart_system_status = json.load(response)
speech_output = "There are currently " + bart_system_status["traincount"] + " trains operating. "
if len(bart_system_status["message"]) > 0:
speech_output += bart_system_status["message"]
else:
speech_output += "The trains are running normally."
return build_response(session_attributes, build_speechlet_response(
card_title, speech_output, reprompt_text, should_end_session))
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
src = dst = None
try:
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(target, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
def test_start_object(self):
server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
html=False, level=6, command=["radamsa"], stdin=True,
json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
parameters=[], notify=False, debug=False, content_type="text/plain",
utf8=False, nologo=True)))
server.run()
json_http = urllib2.urlopen("http://127.0.0.1:8080").read()
try:
import requests
requests.packages.urllib3.disable_warnings()
json_https = requests.get('https://127.0.0.1:8443', verify=False).content
self.assertTrue(json_https)
except ImportError:
pass
self.assertTrue(json_http)
server.stop()
def getRelsFromURIMSinWARC(warc):
urims = getURIMsFromTimeMapInWARC(warc)
startReplay(warc)
# Get Link header values for each memento
linkHeaders = []
for urim in urims:
linkHeaders.append(urllib2.urlopen(urim).info().getheader('Link'))
stopReplay()
relsForURIMs = []
for linkHeader in linkHeaders:
relForURIM = ipwbTest.extractRelationEntriesFromLinkTimeMap(linkHeader)
relsForURIMs.append(relForURIM)
stopReplay()
return relsForURIMs
def send_result(email, result, title, urn):
"""
Args:
email (str): address to send the results
result (obj): results to send
title (str):
urn (str): uniform resource name
Returns:
str: response from endpoint
"""
url = 'https://mongoaud.it/results'
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}
values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()}
try:
req = urllib2.Request(url, json.dumps(values), headers)
response = urllib2.urlopen(req)
return response.read()
except (urllib2.HTTPError, urllib2.URLError) as exc:
return "Sadly enough, we are having technical difficulties at the moment, " \
"please try again later.\n\n%s" % str(exc)
def check_version(version):
# if application is binary then check for latest version
if getattr(sys, 'frozen', False):
try:
url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest"
req = urllib2.urlopen(url)
releases = json.loads(req.read())
latest = releases["tag_name"]
if version < latest:
print("mongoaudit version " + version)
print("There's a new version " + latest)
_upgrade(releases)
except (urllib2.HTTPError, urllib2.URLError):
print("Couldn't check for upgrades")
except os.error:
print("Couldn't write mongoaudit binary")
def download_lyrics(artist, url):
print url
time.sleep(random() + 2)
page = urllib2.urlopen(url).read()
soup = BeautifulSoup(page, 'html.parser')
# Get the song title
song_title = soup.find('title').get_text().split(' - ')[1].lower().replace('/', ' ').replace(' ', '_')
# Get the lyrics div
lyrics = soup.findAll('div', {'class': ''})
for i in lyrics:
lyrics = i.get_text().strip()
if len(lyrics) > 10:
with open('artists/' + artist + '/' + song_title + '.txt', 'wb') as w:
cleaned_lyrics = lyrics.replace('\r\n', ' *BREAK* ').replace('\n', ' *BREAK* ').replace(' ', ' ')
w.write(cleaned_lyrics.encode('utf-8'))
def download_songs(url):
time.sleep(random.random() * 0.5)
try:
page = urllib2.urlopen(url).read()
soup = BeautifulSoup(page, 'html.parser')
# Get the artist name
artist_name = soup.findAll('h1')[0].get_text()[:-7].lower().replace(' ', '_')
# Store all songs for a given artist
with open('artist_data/'+artist_name+'.txt', 'wb') as w:
for song in soup.findAll('a', {'target': '_blank'}):
if 'lyrics/' in song['href']:
song_url = song['href'][1:].strip()
w.write(song_url + '\n')
except urllib2.HTTPError:
print '404 not found'
def TestSite(url):
protocheck(url)
print "Trying: " + url
try:
urllib2.urlopen(url, timeout=3)
except urllib2.HTTPError, e:
if e.code == 405:
print url + " found!"
print "Now the brute force will begin! >:)"
if e.code == 404:
printout(str(e), YELLOW)
print " - XMLRPC has been moved, removed, or blocked"
sys.exit()
except urllib2.URLError, g:
printout("Could not identify XMLRPC. Please verify the domain.\n", YELLOW)
sys.exit()
except socket.timeout as e:
print type(e)
printout("The socket timed out, try it again.", YELLOW)
sys.exit()
def paste(self):
"""Create a paste and return the paste id."""
data = json.dumps({
'description': 'Werkzeug Internal Server Error',
'public': False,
'files': {
'traceback.txt': {
'content': self.plaintext
}
}
}).encode('utf-8')
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
rv = urlopen('https://api.github.com/gists', data=data)
resp = json.loads(rv.read().decode('utf-8'))
rv.close()
return {
'url': resp['html_url'],
'id': resp['id']
}
def update(self, docs, commitwithin=None):
"""Post list of docs to Solr, return URL and status.
Opptionall tell Solr to "commitwithin" that many milliseconds."""
url = self.url + '/update'
add_xml = etree.Element('add')
if commitwithin is not None:
add_xml.set('commitWithin', str(commitwithin))
for doc in docs:
xdoc = etree.SubElement(add_xml, 'doc')
for key, value in doc.iteritems():
if value:
field = etree.Element('field', name=key)
field.text = (value if isinstance(value, unicode)
else str(value))
xdoc.append(field)
request = urllib2.Request(url)
request.add_header('Content-Type', 'text/xml; charset=utf-8')
request.add_data(etree.tostring(add_xml, pretty_print=True))
response = urllib2.urlopen(request).read()
status = etree.XML(response).findtext('lst/int')
return url, status
def post(self):
site = GetSite()
browser = detect(self.request)
member = CheckAuth(self)
l10n = GetMessages(self, member, site)
if member:
image = self.request.argument['image'][0]
if image is not None:
import urllib, urllib2
parameters = urllib.urlencode(dict(member_id=member.num, image=image))
try:
f = urllib2.urlopen('http://daydream/upload', parameters)
data = f.read()
f.close()
except:
self.session = Session()
self.session['message'] = '?????? 1M'
self.redirect('/images')
else:
self.redirect('/signin')
def _html_link_return(self, url, tag, key, value, deeper=False, second=False):
"""
Returns links
:param url: URL to filter
:param key: Name of key to search in tag
:param tag: Name of value to find
:param value: Name of the value expected in tag
"""
if url[0] == '/':
url = '{0}{1}'.format(self.url, url)
r = urllib2.Request(url)
response = urllib2.urlopen(r)
soup = BeautifulSoup(response, 'html.parser')
matches = soup.findAll(tag, {key, value})
if deeper:
m = matches[0]
matches = m.findAll('a')[0]['href']
elif second:
m = matches[0]
matches = m.findAll('a')[1]['href']
print m.findAll('a')
else:
matches = matches[0]['href']
return '{0}{1}'.format(self.url, matches)
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def execute(self):
if hasattr(Context.g_module, 'publish'):
Context.Context.execute(self)
mod = Context.g_module
rfile = getattr(self, 'rfile', send_package_name())
if not os.path.isfile(rfile):
self.fatal('Create the release file with "waf release" first! %r' % rfile)
fdata = Utils.readf(rfile, m='rb')
data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])
req = Request(get_upload_url(), data)
response = urlopen(req, timeout=TIMEOUT)
data = response.read().strip()
if sys.hexversion>0x300000f:
data = data.decode('utf-8')
if data != 'ok':
self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES):
text = Utils.readf(filename)
data = safe_urlencode([('text', text)])
if '--offline' in sys.argv:
self.constraints = self.local_resolve(text)
else:
req = Request(get_resolve_url(), data)
try:
response = urlopen(req, timeout=TIMEOUT)
except URLError as e:
Logs.warn('The package server is down! %r' % e)
self.constraints = self.local_resolve(text)
else:
ret = response.read()
try:
ret = ret.decode('utf-8')
except Exception:
pass
self.trace(ret)
self.constraints = parse_constraints(ret)
self.check_errors()
def download_from_url(url):
proxy = env_server.get_proxy()
if proxy['enabled']:
server = proxy['server'].replace('http://', '')
proxy_dict = {
'http': 'http://{login}:{pass}@{0}'.format(server, **proxy)
}
proxy_handler = urllib2.ProxyHandler(proxy_dict)
auth = urllib2.HTTPBasicAuthHandler()
opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler)
urllib2.install_opener(opener)
run_thread = tc.ServerThread(env_inst.ui_main)
run_thread.kwargs = dict(url=url, timeout=1)
run_thread.routine = urllib2.urlopen
run_thread.run()
result_thread = tc.treat_result(run_thread, silent=True)
if result_thread.isFailed():
return False
else:
return result_thread.result
def run(self):
request = self.request
try:
if ((timeit.default_timer() - self.starttime) <= self.timeout and
not SHUTDOWN_EVENT.isSet()):
try:
f = urlopen(request)
except TypeError:
# PY24 expects a string or buffer
# This also causes issues with Ctrl-C, but we will concede
# for the moment that Ctrl-C on PY24 isn't immediate
request = build_request(self.request.get_full_url(),
data=request.data.read(self.size))
f = urlopen(request)
f.read(11)
f.close()
self.result = sum(self.request.data.total)
else:
self.result = 0
except (IOError, SpeedtestUploadTimeout):
self.result = sum(self.request.data.total)
def stealth_mode(passwd):
df = "http://10.5.5.9/" # DEFAULT PARTS
p1 = "?t="
p2 = "&p=%"
print("\n\r[" + extra.colors.yellow + ".." + extra.colors.end + "] Activating stealth mode")
par1, par2, opt = no_vol() # MUTE MODE
urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt)
time.sleep(1.5)
par1, par2, opt = no_leds() # NO LEDS
urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt)
time.sleep(1.5)
par1, par2, opt = fov_wide() # FOV WIDE FOR A BIGGER FIELD OF VIEW
urllib2.urlopen(df + par1 + "/" + par2 + p1 + passwd + p2 + opt)
time.sleep(1.5)
print("\r\n[" + extra.colors.green + "+" + extra.colors.end + "] Stealth mode activated successfully\r\n")