def play_music(p, l):
""" Plays music """
respond("playing music", prepend_positive_response=True)
playlist = os.path.join(BASE_DIRECTORY, 'playlist.m3u')
extensions = [
".mp3",
".m4a",
".ogg",
".flac",
]
music_files = _get_all_files(preferences.get_music_dir(), extensions)
with open(playlist, 'w') as f:
for file in music_files:
f.write(file + "\n")
os.startfile(playlist)
python类open()的实例源码
def execute(self, context):
#self.link = self.link.replace(" ","%")
#webbrowser.open("https://twitter.com/intent/tweet?text=Check%out%#2DAnimTools%by%@ndee85")
url = "https://twitter.com/intent/tweet?"
if self.link != "":
url += "&url="+self.link
if self.text != "":
url += "&text="+self.text.replace(" ","+")
if self.hashtags != "":
url += "&hashtags="+self.hashtags
if self.via != "":
url += "&via="+self.via
#"https://twitter.com/intent/tweet?url=https://www.youtube.com/ndee85&text=Hello+World&hashtags=coatools,test&via=ndee85"
webbrowser.open(url)
return {"FINISHED"}
def show_about(self):
def events_callback(instance_label, text_link):
def answer_callback(answer):
if answer in [core.string_lang_click_dimonvideo_redirect,
core.string_lang_click_ptyhon_redirect,
core.string_lang_click_kivy_redirect]:
webbrowser.open(answer.replace("www.", r"http:\\"))
if text_link in ["HeaTTheatR", "Virtuos86", "dimy44"]:
Clock.schedule_once(
lambda *args: self.send_mail_or_show_profile(text_link),
0.1)
else:
self.create_window(
callback=answer_callback, param="query",
text=core.text_for_about[text_link]["text"],
button_ok=core.text_for_about[text_link]["ok"],
button_cancel=core.text_for_about[text_link]["cancel"])
AboutDialog(events_callback=events_callback,
background_image=core.theme_decorator_window,
name_program=core.string_lang_about_name_program,
logo_program="Data/Images/logo.png",
info_program=core.info_program)
def synopsis(filename, cache={}):
"""Get the one-line summary out of a module file."""
mtime = os.stat(filename).st_mtime
lastupdate, result = cache.get(filename, (0, None))
if lastupdate < mtime:
info = inspect.getmoduleinfo(filename)
try:
file = open(filename)
except IOError:
# module can't be opened, so skip it
return None
if info and 'b' in info[2]: # binary modules have to be imported
try: module = imp.load_module('__temp__', file, filename, info[1:])
except: return None
result = (module.__doc__ or '').splitlines()[0]
del sys.modules['__temp__']
else: # text modules can be directly examined
result = source_synopsis(file)
file.close()
cache[filename] = (mtime, result)
return result
def importfile(path):
"""Import a Python source file or compiled file given its path."""
magic = imp.get_magic()
file = open(path, 'r')
if file.read(len(magic)) == magic:
kind = imp.PY_COMPILED
else:
kind = imp.PY_SOURCE
file.close()
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
file = open(path, 'r')
try:
module = imp.load_module(name, file, path, (ext, 'r', kind))
except:
raise ErrorDuringImport(path, sys.exc_info())
file.close()
return module
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
formatstyle = style_make()
with open(configfile) as fp:
for line in fp.readlines():
line = line.rstrip()
if line.startswith('--'):
line = line[2:]
pos = line.find('=')
if pos > 0:
optionname, value = line[:pos], line[pos + 1:]
else:
optionname, value = line, OPTION_PRESENT
set_option(formatstyle, optionname, value)
sourcedata = readbinary(sourcefile)
data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
if data is None:
data = b''
writebinary(destfile, data)
# ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
formatstyle = style_make()
with open(configfile) as fp:
for line in fp.readlines():
line = line.rstrip()
if line.startswith('#'):
continue
parts = line.split('=')
if len(parts) == 2:
optionname, value = parts
set_option(formatstyle, optionname, value)
sourcedata = readbinary(sourcefile)
data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
if data is None:
data = b''
writebinary(destfile, data)
# ----------------------------------------------------------------------
def reformat(self, sourcefile, destfile, configfile):
# type: (str, str, str) -> None
formatstyle = style_make()
with open(configfile) as fp:
for line in fp.readlines():
line = line.rstrip()
if line.startswith('#'):
continue
parts = re.split(r'\s+=\s+', line)
if len(parts) == 2:
optionname, value = parts
set_option(formatstyle, optionname, value)
sourcedata = readbinary(sourcefile)
data = self.formatcode(formatstyle, sourcedata, filename=sourcefile)
if data is None:
data = b''
writebinary(destfile, data)
# ----------------------------------------------------------------------
# Functions for the in-memory cache
def read_train_files(indir, separator=" "):
"""Description of read_train_files
Gather local features and GT from every individual train songs
"""
utils.print_success("Reading multiple train files")
indir = utils.abs_path_dir(indir) + "/"
groundtruths = []
features = []
included_extenstions = ["csv"]
filenames = [fn for fn in os.listdir(indir)
if any(fn.endswith(ext) for ext in included_extenstions)]
for index, filename in enumerate(filenames):
print(str(index + 1) + "/" + str(len(filenames)) + " " + filename)
sys.stdout.write("\033[F") # Cursor up one line
sys.stdout.write("\033[K") # Clear line
with open(indir + filename, "r") as filep:
for row in filep:
line = row.split(separator)
features.append([float(i) for i in line[:-1]])
groundtruths.append(line[-1][:-1])
sys.stdout.write("\033[K") # Clear line
return features, groundtruths
def read_test_file(filename):
"""
Read ONE test file with content like:
feat1 feat2 ... featN
feat1 feat2 ... featN
...
feat1 feat2 ... featN
"""
features = []
filename = utils.abs_path_file(filename)
with open(filename, "r") as filep:
for line in filep:
line = line.split(" ")
line[-1] = line[-1][:-1]
feat = []
for tmp_feat in line:
feat.append(float(tmp_feat))
features.append(feat)
return features
def test_models(models_dir, test_dir, out_dir):
models_dir = utils.abs_path_dir(models_dir) + "/"
test_dir = utils.abs_path_dir(test_dir) + "/"
utils.create_dir(out_dir)
test_files = os.listdir(test_dir)
models = os.listdir(models_dir)
for model in models:
utils.print_success(model)
pred_dir = out_dir + model + "/"
utils.create_dir(pred_dir)
clf = joblib.load(models_dir + model + "/" + model + ".pkl")
for index, test_file in enumerate(test_files):
print(str(index) + "\t" + test_file)
sys.stdout.write("\033[F")
sys.stdout.write("\033[K")
test_features = read_test_file(test_dir + test_file)
predictions = clf.predict_proba(test_features)
with open(pred_dir + test_file, "w") as filep:
for pred in predictions:
filep.write(str(pred[0]) + "\n")
sys.stdout.write("\033[K")
def handle_ref_error(self):
try:
if os.stat(self.ref).st_size>0:
with open(self.ref) as f:
for i in range(2):
line=f.next().strip()
if i == 0 and line[0]!='>':
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
if i == 1 and len(re.findall("[^ATGCN]", line.upper()))>0:
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
else:
return QtGui.QMessageBox.question(self, 'Warning !', 'The selected reference file is empty, please check !',
QtGui.QMessageBox.Ok)
except:
return QtGui.QMessageBox.question(self, 'Error !', 'Please input a reference file !',
QtGui.QMessageBox.Ok)
def start(self, callback=None):
self.loop.run_until_complete(self.server)
try:
path = os.path.dirname(os.path.realpath(__file__))
webbrowser.open('file:///' + os.path.join(path, 'index.html'))
self.loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self.server.close()
self.loop.close()
if callback is not None and callable(callback):
callback()
def main(cls, RequestHandlerClass, port=80):
"""Start server with handler on given port.
This static method provides an easy way to start, run, and exit
a HttpServer instance. The server will be executed if possible,
and the computer's web browser will be directed to the address."""
try:
server = cls(('', port), RequestHandlerClass)
active = True
except socket.error:
active = False
else:
addr, port = server.socket.getsockname()
print('Serving HTTP on', addr, 'port', port, '...')
finally:
port = '' if port == 80 else ':' + str(port)
addr = 'http://localhost' + port + '/'
webbrowser.open(addr)
if active:
try:
server.serve_forever()
except KeyboardInterrupt:
print('Keyboard interrupt received: EXITING')
finally:
server.server_close()
def get_settings(self):
# Try opening and loading the settings from file.
filename = os.path.join(self.__path, self.FILENAME)
try:
with open(filename, 'rb') as file:
settings = pickle.load(file)
# Test the pickle and check each setting inside it.
assert isinstance(settings, dict)
key_list = list(self.DEFAULT)
for key in settings:
assert isinstance(key, str)
assert key in self.DEFAULT
key_list.remove(key)
# Add new settings as needed (when new ones are created).
for key in key_list:
settings[key] = self.DEFAULT[key]
# Return old settings, or on error, the default settings.
return False, settings
except (IOError, pickle.UnpicklingError, AssertionError):
return True, self.DEFAULT
def update(self, callback):
# Find out if the file has been modified.
modified = os.path.getmtime(self.__path)
if modified != self.__modified:
# Remember the present time (we are getting an update).
self.__modified = modified
with open(self.__path, 'r') as file:
# Go to present location, read to EOF, and remember position.
file.seek(self.__position)
try:
text = file.read()
except UnicodeError:
print('Please report problem with:', repr(self.__path))
traceback.print_exc()
print('-' * 80)
self.__position = file.tell()
# Execute callback with file ID and new text update.
callback(self.__path, text)
################################################################################
def getchar(echo):
if not isatty(sys.stdin):
f = open('/dev/tty')
fd = f.fileno()
else:
fd = sys.stdin.fileno()
f = None
try:
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = os.read(fd, 32)
if echo and isatty(sys.stdout):
sys.stdout.write(ch)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
sys.stdout.flush()
if f is not None:
f.close()
except termios.error:
pass
_translate_ch_to_exc(ch)
return ch.decode(get_best_encoding(sys.stdin), 'replace')
def body(self, master):
self.t = Text(master)
self.t.pack()
self.t.tag_configure("href", foreground='blue', underline=1)
self.t.tag_bind("href", "<Button-1>", self.openHREF)
self.t.tag_bind("href", "<Enter>", self.show_hand_cursor)
self.t.tag_bind("href", "<Leave>", self.show_arrow_cursor)
self.t.config(cursor="arrow", bg="SystemButtonFace", wrap=WORD)
self.t.insert(END, "Thank you for using PyKeylogger, a versatile backup and system monitoring solution.")
self.t.insert(END, "\n\nAs you may remember from reading the \"welcome screen\", this binary expires \
after 4 days of use, as a method of encouraging donations to this open source software project. This installation of \
PyKeylogger has now *EXPIRED*. There are two ways \
to restore PyKeylogger's functionality: \n\n 1. Donate to PyKeylogger by following the simple instructions at ")
self.t.insert(END, "http://pykeylogger.sourceforge.net/wiki/index.php/PyKeylogger:Download_Instructions", "href")
self.t.insert(END, " and you will get a binary build of PyKeylogger without any nagscreens or expiration, \
by E-mail, HTTP, or FTP.")
self.t.insert(END, "\n\n 2. Get the source code, then find and toggle the nag control. You can then run \
PyKeylogger from source, or even build your own executable, by following the instructions at ")
self.t.insert(END, "http://pykeylogger.sourceforge.net/wiki/index.php/PyKeylogger:Installation_Instructions", "href")
self.t.insert(END, "\n\nIf you run into any trouble, feel free to ask for help on the PyKeylogger forums: ")
self.t.insert(END, "http://sourceforge.net/forum/?group_id=147501", "href")
self.t.config(state=DISABLED)
def recaptcha(self):
""" Check if we need to solve a captcha.
This will open in the default browser.
If we choose to not solve the captcha should it be required,
we will then be considered a 'lurker' and we will not be able to chat
and our name will be shown as a guest name in the room.
"""
t = str(random.uniform(0.9, 0.10))
_url = 'https://tinychat.com/cauth/captcha?{0}'.format(t)
_response = util.web.http_get(url=_url, json=True, proxy=self.proxy)
log.debug('recaptcha response: %s' % _response)
if _response['json'] is not None:
if _response['json']['need_to_solve_captcha'] == 1:
link = 'https://tinychat.com/cauth/recaptcha?token={0}'.format(_response['json']['token'])
webbrowser.open(link, new=True)
print (link)
raw_input('Solve the captcha and click enter to continue.')
def first_auth(self, client_secrets):
"""Authenticate with Google API."""
flow = client.flow_from_clientsecrets(
client_secrets,
scope=[
'https://www.googleapis.com/auth/admin.directory.group.readonly', # noqa
'https://www.googleapis.com/auth/admin.directory.group.member.readonly', # noqa
'https://www.googleapis.com/auth/apps.groups.settings'
],
redirect_uri='urn:ietf:wg:oauth:2.0:oob')
logger.debug('Generating authorization URL.')
auth_uri = flow.step1_get_authorize_url()
webbrowser.open(auth_uri)
auth_code = input('Enter the auth code: ')
logger.debug('Generating credentials.')
self.credentials = flow.step2_exchange(auth_code)
def server_port(project, ignored=None):
if project.port is not None and project.port != ignored:
return (project.port, True)
if project.port == ignored:
kill_server(project)
port_file = os.path.join(project.dir, ".tern-port")
if os.path.isfile(port_file):
port = int(open(port_file, "r").read())
if port != ignored:
project.port = port
return (port, True)
started = start_server(project)
if started is not None:
project.port = started
return (started, False)
def run(self, edit, **args):
data = run_command(self.view, {"type": "definition", "lineCharPositions": True})
if data is None: return
file = data.get("file", None)
if file is not None:
# Found an actual definition
row, col = self.view.rowcol(self.view.sel()[0].b)
cur_pos = self.view.file_name() + ":" + str(row + 1) + ":" + str(col + 1)
jump_stack.append(cur_pos)
if len(jump_stack) > 50: jump_stack.pop(0)
real_file = (os.path.join(get_pfile(self.view).project.dir, file) +
":" + str(data["start"]["line"] + 1) + ":" + str(data["start"]["ch"] + 1))
sublime.active_window().open_file(real_file, sublime.ENCODED_POSITION)
else:
url = data.get("url", None)
if url is None:
sublime.error_message("Could not find a definition")
else:
webbrowser.open(url)
def synopsis(filename, cache={}):
"""Get the one-line summary out of a module file."""
mtime = os.stat(filename).st_mtime
lastupdate, result = cache.get(filename, (None, None))
if lastupdate is None or lastupdate < mtime:
info = inspect.getmoduleinfo(filename)
try:
file = open(filename)
except IOError:
# module can't be opened, so skip it
return None
if info and 'b' in info[2]: # binary modules have to be imported
try: module = imp.load_module('__temp__', file, filename, info[1:])
except: return None
result = (module.__doc__ or '').splitlines()[0]
del sys.modules['__temp__']
else: # text modules can be directly examined
result = source_synopsis(file)
file.close()
cache[filename] = (mtime, result)
return result
def importfile(path):
"""Import a Python source file or compiled file given its path."""
magic = imp.get_magic()
file = open(path, 'r')
if file.read(len(magic)) == magic:
kind = imp.PY_COMPILED
else:
kind = imp.PY_SOURCE
file.close()
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
file = open(path, 'r')
try:
module = imp.load_module(name, file, path, (ext, 'r', kind))
except:
raise ErrorDuringImport(path, sys.exc_info())
file.close()
return module
def basicWebServer(self):
import SimpleHTTPServer
import SocketServer
PORT = 8000
Handler = SimpleHTTPServer.SimpleHTTPRequestHandler
while (True):
try:
print "Trying to open on port", PORT
httpd = SocketServer.TCPServer(("", PORT), Handler)
except Exception as e:
print "port", PORT, "did not work, checking next port"
PORT += 1
else:
break
print "serving at port", PORT
webbrowser.open("http://localhost:"+str(PORT)+"/forceD3/force.html")
httpd.serve_forever()
### FUNCTIONS TO BE DEFINED IN SUBCLASSES ###
def graphD3(self, buds_visible = False, filter_assym_edges = False):
# possibly combine with above?
self.buds_visible = buds_visible
self.filter_assym_edges = filter_assym_edges
G = self.makeGraphData(mode = "d3")
# print G
if hasattr(self, 'nodeColorInfo'):
G = self.useColorData(G, "d3")
# for item in adjList:
# print item
#nodeColors = self.useColorData(G)
#for item in nx.readwrite.__dict__:
#print item
# nodeLinkData = nx.readwrite.d3_js(G)
#print G
json.dump(G, open('forceD3/force.json','w'))
self.basicWebServer()
def openFile(filename, mode=u'rU', continueOnError=False, displayError=True):
try:
if filename != u'-':
return open(os.path.expanduser(filename), mode)
if mode.startswith(u'r'):
return StringIOobject(text_type(sys.stdin.read()))
return sys.stdout
except IOError as e:
if continueOnError:
if displayError:
stderrWarningMsg(e)
setSysExitRC(FILE_ERROR_RC)
return None
systemErrorExit(FILE_ERROR_RC, e)
# Close a file
def readFile(filename, mode=u'rU', continueOnError=False, displayError=True, encoding=None):
try:
if filename != u'-':
if not encoding:
with open(os.path.expanduser(filename), mode) as f:
return f.read()
with codecs.open(os.path.expanduser(filename), mode, encoding) as f:
content = f.read()
# codecs does not strip UTF-8 BOM (ef:bb:bf) so we must
if not content.startswith(codecs.BOM_UTF8):
return content
return content[3:]
return text_type(sys.stdin.read())
except IOError as e:
if continueOnError:
if displayError:
stderrWarningMsg(e)
setSysExitRC(FILE_ERROR_RC)
return None
systemErrorExit(FILE_ERROR_RC, e)
except (LookupError, UnicodeDecodeError, UnicodeError) as e:
Cmd.Backup()
usageErrorExit(e)
# Write a file
def getchar(echo):
if not isatty(sys.stdin):
f = open('/dev/tty')
fd = f.fileno()
else:
fd = sys.stdin.fileno()
f = None
try:
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = os.read(fd, 32)
if echo and isatty(sys.stdout):
sys.stdout.write(ch)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
sys.stdout.flush()
if f is not None:
f.close()
except termios.error:
pass
_translate_ch_to_exc(ch)
return ch.decode(get_best_encoding(sys.stdin), 'replace')
def collision_series(paperback=True, kindle=True):
import webbrowser
import imp
if paperback:
webbrowser.open("https://www.createspace.com/6043857")
imp.reload(webbrowser)
webbrowser.open("https://www.createspace.com/7164863")
return
if kindle:
webbrowser.open("https://www.amazon.com/Resolve-Immortal-Flesh-Collision-Book-ebook/dp/B01CO3MBVQ")
imp.reload(webbrowser)
webbrowser.open("https://www.amazon.com/Formulacrum-Collision-Book-Rich-Colburn-ebook/dp/B0711P744G")
return
webbrowser.open("https://www.paypal.com/donate/?token=G1UymFn4CP8lSFn1r63jf_XOHAuSBfQJWFj9xjW9kWCScqkfYUCdTzP-ywiHIxHxYe7uJW&country.x=US&locale.x=US")
# ============================================================================================