python类max()的实例源码

power.py 文件源码 项目:NodeDefender 作者: CTSNE 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(PowerModel.date > from_date).\
            filter(PowerModel.date < to_date).\
            group_by(PowerModel.date).all()

    if not power_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['power'] = []
    for data in power_data:
        ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
heat.py 文件源码 项目:NodeDefender 作者: CTSNE 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def latest(icpe):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(HeatModel.date.desc()).\
            group_by(HeatModel.date).first()

    if not heat_data:
        return False

    return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\
            'high' : heat_data.high, 'total' : power_data.total}
heat.py 文件源码 项目:NodeDefender 作者: CTSNE 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(HeatModel.date > from_date).\
            filter(HeatModel.date < to_date).\
            group_by(HeatModel.date).all()

    if not heat_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['heat'] = []
    for data in heat_data:
        ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
power.py 文件源码 项目:NodeDefender 作者: CTSNE 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def latest(icpe):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(PowerModel.date.desc()).\
            group_by(PowerModel.date).first()

    if not power_data:
        return False

    return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\
            'high' : power_data.high, 'total' : power_data.total}
water.py 文件源码 项目:beproudbot 作者: beproud 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def count_water_stock(message):
    """????????????????

    :param message: slackbot?????????????class
    """
    s = Session()
    stock_number, latest_ctime = (
        s.query(func.sum(WaterHistory.delta),
                func.max(case(whens=((
                    WaterHistory.delta != 0,
                    WaterHistory.ctime),), else_=None))).first()
    )

    if stock_number:
        # SQLite????????????????
        if not isinstance(latest_ctime, datetime.datetime):
            latest_ctime = datetime.datetime.strptime(latest_ctime,
                                                      '%Y-%m-%d %H:%M:%S')
        message.send('??: {}? ({:%Y?%m?%d?} ??)'
                     .format(stock_number, latest_ctime))
    else:
        message.send('??????????')
sensor_hub.py 文件源码 项目:iot 作者: jdupl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_latest_soil_humidity():
    """Gets latest info about soil humdity and predict waterings.
    """

    pub = []
    records = __to_pub_list(
        HygroRecord.query.group_by(HygroRecord.sensor_uuid)
        .having(func.max(HygroRecord.timestamp)).all())

    for r in records:
        last_watering_timestamp = analytics\
            ._get_last_watering_timestamp(r['sensor_uuid'])

        if last_watering_timestamp:
            polyn = analytics._get_polynomial(
                r['sensor_uuid'], last_watering_timestamp)

            next_watering_timestamp = analytics\
                ._predict_next_watering(polyn, last_watering_timestamp)

            r['last_watering_timestamp'] = last_watering_timestamp
            r['next_watering_timestamp'] = next_watering_timestamp
        pub.append(r)

    return pub
core.py 文件源码 项目:sbds 作者: steemit 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def highest_block(cls, session):
        """
        Return integer result of MAX(block_num) db query.

        This does not have the same meaning as last irreversible block, ie, it
        makes no claim that the all blocks lower than the MAX(block_num) exist
        in the database.

        Args:
            session (sqlalchemy.orm.session.Session):

        Returns:
            int:
        """
        highest = session.query(func.max(cls.block_num)).scalar()
        if not highest:
            return 0
        else:
            return highest
db.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def estimate_remaining_time(session, spawn_id, seen):
    first, last = get_first_last(session, spawn_id)

    if not first:
        return 90, 1800

    if seen > last:
        last = seen
    elif seen < first:
        first = seen

    if last - first > 1710:
        estimates = [
            time_until_time(x, seen)
            for x in (first + 90, last + 90, first + 1800, last + 1800)]
        return min(estimates), max(estimates)

    soonest = last + 90
    latest = first + 1800
    return time_until_time(soonest, seen), time_until_time(latest, seen)
models.py 文件源码 项目:incubator-airflow-old 作者: apache 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_latest_runs(cls, session):
        """Returns the latest DagRun for each DAG. """
        subquery = (
            session
            .query(
                cls.dag_id,
                func.max(cls.execution_date).label('execution_date'))
            .group_by(cls.dag_id)
            .subquery()
        )
        dagruns = (
            session
            .query(cls)
            .join(subquery,
                  and_(cls.dag_id == subquery.c.dag_id,
                       cls.execution_date == subquery.c.execution_date))
            .all()
        )
        return dagruns
