def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
datetime.now()):
power_data = SQL.session.query(PowerModel, \
label('low', func.min(PowerModel.low)),
label('high', func.max(PowerModel.high)),
label('total', func.sum(PowerModel.average)),
label('date', PowerModel.date)).\
join(iCPEModel).\
filter(iCPEModel.mac_address == icpe).\
filter(PowerModel.date > from_date).\
filter(PowerModel.date < to_date).\
group_by(PowerModel.date).all()
if not power_data:
return False
ret_json = {'icpe' : icpe}
ret_json['power'] = []
for data in power_data:
ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
'total' : data.total})
return ret_json
python类max()的实例源码
def latest(icpe):
heat_data = SQL.session.query(HeatModel, \
label('low', func.min(HeatModel.low)),
label('high', func.max(HeatModel.high)),
label('total', func.sum(HeatModel.average)),
label('date', HeatModel.date)).\
join(iCPEModel).\
filter(iCPEModel.mac_address == icpe).\
order_by(HeatModel.date.desc()).\
group_by(HeatModel.date).first()
if not heat_data:
return False
return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\
'high' : heat_data.high, 'total' : power_data.total}
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
datetime.now()):
heat_data = SQL.session.query(HeatModel, \
label('low', func.min(HeatModel.low)),
label('high', func.max(HeatModel.high)),
label('total', func.sum(HeatModel.average)),
label('date', HeatModel.date)).\
join(iCPEModel).\
filter(iCPEModel.mac_address == icpe).\
filter(HeatModel.date > from_date).\
filter(HeatModel.date < to_date).\
group_by(HeatModel.date).all()
if not heat_data:
return False
ret_json = {'icpe' : icpe}
ret_json['heat'] = []
for data in heat_data:
ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
'total' : data.total})
return ret_json
def latest(icpe):
power_data = SQL.session.query(PowerModel, \
label('low', func.min(PowerModel.low)),
label('high', func.max(PowerModel.high)),
label('total', func.sum(PowerModel.average)),
label('date', PowerModel.date)).\
join(iCPEModel).\
filter(iCPEModel.mac_address == icpe).\
order_by(PowerModel.date.desc()).\
group_by(PowerModel.date).first()
if not power_data:
return False
return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\
'high' : power_data.high, 'total' : power_data.total}
def count_water_stock(message):
"""????????????????
:param message: slackbot?????????????class
"""
s = Session()
stock_number, latest_ctime = (
s.query(func.sum(WaterHistory.delta),
func.max(case(whens=((
WaterHistory.delta != 0,
WaterHistory.ctime),), else_=None))).first()
)
if stock_number:
# SQLite????????????????
if not isinstance(latest_ctime, datetime.datetime):
latest_ctime = datetime.datetime.strptime(latest_ctime,
'%Y-%m-%d %H:%M:%S')
message.send('??: {}? ({:%Y?%m?%d?} ??)'
.format(stock_number, latest_ctime))
else:
message.send('??????????')
def get_latest_soil_humidity():
"""Gets latest info about soil humdity and predict waterings.
"""
pub = []
records = __to_pub_list(
HygroRecord.query.group_by(HygroRecord.sensor_uuid)
.having(func.max(HygroRecord.timestamp)).all())
for r in records:
last_watering_timestamp = analytics\
._get_last_watering_timestamp(r['sensor_uuid'])
if last_watering_timestamp:
polyn = analytics._get_polynomial(
r['sensor_uuid'], last_watering_timestamp)
next_watering_timestamp = analytics\
._predict_next_watering(polyn, last_watering_timestamp)
r['last_watering_timestamp'] = last_watering_timestamp
r['next_watering_timestamp'] = next_watering_timestamp
pub.append(r)
return pub
def highest_block(cls, session):
"""
Return integer result of MAX(block_num) db query.
This does not have the same meaning as last irreversible block, ie, it
makes no claim that the all blocks lower than the MAX(block_num) exist
in the database.
Args:
session (sqlalchemy.orm.session.Session):
Returns:
int:
"""
highest = session.query(func.max(cls.block_num)).scalar()
if not highest:
return 0
else:
return highest
def estimate_remaining_time(session, spawn_id, seen):
first, last = get_first_last(session, spawn_id)
if not first:
return 90, 1800
if seen > last:
last = seen
elif seen < first:
first = seen
if last - first > 1710:
estimates = [
time_until_time(x, seen)
for x in (first + 90, last + 90, first + 1800, last + 1800)]
return min(estimates), max(estimates)
soonest = last + 90
latest = first + 1800
return time_until_time(soonest, seen), time_until_time(latest, seen)
def get_latest_runs(cls, session):
"""Returns the latest DagRun for each DAG. """
subquery = (
session
.query(
cls.dag_id,
func.max(cls.execution_date).label('execution_date'))
.group_by(cls.dag_id)
.subquery()
)
dagruns = (
session
.query(cls)
.join(subquery,
and_(cls.dag_id == subquery.c.dag_id,
cls.execution_date == subquery.c.execution_date))
.all()
)
return dagruns
def test_00_verify_db(self, testdb):
b = testdb.query(Project).get(1)
assert b is not None
assert b.name == 'P1'
assert b.notes == 'ProjectOne'
assert b.is_active is True
b = testdb.query(Project).get(2)
assert b is not None
assert b.name == 'P2'
assert b.notes == 'ProjectTwo'
assert b.is_active is True
b = testdb.query(Project).get(3)
assert b is not None
assert b.name == 'P3Inactive'
assert b.notes == 'ProjectThreeInactive'
assert b.is_active is False
assert testdb.query(Project).with_entities(
func.max(Project.id)
).scalar() == 3
assert testdb.query(BoMItem).with_entities(
func.max(BoMItem.id)
).scalar() == 5
def max_t(self):
"""
The population number of the last populations.
This is equivalent to ``n_populations - 1``.
"""
max_t = (self._session.query(func.max(Population.t))
.join(ABCSMC).filter(ABCSMC.id == self.id).one()[0])
return max_t
def _cleanup(self):
if not os.path.exists(self.base_repos_path):
logger.info("Base repo path {} does not exist yet, nothing to do.".format(self.base_repos_path))
return
now = datetime.utcnow()
with session_scope() as session:
deletion_candidates = set(os.listdir(self.base_repos_path))
subquery = session.query(m.Environment.id, func.max(m.DeploymentView.queued_date).label("max_queued_date")).\
filter(m.Environment.id == m.DeploymentView.environment_id).\
group_by(m.Environment.id).subquery()
recently_deployed_envs = session.query(m.Environment).\
join((subquery, subquery.c.id == m.Environment.id)).\
filter(subquery.c.max_queued_date > now - self.max_unused_age).\
all()
to_keep = set(e.local_repo_directory_name for e in recently_deployed_envs)
for path in deletion_candidates - to_keep:
path = os.path.join(self.base_repos_path, path)
if os.path.exists(path):
with lock_repository_fetch(path), lock_repository_write(path):
rmtree(path)
logger.info(
"Deleted unused directory {}".format(path)
)
def _latest_version(cls, session, row, use_dirty=True):
"""
:param session: a session instance to execute a select on the log table
:param row: an instance of the user table row object
:return: the maximum version ID recorded for the specified row or None if version_id
has not been inserted
:rtype: int
"""
and_clause = \
utils.generate_and_clause(cls, row, cls._version_col_names, use_dirty=use_dirty)
result = session.execute(
sa.select([func.max(cls.va_version)]).
where(and_clause)
).first()
return None if result is None else result[0]
def post_add_user_sql(name, password, email):
"""Gets current max uid + 1, and then creates new User."""
# check if username already exists -- raise exception if it does
q = DBS.query(User).filter_by(name=name)
r = q.first()
if r:
raise ValueError('Username already exists')
else:
r = DBS.query(func.max(User.uid)).first()[0]
uid = 1 if not r else r + 1 # set to 1 for the first user
r = DBS.query(func.max(Group.gid)).first()[0]
gid = 1 if not r else r + 1
DBS.add(User(gid, uid, name, hash_password(password), email))
DBS.add(Group(gid, "No Name"))
transaction.commit()
return uid, gid
def prob_view_get(hid, pid):
result, ok, problem, homework = checkValid(hid, pid)
if not ok:
return result
form = SubmitForm()
plist = ['rank', 'user name', problem.name, 'Entries', 'last submit time']
result = db.session.query(ProbUserStatic.user_id, func.sum(ProbUserStatic.real_score), func.max(ProbUserStatic.score), func.sum(ProbUserStatic.submit_times), func.max(ProbUserStatic.last_time)). \
filter(ProbUserStatic.prob_id == problem.id, ProbUserStatic.score > 0) \
.group_by(ProbUserStatic.user_id).order_by(func.sum(ProbUserStatic.real_score)).all()
prank = []
result = reversed(result)
for i, res in enumerate(result):
name = User.query.filter_by(id=res[0]).first().name
prank.append([i + 1, name, res[2], res[3], res[4]])
if hid == -1:
return render_template('prob_view.html', problem=problem, form=form, hid=-1, data=problem.data, active='problem', attach = False, plist = plist, prank = prank)
attach = os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], problem.data.attach))
return render_template('prob_view.html', problem=problem, form=form, hid=hid, data=problem.data, active='homework', attach =attach, plist = plist, prank = prank)
def download_latest():
"""
Download and save the latest submission files possibly overwriting existing, presumably older files
:return:
"""
q1 = select([table.c.file_name, func.max(table.c.sent_time_epoch).label('sent_time_max_epoch')]).group_by('file_name')
latest_files = engine.execute(q1).fetchall()
permitted = list(permitted_file_names())
for f in latest_files:
selection = [table.c.sender_name, table.c.file_name, table.c.url]
q2 = select(selection).where(
and_(table.c.file_name == f['file_name'], table.c.sent_time_epoch == f['sent_time_max_epoch']))
r = engine.execute(q2).fetchone()
file_name = r['file_name'].lower()
sender_name = r['sender_name']
if file_name not in permitted:
print 'Notify %(sender_name)s that "%(file_name)s" is not a valid file name' % locals()
# download either way just in case
dest = os.path.join(settings.csv_dir, file_name)
content = file_transfer.download(r['url'])
if 'MD5 token has expired' not in content:
with open(dest, 'wb') as out:
out.write(content)
def dashboard(period=None):
period, days = PERIODS.get(period, PERIODS['1w'])
col_year = func.extract('year', Scrobble.played_at)
year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
year_from, year_to = int(year_from), int(year_to)
return render_template(
'dashboard.html',
period=period,
year_min=year_from,
year_max=year_to,
)
def run_ids():
cols = [materials.c.run_id,
func.max(materials.c.generation),
func.count(materials.c.id)]
rows = or_(materials.c.retest_passed == None, materials.c.retest_passed == True)
sort = materials.c.run_id
s = select(cols, rows).group_by(sort).order_by(asc(sort))
print('\nrun-id\t\t\t\tgenerations\tmaterial')
result = engine.execute(s)
for row in result:
print('%s\t%s\t\t%s' % (row[0], row[1], row[2]))
result.close()
def get_lastest_dht11():
return __to_pub_list(
DHT11Record.query.group_by(DHT11Record.sensor_uuid)
.having(func.max(DHT11Record.timestamp)).all())
def get_lastest_photocell():
return __to_pub_list(
PhotocellRecord.query.group_by(PhotocellRecord.sensor_uuid)
.having(func.max(PhotocellRecord.timestamp)).all())
def get_lastest_rain():
return __to_pub_list(
RainRecord.query.group_by(RainRecord.sensor_uuid)
.having(func.max(RainRecord.timestamp)).all())
def get_lastest_bmp():
return __to_pub_list(
BMPRecord.query.group_by(BMPRecord.sensor_uuid)
.having(func.max(BMPRecord.timestamp)).all())
def get_session_stats(session):
query = session.query(func.min(Sighting.expire_timestamp),
func.max(Sighting.expire_timestamp))
if conf.REPORT_SINCE:
query = query.filter(Sighting.expire_timestamp > SINCE_TIME)
min_max_result = query.one()
length_hours = (min_max_result[1] - min_max_result[0]) // 3600
if length_hours == 0:
length_hours = 1
# Convert to datetime
return {
'start': datetime.fromtimestamp(min_max_result[0]),
'end': datetime.fromtimestamp(min_max_result[1]),
'length_hours': length_hours
}
def get_first_last(session, spawn_id):
return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \
.filter(Mystery.spawn_id == spawn_id) \
.filter(Mystery.first_seen > conf.LAST_MIGRATION) \
.first()
def get_widest_range(session, spawn_id):
return session.query(func.max(Mystery.seen_range)) \
.filter(Mystery.spawn_id == spawn_id) \
.filter(Mystery.first_seen > conf.LAST_MIGRATION) \
.scalar()
def set_val_user_id():
"""Set value for the next user_id after seeding database"""
# Get the Max user_id in the database
result = db.session.query(func.max(User.user_id)).one()
max_id = int(result[0])
# Set the value for the next user_id to be max_id + 1
query = "SELECT setval('users_user_id_seq', :new_id)"
db.session.execute(query, {'new_id': max_id + 1})
db.session.commit()
#*****************************************************************************#
def get_max_id(cls, session):
"""Get the current max value of the ``id`` column.
When creating and storing ORM objects in bulk, :mod:`sqlalchemy` does not automatically
generate an incrementing primary key ``id``. To do this manually, one needs to know the
current max ``id``. For ORM object classes that are derived from other ORM object classes,
the max ``id`` of the lowest base class is returned. This is designed to be used with
inheritance by joining, in which derived and base class objects have identical ``id`` values.
Args:
session: database session to operate in
"""
# sqlalchemy allows only one level of inheritance, so just check this class and all its bases
id_base = None
for c in [cls] + list(cls.__bases__):
for base_class in c.__bases__:
if base_class.__name__ == 'Base':
if id_base is None:
# we found our base class for determining the ID
id_base = c
else:
raise RuntimeError("Multiple base object classes for class " + cls.__name__)
# this should never happen
if id_base is None:
raise RuntimeError("Error searching for base class of " + cls.__name__)
# get its max ID
max_id = session.query(func.max(id_base.id)).scalar()
# if no object is present, None is returned
if max_id is None:
max_id = 0
return max_id
def truncate_to_field_length(self, field, value):
"""Truncate the value of a string field to the field's max length.
Use this in a validator to check/truncate values before inserting them into the database.
Copy the below example code after ``@validates`` to your model class and replace ``field1`` and ``field2`` with
your field name(s).
:Example:
from sqlalchemy.orm import validates
# ... omitting other imports ...
class MyModel(base.Base):
field1 = Column(String(128))
field2 = Column(String(64))
@validates('field1', 'field2')
def truncate(self, field, value):
return self.truncate_to_field_length(field, value)
Args:
field (str): field name to validate
value (str/unicode): value to validate
Returns:
str/unicode: value truncated to field max length
"""
max_len = getattr(self.__class__, field).prop.columns[0].type.length
if value and len(value) > max_len:
return value[:max_len]
else:
return value
def get_end_day():
r = db.session.query(
func.max(Article.crawl_date).label('end')
).one()
return r.end.date()
def get_last_aid(category):
r = db.session.query(
func.max(Article.id).label('max_aid')
).filter_by(category=category).one()
return r.max_aid