def update_p95(self, value):
r_len = 0
r_exists = True
if self.reservoir == None:
r_exists = False
else:
r_len = len(self.reservoir)
if not r_exists:
self.reservoir = []
if r_len < self.RESERVOIR_SIZE:
self.reservoir.append(value)
else:
self.reservoir[random.randint(0, self.RESERVOIR_SIZE - 1)] = value
self.num_samples += 1
python类randint()的实例源码
def __setMail(self, email):
code = random.randint(1000,9999)
re = RecoveryEmail(jid=self.jid, email=email, code=code)
self.s.merge(re)
self.s.commit()
msg = (
'Please verify your e-mail address by sending '
'"code %s" via XMPP back.'
) % str(code)
self.__sendMail(email, 'verification code for pimux.de', msg)
message =(
'A confirmation code was sent to %s. '
'Please now send "code XXXX" back where XXXX is your '
'code to verify your e-mail address.'
) % email
return message
def random_context():
result = {}
if random.getrandbits(1):
result["function_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["function_version"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["invoked_function_arn"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["memory_limit_in_mb"] = str(random.randint(100, 200))
if random.getrandbits(1):
result["aws_request_id"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_group_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_stream_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["identity"] = RunLambdaCliTest.random_identity()
if random.getrandbits(1):
result["client_context"] = RunLambdaCliTest.random_client_context()
return result
def act(self, observation, last_reward):
if len(self.fun) == 0:
print("\nFUN", end="")
fun_length = random.randint(5, 15)
self.actions = self.actions[::-1]
self.fun = sum([[a] * fun_length for a in self.actions], [])
elif self.fun[0] == self.actions[0]:
print(">" if self.fun[0] == 2 else "<", end="")
action = self.fun.pop(-1)
if last_reward > 0: # lose
print("TENSAI!!!!!")
self.fun = [0] * 5 # stop a while to shout
return action
def send_request(self, method: str, params):
_id = random.randint(0, 2 ** 16) # TODO(renfred) guarantee uniqueness.
body = {
"jsonrpc": "2.0",
"id": _id,
"method": method,
"params": params,
}
body = json.dumps(body, separators=(",", ":"))
content_length = len(body)
request = (
"Content-Length: {}\r\n"
"Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n"
"{}".format(content_length, body))
log("SENDING REQUEST: ", request)
self.conn.write(request)
return self.read_message(_id)
def fetch (src, save, db, collection, p, f):
"""This is the worker function to get the next recipe from
the pending queue, save it, and put all the related urls
on the pending queue for other workers to process"""
while True:
url = p.get()
if url in f.queue:
p.task_done()
else:
try:
recipe = src(url)
if save:
recipe.save()
if db is not None and collection is not None:
recipe.store(db, collection)
f.put(url)
map(lambda x: p.put(x), filter(lambda link: link != url, recipe.getOtherRecipeLinks()))
except ValueError:
print '[warning] could not fetch:', url
p.task_done()
if PAUSE_CRAWLER:
# pause a random interval between PAUSE_TIME_RANGE seconds before continuing
sleep(randint(PAUSE_TIME_RANGE[0], PAUSE_TIME_RANGE[1]))
def get_batch():
ran = random.randint(600, data_size)
#print(ran)
image = []
label = []
label_0 = []
n_pic = ran
# print(n_pic)
for i in range(batch_size * n_steps):
frame_0 = cv2.imread('./cropedoriginalPixel2/%d.jpg' % (n_pic+i), 0)
frame_0 = cv2.resize(frame_0, (LONGITUDE, LONGITUDE))
frame_0 = np.array(frame_0).reshape(-1)
image.append(frame_0)
#print(np.shape(image))
for i in range(batch_size):
frame_1 = cv2.imread('./cropedoriginalPixel2/%d.jpg' % (n_pic + batch_size * (i+1) ), 0)
frame_1 = cv2.resize(frame_1, (LONGITUDE, LONGITUDE))
frame_1 = np.array(frame_1).reshape(-1)
label.append(frame_1)
for i in range(batch_size):
frame_2 = cv2.imread('./cropedoriginalUS2/%d.jpg' % (n_pic + batch_size * (i+1) ), 0)
frame_2 = cv2.resize(frame_2, (LONGITUDE, LONGITUDE))
frame_2 = np.array(frame_2).reshape(-1)
label_0.append(frame_2)
return image , label , label_0
def get_train_batch(noise=0):
ran = random.randint(600, data_size)
#print(ran)
image = []
label = []
label_0 = []
n_pic = ran
# print(n_pic)
for i in range(batch_size ):
frame_0 = cv2.imread('./cropedoriginalPixel2/%d.jpg' % (n_pic+i), 0)
frame_0 = add_noise(frame_0, n = noise)
frame_0 = cv2.resize(frame_0, (LONGITUDE, LONGITUDE))
frame_0 = np.array(frame_0).reshape(-1)
image.append(frame_0)
#print(np.shape(image))
for i in range(batch_size):
frame_1 = cv2.imread('./cropedoriginalPixel2/%d.jpg' % (n_pic + batch_size * (i+1) ), 0)
frame_1 = cv2.resize(frame_1, (LONGITUDE, LONGITUDE))
frame_1 = np.array(frame_1).reshape(-1)
label.append(frame_1)
return image , label
def start_appium(self):
"""
??appium
p:appium port
bp:bootstrap port
:return: ??appium????
"""
aport = random.randint(4700, 4900)
bpport = random.randint(4700, 4900)
self.__start_driver(aport, bpport)
U.Logging.debug(
'start appium :p %s bp %s device:%s' %
(aport, bpport, self.device))
U.sleep(10)
return aport
def mkdir_file(self):
"""
:return:?????????
"""
ini = U.ConfigIni()
result_file = str(ini.get_ini('test_case', 'log_file'))
result_file_every = result_file + '/' + \
time.strftime("%Y-%m-%d_%H_%M_%S{}".format(random.randint(10, 99)),
time.localtime(time.time()))
file_list = [
result_file,
result_file_every,
result_file_every + '/log',
result_file_every + '/per',
result_file_every + '/img',
result_file_every + '/status']
if not os.path.exists(result_file):
os.mkdir(result_file)
for file_path in file_list:
if not os.path.exists(file_path):
os.mkdir(file_path)
return result_file_every
def client(host, port, n, task=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = pycos.AsyncSocket(sock)
yield sock.connect((host, port))
print('%s connected' % n)
# send arbitrary length of data
msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
msg = msg.encode()
yield sock.sendall(msg)
sock.close()
# pycos.logger.setLevel(pycos.Logger.DEBUG)
# run 10 client tasks
def client(host, port, n, task=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = pycos.AsyncSocket(sock)
yield sock.connect((host, port))
print('%s connected' % n)
# send arbitrary length of data
msg = '%d: ' % n + '-' * random.randint(100,300) + '/'
msg = msg.encode()
yield sock.sendall(msg)
sock.close()
# pycos.logger.setLevel(pycos.Logger.DEBUG)
# run 10 client tasks
def generateHash(tamanho=10):
gerado = ""
for i in range(tamanho):
gerado += chr(rand(65,90))
return gerado
def generateIntHash(tamanho=10):
gerado = ""
for i in range(tamanho):
gerado += str(rand(0,9))
return int(gerado)
def generateHash(tamanho=10):
gerado = ""
for i in range(tamanho):
gerado += chr(rand(65,90))
return gerado
def generateIntHash(tamanho=10):
gerado = ""
for i in range(tamanho):
gerado += str(rand(0,9))
return int(gerado)
def stick_mesh(width, height, depth):
x = max(width,height,depth) * unit + unit
step = unit/20
stick = cube([x,step,step])
b = cube([x,step,step])
b += translate([0,x,0])( cube([x,step,step]) )
b += cube([step,x,step])
b += translate([x,0,0])( cube([step,x,step]) )
b = b + translate([0,0,x])(b)
b += cube([step,step,x])
b += translate([0,x,0])(cube([step,step,x]))
b += translate([x,0,0])(cube([step,step,x]))
b += translate([x,x,0])(cube([step,step,x]))
sticks = []
for i in range(2000):
trans = [randint(int(-100 * x*2/3),int(x*100*2/3))/100.0 for i in range(3)]
rot = [randint(0,3600)/10. for i in range(3)]
sticks.append(translate([x/2,x/2,x/2])(
translate(trans)(rotate(rot)(stick))
))
b = b + union()(*sticks)
b = b * cube(x+step)
return b
def random_different_from(top_range, number_to_not_repeat):
result = random.randint(0, top_range-1)
while result == number_to_not_repeat:
result = random.randint(0, top_range-1)
return result
def get_item(player):
item_list = ["MRE", "First Aid Kit", "Meth-derived stim-pack"]
print("You find a ", item_list[random.randint(0, 2)], "your health increased by ",
(abs(player['HP'] - 100)), "HP")
player['HP'] += (abs(player['HP'] - 100))
return player
def attack(opponent):
rand_damage = random.randint(8, 32)
opponent['HP'] -= rand_damage
print(rand_damage, " damage!")
return opponent