def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
python类linesep()的实例源码
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
starts, stops = [], []
self._file.seek(0)
while True:
line_pos = self._file.tell()
line = self._file.readline()
if line.startswith('From '):
if len(stops) < len(starts):
stops.append(line_pos - len(os.linesep))
starts.append(line_pos)
elif line == '':
stops.append(line_pos)
break
self._toc = dict(enumerate(zip(starts, stops)))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
script = open(path, 'rb')
try:
firstline = script.readline()
if not firstline.startswith(binary('#!python')):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = binary('#!') + exename + binary(os.linesep)
rest = script.read()
finally:
script.close()
script = open(path, 'wb')
try:
script.write(firstline)
script.write(rest)
finally:
script.close()
return True
def main(initial_args=None):
if initial_args is None:
initial_args = sys.argv[1:]
autocomplete()
try:
cmd_name, cmd_args = parseopts(initial_args)
except PipError:
e = sys.exc_info()[1]
sys.stderr.write("ERROR: %s" % e)
sys.stderr.write(os.linesep)
sys.exit(1)
command = commands[cmd_name]()
return command.main(cmd_args)
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, cmd_args = parseopts(args)
except PipError as exc:
sys.stderr.write("ERROR: %s" % exc)
sys.stderr.write(os.linesep)
sys.exit(1)
# Needed for locale.getpreferredencoding(False) to work
# in pip._internal.utils.encoding.auto_decode
try:
locale.setlocale(locale.LC_ALL, '')
except locale.Error as e:
# setlocale can apparently crash if locale are uninitialized
logger.debug("Ignoring error %s when setting locale", e)
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
def remove(path, pth_file=None):
pth_file = pth_file or PATH
try:
with open(pth_file, "r") as f:
lines = f.read().splitlines()[2:]
except IOError:
raise ValueError("not found: {}".format(path))
try:
lines.remove(path)
except ValueError:
raise ValueError("not found: {}".format(path))
lines.insert(0, HEADER)
with open(pth_file, "w") as f:
f.write(os.linesep.join(lines))
def _printf(fmt, *args, **print3opts):
'''Formatted print to sys.stdout or given stream.
*print3opts* -- print keyword arguments, like Python 3.+
'''
if print3opts: # like Python 3.0
f = print3opts.get('file', None) or sys.stdout
if args:
f.write(fmt % args)
else:
f.write(fmt)
f.write(print3opts.get('end', linesep))
if print3opts.get('flush', False):
f.flush()
elif args:
print(fmt % args)
else:
print(fmt)
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test', source='main.c', target='app')
from waflib.Tools import waf_unit_test
bld.add_post_fun(waf_unit_test.set_exit_code)
"""
lst = getattr(bld, 'utest_results', [])
for (f, code, out, err) in lst:
if code:
msg = []
if out:
msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
if err:
msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def write_json(self, data, pretty=True):
"""
Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::
def build(bld):
bld.path.find_node('xyz.json').write_json(199)
:type data: object
:param data: The data to write to disk
:type pretty: boolean
:param pretty: Determines if the JSON will be nicely space separated
"""
import json # Python 2.6 and up
indent = 2
separators = (',', ': ')
sort_keys = pretty
newline = os.linesep
if not pretty:
indent = None
separators = (',', ':')
newline = ''
output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
self.write(output, encoding='utf-8')
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test', source='main.c', target='app')
from waflib.Tools import waf_unit_test
bld.add_post_fun(waf_unit_test.set_exit_code)
"""
lst = getattr(bld, 'utest_results', [])
for (f, code, out, err) in lst:
if code:
msg = []
if out:
msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
if err:
msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def write_json(self, data, pretty=True):
"""
Writes a python object as JSON to disk. Files are always written as UTF8 as per the JSON standard::
def build(bld):
bld.path.find_node('xyz.json').write_json(199)
:type data: object
:param data: The data to write to disk
:type pretty: boolean
:param pretty: Determines if the JSON will be nicely space separated
"""
import json # Python 2.6 and up
indent = 2
separators = (',', ': ')
sort_keys = pretty
newline = os.linesep
if not pretty:
indent = None
separators = (',', ':')
newline = ''
output = json.dumps(data, indent=indent, separators=separators, sort_keys=sort_keys) + newline
self.write(output, encoding='utf-8')
def set_exit_code(bld):
"""
If any of the tests fail waf will exit with that exit code.
This is useful if you have an automated build system which need
to report on errors from the tests.
You may use it like this:
def build(bld):
bld(features='cxx cxxprogram test', source='main.c', target='app')
from waflib.Tools import waf_unit_test
bld.add_post_fun(waf_unit_test.set_exit_code)
"""
lst = getattr(bld, 'utest_results', [])
for (f, code, out, err) in lst:
if code:
msg = []
if out:
msg.append('stdout:%s%s' % (os.linesep, out.decode('utf-8')))
if err:
msg.append('stderr:%s%s' % (os.linesep, err.decode('utf-8')))
bld.fatal(os.linesep.join(msg))
def __init__(self, to=sys.stdout, only=(), *args, **kwargs):
super(DebugStream, self).__init__(*args, **kwargs)
def safe_str(chunk):
if isinstance(chunk, bytes):
chunk = chunk.decode("utf-8")
elif not isinstance(chunk, str):
chunk = str(chunk)
return chunk
class Bugger(object):
__before__ = __after__ = lambda *args: None
def __getattr__(self, event):
def inner(*args, **kwargs):
to.write(event.upper() + " ")
to.write("; ".join(map(safe_str, args)))
to.write(" ")
to.write(", ".join("{0}: {1}".format(k, safe_str(v))
for k, v in kwargs.items()))
to.write(os.linesep)
return inner
self.attach(Bugger(), only=only)
def generate():
global sound
statement = coremeraco.constructRegularStatement() + os.linesep
textArea.insert(Tkinter.END, statement)
textArea.yview(Tkinter.END)
if readOption and readOutputState.get():
if sound:
sound.stop()
sound = playsnd.playsnd()
threading.Thread(target=sound.play, args=(procfest.text2wave(statement),)).start()
if saveOutputState.get():
try:
outputFileHandler = open(saveOutputDestination.get(), 'a') # 'a' means append mode
outputFileHandler.write(statement)
outputFileHandler.close()
except IOError as Argument:
tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "ioErrorMessage") + str(Argument))
except Exception as Argument:
tkMessageBox.showerror(productName, meracolocale.getLocalisedString(localisationFile, "genericErrorMessage") + str(Argument))
def test_append_metadata_file(tmpdir):
initial_data = 'some data'
my_metadata = {
'some': 'data',
'for': 'this metadata'
}
output_file = tmpdir.join('metadata')
output_file.write_binary((initial_data + os.linesep).encode('utf-8'))
with metadata.MetadataWriter(suppress_output=False)(str(output_file)) as writer:
writer.write_metadata(**my_metadata)
lines = output_file.readlines()
assert len(lines) == 2
assert lines[0].strip() == initial_data
assert json.loads(lines[1].strip()) == my_metadata
def test_overwrite_metdata_file(tmpdir):
initial_data = 'some data'
my_metadata = {
'some': 'data',
'for': 'this metadata'
}
output_file = tmpdir.join('metadata')
output_file.write(initial_data + os.linesep)
overwrite_writer = metadata.MetadataWriter(suppress_output=False)(str(output_file))
overwrite_writer.force_overwrite()
with overwrite_writer as writer:
writer.write_metadata(**my_metadata)
lines = output_file.readlines()
assert len(lines) == 1
assert json.loads(lines[0].strip()) == my_metadata
def write_metadata(self, **metadata):
# type: (**Any) -> Optional[int]
"""Writes metadata to the output stream if output is not suppressed.
:param **metadata: JSON-serializeable metadata kwargs to write
"""
if self.suppress_output:
return 0 # wrote 0 bytes
metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
metadata_output = '' # type: Union[str, bytes]
if 'b' in self._output_mode:
metadata_output = metadata_line.encode('utf-8')
else:
metadata_output = metadata_line
return self._output_stream.write(metadata_output)
def _collect(self, room_name, compress):
event = yield idiokit.next()
while True:
current = datetime.utcnow().day
with _open_archive(self.archive_dir, time.time(), room_name) as archive:
self.log.info("Opened archive {0!r}".format(archive.name))
while current == datetime.utcnow().day:
json_dict = dict((key, event.values(key)) for key in event.keys())
archive.write(json.dumps(json_dict) + os.linesep)
event = yield idiokit.next()
yield compress.queue(0.0, _rename(archive.name))
def output(self):
if self._params['delimiter'] is None:
delim = ','
else:
delim = self._params['delimiter']
if self._params['linesep'] is None:
eol = os.linesep
else:
eol = self._params['linesep']
lines = eol.join([delim.join(map(str, row)) for row in self._params['data_matrix']])
if self._params['header_list']:
header = delim.join(self._params['header_list'])
else:
header = ''
return header + eol + lines
def main(args=None):
if args is None:
args = sys.argv[1:]
# Configure our deprecation warnings to be sent through loggers
deprecation.install_warning_logger()
autocomplete()
try:
cmd_name, cmd_args = parseopts(args)
except PipError as exc:
sys.stderr.write("ERROR: %s" % exc)
sys.stderr.write(os.linesep)
sys.exit(1)
# Needed for locale.getpreferredencoding(False) to work
# in pip.utils.encoding.auto_decode
locale.setlocale(locale.LC_ALL, '')
command = commands_dict[cmd_name](isolated=check_isolated(cmd_args))
return command.main(cmd_args)
# ###########################################################
# # Writing freeze files
def keyPressEvent(self, qkeyevent):
if qkeyevent.matches(QKeySequence.Copy):
selected_rows = [x.row() for x in self.selectionModel().selectedRows()]
logger.info('Copy to clipboard requested [%r rows]' % len(selected_rows))
out_string = ''
for row in selected_rows:
out_string += self.get_row_as_string(row) + os.linesep
if out_string:
QApplication.clipboard().setText(out_string)
else:
super(BasicTable, self).keyPressEvent(qkeyevent)
if qkeyevent.matches(QKeySequence.InsertParagraphSeparator):
if self.hasFocus():
self.on_enter_pressed([(x.row(), x.column()) for x in self.selectedIndexes()])
def print_help():
"""Print help encoded as initial script's comment between #< and #>."""
show_line = False
fp = open(sys.argv[0], 'r')
for line in fp:
if line[0:2] == '#<': # start token
show_line = True
continue
elif line[0:2] == '#>': # end token
return
if show_line == True:
print(line.rstrip(os.linesep)[1:])
fp.close()
def gui_ask_for_api():
"""Gtk dialog for API key insert."""
message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL)
message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>"))
entry = gtk.Entry(max=64)
entry.set_text("Enter your API key")
entry.show()
message.vbox.pack_end(entry)
entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK))
message.set_default_response(gtk.RESPONSE_OK)
message.run()
api_key = entry.get_text().decode('utf8')
fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w')
fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep))
fp.close()
# process buttong click immediately
message.destroy()
while gtk.events_pending():
gtk.main_iteration()
def testOutputSignal(self):
# Use SIGKILL here because it's guaranteed to be delivered. Using
# SIGHUP might not work in, e.g., a buildbot slave run under the
# 'nohup' command.
exe = sys.executable
scriptFile = self.makeSourceFile([
"import sys, os, signal",
"sys.stdout.write('stdout bytes\\n')",
"sys.stderr.write('stderr bytes\\n')",
"sys.stdout.flush()",
"sys.stderr.flush()",
"os.kill(os.getpid(), signal.SIGKILL)"
])
def gotOutputAndValue(err):
(out, err, sig) = err.value # XXX Sigh wtf
self.assertEquals(out, "stdout bytes" + os.linesep)
self.assertEquals(err, "stderr bytes" + os.linesep)
self.assertEquals(sig, signal.SIGKILL)
d = utils.getProcessOutputAndValue(exe, ['-u', scriptFile])
return d.addErrback(gotOutputAndValue)
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
script = open(path, 'rb')
try:
firstline = script.readline()
if not firstline.startswith(binary('#!python')):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = binary('#!') + exename + binary(os.linesep)
rest = script.read()
finally:
script.close()
script = open(path, 'wb')
try:
script.write(firstline)
script.write(rest)
finally:
script.close()
return True
def main(initial_args=None):
if initial_args is None:
initial_args = sys.argv[1:]
autocomplete()
try:
cmd_name, cmd_args = parseopts(initial_args)
except PipError:
e = sys.exc_info()[1]
sys.stderr.write("ERROR: %s" % e)
sys.stderr.write(os.linesep)
sys.exit(1)
command = commands[cmd_name]()
return command.main(cmd_args)
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg