def test_parse_invalid_port(self):
"""
When a listen address is parsed with an invalid port, an error is
raised.
"""
with ExpectedException(
ValueError,
r"'foo' does not appear to be a valid port number"):
parse_listen_addr(':foo')
with ExpectedException(
ValueError,
r"'0' does not appear to be a valid port number"):
parse_listen_addr(':0')
with ExpectedException(
ValueError,
r"'65536' does not appear to be a valid port number"):
parse_listen_addr(':65536')
with ExpectedException(
ValueError,
r"'' does not appear to be a valid port number"):
parse_listen_addr(':')
python类ExpectedException()的实例源码
def test_image_required(self, capfd):
"""
When the main function is given no image argument, it should exit with
a return code of 2 and inform the user of the missing argument.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main(['--tag', 'abc'])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
# More useful error message added to argparse in Python 3
if sys.version_info >= (3,):
# Use re.DOTALL so that '.*' also matches newlines
assert_that(err, MatchesRegex(
r'.*error: the following arguments are required: image$',
re.DOTALL
))
else:
assert_that(
err, MatchesRegex(r'.*error: too few arguments$', re.DOTALL))
def test_semver_precision_requires_version_semver(self, capfd):
"""
When the main function is given the `--semver-precision` option but no
`--version-semver` option, it should exit with a return code of 2 and
inform the user of the missing option.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main(['--semver-precision', '2', 'test-image:abc'])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
assert_that(err, MatchesRegex(
r'.*error: the --semver-precision option requires '
r'--version-semver$',
re.DOTALL
))
def test_version_semver_requires_argument(self, capfd):
"""
When the main function is given the `--version-semver` option without
an argument, an error should be raised.
"""
with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
main([
'--version', '1.2.3',
'--version-semver',
'--semver-precision',
'--', 'test-image',
])
out, err = capfd.readouterr()
assert_that(out, Equals(''))
assert_that(err, MatchesRegex(
r'.*error: argument -P/--semver-precision: expected one argument$',
re.DOTALL
))
def test_validate_provider_segment(self):
segment = {api.NETWORK_TYPE: self.TYPE,
api.PHYSICAL_NETWORK: "phys_net",
api.SEGMENTATION_ID: None}
with testtools.ExpectedException(exc.InvalidInput, ".*specified.*"):
self.driver.validate_provider_segment(segment)
segment[api.PHYSICAL_NETWORK] = None
self.driver.validate_provider_segment(segment)
segment[api.SEGMENTATION_ID] = 1
self.driver.validate_provider_segment(segment)
segment = {api.NETWORK_TYPE: self.TYPE,
api.SEGMENTATION_ID: None,
'bad_key': "bad_value"}
with testtools.ExpectedException(exc.InvalidInput, ".*prohibited.*"):
self.driver.validate_provider_segment(segment)
def test_status_with_file_bad_encoder_fails(self):
node = factory.make_Node(
interface=True, status=NODE_STATUS.COMMISSIONING)
contents = b'These are the contents of the file.'
encoded_content = encode_as_base64(bz2.compress(contents))
payload = {
'event_type': 'finish',
'result': 'FAILURE',
'origin': 'curtin',
'name': 'commissioning',
'description': 'Commissioning',
'timestamp': datetime.utcnow(),
'files': [
{
"path": "sample.txt",
"encoding": "uuencode",
"compression": "bzip2",
"content": encoded_content
}
]
}
with ExpectedException(ValueError):
self.processMessage(node, payload)
def test_status_with_file_bad_compression_fails(self):
node = factory.make_Node(
interface=True, status=NODE_STATUS.COMMISSIONING)
contents = b'These are the contents of the file.'
encoded_content = encode_as_base64(bz2.compress(contents))
payload = {
'event_type': 'finish',
'result': 'FAILURE',
'origin': 'curtin',
'name': 'commissioning',
'description': 'Commissioning',
'timestamp': datetime.utcnow(),
'files': [
{
"path": "sample.txt",
"encoding": "base64",
"compression": "jpeg",
"content": encoded_content
}
]
}
with ExpectedException(ValueError):
self.processMessage(node, payload)
def test__create_node_fails_with_invalid_hostname(self):
self.prepare_rack_rpc()
mac_addresses = [
factory.make_mac_address() for _ in range(3)]
architecture = make_usable_architecture(self)
hostname = random.choice([
"---",
"Microsoft Windows",
])
power_type = random.choice(self.power_types)['name']
power_parameters = {}
with ExpectedException(ValidationError):
create_node(
architecture, power_type, power_parameters,
mac_addresses, hostname=hostname)
def test_cannot_save_if_size_larger_than_volume_group(self):
filesystem_group = factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.LVM_VG)
factory.make_VirtualBlockDevice(
filesystem_group=filesystem_group,
size=filesystem_group.get_size() / 2)
new_block_device_size = filesystem_group.get_size()
human_readable_size = human_readable_bytes(new_block_device_size)
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['There is not enough free space (%s) "
"on volume group %s.']}" % (
human_readable_size,
filesystem_group.name,
))):
factory.make_VirtualBlockDevice(
filesystem_group=filesystem_group,
size=new_block_device_size)
def test_cannot_save_volume_group_if_logical_volumes_larger(self):
node = factory.make_Node()
filesystem_one = factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.LVM_PV,
block_device=factory.make_PhysicalBlockDevice(node=node))
filesystem_two = factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.LVM_PV,
block_device=factory.make_PhysicalBlockDevice(node=node))
filesystems = [
filesystem_one,
filesystem_two,
]
volume_group = factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.LVM_VG,
filesystems=filesystems)
factory.make_VirtualBlockDevice(
size=volume_group.get_size(), filesystem_group=volume_group)
filesystem_two.delete()
with ExpectedException(
ValidationError,
re.escape(
"['Volume group cannot be smaller than its "
"logical volumes.']")):
volume_group.save()
def test_cannot_save_raid_0_with_spare_raid_devices(self):
node = factory.make_Node()
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.RAID,
block_device=factory.make_PhysicalBlockDevice(node=node))
for _ in range(2)
]
filesystems.append(
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.RAID_SPARE,
block_device=factory.make_PhysicalBlockDevice(node=node)))
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 0 must have at least 2 raid "
"devices and no spares.']}")):
factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.RAID_0,
filesystems=filesystems)
def test_cannot_save_raid_5_with_less_than_3_raid_devices(self):
node = factory.make_Node()
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.RAID,
block_device=factory.make_PhysicalBlockDevice(node=node))
for _ in range(random.randint(1, 2))
]
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 5 must have at least 3 raid "
"devices and any number of spares.']}")):
factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.RAID_5,
filesystems=filesystems)
def test_cannot_save_raid_10_with_less_than_3_raid_devices(self):
node = factory.make_Node()
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.RAID,
block_device=factory.make_PhysicalBlockDevice(node=node))
for _ in range(random.randint(1, 2))
]
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 10 must have at least 3 raid "
"devices and any number of spares.']}")):
factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.RAID_10,
filesystems=filesystems)
def test_cannot_save_bcache_without_cache_set(self):
node = factory.make_Node()
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
block_device=factory.make_PhysicalBlockDevice(node=node)),
]
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['Bcache requires an assigned cache "
"set.']}")):
filesystem_group = factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
filesystems=filesystems)
filesystem_group.cache_set = None
filesystem_group.save()
def test_cannot_save_bcache_with_logical_volume_as_backing(self):
node = factory.make_Node()
cache_set = factory.make_CacheSet(node=node)
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
block_device=factory.make_VirtualBlockDevice(node=node)),
]
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['Bcache cannot use a logical volume as a "
"backing device.']}")):
factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
cache_set=cache_set,
filesystems=filesystems)
def test_cannot_save_bcache_with_multiple_backings(self):
node = factory.make_Node()
cache_set = factory.make_CacheSet(node=node)
filesystems = [
factory.make_Filesystem(
fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
block_device=factory.make_PhysicalBlockDevice(node=node))
for _ in range(random.randint(2, 10))
]
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['Bcache can only contain one backing "
"device.']}")):
factory.make_FilesystemGroup(
group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
cache_set=cache_set,
filesystems=filesystems)
def test_create_raid_0_with_one_element_fails(self):
node = factory.make_Node()
block_device = factory.make_PhysicalBlockDevice(node=node)
uuid = str(uuid4())
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 0 must have at least 2 raid "
"devices and no spares.']}")):
RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_0,
uuid=uuid,
block_devices=[block_device],
partitions=[],
spare_devices=[],
spare_partitions=[])
def test_create_raid_1_with_one_element_fails(self):
node = factory.make_Node()
block_device = factory.make_PhysicalBlockDevice(node=node)
uuid = str(uuid4())
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 1 must have at least 2 raid "
"devices and any number of spares.']}")):
RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_1,
uuid=uuid,
block_devices=[block_device],
partitions=[],
spare_devices=[],
spare_partitions=[])
def test_create_raid_5_with_2_elements_fails(self):
node = factory.make_Node()
block_devices = [
factory.make_PhysicalBlockDevice(node=node, size=10 * 1000 ** 4)
for _ in range(2)
]
uuid = str(uuid4())
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 5 must have at least 3 raid "
"devices and any number of spares.']}")):
RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_5,
uuid=uuid,
block_devices=block_devices,
partitions=[],
spare_devices=[],
spare_partitions=[])
def test_create_raid_6_with_3_elements_fails(self):
node = factory.make_Node()
block_devices = [
factory.make_PhysicalBlockDevice(node=node)
for _ in range(3)
]
uuid = str(uuid4())
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['RAID level 6 must have at least 4 raid "
"devices and any number of spares.']}")):
RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_6,
uuid=uuid,
block_devices=block_devices,
partitions=[],
spare_devices=[],
spare_partitions=[])
def test_create_raid_with_block_device_from_other_node_fails(self):
node1 = factory.make_Node()
node2 = factory.make_Node()
block_devices_1 = [
factory.make_PhysicalBlockDevice(node=node1)
for _ in range(5)
]
block_devices_2 = [
factory.make_PhysicalBlockDevice(node=node2)
for _ in range(5)
]
uuid = str(uuid4())
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['All added filesystems must belong to the "
"same node.']}")):
RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_1,
uuid=uuid,
block_devices=block_devices_1 + block_devices_2,
partitions=[],
spare_devices=[],
spare_partitions=[])
def test_add_device_from_another_node_to_array_fails(self):
node = factory.make_Node()
other_node = factory.make_Node()
device_size = 10 * 1000 ** 4
block_devices = [
factory.make_PhysicalBlockDevice(node=node, size=device_size)
for _ in range(10)
]
uuid = str(uuid4())
raid = RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_5,
uuid=uuid,
block_devices=block_devices)
device = factory.make_PhysicalBlockDevice(
node=other_node, size=device_size)
with ExpectedException(
ValidationError,
re.escape(
"['Device needs to be from the same node as the rest of the "
"array.']")):
raid.add_device(device, FILESYSTEM_TYPE.RAID)
self.assertEqual(10, raid.filesystems.count()) # Still 10 devices
self.assertEqual(9 * device_size, raid.get_size())
def test_add_partition_from_another_node_to_array_fails(self):
node = factory.make_Node()
other_node = factory.make_Node()
device_size = 10 * 1000 ** 4
block_devices = [
factory.make_PhysicalBlockDevice(node=node, size=device_size)
for _ in range(10)
]
uuid = str(uuid4())
raid = RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_5,
uuid=uuid,
block_devices=block_devices)
partition = factory.make_PartitionTable(
block_device=factory.make_PhysicalBlockDevice(
node=other_node, size=device_size)).add_partition()
with ExpectedException(
ValidationError,
re.escape(
"['Partition must be on a device from the same node as "
"the rest of the array.']")):
raid.add_partition(partition, FILESYSTEM_TYPE.RAID)
self.assertEqual(10, raid.filesystems.count()) # Nothing added
self.assertEqual(9 * device_size, raid.get_size())
def test_add_already_used_device_to_array_fails(self):
node = factory.make_Node()
device_size = 10 * 1000 ** 4
block_devices = [
factory.make_PhysicalBlockDevice(node=node, size=device_size)
for _ in range(10)
]
uuid = str(uuid4())
raid = RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_5,
uuid=uuid,
block_devices=block_devices)
device = factory.make_PhysicalBlockDevice(node=node, size=device_size)
Filesystem.objects.create(
block_device=device, mount_point='/export/home',
fstype=FILESYSTEM_TYPE.EXT4)
with ExpectedException(
ValidationError,
re.escape(
"['There is another filesystem on this device.']")):
raid.add_device(device, FILESYSTEM_TYPE.RAID)
self.assertEqual(10, raid.filesystems.count()) # Nothing added.
self.assertEqual(9 * device_size, raid.get_size())
def test_remove_device_from_array_fails(self):
node = factory.make_Node()
device_size = 10 * 1000 ** 4
block_devices = [
factory.make_PhysicalBlockDevice(node=node, size=device_size)
for _ in range(10)
]
uuid = str(uuid4())
raid = RAID.objects.create_raid(
name='md0',
level=FILESYSTEM_GROUP_TYPE.RAID_5,
uuid=uuid,
block_devices=block_devices)
with ExpectedException(
ValidationError,
re.escape("['Device does not belong to this array.']")):
raid.remove_device(
factory.make_PhysicalBlockDevice(node=node, size=device_size))
self.assertEqual(10, raid.filesystems.count())
self.assertEqual(9 * device_size, raid.get_size())
def test_allows_multiple_records_unless_cname(self):
dnsdata = factory.make_DNSData(no_ip_addresses=True)
if dnsdata.rrtype == 'CNAME':
with ExpectedException(
ValidationError,
re.escape(
"{'__all__': ['%s']}" % MULTI_CNAME_MSG)):
factory.make_DNSData(
dnsresource=dnsdata.dnsresource,
rrtype='CNAME')
else:
factory.make_DNSData(
dnsresource=dnsdata.dnsresource,
rrtype=dnsdata.rrtype)
self.assertEqual(2, DNSData.objects.filter(
dnsresource=dnsdata.dnsresource).count())
def test__switch_static_to_already_used_ip_address(self):
interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
subnet = factory.make_Subnet(vlan=interface.vlan)
static_ip = factory.make_StaticIPAddress(
alloc_type=IPADDRESS_TYPE.STICKY,
ip=factory.pick_ip_in_Subnet(subnet),
subnet=subnet, interface=interface)
other_interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
used_ip_address = factory.pick_ip_in_Subnet(
subnet, but_not=[static_ip.ip])
factory.make_StaticIPAddress(
alloc_type=IPADDRESS_TYPE.STICKY,
ip=used_ip_address,
subnet=subnet, interface=other_interface)
with ExpectedException(StaticIPAddressUnavailable):
interface.update_ip_address(
static_ip, INTERFACE_LINK_TYPE.STATIC, subnet,
ip_address=used_ip_address)
def test__claim_fails_if_subnet_missing(self):
with transaction.atomic():
interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
subnet = factory.make_Subnet(vlan=interface.vlan)
ip = factory.make_StaticIPAddress(
alloc_type=IPADDRESS_TYPE.AUTO, ip="",
subnet=subnet, interface=interface)
ip.subnet = None
ip.save()
maaslog = self.patch_autospec(interface_module, "maaslog")
with transaction.atomic():
with ExpectedException(StaticIPAddressUnavailable):
interface.claim_auto_ips()
self.expectThat(maaslog.error, MockCalledOnceWith(
"Could not find subnet for interface %s." %
interface.get_log_string()))
def test__lock_released_on_error(self):
exception = factory.make_exception()
self.patch(node_module, 'refresh').side_effect = exception
region = yield deferToDatabase(factory.make_RegionController)
self.patch(node_module, 'get_maas_id').return_value = region.system_id
self.patch(node_module, 'get_sys_info').return_value = {
'hostname': region.hostname,
'architecture': region.architecture,
'osystem': '',
'distro_series': '',
'interfaces': {},
}
with ExpectedException(type(exception)):
yield region.refresh()
lock = NamedLock('refresh')
self.assertFalse(lock.is_locked())
def test__no_save_duplicate_ipranges(self):
subnet = make_plain_subnet()
IPRange(
subnet=subnet,
type=IPRANGE_TYPE.DYNAMIC,
start_ip="192.168.0.100",
end_ip="192.168.0.150",
).save()
# Make the same range again, should fail to save.
iprange = IPRange(
subnet=subnet,
type=IPRANGE_TYPE.DYNAMIC,
start_ip="192.168.0.100",
end_ip="192.168.0.150",
)
with ExpectedException(ValidationError, self.dynamic_overlaps):
iprange.save()