python类ExpectedException()的实例源码

test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test__no_save_range_overlap_begin(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make an overlapping range across start_ip, should fail to save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.90",
            end_ip="192.168.0.100",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
        # Try as reserved range.
        iprange.type = IPRANGE_TYPE.RESERVED
        with ExpectedException(ValidationError, self.reserved_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test__no_save_range_overlap_end(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make an overlapping range across end_ip, should fail to save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.140",
            end_ip="192.168.0.160",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test__no_save_range_within_ranges(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test__no_save_range_within_existing_range(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test__no_save_range_within_existing_reserved_range(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test__reserved_range_cannot_overlap_reserved_ranges(self):
        subnet = factory.make_Subnet(
            cidr='192.168.0.0/24',
            gateway_ip='192.168.0.1',
            dns_servers=['192.168.0.50', '192.168.0.200'])
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.1",
            end_ip="192.168.0.250",
        ).save()
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.250",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.reserved_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test__dynamic_range_cannot_overlap_auto_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.AUTO,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test__dynamic_range_cannot_overlap_sticky_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.STICKY,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
test_iprange.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test__dynamic_range_cannot_overlap_user_reserved_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.USER_RESERVED,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()

    # Regression for lp:1580772.
test_websocket_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test__skips_notify_on_controllerinfo_interface_update(self):
        yield deferToDatabase(register_websocket_triggers)
        listener = self.make_listener_without_delay()
        dv = DeferredValue()
        params = self.params.copy()
        controller = yield deferToDatabase(self.create_node, params)
        yield deferToDatabase(self.set_version, controller, '')
        listener.register(self.listener, lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(
                self.set_interface_update_info, controller, '{]', '{}')
            with ExpectedException(CancelledError):
                yield dv.get(timeout=0.2)
        finally:
            yield listener.stopService()
test_system_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_doesnt_sends_message_for_update_on_node_alloc_type_no_ip(self):
        yield deferToDatabase(register_system_triggers)
        node = yield deferToDatabase(self.create_node_with_interface)
        sip = yield deferToDatabase(self.get_node_ip_address, node)
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.update_staticipaddress, sip.id, {
                "alloc_type": IPADDRESS_TYPE.STICKY,
            })
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
test_system_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_doesnt_send_message_for_nic_link_non_authorative_domain(self):
        yield deferToDatabase(register_system_triggers)
        domain = yield deferToDatabase(
            self.create_domain, {'authoritative': False})
        node = yield deferToDatabase(
            self.create_node, {'domain': domain})
        interface = yield deferToDatabase(
            self.create_interface, {'node': node})
        subnet = yield deferToDatabase(self.create_subnet)
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.create_staticipaddress, {
                "interface": interface,
                "subnet": subnet,
            })
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
test_system_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_doesnt_send_message_for_nic_unlink_non_authorative_domain(self):
        yield deferToDatabase(register_system_triggers)
        domain = yield deferToDatabase(
            self.create_domain, {'authoritative': False})
        node = yield deferToDatabase(
            self.create_node, {'domain': domain})
        interface = yield deferToDatabase(
            self.create_interface, {'node': node})
        subnet = yield deferToDatabase(self.create_subnet)
        sip = yield deferToDatabase(self.create_staticipaddress, {
            "interface": interface,
            "subnet": subnet,
        })
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.delete_staticipaddress, sip.id)
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
test_system_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_doesnt_send_message_for_subnet_delete_disabled_rdns(self):
        yield deferToDatabase(register_system_triggers)
        subnet = yield deferToDatabase(
            self.create_subnet, {'rdns_mode': RDNS_MODE.DISABLED})
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.delete_subnet, subnet.id)
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
test_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test__tryConnection_logs_error(self):
        listener = PostgresListenerService()

        exception_type = factory.make_exception_type()
        exception_message = factory.make_name("message")

        startConnection = self.patch(listener, "startConnection")
        startConnection.side_effect = exception_type(exception_message)

        with TwistedLoggerFixture() as logger:
            with ExpectedException(exception_type):
                yield listener.tryConnection()

        self.assertThat(logger.events, HasLength(1))
        self.assertThat(logger.events[0], ContainsDict({
            "log_format": Equals("Unable to connect to database: {error}"),
            "log_level": Equals(LogLevel.error),
            "error": Equals(exception_message),
        }))
test_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test__tryConnection_will_not_retry_if_autoReconnect_not_set(self):
        listener = PostgresListenerService()
        listener.autoReconnect = False

        exception_type = factory.make_exception_type()
        exception_message = factory.make_name("message")

        startConnection = self.patch(listener, "startConnection")
        startConnection.side_effect = exception_type(exception_message)
        deferLater = self.patch(listener_module, "deferLater")
        deferLater.return_value = sentinel.retry

        with ExpectedException(exception_type):
            yield listener.tryConnection()

        self.assertThat(deferLater, MockNotCalled())
test_listener.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test__stopping_cancels_start(self):
        listener = PostgresListenerService()

        # Start then stop immediately, without waiting for start to complete.
        starting = listener.startService()
        starting_spy = DeferredValue()
        starting_spy.observe(starting)
        stopping = listener.stopService()

        # Both `starting` and `stopping` have callbacks yet to fire.
        self.assertThat(starting.callbacks, Not(Equals([])))
        self.assertThat(stopping.callbacks, Not(Equals([])))

        # Wait for the listener to stop.
        yield stopping

        # Neither `starting` nor `stopping` have callbacks. This is because
        # `stopping` chained itself onto the end of `starting`.
        self.assertThat(starting.callbacks, Equals([]))
        self.assertThat(stopping.callbacks, Equals([]))

        # Confirmation that `starting` was cancelled.
        with ExpectedException(CancelledError):
            yield starting_spy.get()
test_vlan.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test__non_superuser_reloads_user(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        user.is_superuser = False
        user.save()
        rack = factory.make_RackController()
        vlan = factory.make_VLAN()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        vlan.dhcp_on = True
        vlan.primary_rack = rack
        vlan.save()
        factory.make_ipv4_Subnet_with_IPRanges(vlan=vlan)
        with ExpectedException(AssertionError, "Permission denied."):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": []
            })
test_vlan.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test__configure_dhcp_gateway_outside_subnet_raises(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        vlan = factory.make_VLAN()
        rack = factory.make_RackController()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        subnet = factory.make_Subnet(
            vlan=vlan, cidr="10.0.0.0/24", gateway_ip="")
        self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0))
        with ExpectedException(ValueError):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": [rack.system_id],
                "extra": {
                    "subnet": subnet.id,
                    "gateway": "1.0.0.1",
                    "start": "10.0.0.2",
                    "end": "10.0.0.99"
                }
            })
test_vlan.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test__configure_dhcp_gateway_inside_range_raises(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        vlan = factory.make_VLAN()
        rack = factory.make_RackController()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        subnet = factory.make_Subnet(
            vlan=vlan, cidr="10.0.0.0/24", gateway_ip="")
        self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0))
        with ExpectedException(ValueError):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": [rack.system_id],
                "extra": {
                    "subnet": subnet.id,
                    "gateway": "10.0.0.1",
                    "start": "10.0.0.1",
                    "end": "10.0.0.99"
                }
            })
        vlan = reload_object(vlan)


问题


面经


文章

微信
公众号

扫码关注公众号