def update_crawl_message(crawl_messages):
crawl_messages.set_message(0, config.EVENT_NAME)
crawl_messages.set_message_colors(0, graphics.BRIGHT_BLUE, graphics.BLACK)
now = datetime.utcnow()
crawl_messages.set_message(1, datetime.strftime(now, '%H:%M:%S'))
if now < config.EVENT_START_TIME:
delta = config.EVENT_START_TIME - now
seconds = delta.total_seconds()
bg = graphics.BLUE if seconds > 3600 else graphics.RED
crawl_messages.set_message(2, '%s starts in %s' % (config.EVENT_NAME, delta_time_to_string(delta)))
crawl_messages.set_message_colors(2, graphics.WHITE, bg)
elif now < config.EVENT_END_TIME:
delta = config.EVENT_END_TIME - now
seconds = delta.total_seconds()
fg = graphics.YELLOW if seconds > 3600 else graphics.ORANGE
crawl_messages.set_message(2, '%s ends in %s' % (config.EVENT_NAME, delta_time_to_string(delta)))
crawl_messages.set_message_colors(2, fg, graphics.BLACK)
else:
crawl_messages.set_message(2, '%s is over.' % config.EVENT_NAME)
crawl_messages.set_message_colors(2, graphics.RED, graphics.BLACK)
python类strftime()的实例源码
def _compute_name(self):
comp_name = '/'
for hc_res_clinical_impression in self:
if hc_res_clinical_impression.subject_type == 'patient':
comp_name = hc_res_clinical_impression.subject_patient_id.name
if hc_res_clinical_impression.subject_patient_id.birth_date:
subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.subject_patient_id.birth_date, DF), "%Y-%m-%d")
comp_name = comp_name + "("+ subject_patient_birth_date + "),"
if hc_res_clinical_impression.subject_type == 'group':
comp_name = hc_res_clinical_impression.subject_group_id.name + ","
if hc_res_clinical_impression.code_id:
comp_name = comp_name + " " + hc_res_clinical_impression.code_id.name + "," or ''
if hc_res_clinical_impression.date:
patient_date = datetime.strftime(datetime.strptime(hc_res_clinical_impression.date, DTF), "%Y-%m-%d")
comp_name = comp_name + " " + patient_date
hc_res_clinical_impression.name = comp_name
def _compute_name(self):
comp_name = '/'
for hc_res_condition in self:
if hc_res_condition.subject_type == 'patient':
comp_name = hc_res_condition.subject_patient_id.name
if hc_res_condition.subject_patient_id.birth_date:
subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_condition.subject_patient_id.birth_date, DF), "%Y-%m-%d")
comp_name = comp_name + "("+ subject_patient_birth_date + ")"
if hc_res_condition.subject_type == 'group':
comp_name = hc_res_condition.subject_group_id.name
if hc_res_condition.code_id:
comp_name = comp_name + ", " + hc_res_condition.code_id.name or ''
if hc_res_condition.asserted_date:
patient_asserted_date = datetime.strftime(datetime.strptime(hc_res_condition.asserted_date, DTF), "%Y-%m-%d")
comp_name = comp_name + ", " + patient_asserted_date
hc_res_condition.name = comp_name
def _compute_name(self):
comp_name = '/'
for hc_res_encounter in self:
if hc_res_encounter.subject_type == 'patient':
comp_name = hc_res_encounter.subject_patient_id.name
if hc_res_encounter.subject_patient_id.birth_date:
subject_patient_birth_date = datetime.strftime(datetime.strptime(hc_res_encounter.subject_patient_id.birth_date, DF), "%Y-%m-%d")
comp_name = comp_name + "("+ subject_patient_birth_date + ")"
if hc_res_encounter.subject_type == 'group':
comp_name = hc_res_encounter.subject_group_id.name
# if hc_res_encounter.type_id:
# comp_name = comp_name + ", " + hc_res_encounter.type_id.name or ''
if hc_res_encounter.start_date:
subject_start_date = datetime.strftime(datetime.strptime(hc_res_encounter.start_date, DTF), "%Y-%m-%d")
comp_name = comp_name + ", " + subject_start_date
hc_res_encounter.name = comp_name
def _compute_name(self):
comp_name = '/'
for hc_coverage in self:
if hc_coverage.network_id:
comp_name = hc_coverage.network_id.name or ''
if hc_coverage.policy_holder_name:
comp_name = comp_name + ", " + hc_coverage.policy_holder_name or ''
if hc_coverage.subscriber_name:
comp_name = comp_name + ", " + hc_coverage.subscriber_name or ''
if hc_coverage.beneficiary_id:
comp_name = comp_name + ", " + hc_coverage.beneficiary_id.name or ''
if hc_coverage.type_id:
comp_name = comp_name + ", " + hc_coverage.type_id.name or ''
if hc_coverage.start_date:
comp_name = comp_name + ", " + datetime.strftime(datetime.strptime(hc_coverage.start_date, DTF), "%Y-%m-%d")
hc_coverage.name = comp_name
def checkSQL(self,symbol):
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
res = self.cursor.fetchall()[0]
if symbol in res:
for i in range(self.delta.days + 1):
if (self.startDate + timedelta(days=i)).weekday() in [6, 7]:
continue
self.cursor.execute("SELECT * FROM %s WHERE Date = %s" % (symbol, datetime.strftime(self.startDate + timedelta(days=i),"%Y-%m-%d")))
r = self.cursor.fetchall()
if r == []:
self.missing_data.append(symbol)
break
self.included_data.append(symbol)
else:
self.missing_data.append(symbol)
def setup(config=None):
root_logger = logging.getLogger()
log_path = config.get_safe('global', '--log-path', '.')
if not os.path.exists(log_path):
raise RuntimeError('configured ``--log-path`` value does not exist: %s' % log_path)
date = datetime.strftime(datetime.utcnow(), '%Y-%m-%d')
log_file = os.path.join(log_path, 'ceph-medic-%s.log' % date)
root_logger.setLevel(logging.DEBUG)
# File Logger
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(FILE_FORMAT))
root_logger.addHandler(fh)
def get_stock(symbol):
last_year_date = datetime.strftime(datetime.now() - relativedelta(years=1), "%Y-%m-%d")
date = get_last_trading_date()
url = requests.get('https://www.quandl.com/api/v3/datasets/WIKI/{}.json?start_date={}&end_date={}'.format(symbol, last_year_date, date))
json_dataset = url.json()
json_data = json_dataset['dataset']['data']
dates = []
closing = []
for day in json_data:
dates.append(datetime.strptime(day[0], "%Y-%m-%d"))
closing.append(day[4])
plt.plot_date(dates, closing, '-')
plt.title(symbol)
plt.xlabel('Date')
plt.ylable('Stock Price')
plt.savefig('foo.png')
def get_game(self):
# if the movelist is positioned part way through the game then
# we must redo all moves to get the full game
redo_count = len(gv.jcchess.get_redolist())
for i in range(0, redo_count):
gv.jcchess.redo_move()
game = chess.pgn.Game.from_board(self.chessboard)
# if we did any redo moves then undo them now to get things back
# the way they were
for i in range(0, redo_count):
gv.jcchess.undo_move()
game.headers["Event"] = "Computer Chess Game"
game.headers["Site"] = socket.gethostname()
game.headers["Date"] = datetime.strftime(datetime.now(), '%Y.%m.%d')
game.headers["Round"] = "-"
game.headers["White"] = gv.jcchess.get_player(WHITE)
game.headers["Black"] = gv.jcchess.get_player(BLACK)
return game
def monta_caminho_inutilizacao(self, ambiente=None, data=None, serie=None, numero_inicial=None, numero_final=None):
caminho = self.caminho + u'ArquivosXML/CTe/'
if ambiente == 1:
caminho = os.path.join(caminho, 'producao/')
else:
caminho = os.path.join(caminho, 'homologacao/')
if data is None:
data = datetime.now()
caminho = os.path.join(caminho, data.strftime(u'%Y-%m') + u'/')
serie = unicode(serie).strip().rjust(3, u'0')
numero_inicial = unicode(numero_inicial).strip().rjust(9, u'0')
numero_final = unicode(numero_final).strip().rjust(9, u'0')
caminho = os.path.join(caminho, serie + u'-' + numero_inicial + u'-' + numero_final + u'/')
return caminho
def _get_email_thread_attachment(ticket, email_category=None):
try:
_emails = ImplementationFactory.instance.get_singleton_of(
'MailerServiceBase'
).get_emails(ticket)
except (KeyError, MailerServiceException) as ex:
raise InternalServerError(str(ex))
emails = [email for email in _emails if email.category.lower() == email_category]
try:
content, filetype = utils.get_email_thread_content(ticket, emails)
except (utils.EmailThreadTemplateNotFound, utils.EmailThreadTemplateSyntaxError) as ex:
raise InternalServerError(str(ex))
content = base64.b64encode(content)
name = 'ticket_{}_emails_{}{}'.format(
ticket.publicId,
datetime.strftime(datetime.now(), '%d-%m-%Y_%H-%M-%S'),
mimetypes.guess_extension(filetype),
)
return {'filetype': filetype, 'content': content, 'name': name}
def __str__(self):
if self.user is None or self.date is None:
error("Stringify `Tweet` without `user` or `date`.")
raise ValueError()
dt = self.date.astimezone(timezone(timedelta(hours=9)))
ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST")
results = [self.user, ds]
for t in [self.ja, self.zh]:
if len(t) == 0:
continue
# Fix GBK encoding
t = t.replace('?', '·')
t = t.replace('?', '×')
t = t.replace('#???', '')
t = t.replace('#????????', '')
results.extend(['', t.strip()])
results.extend([str(m) for m in self.media])
return '\n'.join(results)
def __str__(self):
if self.user is None or self.date is None:
error("Stringify `Tweet` without `user` or `date`.")
raise ValueError()
dt = self.date.astimezone(timezone(timedelta(hours=9)))
ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST")
results = [self.user, ds]
for t in [self.ja, self.zh]:
if len(t) == 0:
continue
# Fix GBK encoding
t = t.replace('?', '·')
t = t.replace('?', '×')
t = t.replace('?', '')
t = t.replace('#???', '')
t = t.replace('#????????', '')
results.extend(['', t.strip()])
results.extend([str(m) for m in self.media])
return '\n'.join(results)
def enable():
## get datetime for timestamp ##
dt = datetime.now()
date = datetime.strftime(dt, '%Y-%m-%d')
fileOut = "/capture-data/" + date + ".pcap"
#set timer
t = time.time()
pcap = subprocess.Popen(['/usr/sbin/tcpdump', '-n', '-e', '-w', fileOut], stdout=subprocess.PIPE)
Ps.pcap = pcap.pid
while t != 0:
pcap
return False
## end enable function
def create_key(session, name, pubkey, check_mode):
if check_mode:
from datetime import datetime
now = datetime.utcnow()
return {
'id': 0,
'key': pubkey,
'title': name,
'url': 'http://example.com/CHECK_MODE_GITHUB_KEY',
'created_at': datetime.strftime(now, '%Y-%m-%dT%H:%M:%SZ'),
'read_only': False,
'verified': False
}
else:
return session.request(
'POST',
API_BASE + '/user/keys',
data=json.dumps({'title': name, 'key': pubkey})).json()
def _createGameFromData(self, gameData):
temp = int(gameData['Template'])
tempID, tempSettings, overrides = self._getTempSettings(temp)
teams = self._assembleTeams(gameData)
try:
wlID = self.handler.createGame(tempID, self._getGameName(gameData),
teams, settingsDict=tempSettings,
overridenBonuses=overrides,
teamless=self.teamless,
message=self._getGameMessage(gameData))
self._adjustTemplateGameCount(temp, 1)
createdStr = datetime.strftime(datetime.now(), self.TIMEFORMAT)
self._updateEntityValue(self.games, gameData['ID'],
WarlightID=wlID, Created=createdStr)
return gameData
except Exception as e:
sides = gameData['Sides']
self.parent.log("Failed to make game with %s on %d because of %s" %
(sides, temp, repr(e)), self.name, error=True)
self._deleteGame(gameData, False, False)
def test_createGame(self, parser, datetime, delete):
self.games.findEntities.return_value = [{'Template': '43',
'Sides': '1/2', 'Vetos': '8', 'ID': 'gameID'},]
self.templates.findEntities.return_value = [{'ID': 'tempID',
'WarlightID': 4904, 'SET_A#B': '490', 'SET_SETTING': 314,
'OVERRIDE_Bonus': 12, 'Usage': '8', 'Name': 'TempName'},]
self.teams.findEntities.return_value = [{'ID': '1', 'Players':
'3022124041', 'Name': 'Team Name', 'Rating': '4034', 'Rank': '1'},]
self.handler.createGame.return_value = "WLID"
datetime.strftime.return_value = "strftime"
self.league._createGame('gameID')
self.games.updateMatchingEntities.assert_called_with({'ID':
{'value': 'gameID', 'type': 'positive'}}, {'WarlightID': "WLID",
'Created': "strftime"})
self.handler.createGame.side_effect = IOError
self.league._createGame('gameID')
failStr = "Failed to make game with 1/2 on 43 because of IOError()"
self.parent.log.assert_called_with(failStr, self.league.name,
error=True)
delete.assert_called_once_with(self.games.findEntities.return_value[0],
False, False)
def test_decayRatings(self):
oldCount = self.teams.updateMatchingEntities.call_count
self._setProp(self.league.SET_RATING_DECAY, "0")
current = datetime.strftime(datetime.now(), self.league.TIMEFORMAT)
self._setProp(self.league.SET_LATEST_RUN, current)
self.league._decayRatings()
self._setProp(self.league.SET_RATING_DECAY, "10")
self.league._decayRatings()
assert_equals(self.teams.updateMatchingEntities.call_count, oldCount)
self._setProp(self.league.SET_LATEST_RUN, "")
self.teams.findEntities.return_value = [{'ID': 1, 'Rating': '33/5',
'Ongoing': '0', 'Finished': '8', 'Limit': '0', 'Confirmations': ''},
{'ID': 2, 'Rating': '34/8', 'Ongoing': '1', 'Finished': '0',
'Limit': '3', 'Confirmations': 'TRUE,FALSE,FALSE'},
{'ID': 3, 'Rating': '39/1', 'Ongoing': '2', 'Finished': '121',
'Limit': '12', 'Confirmations': 'TRUE,TRUE,TRUE'},
{'ID': 4, 'Rating': '12/0', 'Ongoing': '0', 'Finished': '0',
'Limit': '0', 'Confirmations': 'FALSE,FALSE,FALSE'}]
self.league._decayRatings()
self.teams.updateMatchingEntities.assert_called_with({'ID':
{'value': 2, 'type': 'positive'}}, {'Rating': "24/8"})
assert_equals(self.teams.updateMatchingEntities.call_count, 2)
def test_run(self, update, execute, validate, restore, create,
rescale, decay, calculate):
self._setProp(self.league.SET_WAIT_PERIOD, 500)
self._setProp(self.league.SET_LATEST_RUN,
datetime.strftime(datetime.now() - timedelta(minutes=400),
self.league.TIMEFORMAT))
self.league.run()
update.assert_not_called()
execute.assert_called_once_with()
self._setProp(self.league.SET_WAIT_PERIOD, 300)
self._setProp(self.league.SET_ACTIVE, "FALSE")
self.league.run()
create.assert_not_called()
for fn in {update, validate, restore, rescale, decay, calculate}:
fn.assert_called_once_with()
assert_equals(execute.call_count, 2)
execute.assert_called_with()
self._setProp(self.league.SET_ACTIVE, "TRUE")
self.teams.findEntities.return_value = [{'Limit': '10'},] * 10
self.templates.findEntities.return_value = xrange(5)
assert_true(self.league.active)
self.league.run()
create.assert_called_once_with()
def _get_next_report_date(self, cr, uid, ids, field_name, arg, context=None):
"""Return the next report date based on the last report date and report
period.
:return: a string in DEFAULT_SERVER_DATE_FORMAT representing the date"""
res = {}
for challenge in self.browse(cr, uid, ids, context=context):
last = datetime.strptime(challenge.last_report_date, DF).date()
if challenge.report_message_frequency == 'daily':
next = last + timedelta(days=1)
res[challenge.id] = next.strftime(DF)
elif challenge.report_message_frequency == 'weekly':
next = last + timedelta(days=7)
res[challenge.id] = next.strftime(DF)
elif challenge.report_message_frequency == 'monthly':
month_range = calendar.monthrange(last.year, last.month)
next = last.replace(day=month_range[1]) + timedelta(days=1)
res[challenge.id] = next.strftime(DF)
elif challenge.report_message_frequency == 'yearly':
res[challenge.id] = last.replace(year=last.year + 1).strftime(DF)
# frequency == 'once', reported when closed only
else:
res[challenge.id] = False
return res
def default(self, o):
# to support arbitrary iterators
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# support datetime
if isinstance(o, datetime):
# return o.isoformat() if o else ''
return datetime.strftime(o, '%Y-%m-%d %H:%M:%S') if o else ''
elif hasattr(o, '__dict__'):
return o.__dict__
return str(o)
# return JSONEncoder.default(self, o)
def _filter_bs_D(self):
dateline = sys.argv[2]
d_unix = time.mktime(time.strptime(dateline, '%Y%m%d'))
unix_time = d_unix - 86000
yestoday = datetime.fromtimestamp(unix_time).strftime('%Y%m%d')
sql = "select * from s_stock_fenbi_daily where kp_xc > 15000000 and dateline in (%s, %s)" % (yestoday, dateline)
data = self.mysql.getRecord(sql)
tmp_list = {}
for i in range(0, len(data)):
if data[i]['s_code'] not in tmp_list.keys():
tmp_list[data[i]['s_code']] = []
tmp_list[data[i]['s_code']].append(data[i])
c = []
for k, v in tmp_list.items():
if len(v) == 2:
c.append(k)
return c
def __str__(self):
for url in self.media:
filename = os.path.basename(url)
dir = os.path.join(config['cq_root_dir'], config['cq_image_dir'], 'twitter')
path = os.path.join(dir, filename)
# ??twitter??????????
if not os.path.exists(dir):
os.mkdir(dir)
# ??
if not os.path.exists(path):
resp = requests.get(url, timeout=60, proxies=config.get('proxies'))
with open(path, 'wb') as f:
f.write(resp.content)
dt = self.date.astimezone(timezone(timedelta(hours=9)))
ds = datetime.strftime(dt, "%Y-%m-%d %H:%M:%S JST")
results = [ds, ]
text = self.text
text = text.replace('?', '·').replace('?', '×').replace('#????????', '').replace('?', '')
results.extend(['', text.strip()])
results.extend([str(CQImage(os.path.join('twitter', os.path.basename(m)))) for m in self.media])
return '\n'.join(results)
def test_fill_with_duplicate_status(self):
sample_user=twitter.User(
id=718443,
name='Bob Loblaw',
screen_name='bobby',
location='Vancouver, Canada',
verified=True
)
sample_status=twitter.Status(
created_at=datetime.strftime(timezone.make_aware(datetime.now()), '%a %b %d %H:%M:%S +0000 %Y'),
id=1234567,
text='Hello world, again!',
user=sample_user,
retweet_count=1,
favorite_count=5,
)
movie = Movie.objects.get(imdbID="123456")
duplicate_tweet = Tweet().fillWithStatusObject(sample_status, movie)
self.assertEqual(duplicate_tweet.text, 'Hello world')
def add_user(username: str, password: str, facebook_id: int):
"""
Inserts a user to the database
:param username: users username
:param password: users password
:param facebook_id: users unique facebook id
:return: nothing
"""
data = []
data.append(username)
data.append(password)
data.append(facebook_id)
data.append(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'))
# Adds the data to the table
conn = database.DatabaseConnector.connection
cur = conn.cursor()
try:
cur.execute("INSERT INTO `user`(`username`,`password`,`facebook_id`,`registry_date`) VALUES (?,?,?,?)", data)
except:
cur.execute("UPDATE `user` SET password = ?, facebook_id = ? where username = \"" + username + "\"", data[1:3])
conn.commit()
def assemble_scene_id_list(ref_time, prow, sat, end_date, delta=16):
scene_id_list = []
padded_pr, date_part, location, archive = find_valid_scene(ref_time, prow, sat)
while ref_time < end_date:
scene_str = '{}{}{}{}{}'.format(sat, padded_pr, date_part, location, archive)
print('add scene: {}, for {}'.format(scene_str,
datetime.strftime(ref_time, '%Y-%m-%d')))
scene_id_list.append(scene_str)
ref_time += timedelta(days=delta)
date_part = datetime.strftime(ref_time, '%Y%j')
return scene_id_list
def save_result(self, alpha, epochs, filename, save_path='./'):
#self.result.to_csv(
# os.path.join(save_path,
# filename + datetime.strftime(datetime.now(),
# '%Y%m%d-%H%M.csv'))
#)
ax = self.result.plot(title="Average adherence rate of patients",
legend=True,
yticks=[0.5, 0.6, 0.7, 0.8, 0.9])
ax.set(xlabel = "alpha = {}, epochs = {}".format(alpha, epochs))
fig = ax.get_figure()
fig.savefig(
os.path.join(save_path,
filename + datetime.strftime(datetime.now(),
'plot_%Y%m%d-%H%M.png'))
)
def initLastTime(self, path):
if not os.path.exists(path):
cur_time = dself._now.strftime("%y%m%d %H:%M:%S")
self._last_time = datetime.strptime(cur_time, "%y%m%d %H:%M:%S").isoformat()
return False
last_re = re.compile("last_time : (.*)\\n?")
ifs = open(path, "r")
lines = ifs.readlines()
for l in reversed(lines):
if not l: continue
m = last_re.match(l)
if m is not None:
self._last_time = (m.groups(0))[0]
ifs.close()
return True
ifs.close()
cur_time = self._now.strftime("%y%m%d %H:%M:%S")
self._last_time = datetime.strptime(cur_time, "%y%m%d %H:%M:%S").isoformat()
return False
def create_or_update_user_variable(self, name, value, var_type=2):
try:
if int(var_type) == 3:
if isinstance(value, datetime):
var_value = datetime.strftime(value, '%Y-%m-%d')
elif int(var_type) == 4:
if isinstance(value, time):
var_value = time.strftime(value, '%H:%M')
finally:
var_value = str(value)
params = uservariables['save'].copy()
params['vname'] = name
params['vvalue'] = var_value
params['vtype'] = str(var_type)
req = requests.get(self.base_url, params)
status = req.json()
if status == 'Variable name already exists!':
params['param'] = uservariables['update']['param']
req = requests.get(self.base_url, params)
status = req.json()
return status
def current_time():
ts = time.time()
return datetime.fromtimestamp(ts).strftime(TIME_FORMAT)
# converts a string to a timestamp