test_projects.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_00_verify_db(self, testdb):
        b = testdb.query(Project).get(1)
        assert b is not None
        assert b.name == 'P1'
        assert b.notes == 'ProjectOne'
        assert b.is_active is True
        b = testdb.query(Project).get(2)
        assert b is not None
        assert b.name == 'P2'
        assert b.notes == 'ProjectTwo'
        assert b.is_active is True
        b = testdb.query(Project).get(3)
        assert b is not None
        assert b.name == 'P3Inactive'
        assert b.notes == 'ProjectThreeInactive'
        assert b.is_active is False
        assert testdb.query(Project).with_entities(
            func.max(Project.id)
        ).scalar() == 3
        assert testdb.query(BoMItem).with_entities(
            func.max(BoMItem.id)
        ).scalar() == 5
history.py 文件源码 项目:pyabc 作者: neuralyzer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def max_t(self):
        """
        The population number of the last populations.
        This is equivalent to ``n_populations - 1``.
        """
        max_t = (self._session.query(func.max(Population.t))
                 .join(ABCSMC).filter(ABCSMC.id == self.id).one()[0])
        return max_t
cleaner.py 文件源码 项目:nokia-deployer 作者: nokia 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _cleanup(self):
        if not os.path.exists(self.base_repos_path):
            logger.info("Base repo path {} does not exist yet, nothing to do.".format(self.base_repos_path))
            return
        now = datetime.utcnow()
        with session_scope() as session:
            deletion_candidates = set(os.listdir(self.base_repos_path))
            subquery = session.query(m.Environment.id, func.max(m.DeploymentView.queued_date).label("max_queued_date")).\
                filter(m.Environment.id == m.DeploymentView.environment_id).\
                group_by(m.Environment.id).subquery()
            recently_deployed_envs = session.query(m.Environment).\
                join((subquery, subquery.c.id == m.Environment.id)).\
                filter(subquery.c.max_queued_date > now - self.max_unused_age).\
                all()
            to_keep = set(e.local_repo_directory_name for e in recently_deployed_envs)
            for path in deletion_candidates - to_keep:
                path = os.path.join(self.base_repos_path, path)
                if os.path.exists(path):
                    with lock_repository_fetch(path), lock_repository_write(path):
                        rmtree(path)
                    logger.info(
                        "Deleted unused directory {}".format(path)
                    )
__init__.py 文件源码 项目:versionalchemy 作者: NerdWalletOSS 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _latest_version(cls, session, row, use_dirty=True):
        """
        :param session: a session instance to execute a select on the log table
        :param row: an instance of the user table row object

        :return: the maximum version ID recorded for the specified row or None if version_id
        has not been inserted
        :rtype: int
        """
        and_clause = \
            utils.generate_and_clause(cls, row, cls._version_col_names, use_dirty=use_dirty)
        result = session.execute(
            sa.select([func.max(cls.va_version)]).
            where(and_clause)
        ).first()
        return None if result is None else result[0]
db.py 文件源码 项目:thunderingplains 作者: lyctc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post_add_user_sql(name, password, email):
    """Gets current max uid + 1, and then creates new User."""
    # check if username already exists -- raise exception if it does

    q = DBS.query(User).filter_by(name=name)
    r = q.first()

    if r:
        raise ValueError('Username already exists')
    else:
        r = DBS.query(func.max(User.uid)).first()[0]
        uid = 1 if not r else r + 1  # set to 1 for the first user
        r = DBS.query(func.max(Group.gid)).first()[0]
        gid = 1 if not r else r + 1
        DBS.add(User(gid, uid, name, hash_password(password), email))
        DBS.add(Group(gid, "No Name"))
        transaction.commit()
        return uid, gid
