def test_package_update(self):
idf.plugin.create_country_codes()
helpers.call_action(
'package_create', name='test_package',
custom_text='this is my custom text', country_code='uk',
resources=[{
'url': 'http://test.com/',
'custom_resource_text': 'my custom resource',
}])
result = helpers.call_action(
'package_update',
name='test_package',
custom_text='this is my updated text',
country_code='ie',
resources=[{
'url': 'http://test.com/',
'custom_resource_text': 'updated custom resource',
}]
)
nt.assert_equals('this is my updated text', result['custom_text'])
nt.assert_equals([u'ie'], result['country_code'])
nt.assert_equals('updated custom resource',
result['resources'][0]['custom_resource_text'])
python类assert_equals()的实例源码
def test_returns_no_tracking_when_no_execution_id_is_found(self):
self.engine.cwlogs.get_log_events.side_effect = CloudWatchStreamDoesNotExist()
workflow_state = {e: core.STATE_UNKNOWN for e in self.workflow_events}
execution_id = "transaction-id-123"
expected = {
"events_defined": workflow_state,
"events_received": [],
"tracking_summary": {
"last_received_event": None,
"subscribers": [],
"execution_path": self.engine \
._generate_execution_path(workflow_state)
}
}
actual = self.engine.track(self.workflow_id, self.execution_id)
nt.assert_equals(expected, actual)
def test_successfully_gets_log_events(self):
mocked_timestamp = 1476826208 * 1000
mocked_message = '{"foo": "bar"}'
mocked_events = {
"events": [
{
"timestamp": mocked_timestamp,
"message": mocked_message
}
],
"nextForwardToken": None
}
self.logs.cwlogs.get_log_events.return_value = mocked_events
expected = [{
"timestamp": utils.format_datetime(datetime.fromtimestamp(mocked_timestamp / 1000)),
"data": json.loads(mocked_message)
}]
actual = self.logs.get_log_events(self.log_group, self.log_stream)
nt.assert_equals(expected, actual)
def test_simple_engine():
engine = Engine({
'datasource': {
'class': TestDataSource,
'params': {
'csv': test_data_file,
},
},
'algorithm': {
'class': TestSimpleAlgorithm,
'params': {
'model.pickle': '~/.tidml/tests/model.pkl', # default built-in
},
},
})
engine.train()
models = engine.load_models()
prediction = engine.predict(models, 3)
nt.assert_equals(prediction, 6)
def test_multiple_algorithms_engine():
engine = Engine({
'datasource': TestEmptyDataSource,
'algorithms': {
'algo1': {
'class': TestMultiAlgorithm,
'params': {'p': 'A'},
},
'algo2': {
'class': TestMultiAlgorithm,
'params': {'p': 'B'},
},
},
'serving': TestIdentityServing,
})
models = {
'algo1': object(),
'algo2': object(),
}
nt.assert_equals(engine.predict(models, None), {
'algo1': 'A',
'algo2': 'B',
})
def test_ipv4_async():
global FLAG
FLAG = Value('i', 0)
nma = nmap.PortScannerAsync()
def callback_result(host, scan_result):
global FLAG
FLAG.value = 1
nma.scan(hosts='127.0.0.1',
arguments='-p 22 -Pn',
callback=callback_result)
while nma.still_scanning():
nma.wait(2)
assert_equals(FLAG.value, 1)
def test_ipv6_async():
global FLAG
FLAG = Value('i', 0)
nma = nmap.PortScannerAsync()
def callback_result(host, scan_result):
global FLAG
FLAG.value = 1
nma.scan(hosts='::1',
arguments='-6 -p 22 -Pn',
callback=callback_result)
while nma.still_scanning():
nma.wait(2)
assert_equals(FLAG.value, 1)
def test_multipe_osmatch():
assert('osmatch' in nm['127.0.0.1'])
assert('portused' in nm['127.0.0.1'])
for osm in nm['127.0.0.1']['osmatch']:
assert('accuracy' in osm)
assert('line' in osm)
assert('name' in osm)
assert('osclass' in osm)
assert('accuracy' in osm['osclass'][0])
assert('cpe' in osm['osclass'][0])
assert('osfamily' in osm['osclass'][0])
assert('osgen' in osm['osclass'][0])
assert('type' in osm['osclass'][0])
assert('vendor' in osm['osclass'][0])
# def test_host_and_port_as_unicode():
# # nosetests -x -s nmap/test_nmap.py:test_port_as_unicode
# # Covers bug : https://bitbucket.org/xael/python-nmap/issues/9/can-not-pass-ports-with-unicode-string-at
# nma = nm.scan(hosts=u'127.0.0.1', ports=u'22')
# assert_equals(nma['nmap']['scaninfo']['error'], '')
def test_Resources():
rANO = gantt.Resource('ANO')
rANO.add_vacations(
dfrom=datetime.date(2015, 2, 2),
dto=datetime.date(2015, 2, 4)
)
# test global vacations
assert_equals(rANO.is_available(datetime.date(2015, 1, 1)), False)
# test resource vacations
assert_equals(rANO.is_available(datetime.date(2015, 2, 1)), True)
assert_equals(rANO.is_available(datetime.date(2015, 2, 2)), False)
assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False)
assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False)
assert_equals(rANO.is_available(datetime.date(2015, 2, 5)), True)
# Second resource
rJLS = gantt.Resource('JLS')
return
def test_latex_completions():
from IPython.core.latex_symbols import latex_symbols
import random
ip = get_ipython()
# Test some random unicode symbols
keys = random.sample(latex_symbols.keys(), 10)
for k in keys:
text, matches = ip.complete(k)
nt.assert_equal(len(matches),1)
nt.assert_equal(text, k)
nt.assert_equal(matches[0], latex_symbols[k])
# Test a more complex line
text, matches = ip.complete(u'print(\\alpha')
nt.assert_equals(text, u'\\alpha')
nt.assert_equals(matches[0], latex_symbols['\\alpha'])
# Test multiple matching latex symbols
text, matches = ip.complete(u'\\al')
nt.assert_in('\\alpha', matches)
nt.assert_in('\\aleph', matches)
def test_image_to_obs(self, mock_ps, mock_dl):
session = jobs.db_session()
session.query(bm.Observation).delete()
session.commit()
obs = session.query(bm.Observation).all()
nt.assert_equals(obs, [])
teardown_singleton_pubsub()
mock_dl.return_value = [(7.0, 'mockingbird')]
jobs.image_to_observation(self.file_path_gps, self.file_path)
obs = session.query(bm.Observation).all()
# TODO: test better what's in this obs
nt.assert_equals(len(obs), 1)
mock_dl.assert_called_once_with(self.file_path)
mock_ps.assert_called_once_with()
# as long as we return the repr of geometry, we cannot do this assert:
# mock_ps().publish.assert_called_once_with(obs[0].as_public_dict())
nt.assert_equals(mock_ps().publish.call_count, 1)
def test_ipv4_async():
global FLAG
FLAG = Value('i', 0)
nma = nmap.PortScannerAsync()
def callback_result(host, scan_result):
global FLAG
FLAG.value = 1
nma.scan(hosts='127.0.0.1',
arguments='-p 22 -Pn',
callback=callback_result)
while nma.still_scanning():
nma.wait(2)
assert_equals(FLAG.value, 1)
def test_ipv6_async():
global FLAG_ipv6
FLAG_ipv6 = Value('i', 0)
nma_ipv6 = nmap.PortScannerAsync()
def callback_result(host, scan_result):
global FLAG_ipv6
FLAG_ipv6.value = 1
nma_ipv6.scan(hosts='::1',
arguments='-6 -p 22 -Pn',
callback=callback_result)
while nma_ipv6.still_scanning():
nma_ipv6.wait(2)
assert_equals(FLAG_ipv6.value, 1)
def test_ipv6_async():
global FLAG_ipv6
FLAG_ipv6 = Value('i', 0)
nma_ipv6 = nmap.PortScannerAsync()
def callback_result(host, scan_result):
global FLAG_ipv6
FLAG_ipv6.value = 1
nma_ipv6.scan(hosts='::1',
arguments='-6 -p 22 -Pn',
callback=callback_result)
while nma_ipv6.still_scanning():
nma_ipv6.wait(2)
assert_equals(FLAG_ipv6.value, 1)
def prepare_migration_attachments_ipv6(api):
engine = api.system_service()
hosts_service = engine.hosts_service()
for index, host in enumerate(
test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME),
start=1):
host_service = hosts_service.host_service(id=host.id)
ip_address = MIGRATION_NETWORK_IPv6_ADDR.format(index)
ip_configuration = network_utils_v4.create_static_ip_configuration(
ipv6_addr=ip_address,
ipv6_mask=MIGRATION_NETWORK_IPv6_MASK)
network_utils_v4.modify_ip_config(
engine, host_service, MIGRATION_NETWORK, ip_configuration)
actual_address = next(nic for nic in host_service.nics_service().list()
if nic.name == VLAN200_IF_NAME).ipv6.address
nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
def attach_vm_network_to_host_static_config(api):
host = test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME)[0]
ip_configuration = network_utils_v3.create_static_ip_configuration(
VM_NETWORK_IPv4_ADDR,
VM_NETWORK_IPv4_MASK,
VM_NETWORK_IPv6_ADDR,
VM_NETWORK_IPv6_MASK)
network_utils_v3.attach_network_to_host(
api,
host,
NIC_NAME,
VM_NETWORK,
ip_configuration)
# TODO: currently ost uses v3 SDK that doesn't report ipv6. once available,
# verify ipv6 as well.
nt.assert_equals(
host.nics.list(name=VLAN_IF_NAME)[0].ip.address,
VM_NETWORK_IPv4_ADDR)
def prepare_migration_attachments_ipv4(api):
for index, host in enumerate(
test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME),
start=1):
ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index)
ip_configuration = network_utils_v3.create_static_ip_configuration(
ipv4_addr=ip_address,
ipv4_mask=MIGRATION_NETWORK_IPv4_MASK)
network_utils_v3.attach_network_to_host(
api, host, NIC_NAME, MIGRATION_NETWORK, ip_configuration)
nt.assert_equals(
host.nics.list(name=VLAN200_IF_NAME)[0].ip.address,
ip_address)
def prepare_migration_attachments_ipv4(api):
engine = api.system_service()
hosts_service = engine.hosts_service()
for index, host in enumerate(
test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME),
start=1):
host_service = hosts_service.host_service(id=host.id)
ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index)
ip_configuration = network_utils_v4.create_static_ip_configuration(
ipv4_addr=ip_address,
ipv4_mask=MIGRATION_NETWORK_IPv4_MASK)
network_utils_v4.attach_network_to_host(
host_service, NIC_NAME, MIGRATION_NETWORK, ip_configuration)
actual_address = next(nic for nic in host_service.nics_service().list()
if nic.name == VLAN200_IF_NAME).ip.address
nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
def test_latex_completions():
from IPython.core.latex_symbols import latex_symbols
import random
ip = get_ipython()
# Test some random unicode symbols
keys = random.sample(latex_symbols.keys(), 10)
for k in keys:
text, matches = ip.complete(k)
nt.assert_equal(len(matches),1)
nt.assert_equal(text, k)
nt.assert_equal(matches[0], latex_symbols[k])
# Test a more complex line
text, matches = ip.complete(u'print(\\alpha')
nt.assert_equals(text, u'\\alpha')
nt.assert_equals(matches[0], latex_symbols['\\alpha'])
# Test multiple matching latex symbols
text, matches = ip.complete(u'\\al')
nt.assert_in('\\alpha', matches)
nt.assert_in('\\aleph', matches)
def test_property_docstring_is_in_info_for_detail_level_0():
class A(object):
@property
def foobar(self):
"""This is `foobar` property."""
pass
ip.user_ns['a_obj'] = A()
nt.assert_equals(
'This is `foobar` property.',
ip.object_inspect('a_obj.foobar', detail_level=0)['docstring'])
ip.user_ns['a_cls'] = A
nt.assert_equals(
'This is `foobar` property.',
ip.object_inspect('a_cls.foobar', detail_level=0)['docstring'])
def test_open_dois(self, test_data):
(doi, fulltext_url, license) = test_data
my_pub = pub.lookup_product_by_doi(doi)
my_pub.recalculate()
logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url))
logger.info(u"doi: http://doi.org/{}".format(doi))
logger.info(u"title: {}".format(my_pub.best_title))
logger.info(u"evidence: {}\n\n".format(my_pub.evidence))
if my_pub.error:
logger.info(my_pub.error)
assert_not_equals(my_pub.fulltext_url, None)
# @data(*closed_dois)
# def test_closed_dois(self, test_data):
# (doi, fulltext_url, license) = test_data
# my_pub = pub.lookup_product_by_doi(doi)
# my_pub.recalculate()
#
# logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url))
# logger.info(u"doi: http://doi.org/{}".format(doi))
# logger.info(u"title: {}".format(my_pub.best_title))
# logger.info(u"evidence: {}\n\n".format(my_pub.evidence))
# if my_pub.error:
# logger.info(my_pub.error)
#
# assert_equals(my_pub.fulltext_url, None)
#
# have to scrape the publisher pages to find these
def test_initialization(self):
assert_equals(Acl('test').name, 'test')
def test_initialization_default(self):
assert_equals(Acl().name, Acl._default_name)
def test_add(self):
acl = Acl()
acl.add(Ace())
assert_equals(len(acl), 1)
assert_equals(acl[-1]._line_number, 1)
acl.add(Ace())
assert_equals(len(acl), 1)
assert_equals(acl[-1]._line_number, 1)
acl.add(Ace(logging=2))
assert_equals(acl[-1]._line_number, 2)
def test_remove(self):
acl = Acl()
acl.add(Ace())
acl.remove(Ace())
assert_equals(len(acl), 0)
def test_getitem(self):
acl = Acl()
ace = Ace()
acl.add(ace)
assert_equals(id(acl[0]), id(ace))
def test_eqaulity(self):
assert_equals(Acl(), Acl())
acl01 = Acl()
acl01.add(Ace(logging=2))
acl02 = Acl()
acl02.add(Ace(logging=2))
assert_equals(acl01, acl02)
def test_repr(self):
expected = '<Acl test01 #0>'
assert_equals(Acl('test01').__repr__(), expected)
def test_str(self):
acl = Acl('test01')
expected = 'Acl test01 #0'
assert_equals(acl.__str__(), expected)
acl.add(Ace(permit=False, network='1.2.3.0/24 4.5.6.0/24'))
expected = 'Acl test01 #1\n\tdeny ip 1.2.3.0/24 4.5.6.0/24'
assert_equals(acl.__str__(), expected)
def test1(self):
"""r2 does not verify the formula."""
assert_equals(self.hierarchy.check_all_ancestors("g2"),
{'g1':
{'f1': "['r2']"}})
# def test2(self):
# """after adding an edge all the node are valid"""
# self.hie2.add_edge("r2", "a1")
# assert_equals(self.hie2.check(),
# {'g1': {'or(not cnt(Region),<1<=Adj>cnt(Agent))': "[]"}})