def modify_samples(samples, random_data, add_snitch, random_labels):
minval = 1
maxval = cfg.vocabulary_size - 1
pb = Progress_bar(len(samples)-1)
for sample in samples:
int_vector = sample['int_vector']
sample_class = sample['sample_class']
if random_data:
int_vector = [rt(minval, maxval) for _ in range(cfg.max_sequence)]
if add_snitch:
int_vector.extend([cfg.vocabulary_size-1])
if random_labels:
sample_class = random.randint(1,2)
sample['int_vector'] = int_vector
sample['sample_class'] = sample_class
pb.tick()
python类triangular()的实例源码
def _get_waiting_in_secs(waiting_in_secs,
num_retries,
max_waiting_in_secs):
"""Retrieve the waiting time in seconds.
This method uses exponential back-off in figuring out the number of
seconds to wait; however, the max wait time shouldn't be more than
what is specified via max_waiting_in_seconds.
Args:
waiting_in_secs: waiting time in seconds.
num_retries: number of retries, starting from 0.
max_waiting_in_secs: maximum waiting time in seconds.
Returns:
The number of seconds to wait.
"""
# make the backoff going up even faster
waiting_in_secs *= 2**num_retries
jitter = waiting_in_secs * 0.2
waiting_in_secs += random.triangular(-jitter, jitter)
return min(waiting_in_secs, max_waiting_in_secs)
def create_data(config={}):
"""Create data and write to a JSON file."""
max_weight = config.setdefault("max_weight", 15)
items = []
if "num_items" in config:
num_items = config["num_items"]
del config["num_items"]
else:
num_items = 32
# Generate items
digits = int(math.ceil(math.log(num_items, 16)))
fmt = "%0" + str(digits) + "X"
for i in range(0, num_items):
name = fmt % (i + 1)
weight = random.triangular(1.0, max_weight // 3, max_weight)
value = random.random() * 100
items.append({"name": name, "weight": weight, "value": value})
config["items"] = items
configuration.write_file(config)
def single_point(parent1, parent2, locus=None):
"""Return a new chromosome created with single-point crossover.
This is suitable for use with list or value encoding, and will work with
chromosomes of heterogenous lengths.
Args:
parent1 (List): A parent chromosome.
parent2 (List): A parent chromosome.
locus (int): The locus at which to crossover or ``None`` for a randomly
selected locus.
Returns:
List[List]: Two new chromosomes descended from the given parents.
"""
if len(parent1) > len(parent2):
parent1, parent2 = parent2, parent1
if locus is None:
locus = int(random.triangular(1, len(parent1) / 2, len(parent1) - 2))
child1 = parent1[0:locus] + parent2[locus:]
child2 = parent2[0:locus] + parent1[locus:]
return [child1, child2]
def cut_and_splice(parent1, parent2, loci=None):
"""Return a new chromosome created with cut and splice crossover.
This is suitable for use with list or value encoding, and will work with
chromosomes of heterogeneous lengths.
Args:
parent1 (List): A parent chromosome.
parent2 (List): A parent chromosome.
loci (Tuple[int, int]): A crossover locus for each parent.
Returns:
List[List]: Two new chromosomes descended from the given parents.
"""
if loci is None:
loci = []
loci[0] = int(random.triangular(1, len(parent1) / 2, len(parent1) - 2))
loci[1] = int(random.triangular(1, len(parent2) / 2, len(parent2) - 2))
child1 = parent1[0:loci[0]] + parent2[loci[0]:]
child2 = parent2[0:loci[1]] + parent1[loci[1]:]
return [child1, child2]
def __init__(self):
self.generate()
self.ConsumptionYearly = profilegentools.gaussMinMax(3360,600)*config.consumptionFactor #kWh http://www.nibud.nl/uitgaven/huishouden/gas-elektriciteit-en-water.html
age = random.triangular(65, 85, 70)
self.Persons = [ persons.PersonRetired(age), persons.PersonRetired(age)]
if(random.randint(1,2) == 1):
self.Fridges = [ devices.DeviceFridge(random.randint(config.ConsumptionFridgeBigMin,config.ConsumptionFridgeBigMax)) ]
else:
self.Fridges = [ devices.DeviceFridge(random.randint(config.ConsumptionFridgeSmallMin,config.ConsumptionFridgeSmallMax)), devices.DeviceFridge(random.randint(config.ConsumptionFridgeSmallMin,config.ConsumptionFridgeSmallMax)) ]
self.hasDishwasher = random.randint(0,5) < 3 #40%
#Determine washing days
self.generateWashingdays(random.randint(3,4))
#Dermine Dishwasher times
if self.hasDishwasher:
self.generateDishwashdays(3)
def __init__(self):
self.generate()
self.ConsumptionYearly = profilegentools.gaussMinMax(2010,400)*config.consumptionFactor #kWh http://www.nibud.nl/uitgaven/huishouden/gas-elektriciteit-en-water.html
age = random.triangular(65, 85, 70)
self.Persons = [ persons.PersonRetired(age)]
if(random.randint(1,2) == 1):
self.Fridges = [ devices.DeviceFridge(random.randint(config.ConsumptionFridgeBigMin,config.ConsumptionFridgeBigMax)) ]
else:
self.Fridges = [ devices.DeviceFridge(random.randint(config.ConsumptionFridgeSmallMin,config.ConsumptionFridgeSmallMax)), devices.DeviceFridge(random.randint(config.ConsumptionFridgeSmallMin,config.ConsumptionFridgeSmallMax)) ]
self.hasDishwasher = random.randint(0,5) < 3 #40%
#Determine washing days
self.generateWashingdays(random.randint(2, 3))
#Dermine Dishwasher times
if self.hasDishwasher:
self.generateDishwashdays(3)
def makedata():
'''Generates the necessary "data"'''
coord = (random.triangular((-90),90,0), random.triangular((-180),180,0))
names = ['J.B.', 'E.A.', 'K.M.', 'D.M', 'W.R.']
actions = ['EAD', 'TAR', 'OAE', 'AD']
name = random.choice(names)
action = random.choice(actions)
pcoord = "{:6f}\t{:6f}\n{} :: {}".format(coord[0], coord[1], name, action)
return pcoord
def generate_circle(image_width, image_height, min_diameter, max_diameter):
radius = random.triangular(min_diameter, max_diameter,
max_diameter * 0.8 + min_diameter * 0.2) / 2
angle = random.uniform(0, math.pi * 2)
distance_from_center = random.uniform(0, image_width * 0.48 - radius)
x = image_width * 0.5 + math.cos(angle) * distance_from_center
y = image_height * 0.5 + math.sin(angle) * distance_from_center
return x, y, radius
def generate(self, minval, maxval, steps):
vals = [round(rt(minval, maxval), 5) for _ in range(steps)]
for val in vals:
yield val
def __init__(self, host_provider, expire_time=600, retry_time=60,
invalidation_threshold=0.2):
"""Initialize the host selection.
Args:
host_provider: A ``HostProvider``, used to get the current list
of live hosts.
expire_time: An integer, expire time in seconds.
retry_time: An integer, retry time in seconds.
invalidation_threshold: A float, when the number of entries
being invalidated divided by the number of all valid hosts
is above this threshold, we stop accepting invalidation
requests. We do this to stay on the conservative side to
avoid invalidating hosts too fast to starve requests.
"""
assert host_provider.initialized
# Current host.
self._current = None
# Last host, works even when current host invalided.
self._last = None
# Time when we selected the current host.
self._select_time = None
# Adjust expire time by +/- 10%, but 0 is special for testing purpose.
self._expire_time = expire_time
if expire_time:
self._expire_time = expire_time + int(
random.triangular(-expire_time * 0.1, expire_time * 0.1))
self._retry_time = retry_time
self._invalidation_threshold = invalidation_threshold
# Host name -> time when marked bad.
self._bad_hosts = {}
self._host_provider = host_provider
def single_point_bin(parent1, parent2, length=None, locus=None):
"""Return a new chromosome through a single-point crossover.
This is suitable for use with binary encoding.
Args:
parent1 (List): A parent chromosome.
parent2 (List): A parent chromosome.
locus (int): The crossover point or ``None`` to choose one at random.
If ``None``, then ``length`` must be the number of bits used in the
parent chromosomes.
length(int): The number of bits used. Not used if a locus is provided.
Returns:
List[int]: Two new chromosomes descended from the given parents.
Raises:
ValueError: if neither ``locus`` or ``length`` is specified.
"""
if locus is None and length is None:
raise ValueError("Either the length or a locus is required.")
if locus is None:
locus = int(random.triangular(1, length / 2, length - 2))
if length is None:
length = 2 * locus
maskr = 2 ** locus - 1
maskl = (2 ** length - 1) & ~maskr
child1 = parent1 & maskl | parent2 & maskr
child2 = parent2 & maskl | parent1 & maskr
return [child1, child2]
fake_hsm_coordinator.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_random_size(self):
MIN_FILESIZE = 1024
MAX_FILESIZE = 2 ** 41 # 2TB
MODE = MAX_FILESIZE * .025 # cluster around 51GB
return int(random.triangular(MIN_FILESIZE, MAX_FILESIZE, MODE))
def update(self):
if self._start is None:
self._start = time.perf_counter()
if self.is_finished():
return
self.distance += random.triangular(0, 10, 3)
if self.is_finished() and self._end is None:
self._end = time.perf_counter()
def draw_vertical_lines(draw, boxes, doc_bounding_box, line_width):
line_weight_factor = random.triangular(0.005, 1.2)
current_x = doc_bounding_box[0] - line_width / 2
color = get_color()
while current_x < doc_bounding_box[2]:
start_x = current_x
start_y = doc_bounding_box[1] - line_width / 2
end_x = start_x
end_y = doc_bounding_box[3] - line_width / 2
bx0 = start_x
bx1 = start_x + line_width
select_boxes = []
for box in boxes:
wx0 = box.position[0][0] - BOUND_PADDING
wx1 = box.position[1][0] + BOUND_PADDING
if bx0 < wx0 and wx1 < bx1 or \
wx0 < bx1 and bx1 < wx1 or \
wx0 < bx0 and bx0 < wx1:
select_boxes.append(box)
if select_boxes:
y0 = start_y
y1 = end_y
for box in select_boxes:
y1 = box.position[0][1] - BOX_PADDING
draw_line(draw, [start_x, y0, end_x, y1], line_width=line_width, color=color,
line_weight_factor=line_weight_factor, dir='v')
y0 = box.position[1][1] + BOX_PADDING
draw_line(draw, [start_x, y0, end_x, end_y], line_width=line_width, color=color,
line_weight_factor=line_weight_factor, dir='v')
else:
draw_line(draw, [start_x, start_y, end_x, end_y], line_width=line_width, color=color,
line_weight_factor=line_weight_factor, dir='v')
current_x = start_x + line_width
def get_color():
if random.randint(0, 100) == 0:
color = (179, 27, 27)
else:
color = (int(random.triangular(0, 10, 1)),
int(random.triangular(0, 10, 1)),
int(random.triangular(0, 10, 1)),
)
return color
def draw_horizontal_lines(draw, boxes, doc_bounding_box, line_width):
"""Draw black horizontal lines across the page _except_ for that word"""
line_weight_factor = random.triangular(0.005, 1.2)
color = get_color()
start_x = doc_bounding_box[0]
current_y = doc_bounding_box[1]
end_x = doc_bounding_box[2]
end_y = doc_bounding_box[3] - line_width / 2
while current_y < doc_bounding_box[3]:
by0 = current_y
by1 = current_y + line_width
select_boxes = []
for box in boxes:
wy0 = box.position[0][1]
wy1 = box.position[1][1]
if by0 <= wy0 and wy1 <= by1 or \
wy0 <= by1 and by1 <= wy1 or \
wy0 <= by0 and by0 <= wy1:
select_boxes.append(box)
if select_boxes:
x0 = start_x
x1 = end_x
for box in select_boxes:
x1 = box.position[0][0] - BOX_PADDING
draw_line(draw, [x0, current_y, x1, current_y],
line_width=line_width,
line_weight_factor=line_weight_factor, color=color,
dir="h")
x0 = box.position[1][0] + BOX_PADDING
draw_line(draw, [x0 + BOX_PADDING, current_y, end_x, current_y],
line_width=line_width, line_weight_factor=line_weight_factor, dir="h", color=color)
else:
draw_line(draw, [start_x, current_y, end_x, current_y],
line_width=line_width, color=color,
line_weight_factor=line_weight_factor,
dir="h")
current_y = by1
def draw_line(draw, pos, line_width, dir="h", color=(0, 0, 0), line_weight_factor=1):
# Draw a fuzzy line of randomish width repeat times
repeat = random.randint(10, 20)
width = int(line_width) * line_weight_factor
default_padding = line_width / 3
margin_extent = 20 # random.randint(1, 20)
# Slide the center of the line down width/2 based on dir
if dir == 'h':
pos[1] += width / 2
pos[3] += width / 2
# Introduce some randomness into the margins
pos[0] -= random.triangular(width / margin_extent, width * margin_extent)
pos[2] += random.triangular(width / margin_extent, width * margin_extent)
else:
pos[0] -= width / 2
pos[2] -= width / 2
# Introduce some randomness into the margins
pos[1] -= random.triangular(width / margin_extent, width * margin_extent)
pos[3] += random.triangular(width / margin_extent, width * margin_extent)
for i in range(0, repeat):
width = int(random.uniform(line_width - default_padding, line_width))
padding = default_padding * 4
pos[0] = random.triangular(pos[0] - padding, pos[0] + padding)
pos[1] = random.triangular(pos[1] - padding, pos[1] + padding)
pos[2] = random.triangular(pos[2] - padding, pos[2] + padding)
pos[3] = random.triangular(pos[3] - padding, pos[3] + padding)
opacity = 240 + i
width_factor = random.triangular(1, 10, 1)
draw.line(pos, width=int(width / width_factor), fill=(*color, opacity))
def one(o):
return o.final(random.triangular(o.lo, o.hi, o.mode))
def new_time_bounce(self):
left_bound = timeutils.current_unix_timestamp() + BOUNCE_TIMEOUT
right_bound = left_bound + self.bounced * BOUNCE_TIMEOUT
bounce_time = random.triangular(left_bound, right_bound)
bounce_time = int(bounce_time)
return bounce_time
def initTarget():
return GRAF_LENGTH + int(math.floor(random.triangular(0-GRAF_RANGE, GRAF_RANGE)))
def random(low,up = None):
if type(low) not in [int,float]:
return low[randint(0,len(low)-1)]
return triangular(low,up)
def setUp(self):
self.config = {
"max_population": 10,
"tree_generation": {
"method": "FULL_METHOD",
"initial_max_depth": 3
},
"selection": {
"method": "ROULETTE_SELECTION"
},
"function_nodes": [
{"type": "FUNCTION", "name": "ADD", "arity": 2},
{"type": "FUNCTION", "name": "SUB", "arity": 2},
{"type": "FUNCTION", "name": "MUL", "arity": 2},
{"type": "FUNCTION", "name": "DIV", "arity": 2},
{"type": "FUNCTION", "name": "COS", "arity": 1},
{"type": "FUNCTION", "name": "SIN", "arity": 1},
{"type": "FUNCTION", "name": "RAD", "arity": 1}
],
"terminal_nodes": [
{"type": "CONSTANT", "value": 1.0},
{"type": "CONSTANT", "value": 2.0},
{"type": "CONSTANT", "value": 2.0},
{"type": "CONSTANT", "value": 3.0},
{"type": "CONSTANT", "value": 4.0},
{"type": "CONSTANT", "value": 5.0},
{"type": "CONSTANT", "value": 6.0},
{"type": "CONSTANT", "value": 7.0},
{"type": "CONSTANT", "value": 8.0},
{"type": "CONSTANT", "value": 9.0},
{"type": "CONSTANT", "value": 10.0}
],
"input_variables": [
{"type": "INPUT", "name": "x"}
]
}
self.functions = FunctionRegistry("SYMBOLIC_REGRESSION")
self.generator = TreeGenerator(self.config)
self.selection = Selection(self.config)
self.population = self.generator.init()
# give population random scores
for inidividual in self.population.individuals:
inidividual.score = random.triangular(1, 100)
def _execute(self, sources, alignment_stream, interval):
if alignment_stream is None:
raise ToolExecutionError("Alignment stream expected")
for ti, _ in alignment_stream.window(interval, force_calculation=True):
yield StreamInstance(ti, random.triangular(low=self.low, high=self.high, mode=self.mode))
def render(self, chromosome, filepath):
"""Render a chromosome to an SVG file."""
import svgwrite
margin = 100
unit = 200
radius = 50
pad = 10
width = (self.width + 1) * unit + margin * 2
height = (self.height + 1) * unit + margin * 2
doc = svgwrite.Drawing(filename=filepath, size=(width, height))
# Color theme to match the talk...
colors = ["#ff9999", "#9999ff", "#99ff99", "#ffffff"]
# Fill colors at random
def channel():
return int(random.triangular(0, 255, 175))
while len(colors) < len(self.roles):
colors.append("#%02x%02x%02x" % (channel(), channel(), channel()))
# Map row, col to pixels
def origin(row, col):
x = row * unit + margin
y = col * unit + margin
return (x, y)
def color_of_group(group):
idx = self.roles.index(group)
return colors[idx]
def color_of_person(person_id):
group = self.people[person_id][1]
return color_of_group(group)
# Render seating assignments
for seat, person in enumerate(chromosome):
row, col = self.map.point_at(seat)
x, y = origin(row, col)
x, y = (x + radius, y + radius)
doc.add(doc.circle(
center=(x, y),
r=radius,
stroke_width=8,
stroke="#000",
fill=color_of_person(person)
))
doc.save()
# pylint: disable=too-many-locals
def generate(random, timestamp):
projects = range(1, PROJECT_POOL_INITIAL_SIZE + 1)
session_sequences = defaultdict(lambda: itertools.count(1))
sessions = defaultdict(lambda: itertools.count(0))
# Initialize the session pool.
for project in projects:
for session in range(1, SESSION_POOL_INITIAL_SIZE + 1):
sid = next(session_sequences[project])
sessions[(project, sid)] # touch
logger.debug('Initialized session pool, %s sessions '
'currently active.', len(sessions))
def create_session():
project = random.choice(projects)
sid = next(session_sequences[project])
key = (project, sid)
sessions[key] # touch
logger.debug('Created session %r, %s sessions currently '
'active.', key, len(sessions))
return key
while True:
if random.random() <= TICK_PROBABILITY:
timestamp = timestamp + TICK_DURATION
if random.random() <= SESSION_CREATION_PROBABILITY:
key = create_session()
else:
try:
key = random.sample(sessions.keys(), 1)[0]
except ValueError:
key = create_session()
project, sid = key
# TODO: Skip or generate out-of-order operation IDs.
oid = next(sessions[key])
ty = 'op'
if random.random() <= SESSION_FALSE_CLOSE_PROBABILITY:
ty = 'cl'
if random.random() <= SESSION_CLOSE_PROBABILITY:
del sessions[key]
logger.debug('Deleted session %r, %s sessions currently '
'active.', key, len(sessions))
ty = 'cl'
elif random.random() <= SESSION_DROP_PROBABILITY:
del sessions[key]
logger.debug('Dropped session %r, %s sessions currently '
'active.', key, len(sessions))
yield timestamp, project, {
'sid': sid,
'oid': oid,
'ts': timestamp + random.triangular(-5 * 60, 10 * 60, 0),
'ty': ty,
}