views.py 文件源码 项目:xiaoxiang-oj 作者: hanfei19910905 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def prob_view_get(hid, pid):
    result, ok, problem, homework = checkValid(hid, pid)
    if not ok:
        return result
    form = SubmitForm()
    plist = ['rank', 'user name', problem.name, 'Entries', 'last submit time']
    result = db.session.query(ProbUserStatic.user_id, func.sum(ProbUserStatic.real_score), func.max(ProbUserStatic.score), func.sum(ProbUserStatic.submit_times), func.max(ProbUserStatic.last_time)). \
        filter(ProbUserStatic.prob_id == problem.id, ProbUserStatic.score > 0) \
        .group_by(ProbUserStatic.user_id).order_by(func.sum(ProbUserStatic.real_score)).all()
    prank = []
    result = reversed(result)
    for i, res in enumerate(result):
        name = User.query.filter_by(id=res[0]).first().name
        prank.append([i + 1, name, res[2], res[3], res[4]])
    if hid == -1:
        return render_template('prob_view.html', problem=problem, form=form, hid=-1, data=problem.data, active='problem', attach = False, plist = plist, prank = prank)
    attach = os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], problem.data.attach))
    return render_template('prob_view.html', problem=problem, form=form, hid=hid, data=problem.data, active='homework', attach =attach, plist = plist, prank = prank)
download.py 文件源码 项目:pmi_sprint_reporter 作者: cumc-dbmi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download_latest():
    """
    Download and save the latest submission files possibly overwriting existing, presumably older files
    :return:
    """
    q1 = select([table.c.file_name, func.max(table.c.sent_time_epoch).label('sent_time_max_epoch')]).group_by('file_name')
    latest_files = engine.execute(q1).fetchall()
    permitted = list(permitted_file_names())
    for f in latest_files:
        selection = [table.c.sender_name, table.c.file_name, table.c.url]
        q2 = select(selection).where(
            and_(table.c.file_name == f['file_name'], table.c.sent_time_epoch == f['sent_time_max_epoch']))
        r = engine.execute(q2).fetchone()
        file_name = r['file_name'].lower()
        sender_name = r['sender_name']
        if file_name not in permitted:
            print 'Notify %(sender_name)s that "%(file_name)s" is not a valid file name' % locals()
        # download either way just in case
        dest = os.path.join(settings.csv_dir, file_name)
        content = file_transfer.download(r['url'])
        if 'MD5 token has expired' not in content:
            with open(dest, 'wb') as out:
                out.write(content)
stats.py 文件源码 项目:scrobbler 作者: hatarist 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dashboard(period=None):
    period, days = PERIODS.get(period, PERIODS['1w'])

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    return render_template(
        'dashboard.html',
        period=period,
        year_min=year_from,
        year_max=year_to,
    )
check_status.py 文件源码 项目:HTSOHM-dev 作者: akaija 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run_ids():
    cols = [materials.c.run_id,
            func.max(materials.c.generation),
            func.count(materials.c.id)]
    rows = or_(materials.c.retest_passed == None, materials.c.retest_passed == True)
    sort = materials.c.run_id
    s = select(cols, rows).group_by(sort).order_by(asc(sort))
    print('\nrun-id\t\t\t\tgenerations\tmaterial')
    result = engine.execute(s)
    for row in result:
        print('%s\t%s\t\t%s' % (row[0], row[1], row[2]))
    result.close()
sensor_hub.py 文件源码 项目:iot 作者: jdupl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_lastest_dht11():
    return __to_pub_list(
        DHT11Record.query.group_by(DHT11Record.sensor_uuid)
        .having(func.max(DHT11Record.timestamp)).all())
sensor_hub.py 文件源码 项目:iot 作者: jdupl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_lastest_photocell():
    return __to_pub_list(
        PhotocellRecord.query.group_by(PhotocellRecord.sensor_uuid)
        .having(func.max(PhotocellRecord.timestamp)).all())
sensor_hub.py 文件源码 项目:iot 作者: jdupl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_lastest_rain():
    return __to_pub_list(
        RainRecord.query.group_by(RainRecord.sensor_uuid)
        .having(func.max(RainRecord.timestamp)).all())
sensor_hub.py 文件源码 项目:iot 作者: jdupl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_lastest_bmp():
    return __to_pub_list(
        BMPRecord.query.group_by(BMPRecord.sensor_uuid)
        .having(func.max(BMPRecord.timestamp)).all())
