def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
python类replace()的实例源码
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
break
original_headers.write(line.replace(linesep, b'\n'))
while True:
line = self._file.readline()
if line == linesep or not line:
break
headers = original_headers.getvalue()
n = stop - self._file.tell()
assert n >= 0
data = self._file.read(n)
data = data.replace(linesep, b'\n')
return headers + data
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def _create_tmp(self):
"""Create a file in the tmp subdirectory and open and return it."""
now = time.time()
hostname = socket.gethostname()
if '/' in hostname:
hostname = hostname.replace('/', r'\057')
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except FileNotFoundError:
Maildir._count += 1
try:
return _create_carefully(path)
except FileExistsError:
pass
# Fall through to here if stat succeeded or open raised EEXIST.
raise ExternalClashError('Name clash prevented file creation: %s' %
path)
def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
break
original_headers.write(line.replace(linesep, b'\n'))
while True:
line = self._file.readline()
if line == linesep or not line:
break
headers = original_headers.getvalue()
n = stop - self._file.tell()
assert n >= 0
data = self._file.read(n)
data = data.replace(linesep, b'\n')
return headers + data
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def _create_tmp(self):
"""Create a file in the tmp subdirectory and open and return it."""
now = time.time()
hostname = socket.gethostname()
if '/' in hostname:
hostname = hostname.replace('/', r'\057')
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except FileNotFoundError:
Maildir._count += 1
try:
return _create_carefully(path)
except FileExistsError:
pass
# Fall through to here if stat succeeded or open raised EEXIST.
raise ExternalClashError('Name clash prevented file creation: %s' %
path)
def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
break
original_headers.write(line.replace(linesep, b'\n'))
while True:
line = self._file.readline()
if line == linesep or not line:
break
headers = original_headers.getvalue()
n = stop - self._file.tell()
assert n >= 0
data = self._file.read(n)
data = data.replace(linesep, b'\n')
return headers + data
def _dump_message(self, message, target, mangle_from_=False):
# Most files are opened in binary mode to allow predictable seeking.
# To get native line endings on disk, the user-friendly \n line endings
# used in strings and by email.Message are translated here.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
buffer = StringIO.StringIO()
gen = email.generator.Generator(buffer, mangle_from_, 0)
gen.flatten(message)
buffer.seek(0)
target.write(buffer.read().replace('\n', os.linesep))
elif isinstance(message, str):
if mangle_from_:
message = message.replace('\nFrom ', '\n>From ')
message = message.replace('\n', os.linesep)
target.write(message)
elif hasattr(message, 'read'):
while True:
line = message.readline()
if line == '':
break
if mangle_from_ and line.startswith('From '):
line = '>From ' + line[5:]
line = line.replace('\n', os.linesep)
target.write(line)
else:
raise TypeError('Invalid message type: %s' % type(message))
def _create_tmp(self):
"""Create a file in the tmp subdirectory and open and return it."""
now = time.time()
hostname = socket.gethostname()
if '/' in hostname:
hostname = hostname.replace('/', r'\057')
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except OSError, e:
if e.errno == errno.ENOENT:
Maildir._count += 1
try:
return _create_carefully(path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
raise
# Fall through to here if stat succeeded or open raised EEXIST.
raise ExternalClashError('Name clash prevented file creation: %s' %
path)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
from_line = self._file.readline().replace(os.linesep, '')
string = self._file.read(stop - self._file.tell())
msg = self._message_factory(string.replace(os.linesep, '\n'))
msg.set_from(from_line[5:])
return msg
def get_string(self, key, from_=False):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
if not from_:
self._file.readline()
string = self._file.read(stop - self._file.tell())
return string.replace(os.linesep, '\n')
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
return StringIO.StringIO(self.get_string(key).replace('\n',
os.linesep))
def _dump_message(self, message, target, mangle_from_=False):
# Most files are opened in binary mode to allow predictable seeking.
# To get native line endings on disk, the user-friendly \n line endings
# used in strings and by email.Message are translated here.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
buffer = StringIO.StringIO()
gen = email.generator.Generator(buffer, mangle_from_, 0)
gen.flatten(message)
buffer.seek(0)
data = buffer.read().replace('\n', os.linesep)
target.write(data)
if self._append_newline and not data.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
elif isinstance(message, str):
if mangle_from_:
message = message.replace('\nFrom ', '\n>From ')
message = message.replace('\n', os.linesep)
target.write(message)
if self._append_newline and not message.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
elif hasattr(message, 'read'):
lastline = None
while True:
line = message.readline()
if line == '':
break
if mangle_from_ and line.startswith('From '):
line = '>From ' + line[5:]
line = line.replace('\n', os.linesep)
target.write(line)
lastline = line
if self._append_newline and lastline and not lastline.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
else:
raise TypeError('Invalid message type: %s' % type(message))
def _create_tmp(self):
"""Create a file in the tmp subdirectory and open and return it."""
now = time.time()
hostname = socket.gethostname()
if '/' in hostname:
hostname = hostname.replace('/', r'\057')
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except OSError, e:
if e.errno == errno.ENOENT:
Maildir._count += 1
try:
return _create_carefully(path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
raise
# Fall through to here if stat succeeded or open raised EEXIST.
raise ExternalClashError('Name clash prevented file creation: %s' %
path)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
from_line = self._file.readline().replace(os.linesep, '')
string = self._file.read(stop - self._file.tell())
msg = self._message_factory(string.replace(os.linesep, '\n'))
msg.set_from(from_line[5:])
return msg
def get_string(self, key, from_=False):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
if not from_:
self._file.readline()
string = self._file.read(stop - self._file.tell())
return string.replace(os.linesep, '\n')
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
return StringIO.StringIO(self.get_string(key).replace('\n',
os.linesep))
def get_bytes(self, key):
"""Return a bytes representation or raise a KeyError."""
f = open(os.path.join(self._path, self._lookup(key)), 'rb')
try:
return f.read().replace(linesep, b'\n')
finally:
f.close()
def _create_tmp(self):
"""Create a file in the tmp subdirectory and open and return it."""
now = time.time()
hostname = socket.gethostname()
if '/' in hostname:
hostname = hostname.replace('/', r'\057')
if ':' in hostname:
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
try:
os.stat(path)
except OSError as e:
if e.errno == errno.ENOENT:
Maildir._count += 1
try:
return _create_carefully(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
else:
raise
# Fall through to here if stat succeeded or open raised EEXIST.
raise ExternalClashError('Name clash prevented file creation: %s' %
path)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
from_line = self._file.readline().replace(linesep, b'')
string = self._file.read(stop - self._file.tell())
msg = self._message_factory(string.replace(linesep, b'\n'))
msg.set_from(from_line[5:].decode('ascii'))
return msg
def get_bytes(self, key, from_=False):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
if not from_:
self._file.readline()
string = self._file.read(stop - self._file.tell())
return string.replace(linesep, b'\n')
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
break
original_headers.write(line.replace(linesep, b'\n'))
visible_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == linesep or not line:
break
visible_headers.write(line.replace(linesep, b'\n'))
# Read up to the stop, or to the end
n = stop - self._file.tell()
assert n >= 0
body = self._file.read(n)
body = body.replace(linesep, b'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_file(self, key):
"""Return a file-like representation or raise a KeyError."""
return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
def _dump_message(self, message, target, mangle_from_=False):
# Most files are opened in binary mode to allow predictable seeking.
# To get native line endings on disk, the user-friendly \n line endings
# used in strings and by email.Message are translated here.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
buffer = StringIO.StringIO()
gen = email.generator.Generator(buffer, mangle_from_, 0)
gen.flatten(message)
buffer.seek(0)
data = buffer.read().replace('\n', os.linesep)
target.write(data)
if self._append_newline and not data.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
elif isinstance(message, str):
if mangle_from_:
message = message.replace('\nFrom ', '\n>From ')
message = message.replace('\n', os.linesep)
target.write(message)
if self._append_newline and not message.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
elif hasattr(message, 'read'):
lastline = None
while True:
line = message.readline()
if line == '':
break
if mangle_from_ and line.startswith('From '):
line = '>From ' + line[5:]
line = line.replace('\n', os.linesep)
target.write(line)
lastline = line
if self._append_newline and lastline and not lastline.endswith(os.linesep):
# Make sure the message ends with a newline
target.write(os.linesep)
else:
raise TypeError('Invalid message type: %s' % type(message))