def download_song(song_url, song_title):
"""
Download a song using youtube url and song title
"""
outtmpl = song_title + '.%(ext)s'
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': outtmpl,
'postprocessors': [
{'key': 'FFmpegExtractAudio','preferredcodec': 'mp3',
'preferredquality': '192',
},
{'key': 'FFmpegMetadata'},
],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(song_url, download=True)
python类YoutubeDL()的实例源码
def __init__(self, requester, item_info):
self.requester = requester
self.item_info = item_info
self.url = self.item_info.get('webpage_url')
self.video_id = self.item_info.get('id') or self.url
self.uploader = self.item_info.get('uploader') or 'Unknown'
self.title = self.item_info['title']
if 'thumbnail' in self.item_info:
thumb = self.item_info.get('thumbnail')
if thumb:
self.thumbnail = thumb
else:
self.thumbnail = 'https://i.imgur.com/CGPNJDT.png'
else:
self.thumbnail = 'https://i.imgur.com/CGPNJDT.png'
self.duration = int(self.item_info.get('duration') or 0)
self.downloaded = False
self.loop = asyncio.get_event_loop()
self.threads = ThreadPoolExecutor()
self.ytdl_params = ytdl_params
self.ytdl = youtube_dl.YoutubeDL(self.ytdl_params)
self.token = self.tokenize()
self.location = None
print(self.thumbnail)
def download_song(song_url, song_title):
'''
Downloads song from youtube-dl
'''
outtmpl = song_title + '.%(ext)s'
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': outtmpl,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
},
{'key': 'FFmpegMetadata'},
],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(song_url, download=True)
def test_create_basenames_from_ydl_info_dict_playlist(self):
ydl = YoutubeDL()
result = self.tu.create_basenames_from_ydl_info_dict(
ydl, info_dict_playlist)
expected_result = set([
'Live Streaming Rafid Aslam-7gjgkH5iPaE',
'Live Streaming Rafid Aslam-q92kxPm-pqM',
'Cara Membuat Laptop Menjadi Hotspot WiFi Dengan CMD-YjFwMSDNphM',
'[CSO] Defeat Boss in Dead End With Thanatos 7-EEm6MwXLse0',
'Cara Bermain Minecraft Multiplayer Dengan LAN-g2vTZ2ka-tM',
'Live Streaming Rafid Aslam-AXhuSS5_9YU',
'Cara Membuat Disk Baru di Komputer-KDOygJnK7Sw',
'Cara Mendownload Lewat Torrent-cC-9RghkvXs']
)
self.assertEqual(result, expected_result)
def download(title, video_url):
ydl_opts = {
'outtmpl': '{}.%(ext)s'.format(title),
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
return {
'audio': open('{}.mp3'.format(title), 'rb'),
'title': title,
}
def downloadURL(url, preferredCodec=None, preferredQuality=None):
""" Downloads song using youtube_dl and the song's youtube
url.
"""
codec, quality = getCodecAndQuality()
ydl_opts = {
'format': 'bestaudio/best',
'quiet': True,
'no_warnings': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': codec,
'preferredquality': quality,
},
{'key': 'FFmpegMetadata'},
],
}
try:
with YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(url, download=True)
except:
print("Problem downloading " + url)
return None
def expect_info_dict(self, got_dict, expected_dict):
expect_dict(self, got_dict, expected_dict)
# Check for the presence of mandatory fields
if got_dict.get('_type') not in ('playlist', 'multi_video'):
for key in ('id', 'url', 'title', 'ext'):
self.assertTrue(got_dict.get(key), 'Missing mandatory field %s' % key)
# Check for mandatory fields that are automatically set by YoutubeDL
for key in ['webpage_url', 'extractor', 'extractor_key']:
self.assertTrue(got_dict.get(key), 'Missing field: %s' % key)
# Are checkable fields missing from the test case definition?
test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value))
for key, value in got_dict.items()
if value and key in ('id', 'title', 'description', 'uploader', 'upload_date', 'timestamp', 'uploader_id', 'location', 'age_limit'))
missing_keys = set(test_info_dict.keys()) - set(expected_dict.keys())
if missing_keys:
def _repr(v):
if isinstance(v, compat_str):
return "'%s'" % v.replace('\\', '\\\\').replace("'", "\\'").replace('\n', '\\n')
else:
return repr(v)
info_dict_str = ''
if len(missing_keys) != len(expected_dict):
info_dict_str += ''.join(
' %s: %s,\n' % (_repr(k), _repr(v))
for k, v in test_info_dict.items() if k not in missing_keys)
if info_dict_str:
info_dict_str += '\n'
info_dict_str += ''.join(
' %s: %s,\n' % (_repr(k), _repr(test_info_dict[k]))
for k in missing_keys)
write_string(
'\n\'info_dict\': {\n' + info_dict_str + '},\n', out=sys.stderr)
self.assertFalse(
missing_keys,
'Missing keys in test definition: %s' % (
', '.join(sorted(missing_keys))))
def test_postprocessors(self):
filename = 'post-processor-testfile.mp4'
audiofile = filename + '.mp3'
class SimplePP(PostProcessor):
def run(self, info):
with open(audiofile, 'wt') as f:
f.write('EXAMPLE')
return [info['filepath']], info
def run_pp(params, PP):
with open(filename, 'wt') as f:
f.write('EXAMPLE')
ydl = YoutubeDL(params)
ydl.add_post_processor(PP())
ydl.post_process(filename, {'filepath': filename})
run_pp({'keepvideo': True}, SimplePP)
self.assertTrue(os.path.exists(filename), '%s doesn\'t exist' % filename)
self.assertTrue(os.path.exists(audiofile), '%s doesn\'t exist' % audiofile)
os.unlink(filename)
os.unlink(audiofile)
run_pp({'keepvideo': False}, SimplePP)
self.assertFalse(os.path.exists(filename), '%s exists' % filename)
self.assertTrue(os.path.exists(audiofile), '%s doesn\'t exist' % audiofile)
os.unlink(audiofile)
class ModifierPP(PostProcessor):
def run(self, info):
with open(info['filepath'], 'wt') as f:
f.write('MODIFIED')
return [], info
run_pp({'keepvideo': False}, ModifierPP)
self.assertTrue(os.path.exists(filename), '%s doesn\'t exist' % filename)
os.unlink(filename)
def _download_restricted(url, filename, age):
""" Returns true if the file has been downloaded """
params = {
'age_limit': age,
'skip_download': True,
'writeinfojson': True,
'outtmpl': '%(id)s.%(ext)s',
}
ydl = YoutubeDL(params)
ydl.add_default_info_extractors()
json_filename = os.path.splitext(filename)[0] + '.info.json'
try_rm(json_filename)
ydl.download([url])
res = os.path.exists(json_filename)
try_rm(json_filename)
return res
def __init__(self, url, progress_hook):
self._url = url
self._progress_hook = progress_hook
self._INFO_OPTS = {
'logger': YoutubeDLLogger(),
'socket_timeout': 10
}
ydl = ytdl.YoutubeDL(self._INFO_OPTS)
self._progress_hook({
'status': 'getting_information'
})
self._info = ydl.extract_info(self._url, download=False)
self._progress_hook({
'status': 'information_downloaded',
'type': 'playlist' if self.is_playlist() else 'video'
})
def descargarPorLink(self):
ydl_opts = {
'format': self.format,
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': self.preferedformat
}]
}
with YT(ydl_opts) as yt:
homedir = os.getenv("HOME")
exdir = os.getcwd() #exdir es el directorio actual, se guarda para saber donde volver una vez completada la descarga
#print(exdir)
if not os.path.exists(homedir+ "/Descargas/GUIYoutube"): os.makedirs(homedir+"/Descargas/GUIYoutube")
os.chdir(homedir+"/Descargas/GUIYoutube")
#print(os.getcwd())
yt.download([self.link])
os.chdir(exdir)
#self.popup = QtGui.QMessageBox.information(self, "Informacion", """Descarga finalizada (revise la carpeta Descargas)""",QtGui.QMessageBox.Ok)
subprocess.Popen(["notify-send", "-t","4000", "Descarga finalizada (revise su carpeta de descargas)"])
def descargar(self):
self.parent().parent().parent().parent().parent().seleccionDeCalidad()
self.parent().parent().parent().parent().parent().link = self.descarga[0]
#Dialogo que consulta la calidad y formato
#self.dialogo = self.formato()
"""
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4'
}]
}
with YT(ydl_opts) as yt:
homedir = os.getenv("HOME")
exdir = os.getcwd() #exdir es el directorio actual, se guarda para saber donde volver una vez completada la descarga
#print(exdir)
if not os.path.exists(homedir+ "/Descargas/GUIYoutube"): os.makedirs(homedir+"/Descargas/GUIYoutube")
os.chdir(homedir+"/Descargas/GUIYoutube")
#print(os.getcwd())
yt.download([self.descarga[0]])
os.chdir(exdir)"""
#self.popup = QtGui.QMessageBox.information(self, "Informacion", """Descarga finalizada (revise la carpeta Descargas)""",QtGui.QMessageBox.Ok)
def get_info(self):
if self._yt is None:
self._yt = youtube_dl.YoutubeDL(youtube_dl_options)
if "[SEARCH:]" not in self.url:
video = self._yt.extract_info(self.url, download=False,
process=False)
else:
self.url = self.url[9:]
yt_id = self._yt.extract_info(
self.url, download=False)["entries"][0]["id"]
# Should handle errors here ^
self.url = "https://youtube.com/watch?v={}".format(yt_id)
video = self._yt.extract_info(self.url, download=False,
process=False)
self.song = Song(**video)
def run(self):
song_urls = iter(self.song_urls)
errors = []
with youtube_dl.YoutubeDL(self.opts) as yt:
while not self.is_aborted():
try:
song_url = next(song_urls)
logger.info("Downloading audio/video from {url}".format(url=song_url))
try:
yt.download([song_url])
except youtube_dl.DownloadError:
errors.append(song_url)
self.downloaded += 100
wx.CallAfter(self.parent.download_update, message=self.downloaded)
except StopIteration:
wx.CallAfter(self.parent.download_complete, errors=errors)
break
def yt_extract(links):
"""Extract individual URLs from a playlist.
Args:
links (list[str]): A list of the URLs.
Returns:
list[str]: The list of extracted URLs.
"""
songs = []
with youtube_dl.YoutubeDL() as yt_dl:
for url in links:
try:
result = yt_dl.extract_info(url, download=False, process=False)
except youtube_dl.DownloadError:
return songs
if 'entries' in result:
songs.extend(vid['url'] for vid in list(result['entries']))
else:
songs.append(url)
return songs
def get_videos(url):
"""Download all videos in the course.
"""
# Lynda.com login and video filename options
options = {
'username': USERNAME,
'password': PASSWORD,
'outtmpl': u'%(playlist_index)s-%(title)s.%(ext)s',
'writesubtitles': SUBTITLES,
'allsubtitles': SUBTITLES,
'download_archive': ARCHIVE,
'external_downloader': EXTERNAL_DL
}
try:
with youtube_dl.YoutubeDL(options) as ydl:
ydl.download([url])
except:
print('Could not download the video in course: {}'.format(url))
def download(self,command,id):
#print command
download=subprocess.Popen(command,shell=True)
#download = subprocess.Popen('echo "hello"',shell=True)
download.wait()
#with youtube_dl.YoutubeDL() as ydl:
# ydl.download(['http://www.bilibili.com/video/av2331280?from=search&seid=831467570734192523'])
print 'download video successfully'
gif_command='ffmpeg -ss 20 -t 20 -i'+' '+str(id)+'.mp4 -s 320x240 -f gif'+' '+str(id)+'.gif'
gif=subprocess.Popen(gif_command,shell=True)
gif.wait()
print 'generate gif successfully'
cp_command='cp'+' '+str(id)+'.gif'+' /home/www/public/media'
cp = subprocess.Popen(cp_command, shell=True)
cp.wait()
print 'copy gif to media successfully'
def youtube_download(self,url,id):
#download_command=['youtube-dl -f bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio\
# --merge-output-format mp4 -o /Users/wujishanxia/Downloads/test.mp4',url]
command='youtube-dl'+' -f bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best'+' -o'+' '+str(id)+'.mp4'+' '+url
#thread=threading.Thread(target=self.download,args=(command,id))
#thread.setDaemon(True)
#thread.start()
download=subprocess.Popen(command,shell=True)
#download = subprocess.Popen('echo "hello"',shell=True)
download.wait()
#with youtube_dl.YoutubeDL() as ydl:
# ydl.download(['http://www.bilibili.com/video/av2331280?from=search&seid=831467570734192523'])
print 'download video successfully'
gif_command='ffmpeg -ss 20 -t 20 -i'+' '+str(id)+'.mp4 -s 320x240 -f gif'+' '+str(id)+'.gif'
gif=subprocess.Popen(gif_command,shell=True)
gif.wait()
print 'generate gif successfully'
cp_command='cp'+' '+str(id)+'.gif'+' /home/www/public/media'
cp = subprocess.Popen(cp_command, shell=True)
cp.wait()
print 'copy gif to media successfully'
rm_command='rm'+' '+str(id)+'.mp4'
rm = subprocess.Popen(rm_command,shell=True)
rm.wait()
print 'rm mp4 successfully'
def videoDownloader(string, passedTitle, yearVar):
# Options for the video downloader
ydl1_opts = {
'outtmpl': os.path.join(TheaterTrailersHome, 'Trailers', '{0} ({1})'.format(passedTitle, yearVar), '{0} ({1}).mp4'.format(passedTitle, yearVar)),
'ignoreerrors': True,
'format': 'mp4',
}
with youtube_dl.YoutubeDL(ydl1_opts) as ydl:
logger.info("downloading {0} from {1}".format(passedTitle, string))
ydl.download([string])
shutil.copy2(
os.path.join(trailerLocation, '{0} ({1})'.format(passedTitle, yearVar), '{0} ({1}).mp4'.format(passedTitle, yearVar)),
os.path.join(trailerLocation, '{0} ({1})'.format(passedTitle, yearVar), '{0} ({1})-trailer.mp4'.format(passedTitle, yearVar))
)
shutil.copy2(
os.path.join(TheaterTrailersHome, 'res', 'poster.jpg'),
os.path.join(trailerLocation, '{0} ({1})'.format(passedTitle, yearVar))
)
updatePlex()
# Downloads info for the videos from the playlist
def get_info(self):
if self._yt is None:
self._yt = youtube_dl.YoutubeDL(youtube_dl_options)
if "[SEARCH:]" not in self.url:
video = self._yt.extract_info(self.url, download=False,
process=False)
else:
self.url = self.url[9:]
yt_id = self._yt.extract_info(
self.url, download=False)["entries"][0]["id"]
# Should handle errors here ^
self.url = "https://youtube.com/watch?v={}".format(yt_id)
video = self._yt.extract_info(self.url, download=False,
process=False)
self.song = Song(**video)
test_age_restriction.py 文件源码
项目:get-youtube-subtitle-url-node
作者: joegesualdo
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def _download_restricted(url, filename, age):
""" Returns true if the file has been downloaded """
params = {
'age_limit': age,
'skip_download': True,
'writeinfojson': True,
'outtmpl': '%(id)s.%(ext)s',
}
ydl = YoutubeDL(params)
ydl.add_default_info_extractors()
json_filename = os.path.splitext(filename)[0] + '.info.json'
try_rm(json_filename)
ydl.download([url])
res = os.path.exists(json_filename)
try_rm(json_filename)
return res
def get_info(self):
if self._yt is None:
self._yt = youtube_dl.YoutubeDL(youtube_dl_options)
if "[SEARCH:]" not in self.url:
video = self._yt.extract_info(self.url, download=False,
process=False)
else:
self.url = self.url[9:]
yt_id = self._yt.extract_info(
self.url, download=False)["entries"][0]["id"]
# Should handle errors here ^
self.url = "https://youtube.com/watch?v={}".format(yt_id)
video = self._yt.extract_info(self.url, download=False,
process=False)
self.song = Song(**video)
def _request(self):
ie_key = "YoutubeLive" if "live" in self._baseurl.lower() else "Youtube"
try:
self._setupFormatMap()
with YoutubeDL(self._params) as ytdl:
result = ytdl.extract_info(self._baseurl, ie_key=ie_key, download=False, process=True)
if self.KEY_ENTRIES in result: # Can be a playlist or a list of videos
entry = result[self.KEY_ENTRIES][0] #TODO handle properly
else:# Just a video
entry = result
fmt = entry.get(self.KEY_FORMAT_ID)
url = ""
suburi = ""
for f in entry.get(self.KEY_REQUESTED_FORMATS, []):
if not url and f.get(self.KEY_VCODEC, u"none") != u"none":
url = str(f.get(self.KEY_URL, ""))
elif not suburi and f.get(self.KEY_ACODEC, u"none") != u"none":
suburi = str(f.get(self.KEY_URL, ""))
if not url:
url = str(entry.get(self.KEY_URL, ""))
self._onResult(True, url, fmt, suburi)
except Exception as e:
Log.w(e)
self._onResult(False, None, -1)
def getCaptions(url, progress_cb, so_far, task_weight):
ydl = youtube_dl.YoutubeDL({'writesubtitles': True, 'allsubtitles': True, 'writeautomaticsub': True})
with ydl:
res = ydl.extract_info(url, download=False)
if res['requested_subtitles'] and res['requested_subtitles']['en']:
print ('Grabbing vtt file from ' + res['requested_subtitles']['en']['url'])
response = requests.get(res['requested_subtitles']['en']['url'], stream=True)
b = BytesIO()
for block in response.iter_content(1024):
b.write(block)
b.seek(0)
arr = WebVTTReader().read(b.read().decode('ascii'))
progress_cb(so_far + task_weight, so_far + task_weight)
return arr.get_captions('en-US')
else:
return []
print ('Youtube Video does not have any english captions')
return None
def download(self):
config = self.configuration.get_config()
self.options['format'] = config['quality'] + "[ext="+config['format']+"]+bestaudio/"+config['quality']+ "+bestaudio"
with youtube_dl.YoutubeDL(self.options) as ydl:
ydl.download(["http://www.youtube.com/watch?v=" + self.video])
def youtube(self, *, search : str):
'''Find a Youtube video'''
ydl = youtube_dl.YoutubeDL({"default_search": "auto", "noplaylist": True, "quiet": True})
func = functools.partial(ydl.extract_info, search, download = False)
info = await self.bot.loop.run_in_executor(None, func)
if "entries" in info:
info = info["entries"][0]
await self.bot.reply(info.get("webpage_url"))
def _get_song_info(self, song):
ydl = youtube_dl.YoutubeDL(self.ytdl_options)
func = functools.partial(ydl.extract_info, song, download = False)
info = await self.bot.loop.run_in_executor(None, func)
if "entries" in info:
info = info["entries"][0]
logging.getLogger("discord").info("playing URL {}".format(song))
return info
def _download_song(self, song):
ydl = youtube_dl.YoutubeDL(self.ytdl_download_options)
func = functools.partial(ydl.extract_info, song, download = True)
info = await self.bot.loop.run_in_executor(None, func)
return ydl.prepare_filename(info)
def __init__(self, download_folder=None):
self.thread_pool = ThreadPoolExecutor(max_workers=2)
self.unsafe_ytdl = youtube_dl.YoutubeDL(ytdl_format_options)
self.safe_ytdl = youtube_dl.YoutubeDL(ytdl_format_options)
self.safe_ytdl.params['ignoreerrors'] = True
self.download_folder = download_folder
if download_folder:
otmpl = self.unsafe_ytdl.params['outtmpl']
self.unsafe_ytdl.params['outtmpl'] = os.path.join(download_folder, otmpl)
# print("setting template to " + os.path.join(download_folder, otmpl))
otmpl = self.safe_ytdl.params['outtmpl']
self.safe_ytdl.params['outtmpl'] = os.path.join(download_folder, otmpl)
def download_next_song(self, song):
"""Downloads the next song and starts playing it"""
dl_ydl_opts = dict(ydl_opts)
dl_ydl_opts["progress_hooks"] = [self.ytdl_progress_hook]
dl_ydl_opts["outtmpl"] = self.output_format
# Move the songs from the next cache to the current cache
self.move_next_cache()
self.state = 'ready'
self.play_empty()
# Download the file and create the stream
with youtube_dl.YoutubeDL(dl_ydl_opts) as ydl:
try:
ydl.download([song])
except DownloadStreamException:
# This is a livestream, use the appropriate player
future = asyncio.run_coroutine_threadsafe(self.create_stream_player(song, dl_ydl_opts), client.loop)
try:
future.result()
except Exception as e:
logger.exception(e)
self.vafter_ts()
return
except PermissionError:
# File is still in use, it'll get cleared next time
pass
except youtube_dl.utils.DownloadError as e:
self.logger.exception(e)
self.statuslog.error(e)
self.vafter_ts()
return
except Exception as e:
self.logger.exception(e)
self.vafter_ts()
return