db.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_session_stats(session):
    query = session.query(func.min(Sighting.expire_timestamp),
        func.max(Sighting.expire_timestamp))
    if conf.REPORT_SINCE:
        query = query.filter(Sighting.expire_timestamp > SINCE_TIME)
    min_max_result = query.one()
    length_hours = (min_max_result[1] - min_max_result[0]) // 3600
    if length_hours == 0:
        length_hours = 1
    # Convert to datetime
    return {
        'start': datetime.fromtimestamp(min_max_result[0]),
        'end': datetime.fromtimestamp(min_max_result[1]),
        'length_hours': length_hours
    }
db.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_first_last(session, spawn_id):
    return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \
        .filter(Mystery.spawn_id == spawn_id) \
        .filter(Mystery.first_seen > conf.LAST_MIGRATION) \
        .first()
db.py 文件源码 项目:Monocle 作者: Noctem 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_widest_range(session, spawn_id):
    return session.query(func.max(Mystery.seen_range)) \
        .filter(Mystery.spawn_id == spawn_id) \
        .filter(Mystery.first_seen > conf.LAST_MIGRATION) \
        .scalar()
seed.py 文件源码 项目:game_recommendations 作者: ceorourke 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_val_user_id():
    """Set value for the next user_id after seeding database"""

    # Get the Max user_id in the database
    result = db.session.query(func.max(User.user_id)).one()
    max_id = int(result[0])

    # Set the value for the next user_id to be max_id + 1
    query = "SELECT setval('users_user_id_seq', :new_id)"
    db.session.execute(query, {'new_id': max_id + 1})
    db.session.commit()


#*****************************************************************************#
base.py 文件源码 项目:ozelot 作者: trycs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_max_id(cls, session):
        """Get the current max value of the ``id`` column.

        When creating and storing ORM objects in bulk, :mod:`sqlalchemy` does not automatically
        generate an incrementing primary key ``id``. To do this manually, one needs to know the
        current max ``id``. For ORM object classes that are derived from other ORM object classes,
        the max ``id`` of the lowest base class is returned. This is designed to be used with
        inheritance by joining, in which derived and base class objects have identical ``id`` values.

        Args:
            session: database session to operate in
        """
        # sqlalchemy allows only one level of inheritance, so just check this class and all its bases
        id_base = None
        for c in [cls] + list(cls.__bases__):
            for base_class in c.__bases__:
                if base_class.__name__ == 'Base':
                    if id_base is None:
                        # we found our base class for determining the ID
                        id_base = c
                    else:
                        raise RuntimeError("Multiple base object classes for class " + cls.__name__)

        # this should never happen
        if id_base is None:
            raise RuntimeError("Error searching for base class of " + cls.__name__)

        # get its max ID
        max_id = session.query(func.max(id_base.id)).scalar()

        # if no object is present, None is returned
        if max_id is None:
            max_id = 0

        return max_id
base.py 文件源码 项目:ozelot 作者: trycs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def truncate_to_field_length(self, field, value):
        """Truncate the value of a string field to the field's max length.

        Use this in a validator to check/truncate values before inserting them into the database.
        Copy the below example code after ``@validates`` to your model class and replace ``field1`` and ``field2`` with
        your field name(s).

        :Example:

            from sqlalchemy.orm import validates
            # ... omitting other imports ...

            class MyModel(base.Base):

                field1 = Column(String(128))
                field2 = Column(String(64))

                @validates('field1', 'field2')
                def truncate(self, field, value):
                    return self.truncate_to_field_length(field, value)

        Args:
            field (str): field name to validate
            value (str/unicode): value to validate

        Returns:
            str/unicode: value truncated to field max length

        """
        max_len = getattr(self.__class__, field).prop.columns[0].type.length
        if value and len(value) > max_len:
            return value[:max_len]
        else:
            return value
sqldb.py 文件源码 项目:BlogSpider 作者: hack4code 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_end_day():
    r = db.session.query(
        func.max(Article.crawl_date).label('end')
        ).one()
    return r.end.date()
sqldb.py 文件源码 项目:BlogSpider 作者: hack4code 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_last_aid(category):
    r = db.session.query(
        func.max(Article.id).label('max_aid')
        ).filter_by(category=category).one()
    return r.max_aid


问题


面经


文章

微信
公众号

扫码关注公众号