def saveFile(self, url, page, idx):
user_define_name = self.now_date() + '_p_' + str(page) + '_' + string.zfill(idx, 2) # ??2?
file_ext = self.file_extension(url) # ???
save_file_name = user_define_name + "_" + file_ext
# ???????open??
# urllib.urlretrieve(item[0], self.save_path + save_file_name)
# ????
url = self.CheckUrlValidate(url)
try:
pic = requests.get(url, timeout=10)
f = open(self.store_dir + os.sep + save_file_name, 'wb')
f.write(pic.content)
f.close()
print '\ndone save file ' + save_file_name
except ReadTimeout:
print 'save file %s failed. cause by timeout(10)' %(save_file_name)
except MissingSchema:
print 'invalid url %s' %(url)
except Exception, e:
print e
#??url????http:??
python类MissingSchema()的实例源码
def test_connerror_missing_url_schema(self):
# GIVEN server url with missing schema
self.config['server'] = '127.0.0.1'
# WHEN dpm publish is invoked
try:
result = self.invoke(cli, ['publish', ])
except Exception as e:
result = e
# THEN MissingSchema should be raised
assert isinstance(result, MissingSchema)
# AND it should say that schema is invalid
assert "Invalid URL '127.0.0.1/api/auth/token': No schema supplied. Perhaps you meant http://127.0.0.1/api/auth/token?" in str(result)
def test_requests_instrumentation_malformed_none(elasticapm_client):
elasticapm_client.begin_transaction("transaction.test")
with capture_span("test_request", "test"):
with pytest.raises(MissingSchema):
requests.get(None)
def test_requests_instrumentation_malformed_schema(elasticapm_client):
elasticapm_client.begin_transaction("transaction.test")
with capture_span("test_request", "test"):
with pytest.raises(MissingSchema):
requests.get('')
def gbf(bot, message):
result = GBF_PATTERN.findall(message.text)
for index, match in enumerate(result):
if index > 10:
return False
url = match[0]
if url.split('.')[-1] in ['png', 'jpg', 'jpeg', 'gif']:
return False
if url not in match_urls:
try:
resp = requests.get(url)
except MissingSchema:
resp = requests.get('http://' + url)
if 'granblue' in resp.url:
match_urls.append(url)
reply(bot, message, '?????????')
return True
else:
reply(bot, message, '?????????')
return True
def check_sites(self):
errors = []
try:
errors = [site.create_slack_message() for site in self.sites
if not site.check_status_code()]
except (ConnectionError, MissingSchema, Timeout) as e:
self.slack.post_message(unicode(e.message))
self.logger.error(unicode(e.message))
for error in errors:
self.slack.post_message(error)
self.logger.error(error)
return len(errors)
def test_get_status_code_with_bad_url(self):
# No prefix or suffixes
bad_url = "url"
expected_status_code = 200
monitor = MonitorSite(url=bad_url,
expected_status_code=expected_status_code)
self.assertRaises(MissingSchema, monitor.get_status_code)
bad_url = "url.com"
expected_status_code = 200
monitor = MonitorSite(url=bad_url,
expected_status_code=expected_status_code)
self.assertRaises(MissingSchema, monitor.get_status_code)
def md5_sum_from_url(url):
try:
r = requests.get(url, stream=True)
except InvalidSchema:
return None
except MissingSchema:
return None
chunk_counter = 0
hash_store = hashlib.md5()
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
hash_store.update(chunk)
chunk_counter += 1
# It is not possible to send greater than 50 MB via Telegram
if chunk_counter > 50 * 1024:
return None
return hash_store.hexdigest()
def test_load_invalid_remote_template_invalid(self):
"""
Test if it raises an error with an invalid
schema URL.
"""
self.assertRaises(MissingSchema,
load_remote_schema,
"some.incorrect/..url")
def authenticate(func):
"""Decorator that authenticates credentials.
:type func: callable
:param func: A method to execute if authorization passes.
:return: The return value of `func` if authorization passes, or
None if authorization fails.
"""
def auth_wrapper(self, *args, **kwargs):
self.config.authenticate()
self.config.save_config()
if self.config.check_auth():
try:
return func(self, *args, **kwargs)
except SSLError:
click.secho(('SSL cert verification failed.\n Try running '
'gh configure --enterprise\n and type '
"'n' when asked whether to verify SSL certs."),
fg=self.config.clr_error)
except MissingSchema:
click.secho('Invalid GitHub Enterprise url',
fg=self.config.clr_error)
except AuthenticationFailed:
self.config.print_auth_error()
return auth_wrapper
def is_uri_reachable(uri: str) -> bool:
try:
return requests.head(uri).status_code == requests.codes.ok
except MissingSchema:
return False
##
# Identifies the kind of the source and initializes it
#
# @param args The arguments passed to the application
def load_synonyms(path):
try:
r = requests.get(path)
content = r.text
except (MissingSchema, InvalidSchema, InvalidURL):
try:
with open(path, encoding='utf-8') as fp:
content = fp.read()
except (OSError, IOError):
raise TypeError('Invalid Path: "{0}". Ensure it is either a URL or Correct FS Path.'.format(path))
return SynParser.get_mapping(content)
def test_setup_missing_schema(self):
"""Test setup with resource missing schema."""
with self.assertRaises(MissingSchema):
rest.setup_platform(self.hass, {
'platform': 'rest',
'resource': 'localhost',
'method': 'GET'
}, None)