def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
python类get_sequences()的实例源码
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.keys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'rb+')
else:
f = open(os.path.join(self._path, str(key)), 'rb')
except OSError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
with f:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
for name, key_list in self.get_sequences().items():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
return results
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'rb+')
else:
f = open(os.path.join(self._path, str(key)), 'rb')
except OSError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
with f:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
for name, key_list in self.get_sequences().items():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
return results
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'r+')
else:
f = open(os.path.join(self._path, str(key)), 'r')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
finally:
f.close()
for name, key_list in self.get_sequences().iteritems():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
f = open(os.path.join(self._path, '.mh_sequences'), 'r')
try:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
finally:
f.close()
return results
def _dump_sequences(self, message, key):
"""Inspect a new MHMessage and update sequences appropriately."""
pending_sequences = message.get_sequences()
all_sequences = self.get_sequences()
for name, key_list in all_sequences.iteritems():
if name in pending_sequences:
key_list.append(key)
elif key in key_list:
del key_list[key_list.index(key)]
for sequence in pending_sequences:
if sequence not in all_sequences:
all_sequences[sequence] = [key]
self.set_sequences(all_sequences)
def get_sequences(self):
"""Return a list of sequences that include the message."""
return self._sequences[:]
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'r+')
else:
f = open(os.path.join(self._path, str(key)), 'r')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
finally:
f.close()
for name, key_list in self.get_sequences().iteritems():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
f = open(os.path.join(self._path, '.mh_sequences'), 'r')
try:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
finally:
f.close()
return results
def _dump_sequences(self, message, key):
"""Inspect a new MHMessage and update sequences appropriately."""
pending_sequences = message.get_sequences()
all_sequences = self.get_sequences()
for name, key_list in all_sequences.iteritems():
if name in pending_sequences:
key_list.append(key)
elif key in key_list:
del key_list[key_list.index(key)]
for sequence in pending_sequences:
if sequence not in all_sequences:
all_sequences[sequence] = [key]
self.set_sequences(all_sequences)
def get_sequences(self):
"""Return a list of sequences that include the message."""
return self._sequences[:]
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'rb+')
else:
f = open(os.path.join(self._path, str(key)), 'rb')
except IOError as e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
finally:
f.close()
for name, key_list in self.get_sequences().items():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
f = open(os.path.join(self._path, '.mh_sequences'), 'r')
try:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
finally:
f.close()
return results
def _dump_sequences(self, message, key):
"""Inspect a new MHMessage and update sequences appropriately."""
pending_sequences = message.get_sequences()
all_sequences = self.get_sequences()
for name, key_list in all_sequences.items():
if name in pending_sequences:
key_list.append(key)
elif key in key_list:
del key_list[key_list.index(key)]
for sequence in pending_sequences:
if sequence not in all_sequences:
all_sequences[sequence] = [key]
self.set_sequences(all_sequences)
def get_sequences(self):
"""Return a list of sequences that include the message."""
return self._sequences[:]
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
try:
if self._locked:
f = open(os.path.join(self._path, str(key)), 'r+')
else:
f = open(os.path.join(self._path, str(key)), 'r')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
msg = MHMessage(f)
finally:
if self._locked:
_unlock_file(f)
finally:
f.close()
for name, key_list in self.get_sequences().iteritems():
if key in key_list:
msg.add_sequence(name)
return msg
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
f = open(os.path.join(self._path, '.mh_sequences'), 'r')
try:
all_keys = set(self.keys())
for line in f:
try:
name, contents = line.split(':')
keys = set()
for spec in contents.split():
if spec.isdigit():
keys.add(int(spec))
else:
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if key in all_keys]
if len(results[name]) == 0:
del results[name]
except ValueError:
raise FormatError('Invalid sequence specification: %s' %
line.rstrip())
finally:
f.close()
return results
def _dump_sequences(self, message, key):
"""Inspect a new MHMessage and update sequences appropriately."""
pending_sequences = message.get_sequences()
all_sequences = self.get_sequences()
for name, key_list in all_sequences.iteritems():
if name in pending_sequences:
key_list.append(key)
elif key in key_list:
del key_list[key_list.index(key)]
for sequence in pending_sequences:
if sequence not in all_sequences:
all_sequences[sequence] = [key]
self.set_sequences(all_sequences)