def snapshot_resource(self, resource, description, tags):
aws_tagset = []
for k in tags:
aws_tagset.append({"Key": k, "Value": tags[k]})
date = datetime.today().strftime('%d-%m-%Y-%H-%M-%S')
snapshot_id = self.period + '-' + self.resolve_backupable_id(resource) + "-" + date + "-" + self.date_suffix
if 'DBClusterIdentifier' in resource:
current_snap = self.conn.create_db_cluster_snapshot(
DBClusterIdentifier=self.resolve_backupable_id(resource),
DBClusterSnapshotIdentifier=snapshot_id,
Tags=aws_tagset)
else:
current_snap = self.conn.create_db_snapshot(DBInstanceIdentifier=self.resolve_backupable_id(resource),
DBSnapshotIdentifier=snapshot_id,
Tags=aws_tagset)
python类today()的实例源码
def create(self, vals):
person_obj = self.env['hc.res.person']
person_ids = self.search([('person_id','=',vals.get('person_id')),('end_date', '=', False)])
if vals and vals.get('is_preferred'):
for person in person_ids:
person.is_preferred = False
if not vals.get('start_date'):
person.end_date = datetime.today()
vals.update({'start_date': datetime.today()})
else:
person.end_date = vals.get('start_date')
else:
vals.update({'start_date': datetime.today()})
return super(PersonName, self).create(vals)
# For an existing person record,
# If new name is preferred, set old name not preferred and set its end date to the start date of the new preferred name.
# If new name is not preferred, don't change old name record.
def get_event_kicker(event):
"""
Return the kicker for this event based on several factors.
{% get_event_kicker event as event_kicker %}
"""
event_type = event.event_type
if event_type == 'event':
if event.category is not None:
return event.category.name
return 'Event'
if event_type == 'traveling exhibition':
return 'Traveling Exhibition'
if event_type == 'current exhibition':
if event.end_date() < datetime.today():
period = 'Past'
elif event.start_date() > datetime.today():
period = 'Upcoming'
else:
period = 'Current'
return '%s Exhibition' % period
return event.base_type().title()
def on_status(self, status):
print("{0}: {1}".format(status.text, status.author.screen_name))
screen_name = status.author.screen_name
# ignore my tweets
if screen_name == self.api.me().screen_name:
print("Ignored my tweet")
return True
elif status.text.startswith("@{0}".format(self.api.me().screen_name)):
# Save mentions
print("Saved mention")
insert_tweet(status.id, status)
return True
else:
if self.next_tweet_time < datetime.today():
print("Saving normal tweet as seed")
self.next_tweet_time = self.get_next_tweet_time()
insert_tweet(status.id, status, bot_flag=SHOULD_TWEET)
print("Ignored this tweet")
return True
def get_national_geographic_data():
# Filename with data: .gallery.<currentYear>-<currentMonth>.json
today = datetime.today()
year = str(today.year)
if today.month < 10:
month = '0' + str(today.month)
else:
month = str(today.month)
url = URL00 + year + '-' + month + '.json'
r = requests.get(url)
if r.status_code == 200:
data = r.json()
if 'items' in data:
current_photo = data['items'][0]
# TODO: include preferred image size in configuration
url = current_photo['url'] + current_photo['sizes']['1600']
return dict(url=url,
title=current_photo['title'],
caption=current_photo['caption'],
credit=current_photo['credit'])
return None
def testSeriesWithoutTimeField(self, current_timestamp):
"""
Tests that time is optional on a series without a time field.
"""
current_date = datetime.today()
yesterday = current_date - timedelta(days=1)
current_timestamp.return_value = yesterday
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=159, time=current_date
)
TestSeriesHelper.MySeriesHelper(
server_name='us.east-1', other_tag='ello',
some_stat=158,
)
point1, point2 = TestSeriesHelper.MySeriesHelper._json_body_()
self.assertTrue('time' in point1 and 'time' in point2)
self.assertEqual(point1['time'], current_date)
self.assertEqual(point2['time'], yesterday)
def nth_day_of_week(self, subtree):
if subtree.label().startswith('first'):
n = 0
if subtree.label().startswith('second'):
n = 1
if subtree.label().startswith('third'):
n = 2
if subtree.label().startswith('fourth'):
n = 3
if subtree.label().startswith('fifth'):
n = 4
if subtree.label().startswith('sixth'):
n = 5
if subtree.label().startswith('seventh'):
n = 6
d = dt.today()
self.dictionary['timestamp'] = self.next_weekday(d, n)
def queue_import_grade_results(year, csv, isgzip, encoding, email=None):
"""
Queues the import of the grade results for a specific year (Brio/Hyperion export).
:param year: the year to import the results for (eg 2015)
:type year: int
:param csv: the CSV file to import, can be gzip compressed
:type csv: str
:param isgzip: true if GZIP compressed
:type isgzip: bool
:param encoding: the file encoding (eg utf-8)
:type encoding: str
:param email: the (optional) email address to send a notification to
:type email: str
"""
update_tablestatus(GradeResults._meta.db_table, "Importing...")
msg = import_grade_results(year, csv, isgzip, encoding, email=email)
# query date from import is used when current year
if (msg is not None) or (datetime.today().year != year):
update_tablestatus(GradeResults._meta.db_table, msg=msg)
def get_voting_members():
if datetime.today() < datetime(start_of_year().year, 12, 31):
semester = 'Fall'
else:
semester = 'Spring'
active_members = set(member.uid for member in ldap_get_active_members())
intro_members = set(member.uid for member in ldap_get_intro_members())
on_coop = set(member.uid for member in CurrentCoops.query.filter(
CurrentCoops.date_created > start_of_year(),
CurrentCoops.semester == semester).all())
voting_list = list(active_members - intro_members - on_coop)
passed_fall = FreshmanEvalData.query.filter(
FreshmanEvalData.freshman_eval_result == "Passed"
).distinct()
for intro_member in passed_fall:
if intro_member.uid not in voting_list:
voting_list.append(intro_member.uid)
return voting_list
def tick():
t = datetime.today()
sekunde = t.second + t.microsecond*0.000001
minute = t.minute + sekunde/60.0
stunde = t.hour + minute/60.0
tracer(False)
writer.clear()
writer.home()
writer.forward(65)
writer.write(wochentag(t),
align="center", font=("Courier", 14, "bold"))
writer.back(150)
writer.write(datum(t),
align="center", font=("Courier", 14, "bold"))
writer.forward(85)
tracer(True)
second_hand.setheading(6*sekunde)
minute_hand.setheading(6*minute)
hour_hand.setheading(30*stunde)
tracer(True)
ontimer(tick, 100)
def __init__(self, base_log_folder, filename_template):
"""
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string
"""
super(FileProcessorHandler, self).__init__()
self.handler = None
self.base_log_folder = base_log_folder
self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
self.filename_template = filename_template
self.filename_jinja_template = None
if "{{" in self.filename_template: #jinja mode
self.filename_jinja_template = Template(self.filename_template)
self._cur_date = datetime.today()
if not os.path.exists(self._get_log_directory()):
os.makedirs(self._get_log_directory())
self._symlink_latest_log_directory()
def get_all_paginate(page, paginate_by, period, **kwargs):
serial_number = kwargs["serial_number"]
battery_record = BatteryRecord.select()
if period > 0:
period -= 1
today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
before = today - timedelta(days=period)
battery_record = battery_record.where(
BatteryRecord.rent_date >= before
)
if serial_number:
battery_record = battery_record.where(
BatteryRecord.battery.regexp(kwargs["serial_number"])
)
total = battery_record.count()
battery_record = battery_record.order_by(BatteryRecord.id.desc())
battery_record = battery_record.paginate(page=page, paginate_by=paginate_by)
return battery_record, total
def on_status(self, status):
print("{0}: {1}".format(status.text, status.author.screen_name))
screen_name = status.author.screen_name
# ignore my tweets
if screen_name == self.api.me().screen_name:
print("Ignored my tweet")
return True
elif status.text.startswith("@{0}".format(self.api.me().screen_name)):
# Save mentions
print("Saved mention")
insert_tweet(status.id, status)
return True
else:
if self.next_tweet_time < datetime.today():
print("Saving normal tweet as seed")
self.next_tweet_time = self.get_next_tweet_time()
insert_tweet(status.id, status, bot_flag=SHOULD_TWEET)
print("Ignored this tweet")
return True
def close(self, cr, uid, ids, reason_id, context=None):
if any(post.parent_id for post in self.browse(cr, uid, ids, context=context)):
return False
reason_offensive = self.pool['ir.model.data'].xmlid_to_res_id(cr, uid, 'website_forum.reason_7')
reason_spam = self.pool['ir.model.data'].xmlid_to_res_id(cr, uid, 'website_forum.reason_8')
if reason_id in (reason_offensive, reason_spam):
for post in self.browse(cr, uid, ids, context=context):
_logger.info('Downvoting user <%s> for posting spam/offensive contents',
post.create_uid)
# TODO: in master, consider making this a tunable karma parameter
self.pool['res.users'].add_karma(cr, SUPERUSER_ID, [post.create_uid.id],
post.forum_id.karma_gen_question_downvote * 5,
context=context)
self.pool['forum.post'].write(cr, uid, ids, {
'state': 'close',
'closed_uid': uid,
'closed_date': datetime.today().strftime(tools.DEFAULT_SERVER_DATETIME_FORMAT),
'closed_reason_id': reason_id,
}, context=context)
def mac802154_BM(self,raw,size):
global separator
header = struct.unpack_from('H',raw,0)[0]
if header == 0xa821:
seq = struct.unpack_from('B',raw,2)[0]
rxPanid = struct.unpack_from('H',raw,3)[0]
rxAddr = struct.unpack_from('H',raw,5)[0]
txAddr = struct.unpack_from('H',raw,7)[0]
rssi = struct.unpack_from('B',raw,size-1)[0]
msg = ""
print(datetime.today(),"0x%04x"%header,"0x%02x"%seq,"0x%04x"%rxPanid,"0x%04x"%rxAddr,"0x%04x"%txAddr,"%03d "%rssi,sep=separator,end=separator)
for i in range(9,size-1):
print(str("%02x"%struct.unpack_from('B',raw,i)[0]),sep="",end=separator)
print("")
return
elif header == 0xa802:
seq = struct.unpack_from('B',raw,2)[0]
rxPanid = struct.unpack_from('H',raw,3)[0]
rxAddr = struct.unpack_from('H',raw,5)[0]
txAddr = struct.unpack_from('H',raw,7)[0]
rssi = struct.unpack_from('B',raw,size-1)[0]
print(datetime.today(),"0x%04x"%header,"0x%02x"%seq,"0x%04x"%rxPanid,"0x%04x"%rxAddr,"0x%04x"%txAddr,"%03d "%rssi,"(ACK)",sep=separator)
return
else:
self.mac802154_unsupported_format(raw,size)
def action_asignado(self):
if not self.asignacion:
raise osv.except_osv(('Error'),('Debes llenar el campo: asignado a'))
self.fecha_asignado_a=datetime.today()
diferencia=self.calcular_dias(self.fecha_recibido, self.fecha_asignado_a)
self.dia_asignado_a=diferencia.days
self.state='asignado'
self.enviar_mensaje_status()
self.message_subscribe_users(user_ids=[self.asignacion.id])
# PARA ENVIAR E-MAIL
cuerpo_mensaje = """Se le ha asignado una Ticket en Help Desk:<br>
Codigo: %s,<br>
Asunto: %s,<br>
Descripcion: %s,<br> """ % (self.codigo, self.denominacion, self.descripcion)
const_mail = {'email_from' : self.solicitante_id.email,
'email_to' : self.asignacion.login,
#'partner_ids' : [(0,0,{'res_partner_id':self.asignacion.partner_id, 'mail_message_id': ids_mail})],
'subject' : "Re: %s" % self.codigo,
'body_html' : cuerpo_mensaje}
ids_mail = self.env['mail.mail'].create(const_mail).send()
return True
# FIN DE EMAIL
def flag_forgotten_entries(session, today=None):
"""Flag any entries from previous days where users forgot to sign
out.
:param session: SQLAlchemy session through which to access the database.
:param today: (optional) The current date as a `datetime.date` object. Used for testing.
""" # noqa
today = date.today() if today is None else today
forgotten = (
session
.query(Entry)
.filter(Entry.time_out.is_(None))
.filter(Entry.forgot_sign_out.is_(False))
.filter(Entry.date < today)
)
for entry in forgotten:
e = sign_out(entry, forgot=True)
logger.debug('Signing out forgotten entry: {}'.format(e))
session.add(e)
session.commit()
def sign_out(entry, time_out=None, forgot=False):
"""Sign out of an existing entry in the timesheet. If the user
forgot to sign out, flag the entry.
:param entry: `models.Entry` object. The entry to sign out.
:param time_out: (optional) `datetime.time` object. Specify the sign out time.
:param forgot: (optional) If true, user forgot to sign out. Entry will be flagged as forgotten.
:return: The signed out entry.
""" # noqa
if time_out is None:
time_out = datetime.today().time()
if forgot:
entry.forgot_sign_out = True
logger.info(
'{} forgot to sign out on {}.'.format(entry.user_id, entry.date)
)
else:
entry.time_out = time_out
logger.info('{} ({}) signed out.'.format(entry.user_id, entry.user_type))
return entry
def get_domain(self):
date = self.configuration.get("date", datetime.today())
seed = self.configuration.get("seed")
minSeed = self.configuration.get("min_seed")
maxSeed = self.configuration.get("max_seed")
if minSeed and maxSeed:
minSeed = int(minSeed)
maxSeed = int(maxSeed)
elif seed:
minSeed = maxSeed = int(seed)
else:
log.error("Seed is missing from config")
sys.exit()
if isinstance(date, str):
date = datetime.strptime(date, "%Y-%m-%d")
seed = minSeed
while seed <= maxSeed:
for pos in range(12):
domain = self._lockyDGA(pos, seed, date)
if domain:
yield domain
seed += 1
def get_domain(self):
date = self.configuration.get("date", datetime.today())
seed = self.configuration.get("seed")
minSeed = self.configuration.get("min_seed")
maxSeed = self.configuration.get("max_seed")
if minSeed and maxSeed:
minSeed = int(minSeed)
maxSeed = int(maxSeed)
elif seed:
minSeed = maxSeed = int(seed)
else:
log.error("Seed is missing from config")
sys.exit()
if isinstance(date, str):
date = datetime.strptime(date, "%Y-%m-%d")
seed = minSeed
while seed <= maxSeed:
for pos in range(8):
yield self._lockyDGA(pos, seed, date)
seed += 1
def tick():
t = datetime.today()
sekunde = t.second + t.microsecond*0.000001
minute = t.minute + sekunde/60.0
stunde = t.hour + minute/60.0
try:
tracer(False) # Terminator can occur here
writer.clear()
writer.home()
writer.forward(65)
writer.write(wochentag(t),
align="center", font=("Courier", 14, "bold"))
writer.back(150)
writer.write(datum(t),
align="center", font=("Courier", 14, "bold"))
writer.forward(85)
tracer(True)
second_hand.setheading(6*sekunde) # or here
minute_hand.setheading(6*minute)
hour_hand.setheading(30*stunde)
tracer(True)
ontimer(tick, 100)
except Terminator:
pass # turtledemo user pressed STOP
def main():
print('downloading pictures:')
for c, account in enumerate(top100,1):
print(c,'Pictures from today on '+account+'\'s Instagram')
picDownloader(account)
print('finding dominant colour')
milpy.directory_image_average(pictureFolder, '.jpg')
if len(sys.argv[1]) > 2:
api = make_twitter_api('InstaTopEmoji') #updates InstaTopEmoji twitterbot
for i, j in emojiCounter():
api.update_status('The most popular emoji on Celebrity Instagram yesterday was: '+ i +' which was used ' +str(j)+' times')
api = make_twitter_api('instaverage') #updates instaverage twitterbot
tweetpic = os.path.join(pictureFolder,'1_average_colors.jpg')
api.update_with_media(tweetpic, status='the dominant colours on Celebrity Instagram yesterday were...')
else:
print('no twitter password entered, terminating without tweeting')
def __init__(self, tid):
self.tid = tid
self.now = datetime.now()
self.today = datetime.today()
self.ds = self.get_client()
def next_available_date(self):
"""Return next available checkout date."""
try:
next_lendable_due = self.lendables.earliest('checked_out_on')
except ObjectDoesNotExist:
return datetime.today()
else:
return next_lendable_due.due_on
def generate_wildcard_pem_bytes():
"""
Generate a wildcard (subject name '*') self-signed certificate valid for
10 years.
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
:return: Bytes representation of the PEM certificate data
"""
key = generate_private_key(u'rsa')
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
cert = (
x509.CertificateBuilder()
.issuer_name(name)
.subject_name(name)
.not_valid_before(datetime.today() - timedelta(days=1))
.not_valid_after(datetime.now() + timedelta(days=3650))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return b''.join((
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
cert.public_bytes(serialization.Encoding.PEM)
))
def work_on(task_id=0, start_time_str=None):
'''Start given task id'''
tasks = get_tasks(condition=lambda x: x.tid == task_id)
tasks = group_task_by(tasks, group='name')
if not tasks:
LOGGER.error("could not find task ID '%s'", task_id)
else:
task = tasks[0]
start_time = None
if start_time_str:
date_str = datetime.strftime(datetime.today(),
'%Y-%m-%d')
start_time = date_str + ' ' + start_time_str
Task(task.name, start_str=start_time).start()
def oncommand(self, command, ctx):
if self.debug:
# We're Debugging
timeStamp = datetime.today().strftime("%Y-%m-%d %H.%M")
msg = '{}{}:\n"{}"\nRun at {}\nBy {}\nOn {}'.format(ctx.prefix, command, ctx.message.content, timeStamp, ctx.message.author.name, ctx.message.server.name)
if os.path.exists('debug.txt'):
# Exists - let's append
msg = "\n\n" + msg
msg = msg.encode("utf-8")
with open("debug.txt", "ab") as myfile:
myfile.write(msg)
else:
msg = msg.encode("utf-8")
with open("debug.txt", "wb") as myfile:
myfile.write(msg)
def oncommandcompletion(self, command, ctx):
if self.debug:
# We're Debugging
timeStamp = datetime.today().strftime("%Y-%m-%d %H.%M")
msg = '{}{}:\n"{}"\nCompleted at {}\nBy {}\nOn {}'.format(ctx.prefix, command, ctx.message.content, timeStamp, ctx.message.author.name, ctx.message.server.name)
if os.path.exists('debug.txt'):
# Exists - let's append
msg = "\n\n" + msg
msg = msg.encode("utf-8")
with open("debug.txt", "ab") as myfile:
myfile.write(msg)
else:
msg = msg.encode("utf-8")
with open("debug.txt", "wb") as myfile:
myfile.write(msg)
def heartbeat(self, ctx):
"""Write to the console and attempt to send a message (owner only)."""
author = ctx.message.author
server = ctx.message.server
channel = ctx.message.channel
try:
owner = self.settings.serverDict['Owner']
except KeyError:
owner = None
if owner == None:
# No previous owner, let's set them
msg = 'I cannot adjust debugging until I have an owner.'
await self.bot.send_message(channel, msg)
return
if not author.id == owner:
# Not the owner
msg = 'You are not the *true* owner of me. Only the rightful owner can change this setting.'
await self.bot.send_message(channel, msg)
return
timeStamp = datetime.today().strftime("%Y-%m-%d %H.%M")
print('Heartbeat tested at {}.'.format(timeStamp))
# Message send
message = await self.bot.send_message(ctx.message.channel, 'Heartbeat tested at {}.'.format(timeStamp))
if message:
print('Message:\n{}'.format(message))
else:
print('No message returned.')
def backup(self):
# Wait initial time - then start loop
await asyncio.sleep(self.backupWait)
while not self.bot.is_closed:
# Initial backup - then wait
if not os.path.exists(self.backupDir):
# Create it
os.makedirs(self.backupDir)
# Flush backup
timeStamp = datetime.today().strftime("%Y-%m-%d %H.%M")
self.flushSettings("./{}/Backup-{}.json".format(self.backupDir, timeStamp))
# Get curr dir and change curr dir
retval = os.getcwd()
os.chdir(self.backupDir)
# Get reverse sorted backups
backups = sorted(os.listdir(os.getcwd()), key=os.path.getmtime)
numberToRemove = None
if len(backups) > self.backupMax:
# We have more than 100 backups right now, let's prune
numberToRemove = len(backups)-self.backupMax
for i in range(0, numberToRemove):
os.remove(backups[i])
# Restore curr dir
os.chdir(retval)
if numberToRemove:
print("Settings Backed Up ({} removed): {}".format(numberToRemove, timeStamp))
else:
print("Settings Backed Up: {}".format(timeStamp))
await asyncio.sleep(self.backupTime)