def combinable_paths_no_loops(draw):
"""Makes varying-length paths, but no loops."""
path_points = draw(lists(ipoints, min_size=2, max_size=200, unique_by=tuple))
rand = draw(randoms())
paths = [[]]
length = rand.randint(2, 4)
for pt in path_points:
paths[-1].append(pt)
length -= 1
if length == 0:
paths.append([])
length = rand.randint(2, 4)
joinable = (rand.random() > .5)
if joinable:
paths[-1].append(pt)
if len(paths[-1]) < 2:
paths.pop()
rand.shuffle(paths)
return [Path(p) for p in paths]
python类lists()的实例源码
def test_structure_simple_from_dict_default(converter, cl_and_vals, data):
"""Test structuring non-nested attrs classes with default value."""
cl, vals = cl_and_vals
obj = cl(*vals)
attrs_with_defaults = [a for a in fields(cl)
if a.default is not NOTHING]
to_remove = data.draw(lists(elements=sampled_from(attrs_with_defaults),
unique=True))
for a in to_remove:
if isinstance(a.default, Factory):
setattr(obj, a.name, a.default.factory())
else:
setattr(obj, a.name, a.default)
dumped = asdict(obj)
for a in to_remove:
del dumped[a.name]
assert obj == converter.structure(dumped, cl)
def spark_application(app_id):
"""Mock of the Spark jobs REST resource."""
if 'last' in request.args:
return jsonify(redis.get(request.base_url))
d = st.fixed_dictionaries({
'jobId': st.integers(0),
'name': st.text(),
'submissionTime': st.text(),
'completionTime': st.text(),
'stageIds': st.lists(st.integers(0), average_size=3),
'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']),
'numTasks': st.integers(0),
'numActiveTasks': st.integers(0),
'numCompletedTasks': st.integers(0),
'numSkippedTasks': st.integers(0),
'numFailedTasks': st.integers(0),
'numActiveStages': st.integers(0),
'numCompletedStages': st.integers(0),
'numSkippedStages': st.integers(0),
'numFailedStages': st.integers(0),
})
result = json.dumps(st.lists(d, average_size=3).example())
redis.set(request.base_url, result)
return jsonify(result)
def pattern_to_statements(pattern):
if isinstance(pattern, template):
return lists(just(pattern), min_size=1, max_size=1)
rule, value = pattern
if rule == 'sequence':
return tuples(*map(pattern_to_statements, value)).map(unpack_list).map(list)
elif rule == 'alternates':
return one_of(*map(pattern_to_statements, value))
elif rule == 'zeroOrMore':
return lists(pattern_to_statements(value)).map(unpack_list).map(list)
elif rule == 'oneOrMore':
return lists(pattern_to_statements(value), min_size=1).map(unpack_list).map(list)
elif rule == 'optional':
return lists(pattern_to_statements(value), min_size=0, max_size=1).map(unpack_list).map(list)
else:
raise Exception("impossible!", rule)
# this replicates the current scorm pattern, a realistic example of medium
# complexity. Note it has repeated elements, just not in ambiguous ways.
def arguments_node(draw, annotated=False):
n = draw(hs.integers(min_value=1, max_value=5))
args = draw(hs.lists(name_node(None), min_size=n, max_size=n))
if annotated:
annotations = draw(hs.lists(name_node(annotation), min_size=n, max_size=n))
else:
annotations = None
node = astroid.Arguments()
node.postinit(
args,
None,
None,
None,
annotations
)
return node
def gen_data_from_z(z):
"""
Given the z axis values (2d list of failures) generate data that would have
given us this z value.
:param z: a list of lists of failures.
:return: the data that would have given this z value, the z value, the x value
(stages) and the y value (projects).
"""
data = []
projects_len = len(z)
stages_len = 0 if projects_len == 0 else len(z[0])
projects = [uuid4() for _ in xrange(projects_len)]
stages = [uuid4() for _ in xrange(stages_len)]
for pidx, project in enumerate(projects):
for sidx, stage in enumerate(stages):
failures = z[pidx][sidx]
repo = strings.example()
for _ in xrange(failures):
data.append(MockPipelineRun(stage_failed=stage, project=project, repository=repo))
return data, z, stages, projects
def build_failure_map(stages, projects, z):
"""
Build a dict mapping the key "projects + stages" to the number
of failures.
:param stages: list of stages.
:param projects: list of projects.
:param z: a list of lists of failures.
:return: the failures map.
"""
failure_lookup = dict()
for stages_index, failures_list in enumerate(z):
for failures_list_index, failures in enumerate(failures_list):
project = projects[stages_index]
stage = stages[failures_list_index]
failure_lookup[str(project) + str(stage)] = failures
return failure_lookup
def two_band_eo_dataset(draw):
crs, height, width, times = draw(dataset_shape())
coordinates = {dim: np.arange(size) for dim, size in zip(crs.dimensions, (height, width))}
coordinates['time'] = times
dimensions = ('time',) + crs.dimensions
shape = (len(times), height, width)
arr = np.random.random_sample(size=shape)
data1 = xr.DataArray(arr,
dims=dimensions,
coords=coordinates,
attrs={'crs': crs})
arr = np.random.random_sample(size=shape)
data2 = xr.DataArray(arr,
dims=dimensions,
coords=coordinates,
attrs={'crs': crs})
name1, name2 = draw(st.lists(variable_name, min_size=2, max_size=2, unique=True))
dataset = xr.Dataset(data_vars={name1: data1, name2: data2},
attrs={'crs': crs})
return dataset
def combinable_paths_maybe_loops(draw):
"""Makes single-segment paths, with loops a possibility."""
endpoints = draw(lists(ipoints, min_size=2, max_size=200, unique_by=tuple))
rand = draw(randoms())
paths = set()
point_use = collections.defaultdict(int)
target_number = len(endpoints) / 3
if target_number < 1:
target_number = 1
while len(paths) < target_number:
# Choose two points at random from the possible endpoints, and make a
# segment.
a, b = rand.sample(endpoints, k=2)
if (a, b) in paths:
continue
paths.add((a, b))
# Track how many times the points have been used.
point_use[a] += 1
point_use[b] += 1
# Any point in two segments is no longer a candidate as an endpoint.
if point_use[a] == 2:
endpoints.remove(a)
if point_use[b] == 2:
endpoints.remove(b)
return [Path(p) for p in paths]
def random_prefix(draw):
#limited to unicode letters, see
#https://en.wikipedia.org/wiki/Unicode_character_property#General_Category
categories = ['Ll', 'Lt', 'Lm', 'Lo']
characters = st.lists(st.characters(whitelist_categories=categories), min_size = 1)
prefix = st.text(alphabet = draw(characters), min_size = 1)
return draw(prefix)
def totally_random_references(draw):
'''generates random sorted lists of references like ['IASDHAH1', 'ZKJDJAD1569', ...]'''
parts = draw(st.lists(random_reference()))
parts.sort()
return list(map(toRef, parts))
def random_references(draw):
'''generates random sorted lists of references with the same prefix like ['IASDHAH1', 'IASDHAH1569', ...]'''
prefix = st.just(draw(random_prefix()))
parts = draw(st.lists(random_reference(prefix = prefix)))
parts.sort()
return list(map(toRef, parts))
def add_bools(list_of_lists):
"""
Given recursive list that can contain other lists, return tuple of that plus
a booleans strategy for each list.
"""
l = []
def count(recursive):
l.append(1)
for child in recursive:
if isinstance(child, list):
count(child)
count(list_of_lists)
return st.tuples(st.just(list_of_lists), st.tuples(*[st.sampled_from([True, False]) for i in l]))
def steps(self):
result = add_strategy | replace_strategy
# Replace or add to a known service cluster:
if self.fake.services:
result |= st.tuples(st.just("replace"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
st.lists(nice_strings)))
result |= st.tuples(st.just("add"),
st.tuples(st.sampled_from(list(self.fake.services.keys())),
nice_strings))
# Remove a known address from known cluster:
if not self.fake.is_empty():
result |= self.remove_strategy()
return result
def default(self, obj):
"""
"""
if isinstance(obj, dict) and _is_swagger_parameter(obj):
parameter_type = obj.get('format', obj.get('type'))
parameter_schema = obj.get('schema')
parameter_ref = obj.get('$ref')
if parameter_type in SWAGGER_FORMAT_MAPPING:
return SWAGGER_FORMAT_MAPPING[parameter_type]
elif parameter_ref:
return self.transform(self.get_ref(parameter_ref, self.spec))
elif parameter_type == 'array':
if obj['items'].get('enum'):
return st.lists(elements=st.sampled_from(obj['items']['enum']))
elif obj['items'].get('type'):
return st.lists(elements=SWAGGER_FORMAT_MAPPING[obj['items']['type']])
elif obj['items'].get('$ref'):
schema = self.get_ref(obj['items']['$ref'], self.spec)
return st.lists(elements=self.transform(schema))
raise Exception('array', obj)
elif parameter_type == 'object':
properties = {}
for property_name, property_ in obj['properties'].items():
properties[property_name] = self.transform(property_)
return st.fixed_dictionaries(properties)
elif parameter_schema:
if parameter_schema.get('type') == 'array':
schema = self.get_ref(parameter_schema['items']['$ref'], self.spec)
return st.lists(elements=self.transform(schema))
else:
schema = self.get_ref(parameter_schema['$ref'], self.spec)
transformed = self.transform(schema)
return transformed
else:
raise Exception("Invalid", obj, parameter_type)
return obj
def lists_of_primitives(draw):
"""Generate a strategy that yields tuples of list of primitives and types.
For example, a sample value might be ([1,2], List[int]).
"""
prim_strat, t = draw(primitive_strategies)
list_t = draw(list_types.map(lambda list_t: list_t[t]) | list_types)
return draw(st.lists(prim_strat)), list_t
def lists_of_attrs(defaults=None):
# Python functions support up to 255 arguments.
return (st.lists(simple_attrs(defaults), average_size=5, max_size=20)
.map(lambda l: sorted(l,
key=lambda t: t[0]._default is not NOTHING)))
def _device_list(minimum):
"""
Get a device generating strategy.
:param int minimum: the minimum number of devices, must be at least 0
"""
return strategies.lists(
strategies.text(
alphabet=string.ascii_letters + "/",
min_size=1
),
min_size=minimum
)
def _device_list(minimum):
"""
Get a device generating strategy.
:param int minimum: the minimum number of devices, must be at least 0
"""
return strategies.lists(
strategies.text(
alphabet=string.ascii_letters + "/",
min_size=1
),
min_size=minimum
)
def service_lists(
draw,
min_size=None, average_size=None, max_size=None,
unique_by=None, unique=False
) -> _ServiceList:
service_generator = lists(
services(), min_size=min_size, average_size=average_size,
max_size=max_size, unique_by=unique_by, unique=unique
)
return _ServiceList(draw(service_generator))
def job_lists(
draw, min_size=None, max_size=None, average_size=None, jobs=jobs()
) -> JobListInterface:
job_list_maker = lists(jobs, min_size=min_size, max_size=max_size,
average_size=average_size)
return JobList(draw(job_list_maker))
def user_actions(draw, skip=None, **lists_kwargs):
if skip is None:
skip = []
return draw(lists(sampled_from(k for k in USER_ACTIONS if k not in skip), **lists_kwargs))
def _device_list(minimum):
"""
Get a device generating strategy.
:param int minimum: the minimum number of devices, must be at least 0
"""
return strategies.lists(
strategies.text(
alphabet=string.ascii_letters + "/",
min_size=1
),
min_size=minimum
)
def test_continious_bools_can_be_packed_unpacked(data):
bools = data.draw(hs.lists(strategies.bools))
stream = io.BytesIO()
at.Bool.many_to_bytestream(bools, stream)
stream.seek(0)
new = at.Bool.many_from_bytestream(stream, len(bools))
assert bools == new
def file(draw, filename=st.lists(
st.characters(blacklist_categories=('Cc',),
blacklist_characters=('\0\n\r/\\|><')),
min_size=1, average_size=20, max_size=80),
content=st.binary(max_size=10000, average_size=100)):
return {'filename': ''.join(draw(filename)),
'content': draw(content)}
def files(draw):
# TODO: use st.recursive to generate files in folders
return draw(st.lists(valid_file, average_size=5, max_size=MAX_FILES)\
.filter(has_no_duplicate))
#valid_vault = vault().filter(has_no_duplicate)
def create_dummy_rate_file(rate_file):
rates = lists(floats(min_value=0.00001, allow_nan=False, allow_infinity=False), min_size=0, max_size=100).example()
max_year = datetime.datetime.now().year
date_times = lists(datetimes(min_year=2016, max_year=max_year), min_size=len(rates),
max_size=len(rates)).map(sorted).example()
with open(rate_file, 'a') as f:
for date_time, rate in zip(date_times, rates):
writer = csv.writer(f, lineterminator='\n')
market_data = [date_time.strftime("%Y-%m-%d %H:%M:%S"), rate]
writer.writerow(market_data)
return rates, date_times
def homogeneous_list(**kwargs):
"""Return a strategy which generates a list of uniform type."""
return primitive_types.flatmap(lambda s: hs.lists(s(), **kwargs))
def random_list(**kwargs):
"""Return a strategy which generates a random list."""
return hs.lists(primitive_values, **kwargs)
def boolop_node(draw, value=None, op=binary_bool_operator, **kwargs):
value = value or const_node()
node = astroid.BoolOp(draw(op))
if kwargs.get('min_size', 0) < 2:
kwargs['min_size'] = 2
node.postinit(draw(hs.lists(value, **kwargs)))
return node