def _check_iteration(self, method, do_keys, do_values, repetitions=10):
for value in method():
self.fail("Not empty")
keys, values = [], []
for i in xrange(repetitions):
keys.append(self._box.add(self._template % i))
values.append(self._template % i)
if do_keys and not do_values:
returned_keys = list(method())
elif do_values and not do_keys:
returned_values = list(method())
else:
returned_keys, returned_values = [], []
for key, value in method():
returned_keys.append(key)
returned_values.append(value)
if do_keys:
self.assertEqual(len(keys), len(returned_keys))
self.assertEqual(set(keys), set(returned_keys))
if do_values:
count = 0
for value in returned_values:
self.assertEqual(value['from'], 'foo')
self.assertLess(int(value.get_payload()), repetitions)
count += 1
self.assertEqual(len(values), count)
python类get_payload()的实例源码
def test_popitem(self, iterations=10):
# Get and remove an arbitrary (key, message) using popitem()
keys = []
for i in xrange(10):
keys.append(self._box.add(self._template % i))
seen = []
for i in xrange(10):
key, msg = self._box.popitem()
self.assertIn(key, keys)
self.assertNotIn(key, seen)
seen.append(key)
self.assertEqual(int(msg.get_payload()), keys.index(key))
self.assertEqual(len(self._box), 0)
for key in keys:
self.assertRaises(KeyError, lambda: self._box[key])
def test_initialize_with_nothing(self):
# Initialize without arguments
msg = self._factory()
self._post_initialize_hook(msg)
self.assertIsInstance(msg, email.message.Message)
self.assertIsInstance(msg, mailbox.Message)
self.assertIsInstance(msg, self._factory)
self.assertEqual(msg.keys(), [])
self.assertFalse(msg.is_multipart())
self.assertIsNone(msg.get_payload())
def _check_sample(self, msg):
# Inspect a mailbox.Message representation of the sample message
self.assertIsInstance(msg, email.message.Message)
self.assertIsInstance(msg, mailbox.Message)
for key, value in _sample_headers.items():
self.assertIn(value, msg.get_all(key))
self.assertTrue(msg.is_multipart())
self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
for i, payload in enumerate(_sample_payloads):
part = msg.get_payload(i)
self.assertIsInstance(part, email.message.Message)
self.assertNotIsInstance(part, mailbox.Message)
self.assertEqual(part.get_payload(), payload)
def test_add_8bit_body(self):
key = self._box.add(self._non_latin_bin_msg)
self.assertEqual(self._box.get_bytes(key),
self._non_latin_bin_msg)
with self._box.get_file(key) as f:
self.assertEqual(f.read(),
self._non_latin_bin_msg.replace(b'\n',
os.linesep.encode()))
self.assertEqual(self._box[key].get_payload(),
"??, ??? ?????.\n")
def test_get(self):
# Retrieve messages using get()
key0 = self._box.add(self._template % 0)
msg = self._box.get(key0)
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.get_payload(), '0\n')
self.assertIs(self._box.get('foo'), None)
self.assertIs(self._box.get('foo', False), False)
self._box.close()
self._box = self._factory(self._path)
key1 = self._box.add(self._template % 1)
msg = self._box.get(key1)
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.get_payload(), '1\n')
def test_getitem(self):
# Retrieve message using __getitem__()
key0 = self._box.add(self._template % 0)
msg = self._box[key0]
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.get_payload(), '0\n')
self.assertRaises(KeyError, lambda: self._box['foo'])
self._box.discard(key0)
self.assertRaises(KeyError, lambda: self._box[key0])
def test_get_message(self):
# Get Message representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
msg0 = self._box.get_message(key0)
self.assertIsInstance(msg0, mailbox.Message)
self.assertEqual(msg0['from'], 'foo')
self.assertEqual(msg0.get_payload(), '0\n')
self._check_sample(self._box.get_message(key1))
def test_popitem(self, iterations=10):
# Get and remove an arbitrary (key, message) using popitem()
keys = []
for i in range(10):
keys.append(self._box.add(self._template % i))
seen = []
for i in range(10):
key, msg = self._box.popitem()
self.assertIn(key, keys)
self.assertNotIn(key, seen)
seen.append(key)
self.assertEqual(int(msg.get_payload()), keys.index(key))
self.assertEqual(len(self._box), 0)
for key in keys:
self.assertRaises(KeyError, lambda: self._box[key])
def test_initialize_with_nothing(self):
# Initialize without arguments
msg = self._factory()
self._post_initialize_hook(msg)
self.assertIsInstance(msg, email.message.Message)
self.assertIsInstance(msg, mailbox.Message)
self.assertIsInstance(msg, self._factory)
self.assertEqual(msg.keys(), [])
self.assertFalse(msg.is_multipart())
self.assertEqual(msg.get_payload(), None)
def _check_sample(self, msg):
# Inspect a mailbox.Message representation of the sample message
self.assertIsInstance(msg, email.message.Message)
self.assertIsInstance(msg, mailbox.Message)
for key, value in _sample_headers.iteritems():
self.assertIn(value, msg.get_all(key))
self.assertTrue(msg.is_multipart())
self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
for i, payload in enumerate(_sample_payloads):
part = msg.get_payload(i)
self.assertIsInstance(part, email.message.Message)
self.assertNotIsInstance(part, mailbox.Message)
self.assertEqual(part.get_payload(), payload)
def test_get(self):
# Retrieve messages using get()
key0 = self._box.add(self._template % 0)
msg = self._box.get(key0)
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.get_payload(), '0\n')
self.assertIsNone(self._box.get('foo'))
self.assertFalse(self._box.get('foo', False))
self._box.close()
self._box = self._factory(self._path, factory=rfc822.Message)
key1 = self._box.add(self._template % 1)
msg = self._box.get(key1)
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.fp.read(), '1' + os.linesep)
msg.fp.close()
def test_getitem(self):
# Retrieve message using __getitem__()
key0 = self._box.add(self._template % 0)
msg = self._box[key0]
self.assertEqual(msg['from'], 'foo')
self.assertEqual(msg.get_payload(), '0\n')
self.assertRaises(KeyError, lambda: self._box['foo'])
self._box.discard(key0)
self.assertRaises(KeyError, lambda: self._box[key0])
def test_get_message(self):
# Get Message representations of messages
key0 = self._box.add(self._template % 0)
key1 = self._box.add(_sample_message)
msg0 = self._box.get_message(key0)
self.assertIsInstance(msg0, mailbox.Message)
self.assertEqual(msg0['from'], 'foo')
self.assertEqual(msg0.get_payload(), '0\n')
self._check_sample(self._box.get_message(key1))