def download_url_picture(img_url, file_name, folder_name, folder_base):
try:
file_path = folder_base + '/' + folder_name
if not os.path.exists(file_path):
print('???', file_path, '????????')
os.makedirs(file_path)
# ??????
file_suffix = os.path.splitext(img_url)[1]
# ???????????
filename = '{}{}{}{}'.format(file_path, os.sep, file_name, file_suffix)
# ?????????????
request.urlretrieve(img_url, filename=filename)
except IOError as e:
print('??????', e)
except Exception as e:
print('?? ?', e)
python类urlretrieve()的实例源码
def feed_queue(dest_dir, queue, queue_url, campaign_id):
s = requests.Session()
while True:
logger.debug('fetching %s', queue_url)
analysis_queue = s.get(queue_url).json()['crashes']
for crash in analysis_queue:
crash_name = str(crash['crash_id'])
logger.info('downloading %s', crash_name)
local_filename = os.path.join(dest_dir, crash_name)
urllib_request.urlretrieve(crash['download'], filename=local_filename)
logger.debug('%d crashes waiting', queue.qsize())
queue.put((crash['crash_id'], local_filename))
def download_neo4j():
if sys.platform.startswith('win'):
dist_string = 'windows.zip'
path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.zip')
else:
dist_string = 'unix.tar.gz'
path = os.path.join(settings.POLYGLOT_TEMP_DIR, 'neo4j.tar.gz')
if os.path.exists(path):
return path
os.makedirs(settings.POLYGLOT_TEMP_DIR, exist_ok=True)
download_link = 'https://neo4j.com/artifact.php?name=neo4j-community-{version}-{dist_string}'.format(
version=settings.NEO4J_VERSION, dist_string=dist_string)
archive_path, headers = urlretrieve(download_link, path, data=None)
return archive_path
def loadData(src):
print('Downloading ' + src)
fname, h = urlretrieve(src, './delete.me')
print('Done.')
try:
print('Extracting files...')
with tarfile.open(fname) as tar:
tar.extractall()
print('Done.')
print('Preparing train set...')
trn = np.empty((0, numFeature + 1), dtype=np.int)
for i in range(5):
batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
trn = np.vstack((trn, readBatch(batchName)))
print('Done.')
print('Preparing test set...')
tst = readBatch('./cifar-10-batches-py/test_batch')
print('Done.')
finally:
os.remove(fname)
return (trn, tst)
def loadData(src, cimg):
gzfname, h = urlretrieve(src, './delete.me')
try:
with gzip.open(gzfname) as gz:
n = struct.unpack('I', gz.read(4))
if n[0] != 0x3080000:
raise Exception('Invalid file: unexpected magic number.')
n = struct.unpack('>I', gz.read(4))[0]
if n != cimg:
raise Exception('Invalid file: expected {0} entries.'.format(cimg))
crow = struct.unpack('>I', gz.read(4))[0]
ccol = struct.unpack('>I', gz.read(4))[0]
if crow != 28 or ccol != 28:
raise Exception('Invalid file: expected 28 rows/cols per image.')
res = np.fromstring(gz.read(cimg * crow * ccol), dtype=np.uint8)
finally:
os.remove(gzfname)
return res.reshape((cimg, crow * ccol))
def load_or_download_mnist_files(filename, num_samples, local_data_dir):
if (local_data_dir):
local_path = os.path.join(local_data_dir, filename)
else:
local_path = os.path.join(os.getcwd(), filename)
if os.path.exists(local_path):
gzfname = local_path
else:
local_data_dir = os.path.dirname(local_path)
if not os.path.exists(local_data_dir):
os.makedirs(local_data_dir)
filename = "http://yann.lecun.com/exdb/mnist/" + filename
print ("Downloading from" + filename, end=" ")
gzfname, h = urlretrieve(filename, local_path)
print ("[Done]")
return gzfname
def fetch_data():
"""Download the data."""
train_file = tempfile.NamedTemporaryFile()
test_file = tempfile.NamedTemporaryFile()
req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases"
"/adult/adult.data", train_file.name)
req.urlretrieve("http://mlr.cs.umass.edu/ml/machine-learning-databases/"
"adult/adult.test", test_file.name)
df_train = pd.read_csv(train_file, names=COLUMNS, skipinitialspace=True)
df_test = pd.read_csv(test_file, names=COLUMNS, skipinitialspace=True,
skiprows=1)
df_train[LABEL_COLUMN] = (df_train["income_bracket"]
.apply(lambda x: ">50K" in x)).astype(int)
df_test[LABEL_COLUMN] = (df_test["income_bracket"]
.apply(lambda x: ">50K" in x)).astype(int)
return df_train, df_test
def loadData(src):
print ('Downloading ' + src)
fname, h = urlretrieve(src, './delete.me')
print ('Done.')
try:
print ('Extracting files...')
with tarfile.open(fname) as tar:
tar.extractall()
print ('Done.')
print ('Preparing train set...')
trn = np.empty((0, numFeature + 1), dtype=np.int)
for i in range(5):
batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
trn = np.vstack((trn, readBatch(batchName)))
print ('Done.')
print ('Preparing test set...')
tst = readBatch('./cifar-10-batches-py/test_batch')
print ('Done.')
finally:
os.remove(fname)
return (trn, tst)
def loadData(src):
print ('Downloading ' + src)
fname, h = urlretrieve(src, './delete.me')
print ('Done.')
try:
print ('Extracting files...')
with tarfile.open(fname) as tar:
tar.extractall()
print ('Done.')
print ('Preparing train set...')
trn = np.empty((0, NumFeat + 1), dtype=np.int)
for i in range(5):
batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
trn = np.vstack((trn, readBatch(batchName)))
print ('Done.')
print ('Preparing test set...')
tst = readBatch('./cifar-10-batches-py/test_batch')
print ('Done.')
finally:
os.remove(fname)
return (trn, tst)
def loadData(src):
print ('Downloading ' + src)
fname, h = urlretrieve(src, './delete.me')
print ('Done.')
try:
print ('Extracting files...')
with tarfile.open(fname) as tar:
tar.extractall()
print ('Done.')
print ('Preparing train set...')
trn = np.empty((0, NumFeat + 1), dtype=np.int)
for i in range(5):
batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
trn = np.vstack((trn, readBatch(batchName)))
print ('Done.')
print ('Preparing test set...')
tst = readBatch('./cifar-10-batches-py/test_batch')
print ('Done.')
finally:
os.remove(fname)
return (trn, tst)
def cq_download_pic(filename: str):
"""
download image by cqimg file
:param filename: cqimg file name
"""
try:
path = os.path.join(CQ_IMAGE_ROOT, filename)
if os.path.exists(path):
return
cqimg = os.path.join(CQ_IMAGE_ROOT, filename + '.cqimg')
parser = ConfigParser()
parser.read(cqimg)
url = parser['image']['url']
urlretrieve(url, path)
except:
logger.error(filename)
traceback.print_exc()
def tg_get_pic_url(file_id: str, pic_type: str):
"""
download image from Telegram Server, and generate new image link that send to QQ group
:param file_id: telegram file id
:param pic_type: picture extension name
:return: pic url
"""
file = global_vars.tg_bot.getFile(file_id)
# urlretrieve(file.file_path, os.path.join(CQ_IMAGE_ROOT, file_id)) # download image
file.download(custom_path=os.path.join(CQ_IMAGE_ROOT, file_id))
if pic_type == 'jpg':
create_jpg_image(CQ_IMAGE_ROOT, file_id)
pic_url = get_short_url(SERVER_PIC_URL + file_id + '.jpg')
return pic_url
elif pic_type == 'png':
create_png_image(CQ_IMAGE_ROOT, file_id)
pic_url = get_short_url(SERVER_PIC_URL + file_id + '.png')
return pic_url
return ''
# endregion
def download_image(url: str = '', save_path: str = '',
unverified_ctx: bool = False) -> Union[None, str]:
"""Download image and save in current directory on local machine.
:param str url: URL to image.
:param str save_path: Saving path.
:param bool unverified_ctx: Create unverified context.
:return: Image name.
:rtype: str or None
"""
if unverified_ctx:
ssl._create_default_https_context = ssl._create_unverified_context
if url is not None:
image_name = url.rsplit('/')[-1]
request.urlretrieve(url, save_path + image_name)
return image_name
return None
def maybe_download_and_extract():
main_directory = "./data_set/"
cifar_10_directory = main_directory+"cifar_10/"
if not os.path.exists(main_directory):
os.makedirs(main_directory)
url = "http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz"
filename = url.split('/')[-1]
file_path = os.path.join(main_directory, filename)
zip_cifar_10 = file_path
file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=_print_download_progress)
print()
print("Download finished. Extracting files.")
if file_path.endswith(".zip"):
zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory)
elif file_path.endswith((".tar.gz", ".tgz")):
tarfile.open(name=file_path, mode="r:gz").extractall(main_directory)
print("Done.")
os.rename(main_directory+"./cifar-10-batches-py", cifar_10_directory)
os.remove(zip_cifar_10)
def getLDIcon(pseudo):
"""Récupère l'icone leveldown depuis leur site"""
pseudo = str(pseudo)
url = "http://leveldown.fr/profile/{}".format(pseudo)
fileName = "./ldsource/{}.html".format(pseudo)
request.urlretrieve(url, fileName)
with open(fileName) as f: lines = f.read().splitlines()
Names = []
Values = []
for line in lines:
if line.startswith(' <div class="avatar"'):
getLine = line.rstrip(');"></div>')
Line = getLine.split("(")
Line = Line[1]
return Line
#####################################################################################################################################################
def setUp(self):
# Create a list of temporary files. Each item in the list is a file
# name (absolute path or relative to the current working directory).
# All files in this list will be deleted in the tearDown method. Note,
# this only helps to makes sure temporary files get deleted, but it
# does nothing about trying to close files that may still be open. It
# is the responsibility of the developer to properly close files even
# when exceptional conditions occur.
self.tempFiles = []
# Create a temporary file.
self.registerFileForCleanUp(support.TESTFN)
self.text = b'testing urllib.urlretrieve'
try:
FILE = open(support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
finally:
try: FILE.close()
except: pass
def test_copy(self):
# Test that setting the filename argument works.
second_temp = "%s.2" % support.TESTFN
self.registerFileForCleanUp(second_temp)
result = urllib_request.urlretrieve(self.constructLocalFileUrl(
support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = open(second_temp, 'rb')
try:
text = FILE.read()
FILE.close()
finally:
try: FILE.close()
except: pass
self.assertEqual(self.text, text)
def test_short_content_raises_ContentTooShortError(self):
self.fakehttp(b'''HTTP/1.1 200 OK
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Connection: close
Content-Length: 100
Content-Type: text/html; charset=iso-8859-1
FF
''')
def _reporthook(par1, par2, par3):
pass
with self.assertRaises(urllib_error.ContentTooShortError):
try:
urllib_request.urlretrieve('http://example.com/',
reporthook=_reporthook)
finally:
self.unfakehttp()
def getfile(url, outdir=None):
"""Function to fetch files using urllib
Works with ftp
"""
fn = os.path.split(url)[-1]
if outdir is not None:
fn = os.path.join(outdir, fn)
if not os.path.exists(fn):
#Find appropriate urlretrieve for Python 2 and 3
try:
from urllib.request import urlretrieve
except ImportError:
from urllib import urlretrieve
print("Retrieving: %s" % url)
#Add progress bar
urlretrieve(url, fn)
return fn
#Function to get files using requests
#Works with https authentication
def loadData(src):
print('Downloading ' + src)
fname, h = urlretrieve(src, './delete.me')
print('Done.')
try:
print('Extracting files...')
with tarfile.open(fname) as tar:
tar.extractall()
print('Done.')
print('Preparing train set...')
trn = np.empty((0, NumFeat + 1), dtype=np.int)
for i in range(5):
batchName = './cifar-10-batches-py/data_batch_{0}'.format(i + 1)
trn = np.vstack((trn, readBatch(batchName)))
print('Done.')
print('Preparing test set...')
tst = readBatch('./cifar-10-batches-py/test_batch')
print('Done.')
finally:
os.remove(fname)
return (trn, tst)
def TestDownload(self):
try:
a = datetime.now()
info('Excuting regular download test for network speed')
url = self.config.cloudConfig.DownloadSpeedTestUrl if 'DownloadSpeedTestUrl' in self.config.cloudConfig else defaultUrl
debug(url + ' ' + download_path)
request.urlretrieve(url, download_path)
request.urlcleanup()
b = datetime.now()
c = b - a
if path.exists(download_path):
size = path.getsize(download_path)/mb
self.downloadSpeed = size/c.total_seconds()
remove(download_path)
return True
except socket_error as serr:
error ('TestDownload:' + str(serr))
ret = False
Daemon.OnFailure('cloud', serr.errno)
return
except:
exception('TestDownload Failed')
return False
def url_request(self, url, file_path, proxy_url=None):
try:
urllib_request.urlretrieve(url, file_path)
print(file_path)
except HTTPError as e:
print(e.code)
if proxy_url is not None:
print("Trying proxy URL")
url = proxy_url
await self.url_request(url, file_path)
else:
raise e
except UnicodeEncodeError:
# Special retry logic for IDN domains
url = "http://" + url.replace("http://", "").encode("idna").decode("utf-8")
await self.url_request(url, file_path)
def run(self):
print(self.url)
for link,t in set(re.findall(r'(http://699pic.com/tupian[^\s]*?(html))', str(reqfun(self.url)))):
#res = request.urlopen(link)
#if re.split("\.",os.path.basename(link))[1].lower() == 'gif' or len(res.read()) <10000:
# continue
#print(link)
for piclink,t in set(re.findall(r'(http://seopic.699pic.com/photo/[^s]*?jpg[^s]*?(jpg))', str(reqfun(link)))):
print(piclink,"loading...")
try:
request.urlretrieve(piclink,self.saveFile(piclink))
time.sleep(1)
if os.path.exists(targetPath+os.path.basename(piclink)) is False:
request.urlretrieve(piclink,self.saveFile(piclink))
print("err")
except:
print('??')
q.task_done()
def download_for_anki(url):
if not url:
return ""
try:
fileext = os.path.splitext(url)[1]
filename = hashlib.md5(url.encode()).hexdigest() + fileext
filepath = os.path.join("collection.media",filename)
if not os.path.exists("collection.media"):
os.makedirs("collection.media")
request.urlretrieve(url,filepath)
if fileext in ('.jpg','.jpeg','.gif','.png','.svg'):
filename = '<img src="{}"></img>'.format(filename)
elif fileext in ('.mp3','.mp4','.wav'):
filename = '[sound:{}]'.format(filename)
except Exception as e:
print("fetch {} error:{}".format(url,e),file=sys.stderr)
filename = ""
return filename
def fetch_load_letters(data_dir=None):
path = os.path.join(get_data_home(data_dir), 'letter-recognition.data')
if not os.path.exists(path):
from urllib import request
url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/letter-recognition/letter-recognition.data'
print('Downloading letter-recognition dataset from {}...'.format(url))
request.urlretrieve(url=url, filename=path)
else:
print('Found letter-recognition in {}!'.format(path))
X, y = [], []
with open(path) as f:
reader = csv.reader(f)
for row in reader:
y.append(row[0])
X.append(row[1:])
labels, label_idx = np.unique(y, return_inverse=True)
return np.asarray(X, dtype=float), label_idx
def fetch(self, filename=None):
""" Fetch the remote url to the local file
Kwargs:
- filename(string) : Specify path to download files to. Default=None
"""
if self.url:
if filename:
return urlretrieve(self.url, filename)
else:
return urlretrieve(self.url, self.file)
else:
logger.warning("File %s was not generated on Kotta. Nothing to fetch", self.file)
return False
def addToInventory(self, octoPart, quantity):
"""
Adds quantity of Octopart to the inventory. Performs a searchInventory to determine if part is already in
inventory, then either adds the part to the inventory or simply updates the quantity if the part already
exists.
:param octoPart: Octopart object to add
:type octoPart: octo_utils.Octopart
:param quantity: Quantity of part to add to inventory
:type quantity: int
:return: None
:rtype: None
"""
index = self.searchInventory(octoPart)
if index == -1:
octoPart.quantity = quantity
self.inventory.append(octoPart)
else:
self.inventory[index].quantity += quantity
self.saveInventory()
if octoPart.dataFile:
request.urlretrieve(octoPart.dataURL, octoPart.dataFile)
def json_extract_info(filename):
"""Return list of entries extracted from a file using json. If an
error occur during the extraction, return None.
"""
try:
if not isfile(filename):
filename = urlretrieve(filename)[0]
with open(filename) as f: raw_info, info = json.load(f), []
for i in raw_info:
e = DEFAULT_ENTRY.copy()
for k in e:
if k in i and isinstance(i[k], type(e[k])): e[k] = i[k]
info.append(e)
except:
return None
else:
return info
def download_and_extract(f,subfolder=''):
print("downloading %s..."%f)
local_filename, headers = urlretrieve(f)
print("decompressin the zip file %s"%local_filename)
if not os.path.isdir(extraction_directory):
os.mkdir(extraction_directory)
if f[-3:]=='zip':
with zipfile.ZipFile(local_filename) as zf:
zf.extractall(extraction_directory+subfolder)
elif f[-2:]=='gz':
with tarfile.open(local_filename, "r") as tar:
tar.extractall(extraction_directory+subfolder)
else:
print('unrecognized archive type')
raise
print("done")
def item_Load(itemID, bid, region):
if bid:
orderType = 'buy'
else:
orderType = 'sell'
url = "https://crest-tq.eveonline.com/market/"+str(region)+"/orders/"+orderType+"/?type=https://crest-tq.eveonline.com/inventory/types/"+str(itemID)+"/"
data_file = request.urlopen(url).read().decode('utf-8')
#request.urlretrieve(url, "D:/result.json")
#with open('D:/result.json') as data_file:
data = json.loads(data_file)
return data