def get_token(self):
'''
??getapi??token
:returns: False?????
'''
passport_getapi_url = passport_url + 'getapi&tpl=netdisk&apiver=v3&tt=%s' % utils.get_time()
passport_getapi_url += '&class=login&logintype=basicLogin&callback=bd__cbs__' + utils.get_callback_function()
passport_getapi_response = self.get_response(passport_getapi_url)
json = utils.get_json_from_response(passport_getapi_response)
try:
json = eval(json[0])
self.token = json['data']['token']
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('???Can\'t get passport getapi\'s response json.')
return False
return True
python类get_time()的实例源码
def play(self, n_step=10000, n_episode=1000, test_ep=0.05, render=False):
if test_ep == None:
test_ep = self.ep_end
test_history = History(self.config, self.ob_shape_list)
if not self.display:
gym_dir = '/tmp/%s-%s' % (self.env_name, get_time())
self.env.env.monitor.start(gym_dir)
best_reward, best_idx = 0, 0
for idx in xrange(n_episode):
screen, reward, action, terminal = self.env.new_random_game()
current_reward = 0
for _ in range(self.history_length):
test_history.add(screen)
for t in tqdm(range(n_step), ncols=70):
# 1. predict
action = self.predict(test_history.get(), test_ep)
# 2. act
screen, reward, terminal = self.env.act(action, is_training=False)
# 3. observe
test_history.add(screen)
current_reward += reward
if terminal:
break
if current_reward > best_reward:
best_reward = current_reward
best_idx = idx
print "=" * 30
print " [%d] Best reward : %d" % (best_idx, best_reward)
print "=" * 30
if not self.display:
self.env.env.monitor.close()
# gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
def do_pan_api(self, api, args):
'''
??????api
:param api: ?????api
:param args: string????
:returns: ??True or False
'''
api_url = pan_api_url + api + '?'
api_url += 'channel=chunlei&clienttype=0&web=1&t=%s' % utils.get_time()
api_url += '&bdstoken=' + self.token
for arg in args:
api_url += '&%s=%s' % (arg, args[arg])
pan_api_response = self.get_response(api_url)
json = pan_api_response
try:
json = eval(json)
errno = str(json['errno'])
if errno == '0':
return json['list']
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg("??:Can't get pan api:" + api + " response json.")
return False
# ????
utils.show_msg('??:?????api?' + api + '?????????' + errno + '??????' + errmsg.get_errmsg_by_errno(errno))
return False
def get_list(self, dir, page=None, page_size=None, order='name', desc='1'):
'''
?????????????
????????string??
:param dir?????
:param page????
:param page_size??????????20
:param order: ????
???time ????
name ???
size ??????????
:param desc?1????0?????????
:returns: dict???????server_filename?path?unicode???????False
'''
args = {
"_": utils.get_time(),
"dir": urllib.quote(dir),
"order": order,
"desc" : desc,
}
if page is not None:
args['page'] = page
if page_size is not None:
args['num'] = page_size
result = self.do_pan_api('list', args)
if result != False:
for file in result:
file['server_filename'] = eval('u\'' + file['server_filename'] + '\'')
file['path'] = eval('u\'' + file['path'] + '\'.replace(\'\\\\\',\'\')')
self.file_list[dir] = result
return result
def get_baiducloudclient_url(self, dir):
headers = {
'User-Agent': 'netdisk;2.1.0;pc;pc-mac;10.12.5;macbaiduyunguanjia'
}
# ??????????????????
url = 'https://pan.baidu.com/rest/2.0/membership/speeds/freqctrl'
postdata = {
'method': 'consume'
}
'''
get????????????freq_cnt=1?????
consume?????
'''
try:
responese = self.get_response(url, post_data=postdata, headers=headers)
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('???Get file size failed.url %s.' % url)
return False
# ????
url = 'https://d.pcs.baidu.com/rest/2.0/pcs/file?time=' + utils.get_time() + '&clienttype=21&version=2.1.0&vip=0&method=locatedownload&app_id=250528&esl=1&ver=4.0&dtype=1&ehps=1&check_blue=1&path=' + dir + '&err_ver=1.0'
try:
response = self.get_response(url, headers=headers)
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('???Get file size failed.url %s.' % url)
return False
# ?????url
url_info = json.loads(response)
return url_info['urls'][0]['url']
def rpush(self, msg, timeout=0):
self._conn.rpush(self.queue, msg)
if timeout:
msg = "%s%s"%(msg,get_uuid())
self.zadd(get_time()+timeout ,msg)
return msg
return True
def zrangebyscore(self):
res = self._conn.zrangebyscore(self.ack_queue , 0, get_time())
if not isinstance(res,list):
res = [res]
return res
# def sadd(self, queue, value):
# return self._conn.sadd(queue, value)
#
# def spop(self, queue):
# msg = self._conn.spop(queue)
# return msg if msg else msg
#
# def srem(self, queue, value):
# return self._conn.srem(queue, value)
def play(self, sv, is_chief, n_step=10000, n_episode=100, test_ep=None, render=False):
if test_ep == None:
test_ep = self.ep_end
test_history = History(self.config)
if not self.display:
gym_dir = '/tmp/%s-%s' % (self.env_name, get_time())
self.env.env.monitor.start(gym_dir)
best_reward, best_idx = 0, 0
for idx in xrange(n_episode):
screen, reward, action, terminal = self.env.new_random_game()
current_reward = 0
for _ in xrange(self.history_length):
test_history.add(screen)
for t in tqdm(xrange(n_step), ncols=70):
# 1. predict
action = self.predict(test_history.get(), test_ep)
# 2. act
screen, reward, terminal = self.env.act(action, is_training=False)
# 3. observe
test_history.add(screen)
current_reward += reward
if terminal:
break
if current_reward > best_reward:
best_reward = current_reward
best_idx = idx
print "="*30
print " [%d] Best reward : %d" % (best_idx, best_reward)
print "="*30
if not self.display:
self.env.env.monitor.close()
#gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
def check_login(self, username):
'''
?????????token?codestring
:param username: ???
:returns: ??????string????codestring
0????None?????
'''
response = self.get_response(home_url)
if response == '':
return False
else:
# ??dv
try:
tmp = re.findall('id=\"dv_Input\" type=\"hidden\" value=\"(.*?)\"', response)
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('???Can\'t get dv_Input.')
return False
codestring = None
if not self.get_token():
return False
# logincheck
passport_logincheck_url = passport_url + 'logincheck&&token=%s' % self.token
passport_logincheck_url += '&tpl=netdisk&apiver=v3&tt=%s' % utils.get_time()
passport_logincheck_url += '&username=%s' % urllib.quote(username)
passport_logincheck_url += '&isphone=false&callback=bd__cbs__' + utils.get_callback_function()
passport_logincheck_response = self.get_response(passport_logincheck_url)
json = utils.get_json_from_response(passport_logincheck_response)
try:
json = eval(json[0])
codeString = json['data']['codeString']
except Exception:
utils.show_msg(traceback.print_exc())
utils.show_msg('??:Can\'t get passport logincheck\'s response json.')
return False
return codeString
def play(self, n_step=10000, n_episode=100, test_ep=True, render=False):
if test_ep == None:
test_ep = self.ep_end
test_history = History(self.config)
if not self.display:
gym_dir = './tmp/%s-%s' % (self.env_name, get_time())
# self.env.env.monitor.start(gym_dir)
monitor = gym.wrappers.Monitor(self.env.env, gym_dir)
best_reward, best_idx = 0, 0
ep_rewards = []
for idx in tqdm(xrange(n_episode),ncols=70):
screen = monitor.reset()
screen = imresize(rgb2gray(screen), (110, 84))
screen = screen[18:102, :]
current_reward = 0
# if not os.path.exists("fuck/epoch%i" % idx):
# os.mkdir("fuck/epoch%i" % idx)
for _ in range(self.history_length):
test_history.add(screen)
for t in range(n_step):
# 1. predict
action = self.predict(test_history.get(), test_ep)
# 2. act
screen, reward, terminal, _ = monitor.step(action)
screen = imresize(rgb2gray(screen), (110, 84))
screen = screen[18:102, :]
# 3. observe
test_history.add(screen)
current_reward += reward
if terminal:
break
print "GET REWARD", current_reward
ep_rewards.append(current_reward)
if current_reward > best_reward:
best_reward = current_reward
best_idx = idx
print "="*30
print " [%d] Best reward : %d" % (best_idx, best_reward),
print "Average reward: %f" % np.mean(ep_rewards)
print "="*30
if not self.display:
monitor.close()
#gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
def play(self, n_step=10000, n_episode=100, test_ep=None, render=False):
if test_ep == None:
test_ep = self.ep_end
test_history = History(self.config)
if not self.display:
gym_dir = './tmp/%s-%s' % (self.env_name, get_time())
# self.env.env.monitor.start(gym_dir)
monitor = gym.wrappers.Monitor(self.env.env, gym_dir)
best_reward, best_idx = 0, 0
for idx in tqdm(xrange(n_episode),ncols=70):
screen, reward, action, terminal = self.env.new_random_game()
current_reward = 0
for _ in range(self.history_length):
test_history.add(screen)
for t in range(n_step):
# 1. predict
action = self.predict(test_history.get(), test_ep)
# 2. act
screen, reward, terminal = self.env.act(action, is_training=False)
# 3. observe
test_history.add(screen)
current_reward += reward
if terminal:
break
if current_reward > best_reward:
best_reward = current_reward
best_idx = idx
print "="*30
print " [%d] Best reward : %d" % (best_idx, best_reward)
print "="*30
if not self.display:
monitor.close()
#gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')