def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
starts, stops = [], []
self._file.seek(0)
while True:
line_pos = self._file.tell()
line = self._file.readline()
if line.startswith('From '):
if len(stops) < len(starts):
stops.append(line_pos - len(os.linesep))
starts.append(line_pos)
elif line == '':
stops.append(line_pos)
break
self._toc = dict(enumerate(zip(starts, stops)))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
python类tell()的实例源码
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def _append_message(self, message):
"""Append message to mailbox and return (start, stop) offsets."""
self._file.seek(0, 2)
before = self._file.tell()
if len(self._toc) == 0 and not self._pending:
# This is the first message, and the _pre_mailbox_hook
# hasn't yet been called. If self._pending is True,
# messages have been removed, so _pre_mailbox_hook must
# have been called already.
self._pre_mailbox_hook(self._file)
try:
self._pre_message_hook(self._file)
offsets = self._install_message(message)
self._post_message_hook(self._file)
except BaseException:
self._file.truncate(before)
raise
self._file.flush()
self._file_length = self._file.tell() # Record current length of mailbox
return offsets
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def next(self):
while 1:
self.fp.seek(self.seekp)
try:
self._search_start()
except EOFError:
self.seekp = self.fp.tell()
return None
start = self.fp.tell()
self._search_end()
self.seekp = stop = self.fp.tell()
if start != stop:
break
return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
starts, stops = [], []
self._file.seek(0)
while True:
line_pos = self._file.tell()
line = self._file.readline()
if line.startswith(b'From '):
if len(stops) < len(starts):
stops.append(line_pos - len(linesep))
starts.append(line_pos)
elif not line:
stops.append(line_pos)
break
self._toc = dict(enumerate(zip(starts, stops)))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
while True:
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
break
original_headers.write(line.replace(linesep, b'\n'))
while True:
line = self._file.readline()
if line == linesep or not line:
break
headers = original_headers.getvalue()
n = stop - self._file.tell()
assert n >= 0
data = self._file.read(n)
data = data.replace(linesep, b'\n')
return headers + data
def _append_message(self, message):
"""Append message to mailbox and return (start, stop) offsets."""
self._file.seek(0, 2)
before = self._file.tell()
if len(self._toc) == 0 and not self._pending:
# This is the first message, and the _pre_mailbox_hook
# hasn't yet been called. If self._pending is True,
# messages have been removed, so _pre_mailbox_hook must
# have been called already.
self._pre_mailbox_hook(self._file)
try:
self._pre_message_hook(self._file)
offsets = self._install_message(message)
self._post_message_hook(self._file)
except BaseException:
self._file.truncate(before)
raise
self._file.flush()
self._file_length = self._file.tell() # Record current length of mailbox
return offsets
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def next(self):
while 1:
self.fp.seek(self.seekp)
try:
self._search_start()
except EOFError:
self.seekp = self.fp.tell()
return None
start = self.fp.tell()
self._search_end()
self.seekp = stop = self.fp.tell()
if start != stop:
break
return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!
def _append_message(self, message):
"""Append message to mailbox and return (start, stop) offsets."""
self._file.seek(0, 2)
before = self._file.tell()
if len(self._toc) == 0 and not self._pending:
# This is the first message, and the _pre_mailbox_hook
# hasn't yet been called. If self._pending is True,
# messages have been removed, so _pre_mailbox_hook must
# have been called already.
self._pre_mailbox_hook(self._file)
try:
self._pre_message_hook(self._file)
offsets = self._install_message(message)
self._post_message_hook(self._file)
except BaseException:
self._file.truncate(before)
raise
self._file.flush()
self._file_length = self._file.tell() # Record current length of mailbox
return offsets
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def next(self):
while 1:
self.fp.seek(self.seekp)
try:
self._search_start()
except EOFError:
self.seekp = self.fp.tell()
return None
start = self.fp.tell()
self._search_end()
self.seekp = stop = self.fp.tell()
if start != stop:
break
return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!
def _append_message(self, message):
"""Append message to mailbox and return (start, stop) offsets."""
self._file.seek(0, 2)
before = self._file.tell()
if len(self._toc) == 0 and not self._pending:
# This is the first message, and the _pre_mailbox_hook
# hasn't yet been called. If self._pending is True,
# messages have been removed, so _pre_mailbox_hook must
# have been called already.
self._pre_mailbox_hook(self._file)
try:
self._pre_message_hook(self._file)
offsets = self._install_message(message)
self._post_message_hook(self._file)
except BaseException:
self._file.truncate(before)
raise
self._file.flush()
self._file_length = self._file.tell() # Record current length of mailbox
return offsets
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def next(self):
while 1:
self.fp.seek(self.seekp)
try:
self._search_start()
except EOFError:
self.seekp = self.fp.tell()
return None
start = self.fp.tell()
self._search_end()
self.seekp = stop = self.fp.tell()
if start != stop:
break
return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!
def _append_message(self, message):
"""Append message to mailbox and return (start, stop) offsets."""
self._file.seek(0, 2)
before = self._file.tell()
if len(self._toc) == 0 and not self._pending:
# This is the first message, and the _pre_mailbox_hook
# hasn't yet been called. If self._pending is True,
# messages have been removed, so _pre_mailbox_hook must
# have been called already.
self._pre_mailbox_hook(self._file)
try:
self._pre_message_hook(self._file)
offsets = self._install_message(message)
self._post_message_hook(self._file)
except BaseException:
self._file.truncate(before)
raise
self._file.flush()
self._file_length = self._file.tell() # Record current length of mailbox
return offsets
def _install_message(self, message):
"""Format a message and blindly write to self._file."""
from_line = None
if isinstance(message, str) and message.startswith('From '):
newline = message.find('\n')
if newline != -1:
from_line = message[:newline]
message = message[newline + 1:]
else:
from_line = message
message = ''
elif isinstance(message, _mboxMMDFMessage):
from_line = 'From ' + message.get_from()
elif isinstance(message, email.message.Message):
from_line = message.get_unixfrom() # May be None.
if from_line is None:
from_line = 'From MAILER-DAEMON %s' % time.asctime(time.gmtime())
start = self._file.tell()
self._file.write(from_line + os.linesep)
self._dump_message(message, self._file, self._mangle_from_)
stop = self._file.tell()
return (start, stop)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
visible_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
visible_headers.write(line.replace(os.linesep, '\n'))
body = self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
if key in self._labels:
msg.set_labels(self._labels[key])
return msg
def get_string(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
self._file.readline() # Skip '1,' line specifying labels.
original_headers = StringIO.StringIO()
while True:
line = self._file.readline()
if line == '*** EOOH ***' + os.linesep or line == '':
break
original_headers.write(line.replace(os.linesep, '\n'))
while True:
line = self._file.readline()
if line == os.linesep or line == '':
break
return original_headers.getvalue() + \
self._file.read(stop - self._file.tell()).replace(os.linesep,
'\n')
def next(self):
while 1:
self.fp.seek(self.seekp)
try:
self._search_start()
except EOFError:
self.seekp = self.fp.tell()
return None
start = self.fp.tell()
self._search_end()
self.seekp = stop = self.fp.tell()
if start != stop:
break
return self.factory(_PartialFile(self.fp, start, stop))
# Recommended to use PortableUnixMailbox instead!