def removePastReminders(self):
try :
file_hand = open(reminder_filename, 'r')
reminder_list = file_hand.readlines()
file_hand.close()
new_list = list()
for reminder in reminder_list :
date_time = datetime.strptime(reminder.split('\t')[1].replace('\n',''), '%d %b %Y %I %M %p')
time_diff = date_time - datetime.now()
if time_diff.seconds >= 0 and time_diff.days >= 0 :
new_list.append(reminder)
file_hand = open(reminder_filename, 'w')
for line in new_list :
file_hand.write(line)
file_hand.close()
except FileNotFoundError :
pass
except :
self.frame.displayText("Error occured")
python类open()的实例源码
def send_msg(self, name, word, isfile=False):
uid = self.get_user_id(name)
if uid is not None:
if isfile:
with open(word, 'r') as f:
result = True
for line in f.readlines():
line = line.replace('\n', '')
print '-> ' + name + ': ' + line
if self.send_msg_by_uid(line, uid):
pass
else:
result = False
time.sleep(1)
return result
else:
word = self.to_unicode(word)
if self.send_msg_by_uid(word, uid):
return True
else:
return False
else:
if self.DEBUG:
print '[ERROR] This user does not exist .'
return True
def get_icon(self, uid, gid=None):
"""
?????????????
:param uid: ???id
:param gid: ?id?????None????????????None????????
"""
if gid is None:
url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey)
else:
url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % (
uid, self.skey, self.encry_chat_room_id_list[gid])
r = self.session.get(url)
data = r.content
fn = 'icon_' + uid + '.jpg'
with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
f.write(data)
return fn
def update_repo(self, name, url=None):
reposfile = "%s/.kcli/repos.yml" % os.environ.get('HOME')
repodir = "%s/.kcli/repo_%s" % (os.environ.get('HOME'), name)
if url is None:
if not os.path.exists(reposfile) or os.path.getsize(reposfile) == 0:
common.pprint("Empty .kcli/repos.yml. Leaving...", color='red')
sys.exit(1)
else:
with open(reposfile, 'r') as entries:
try:
repos = yaml.load(entries)
except yaml.scanner.ScannerError:
common.pprint("Couldn't properly parse .kcli/repos.yml. Leaving...", color='red')
sys.exit(1)
if name not in repos:
common.pprint("Entry for name allready there. Leaving...", color='red')
sys.exit(1)
url = "%s/KMETA" % repos[name]
elif 'KMETA' not in url:
url = "%s/KMETA" % url
common.fetch(url, repodir)
return {'result': 'success'}
def public_port_exists(rline, authorized_keys_file, restrictions, public_key, port):
port = str(port)
new_restrictions = []
replaced = False
for restriction in restrictions.split("\n"):
if public_key in restriction:
if ":" + port + '"' not in restriction:
new_opens = 'no-pty,permitopen="localhost:{0}",permitopen="127.0.0.1:{0}",'
restriction = restriction.replace("no-pty,", new_opens.format(port))
replaced = True
else:
print("public_key and port already exists in", authorized_keys_file)
return
new_restrictions.append(restriction)
print("Adding key+port rule to file")
if replaced:
result = "\n".join(new_restrictions)
else:
result = rline
with open(authorized_keys_file, "w") as f:
f.write(result + "\n")
def build_login_menu(self):
def open_login(context):
webbrowser.open(self.cfg_cls.LOGIN_URI, new=1, autoraise=True)
login_menu = Gtk.Menu()
item_login = Gtk.MenuItem('Login')
item_separator = Gtk.SeparatorMenuItem()
item_quit = Gtk.MenuItem('Quit')
item_login.connect('activate', open_login)
item_quit.connect('activate', self.quit)
login_menu.append(item_login)
login_menu.append(item_separator)
login_menu.append(item_quit)
login_menu.show_all()
self.INDICATOR.set_menu(login_menu)
def send_msg(self, name, word, isfile=False):
uid = self.get_user_id(name)
if uid is not None:
if isfile:
with open(word, 'r') as f:
result = True
for line in f.readlines():
line = line.replace('\n', '')
print '-> ' + name + ': ' + line
if self.send_msg_by_uid(line, uid):
pass
else:
result = False
time.sleep(1)
return result
else:
word = self.to_unicode(word)
if self.send_msg_by_uid(word, uid):
return True
else:
return False
else:
if self.DEBUG:
print '[ERROR] This user does not exist .'
return True
def get_icon(self, uid, gid=None):
"""
?????????????
:param uid: ???id
:param gid: ?id?????None????????????None????????
"""
if gid is None:
url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey)
else:
url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % (
uid, self.skey, self.encry_chat_room_id_list[gid])
r = self.session.get(url)
data = r.content
fn = 'icon_' + uid + '.jpg'
with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
f.write(data)
return fn
def headless_authorize(self):
"""
Authorize without a display using only TTY.
"""
url, _ = self.oauth.authorize_token_url(redirect_uri=self.redirect_uri)
# Ask the user to open this url on a system with browser
print('\n-------------------------------------------------------------------------')
print('\t\tOpen the below URL in your browser\n')
print(url)
print('\n-------------------------------------------------------------------------\n')
print('NOTE: After authenticating on Fitbit website, you will redirected to a URL which ')
print('throws an ERROR. This is expected! Just copy the full redirected here.\n')
redirected_url = input('Full redirected URL: ')
params = urlparse.parse_qs(urlparse.urlparse(redirected_url).query)
print(params['code'][0])
self.authenticate_code(code=params['code'][0])
def main():
# Arguments parsing
parser = argparse.ArgumentParser("Client ID and Secret are mandatory arguments")
parser.add_argument("-i", "--id", required=True, help="Client id", metavar='<client-id>')
parser.add_argument("-s", "--secret", required=True, help="Client secret",
metavar='<client-secret>')
parser.add_argument("-c", "--console", default=False,
help="Authenticate only using console (for headless systems)", action="store_true")
args = parser.parse_args()
server = OAuth2Server(args.id, args.secret)
if args.console:
server.headless_authorize()
else:
server.browser_authorize()
credentials = dict(
client_id=args.id,
client_secret=args.secret,
access_token=server.oauth.token['access_token'],
refresh_token=server.oauth.token['refresh_token'])
json.dump(credentials, open('fitbit.json', 'w'))
def print_menu(arg=0):
if arg == 0:
print '''
------ Help Menu -------
Available Commands :
1. m<result number> - Print magnet link of selected torrent
2. c<result number> - Copy magnet link of selected torrent to clipboard
3. d<result number> - Download torrent using default torrent client
4. o<result number> - Open the torrent page of the selected torrent in the default browser
5. cs<result number> - Copy magnet link and open seedr.cc
6. cz<result number> - Copy magnet link and open zbigz
7. p[optional:<choice>] - Print top 10 results from each website for the given query
<choice> : [{default : 1}, {0 : Print formatted result}, {1 : Pretty print results}]
8. s - Enter a new query to search for over all avilable torrent websites
9. r - Repeat last search (with same query)
------------------------'''
elif arg == 1:
print '''
Enter 'q' to exit and 'h' to see all available commands.
'''
def execute(self, context):
webbrowser.open("https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=8TB6CNT9G8LEN")
return {"FINISHED"}
def _on_navigate(href):
print('Navigate to: %s' % (href,))
webbrowser.open(href)
def show_license(self, *args):
def show_license(progress, on_language):
text_license = open("LICENSE/GNU_LICENSE_{}.rst".format(
core.dict_language[on_language])).read()
message = KDialog(underline_color="a6b4bcff",
base_font_size=self.window_text_size,
background_image=core.theme_decorator_window)
message.show(title="GNU_LICENSE:", text=text_license, rst=True,
size_rst=(.9, .7))
progress.body.dismiss()
if len(args) > 1: # ??????? ?????? ? ??????
click_link = args[1]
webbrowser.open(click_link)
return
else:
on_language = args[0] # ?????? '?? ???????/?? ??????????'
progress = self.create_window(text=core.string_lang_wait)
Clock.schedule_once(lambda *args: show_license(progress, on_language),2)
def authorize(self):
if self._session is not None:
return
self._request_token, self._request_token_secret = \
self.gr.get_request_token(header_auth=True)
authorize_url = self._gr.get_authorize_url(self._request_token)
webbrowser.open(authorize_url)
def _write_credentials(self):
blob = {
'developer_key': self._developer_key,
'developer_secret': self._developer_secret,
'access_token': self.session.access_token,
'access_token_secret': self.session.access_token_secret,
}
with open(credentials_file_path, 'w+') as creds_file:
json.dump(blob, creds_file, sort_keys=True, indent=4)
def read_credentials():
if not os.path.exists(credentials_file_path):
return None
with open(credentials_file_path, 'r') as creds_file:
return json.loads(creds_file.read())
def write_and_open_in_browser(html: str, filepath: Optional[str]):
''' Writes html to filepath and opens it. '''
file = NamedTemporaryFile('w', encoding='utf-8', delete=False) if filepath is None else open(filepath, 'w', encoding='utf-8')
file.write(html)
file.close()
webbrowser.open(file.name)
def tempfilepager(text, cmd):
"""Page through text by invoking a program on a temporary file."""
import tempfile
filename = tempfile.mktemp()
file = open(filename, 'w')
file.write(text)
file.close()
try:
os.system(cmd + ' "' + filename + '"')
finally:
os.unlink(filename)
def writedoc(thing, forceload=0):
"""Write HTML documentation to a file in the current directory."""
try:
object, name = resolve(thing, forceload)
page = html.page(describe(object), html.document(object, name))
file = open(name + '.html', 'w')
file.write(page)
file.close()
print 'wrote', name + '.html'
except (ImportError, ErrorDuringImport), value:
print value