def test_structure_simple_from_dict_default(converter, cl_and_vals, data):
"""Test structuring non-nested attrs classes with default value."""
cl, vals = cl_and_vals
obj = cl(*vals)
attrs_with_defaults = [a for a in fields(cl)
if a.default is not NOTHING]
to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults),
unique=True))
for a in to_remove:
if isinstance(a.default, Factory):
setattr(obj, a.name, a.default.factory())
else:
setattr(obj, a.name, a.default)
dumped = asdict(obj)
for a in to_remove:
del dumped[a.name]
assert obj == converter.structure(dumped, cl)
python类sampled_from()的实例源码
def spark_application(app_id):
"""Mock of the Spark jobs REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'jobId': st.integers(0),
'name': st.text(),
'submissionTime': st.text(),
'completionTime': st.text(),
'stageIds': st.lists(st.integers(0), average_size=3),
'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']),
'numTasks': st.integers(0),
'numActiveTasks': st.integers(0),
'numCompletedTasks': st.integers(0),
'numSkippedTasks': st.integers(0),
'numFailedTasks': st.integers(0),
'numActiveStages': st.integers(0),
'numCompletedStages': st.integers(0),
'numSkippedStages': st.integers(0),
'numFailedStages': st.integers(0),
})
result = json.dumps(st.lists(d, average_size=3).example())
redis.set(request.base_url, result)
return jsonify(result)
def functiondef_node(draw, name=None, annotated=False, returns=False):
name = name or draw(valid_identifier())
args = draw(arguments_node(annotated))
body = []
returns_node = astroid.Return()
arg_node, arg_type_node = draw(hs.sampled_from(list(zip(args.args, args.annotations))))
if returns:
returns_node.postinit(arg_node)
else:
returns_node.postinit(const_node(None))
body.append(returns_node)
node = astroid.FunctionDef(name=name)
node.postinit(
args,
body,
None,
arg_type_node
)
return node
def enum_to_strategy(enum):
"""Generate strategy for enum."""
return st.sampled_from([
value.number
for value in enum.DESCRIPTOR.values
])
def add_bools(list_of_lists):
"""
Given recursive list that can contain other lists, return tuple of that plus
a booleans strategy for each list.
"""
l = []
def count(recursive):
l.append(1)
for child in recursive:
if isinstance(child, list):
count(child)
count(list_of_lists)
return st.tuples(st.just(list_of_lists), st.tuples(*[st.sampled_from([True, False]) for i in l]))
def remove_strategy(self):
def get_address(service_name):
return st.tuples(st.just(service_name), st.sampled_from(
self.fake.services[service_name]))
return st.tuples(st.just("remove"), (
st.sampled_from(list(self.fake.services.keys())).flatmap(get_address)))
def steps(self):
result = add_strategy | replace_strategy
# Replace or add to a known service cluster:
if self.fake.services:
result |= st.tuples(st.just("replace"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
st.lists(nice_strings)))
result |= st.tuples(st.just("add"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
nice_strings))
# Remove a known address from known cluster:
if not self.fake.is_empty():
result |= self.remove_strategy()
return result
def default(self, obj):
"""
"""
if isinstance(obj, dict) and _is_swagger_parameter(obj):
parameter_type = obj.get('format', obj.get('type'))
parameter_schema = obj.get('schema')
parameter_ref = obj.get('$ref')
if parameter_type in SWAGGER_FORMAT_MAPPING:
return SWAGGER_FORMAT_MAPPING[parameter_type]
elif parameter_ref:
return self.transform(self.get_ref(parameter_ref, self.spec))
elif parameter_type == 'array':
if obj['items'].get('enum'):
return st.lists(elements=st.sampled_from(obj['items']['enum']))
elif obj['items'].get('type'):
return st.lists(elements=SWAGGER_FORMAT_MAPPING[obj['items']['type']])
elif obj['items'].get('$ref'):
schema = self.get_ref(obj['items']['$ref'], self.spec)
return st.lists(elements=self.transform(schema))
raise Exception('array', obj)
elif parameter_type == 'object':
properties = {}
for property_name, property_ in obj['properties'].items():
properties[property_name] = self.transform(property_)
return st.fixed_dictionaries(properties)
elif parameter_schema:
if parameter_schema.get('type') == 'array':
schema = self.get_ref(parameter_schema['items']['$ref'], self.spec)
return st.lists(elements=self.transform(schema))
else:
schema = self.get_ref(parameter_schema['$ref'], self.spec)
transformed = self.transform(schema)
return transformed
else:
raise Exception("Invalid", obj, parameter_type)
return obj
def strategy(self):
'Returns resulting strategy that generates configured char set'
max_codepoint = None if self._unicode else 127
strategies = []
if self._negate:
if self._categories or self._whitelist_chars:
strategies.append(
hs.characters(
blacklist_categories=self._categories | set(['Cc', 'Cs']),
blacklist_characters=self._whitelist_chars,
max_codepoint=max_codepoint,
)
)
if self._blacklist_chars:
strategies.append(
hs.sampled_from(
list(self._blacklist_chars - self._whitelist_chars)
)
)
else:
if self._categories or self._blacklist_chars:
strategies.append(
hs.characters(
whitelist_categories=self._categories,
blacklist_characters=self._blacklist_chars,
max_codepoint=max_codepoint,
)
)
if self._whitelist_chars:
strategies.append(
hs.sampled_from(
list(self._whitelist_chars - self._blacklist_chars)
)
)
return hs.one_of(*strategies) if strategies else hs.just(u'')
def jobs(
draw,
ids=uuids(),
statuses=sampled_from(JobInterface.JobStatus),
parameters=dictionaries(text(), text()),
results=dictionaries(text(), text()),
dates_submitted=datetimes(),
registration_schemas=dictionaries(text(), text()),
result_schemas=dictionaries(text(), text())
) -> JobInterface:
"""
:param draw: A function that can take a strategy and draw a datum from it
:param ids: A hypothesis strategy (statisticians should read "random
variable"), that represents the set of all valid job IDs
:param statuses: A hypothesis strategy that samples from the set of all
allowed job statuses
:param parameters: A hypothesis strategy that samples from all job
parameters
:param results: A hypothesis strategy that represents the possible results
:param dates_submitted: A hypothesis strategy that represents the
possible dates that can be submitted
:param registration_schemas: The possible job registration schemas
:param result_schemas: The possible job result schemas
:return: A randomly-generated implementation of :class:`JobInterface`
"""
return Job(
draw(ids), draw(statuses), draw(parameters), draw(results),
draw(dates_submitted),
draw(registration_schemas),
draw(result_schemas)
)
def user_actions(draw, skip=None, **lists_kwargs):
if skip is None:
skip = []
return draw(lists(sampled_from(k for k in USER_ACTIONS if k not in skip), **lists_kwargs))
def test_numeric_type_instances_can_be_compared(data):
first = data.draw(hs.sampled_from(NUMERIC_TYPES))()
second = data.draw(hs.sampled_from(NUMERIC_TYPES))()
# We don't actually care if it's True or False
assert (first < second) in (True, False)
def models():
return sampled_from([v1_5_model])
def durations(draw):
num = draw(st.integers(1, 100))
suffix = draw(st.sampled_from('ymd'))
return f'{num}{suffix}'
def provide_require_st(draw, filter_=True): # pragma: no cover
commands = draw(range_intagers_st)
provides = draw(
st.lists(
st.lists(range_intagers_st, max_size=10),
min_size = commands,
max_size = commands
),
)
is_func = draw(
st.lists(
st.booleans(),
min_size = commands,
max_size = commands
)
)
provides_set = set()
for command in provides:
provides_set.update(command)
requires = []
if provides_set:
for command in provides:
if command:
max_prov = max(command)
else:
max_prov = 0
if filter_:
provides_filter = [x for x in provides_set if x > max_prov]
else:
provides_filter = provides_set
if provides_filter:
sample = st.sampled_from(provides_filter)
requires.append(draw(st.lists(sample, max_size=10)))
else:
requires.append([])
else:
requires = [[]] * commands
return (provides, requires, is_func)
def get_request(data, spec, spec_host):
endpoint_path = data.draw(st.sampled_from(spec['paths'].keys()))
endpoint = spec['paths'][endpoint_path]
method_name = data.draw(st.sampled_from(endpoint.keys()))
endpoint = endpoint[method_name]
path_params = _get_filtered_parameter(endpoint, 'path', spec)
path_args = data.draw(st.fixed_dictionaries(path_params))
query_params = _get_filtered_parameter(endpoint, 'query', spec)
query_args = data.draw(st.fixed_dictionaries(query_params))
body_params = _get_filtered_parameter(endpoint, 'body', spec)
if body_params:
body_args = data.draw(body_params['body'])
else:
body_args = None
valid_request_body_format = get_item_path_acceptable_format(endpoint, spec)
request_data = None
request_headers = {}
if body_args:
# no_body_format_declaration(body_args, valid_request_body_format, endpoint)
if body_args and valid_request_body_format is None:
# Force a request format, swagger ui seems to force json format
valid_request_body_format = ["application/json"]
request_body_format = data.draw(st.sampled_from(valid_request_body_format), 'request_body_format')
request_headers['Content-Type'] = request_body_format
if request_body_format == 'application/x-www-form-urlencoded':
request_data = body_args
elif request_body_format == 'application/json':
request_data = json.dumps(body_args, cls=CustomJsonEncoder)
elif request_body_format == 'application/xml':
pass
# TODO Implement XML
else:
raise Exception(request_body_format)
endpoint_url = endpoint_path.format(**path_args)
assume('\x00' not in endpoint_url)
# Generate request
URL = furl(spec_host)
URL = URL.join(endpoint_url.lstrip('/'))
if query_args:
URL = URL.add(args=query_args)
request = Request(method_name, URL.url, data=request_data,
headers=request_headers).prepare()
request.build_context = locals()
return request
def applications():
"""Mock of the YARN cluster apps REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'allocatedMB': st.integers(-1),
'allocatedVCores': st.integers(-1),
'amContainerLogs': st.text(),
'amHostHttpAddress': st.text(),
'applicationTags': st.text(),
'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']),
'clusterId': st.integers(0),
'diagnostics': st.text(),
'elapsedTime': st.integers(0),
'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']),
'finishedTime': st.integers(0),
'id': st.text(string.ascii_letters, min_size=5, max_size=25),
'memorySeconds': st.integers(0),
'name': st.text(min_size=5),
'numAMContainerPreempted': st.integers(0),
'numNonAMContainerPreempted': st.integers(0),
'preemptedResourceMB': st.integers(0),
'preemptedResourceVCores': st.integers(0),
'progress': st.floats(0, 100),
'queue': st.text(),
'runningContainers': st.integers(-1),
'startedTime': st.integers(0),
'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']),
'trackingUI': st.text(),
'trackingUrl': st.just(os.environ['YARN_ENDPOINT']),
'user': st.text(),
'vcoreSeconds': st.integers(0)
})
result = json.dumps({
'apps': {
'app': st.lists(d, min_size=4, average_size=10).example()
}
})
redis.set(request.base_url, result)
return jsonify(result)
def mapreduce_application():
"""Mock of the mapreduce jobs REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'startTime': st.integers(0),
'finishTime': st.integers(0),
'elapsedTime': st.integers(0),
'id': st.integers(0),
'name': st.text(),
'user': st.text(),
'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']),
'mapsTotal': st.integers(0),
'mapsCompleted': st.integers(0),
'reducesTotal': st.integers(0),
'reducesCompleted': st.integers(0),
'mapProgress': st.floats(0, 100),
'reduceProgress': st.floats(0, 100),
'mapsPending': st.integers(0),
'mapsRunning': st.integers(0),
'reducesPending': st.integers(0),
'reducesRunning': st.integers(0),
'uberized': st.booleans(),
'diagnostics': st.text(),
'newReduceAttempts': st.integers(0),
'runningReduceAttempts': st.integers(0),
'failedReduceAttempts': st.integers(0),
'killedReduceAttempts': st.integers(0),
'successfulReduceAttempts': st.integers(0),
'newMapAttempts': st.integers(0),
'runningMapAttempts': st.integers(0),
'failedMapAttempts': st.integers(0),
'killedMapAttempts': st.integers(0),
'successfulMapAttempts': st.integers(0)
})
result = json.dumps({
'jobs': {
'job': st.lists(d, average_size=3).example()
}
})
redis.set(request.base_url, result)
return jsonify(result)