python类max()的实例源码

pgdb.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def cast_date(value, connection):
    """Cast a date value."""
    # The output format depends on the server setting DateStyle.  The default
    # setting ISO and the setting for German are actually unambiguous.  The
    # order of days and months in the other two settings is however ambiguous,
    # so at least here we need to consult the setting to properly parse values.
    if value == '-infinity':
        return date.min
    if value == 'infinity':
        return date.max
    value = value.split()
    if value[-1] == 'BC':
        return date.min
    value = value[0]
    if len(value) > 10:
        return date.max
    fmt = connection.date_format()
    return datetime.strptime(value, fmt).date()
pgdb.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def cast_timestamp(value, connection):
    """Cast a timestamp value."""
    if value == '-infinity':
        return datetime.min
    if value == 'infinity':
        return datetime.max
    value = value.split()
    if value[-1] == 'BC':
        return datetime.min
    fmt = connection.date_format()
    if fmt.endswith('-%Y') and len(value) > 2:
        value = value[1:5]
        if len(value[3]) > 4:
            return datetime.max
        fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
            '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
    else:
        if len(value[0]) > 10:
            return datetime.max
        fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
    return datetime.strptime(' '.join(value), ' '.join(fmt))
pg.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def cast_date(value, connection):
    """Cast a date value."""
    # The output format depends on the server setting DateStyle.  The default
    # setting ISO and the setting for German are actually unambiguous.  The
    # order of days and months in the other two settings is however ambiguous,
    # so at least here we need to consult the setting to properly parse values.
    if value == '-infinity':
        return date.min
    if value == 'infinity':
        return date.max
    value = value.split()
    if value[-1] == 'BC':
        return date.min
    value = value[0]
    if len(value) > 10:
        return date.max
    fmt = connection.date_format()
    return datetime.strptime(value, fmt).date()
pg.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cast_timestamp(value, connection):
    """Cast a timestamp value."""
    if value == '-infinity':
        return datetime.min
    if value == 'infinity':
        return datetime.max
    value = value.split()
    if value[-1] == 'BC':
        return datetime.min
    fmt = connection.date_format()
    if fmt.endswith('-%Y') and len(value) > 2:
        value = value[1:5]
        if len(value[3]) > 4:
            return datetime.max
        fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
            '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
    else:
        if len(value[0]) > 10:
            return datetime.max
        fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
    return datetime.strptime(' '.join(value), ' '.join(fmt))
test_classic_dbwrapper.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testTruncateRestart(self):
        truncate = self.db.truncate
        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
        query = self.db.query
        self.createTable('test_table', 'n serial, t text')
        for n in range(3):
            query("insert into test_table (t) values ('test')")
        q = "select count(n), min(n), max(n) from test_table"
        r = query(q).getresult()[0]
        self.assertEqual(r, (3, 1, 3))
        truncate('test_table')
        r = query(q).getresult()[0]
        self.assertEqual(r, (0, None, None))
        for n in range(3):
            query("insert into test_table (t) values ('test')")
        r = query(q).getresult()[0]
        self.assertEqual(r, (3, 4, 6))
        truncate('test_table', restart=True)
        r = query(q).getresult()[0]
        self.assertEqual(r, (0, None, None))
        for n in range(3):
            query("insert into test_table (t) values ('test')")
        r = query(q).getresult()[0]
        self.assertEqual(r, (3, 1, 3))
test_classic_dbwrapper.py 文件源码 项目:pygresql 作者: Cito 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testDate(self):
        query = self.db.query
        for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
                'SQL, MDY', 'SQL, DMY', 'German'):
            self.db.set_parameter('datestyle', datestyle)
            d = date(2016, 3, 14)
            q = "select $1::date"
            r = query(q, (d,)).getresult()[0][0]
            self.assertIsInstance(r, date)
            self.assertEqual(r, d)
            q = "select '10000-08-01'::date, '0099-01-08 BC'::date"
            r = query(q).getresult()[0]
            self.assertIsInstance(r[0], date)
            self.assertIsInstance(r[1], date)
            self.assertEqual(r[0], date.max)
            self.assertEqual(r[1], date.min)
        q = "select 'infinity'::date, '-infinity'::date"
        r = query(q).getresult()[0]
        self.assertIsInstance(r[0], date)
        self.assertIsInstance(r[1], date)
        self.assertEqual(r[0], date.max)
        self.assertEqual(r[1], date.min)
conversation.py 文件源码 项目:facebook-message-analysis 作者: szheng17 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_user_to_damped_n_messages(self, dt_max, alpha):
        """
        Maps each user to the number of messages before a reference datetime, 
        where each message count is exponentially damped by a constant times
        the difference between the reference datetime and the datetime of the
        message.

        Args:
            dt_max: A datetime representing the max datetime of messages
                to consider.
            alpha: A nonnegative float representing the damping factor.

        Returns:
            user_to_damped_n_messages: A dict mapping each user in
                self.users_union to the damped number of messages by that user
                before dt_max. The contribution of a message is a float equal
                to exp(-alpha * t), where t is the difference in days between
                dt_max and the datetime of the message.
        """
        if alpha < 0:
            raise ValueError('Must have alpha >= 0')
        try:
            # Only keep messages with datetimes <= dt_max
            filtered = self.filter_by_datetime(end_dt=dt_max)
        except EmptyConversationError:
            # Map all users to 0 if dt_max occurs before all messages
            return self.get_user_to_message_statistic(lambda x: 0)
        damped_message_count = lambda x: self.exp_damped_day_difference(dt_max, x.timestamp, alpha)
        user_to_damped_n_messages = filtered.get_user_to_message_statistic(damped_message_count)
        return user_to_damped_n_messages
shortcut.py 文件源码 项目:black-hole 作者: liuhao1024 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def tslice(unit, start=ben(), end=None, step=1, count=float('inf')):
    """
    tslice(unit, start=None, end=None, step=1, count=None) -> generator of Blackhole object
    unit in ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond']
    this is some kind xrange-like
    :param unit:
    :param start:
    :param end:
    :param step:
    :param count:
    :return:
    """
    if unit not in Blackhole._units:
        raise AttributeError()
    if isinstance(start, basestring):
        start = ben(start)
    if isinstance(end, basestring):
        end = ben(end)

    cnt = 0
    if step > 0:
        end = end or ben(datetime.max)
        while start < end and cnt < count:
            yield start
            start = start.shifted(**{unit: step})
            cnt += 1
    elif step < 0:
        end = end or ben(datetime.min)
        while start > end and cnt < count:
            yield start
            start = start.shifted(**{unit: step})
            cnt += 1
test_utils.py 文件源码 项目:python-zhmcclient 作者: zhmcclient 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def find_max_value(test_func, initial_value):
    """
    Starting from an initial number (integer or float), find the maximum value
    for which the test function does not yet fail, and return that maximum
    value.
    """
    assert isinstance(initial_value, int) and initial_value > 0

    fails = FailsArray(test_func)
    value = initial_value

    # Advance the value exponentially beyond the max value
    while fails[value] == 0:
        value *= 2

    # Search for the exact max value in the previous range. We search for the
    # boundary where the fails array goes from 0 to 1.
    boundary = 0.5
    value = binary_search(fails, boundary, value // 2, value)
    max_value = value - 1

    # Verify that we found exactly the maximum:
    assert fails[max_value] == 0 and fails[max_value + 1] == 1, \
        "max_value={}, fails[+-2]: {}, {}, {}, {}, {}".\
        format(max_value, fails[max_value - 2], fails[max_value - 1],
               fails[max_value], fails[max_value + 1], fails[max_value + 2])

    return max_value
test_utils.py 文件源码 项目:python-zhmcclient 作者: zhmcclient 项目源码 文件源码 阅读 114 收藏 0 点赞 0 评论 0
def x_test_print_datetime_max(self):
        """Print datetime.max."""
        print("\nMax value for Python datetime (datetime.max): {!r}".
              format(datetime.max))
        sys.stdout.flush()
test_utils.py 文件源码 项目:python-zhmcclient 作者: zhmcclient 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_datetime_max(self):
        """Test timestamp_from_datetime() with datetime.max."""

        # The test is that it does not raise an exception:
        timestamp_from_datetime(datetime.max)
arrow.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_iteration_params(cls, end, limit):

        if end is None:

            if limit is None:
                raise Exception('one of \'end\' or \'limit\' is required')

            return cls.max, limit

        else:
            return end, sys.maxsize
test_classic_dbwrapper.py 文件源码 项目:slack-sql 作者: wang502 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testTimestamp(self):
        query = self.db.query
        for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
                'SQL, MDY', 'SQL, DMY', 'German'):
            self.db.set_parameter('datestyle', datestyle)
            d = datetime(2016, 3, 14)
            q = "select $1::timestamp"
            r = query(q, (d,)).getresult()[0][0]
            self.assertIsInstance(r, datetime)
            self.assertEqual(r, d)
            d = datetime(2016, 3, 14, 15, 9, 26)
            q = "select $1::timestamp"
            r = query(q, (d,)).getresult()[0][0]
            self.assertIsInstance(r, datetime)
            self.assertEqual(r, d)
            d = datetime(2016, 3, 14, 15, 9, 26, 535897)
            q = "select $1::timestamp"
            r = query(q, (d,)).getresult()[0][0]
            self.assertIsInstance(r, datetime)
            self.assertEqual(r, d)
            q = ("select '10000-08-01 AD'::timestamp,"
                " '0099-01-08 BC'::timestamp")
            r = query(q).getresult()[0]
            self.assertIsInstance(r[0], datetime)
            self.assertIsInstance(r[1], datetime)
            self.assertEqual(r[0], datetime.max)
            self.assertEqual(r[1], datetime.min)
        q = "select 'infinity'::timestamp, '-infinity'::timestamp"
        r = query(q).getresult()[0]
        self.assertIsInstance(r[0], datetime)
        self.assertIsInstance(r[1], datetime)
        self.assertEqual(r[0], datetime.max)
        self.assertEqual(r[1], datetime.min)
test_classic_dbwrapper.py 文件源码 项目:slack-sql 作者: wang502 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def testTimestamptz(self):
        query = self.db.query
        timezones = dict(CET=1, EET=2, EST=-5, UTC=0)
        for timezone in sorted(timezones):
            tz = '%+03d00' % timezones[timezone]
            try:
                tzinfo = datetime.strptime(tz, '%z').tzinfo
            except ValueError:  # Python < 3.2
                tzinfo = pg._get_timezone(tz)
            self.db.set_parameter('timezone', timezone)
            for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
                    'SQL, MDY', 'SQL, DMY', 'German'):
                self.db.set_parameter('datestyle', datestyle)
                d = datetime(2016, 3, 14, tzinfo=tzinfo)
                q = "select $1::timestamptz"
                r = query(q, (d,)).getresult()[0][0]
                self.assertIsInstance(r, datetime)
                self.assertEqual(r, d)
                d = datetime(2016, 3, 14, 15, 9, 26, tzinfo=tzinfo)
                q = "select $1::timestamptz"
                r = query(q, (d,)).getresult()[0][0]
                self.assertIsInstance(r, datetime)
                self.assertEqual(r, d)
                d = datetime(2016, 3, 14, 15, 9, 26, 535897, tzinfo)
                q = "select $1::timestamptz"
                r = query(q, (d,)).getresult()[0][0]
                self.assertIsInstance(r, datetime)
                self.assertEqual(r, d)
                q = ("select '10000-08-01 AD'::timestamptz,"
                    " '0099-01-08 BC'::timestamptz")
                r = query(q).getresult()[0]
                self.assertIsInstance(r[0], datetime)
                self.assertIsInstance(r[1], datetime)
                self.assertEqual(r[0], datetime.max)
                self.assertEqual(r[1], datetime.min)
        q = "select 'infinity'::timestamptz, '-infinity'::timestamptz"
        r = query(q).getresult()[0]
        self.assertIsInstance(r[0], datetime)
        self.assertIsInstance(r[1], datetime)
        self.assertEqual(r[0], datetime.max)
        self.assertEqual(r[1], datetime.min)
pgdb.py 文件源码 项目:slack-sql 作者: wang502 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def cast_timestamptz(value, connection):
    """Cast a timestamptz value."""
    if value == '-infinity':
        return datetime.min
    if value == 'infinity':
        return datetime.max
    value = value.split()
    if value[-1] == 'BC':
        return datetime.min
    fmt = connection.date_format()
    if fmt.endswith('-%Y') and len(value) > 2:
        value = value[1:]
        if len(value[3]) > 4:
            return datetime.max
        fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
            '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
        value, tz = value[:-1], value[-1]
    else:
        if fmt.startswith('%Y-'):
            tz = _re_timezone.match(value[1])
            if tz:
                value[1], tz = tz.groups()
            else:
                tz = '+0000'
        else:
            value, tz = value[:-1], value[-1]
        if len(value[0]) > 10:
            return datetime.max
        fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
    if _has_timezone:
        value.append(_timezone_as_offset(tz))
        fmt.append('%z')
        return datetime.strptime(' '.join(value), ' '.join(fmt))
    return datetime.strptime(' '.join(value), ' '.join(fmt)).replace(
        tzinfo=_get_timezone(tz))
pg.py 文件源码 项目:slack-sql 作者: wang502 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cast_timestamptz(value, connection):
    """Cast a timestamptz value."""
    if value == '-infinity':
        return datetime.min
    if value == 'infinity':
        return datetime.max
    value = value.split()
    if value[-1] == 'BC':
        return datetime.min
    fmt = connection.date_format()
    if fmt.endswith('-%Y') and len(value) > 2:
        value = value[1:]
        if len(value[3]) > 4:
            return datetime.max
        fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
            '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
        value, tz = value[:-1], value[-1]
    else:
        if fmt.startswith('%Y-'):
            tz = _re_timezone.match(value[1])
            if tz:
                value[1], tz = tz.groups()
            else:
                tz = '+0000'
        else:
            value, tz = value[:-1], value[-1]
        if len(value[0]) > 10:
            return datetime.max
        fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
    if _has_timezone:
        value.append(_timezone_as_offset(tz))
        fmt.append('%z')
        return datetime.strptime(' '.join(value), ' '.join(fmt))
    return datetime.strptime(' '.join(value), ' '.join(fmt)).replace(
        tzinfo=_get_timezone(tz))
result.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, batch_name, sd, mimetype_files=None):
        self.sd = sd
        self.batch = batch_name
        self.log_fp = None
        self.num_files = 0
        self.total_time = 0
        self.min_time = timedelta.max
        self.max_time = timedelta.min
        self.earliest_time = datetime.max
        self.latest_time = datetime.min
        self.queue = self.sd.get_obj('output_queue')
        self.domain = self.sd.get_obj('output_domain')
appcompat_mirlua_v1.py 文件源码 项目:appcompatprocessor 作者: mbevilacqua 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def calculateID(self, file_name_fullpath):
        # Get the creation date for the first PersistenceItem in the audit (they will all be the same)
        instanceID = datetime.min
        tmp_instanceID = None

        try:
            file_object = loadFile(file_name_fullpath)
            root = ET.parse(file_object).getroot()
            file_object.close()
            reg_key = root.find('PersistenceItem')
            reg_modified = reg_key.get('created')
            try:
                tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
            except ValueError as e:
                tmp_instanceID = datetime.max
                logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
            instanceID = tmp_instanceID
        except Exception:
            traceback.print_exc(file=sys.stdout)

        # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later)
        if instanceID is None:
            file_object = loadFile(file_name_fullpath)
            content = file_object.read()
            instanceID = hashlib.md5(content).hexdigest()
            file_object.close()

        return instanceID
appcompat_mirlua_v2.py 文件源码 项目:appcompatprocessor 作者: mbevilacqua 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def calculateID(self, file_name_fullpath):
        # Get the creation date for the first PersistenceItem in the audit (they will all be the same)
        instanceID = datetime.min
        tmp_instanceID = None

        try:
            file_object = loadFile(file_name_fullpath)
            root = ET.parse(file_object).getroot()
            file_object.close()
            reg_key = root.find('AppCompatItemExtended')
            reg_modified = reg_key.get('created')
            try:
                tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
            except ValueError as e:
                tmp_instanceID = datetime.max
                logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
            instanceID = tmp_instanceID
        except Exception:
            traceback.print_exc(file=sys.stdout)

        # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later)
        if instanceID is None:
            file_object = loadFile(file_name_fullpath)
            content = file_object.read()
            instanceID = hashlib.md5(content).hexdigest()
            file_object.close()

        return instanceID
appcompat_mirregistryaudit.py 文件源码 项目:appcompatprocessor 作者: mbevilacqua 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def calculateID(self, file_name_fullpath):
        instanceID = datetime.min
        tmp_instanceID = None

        try:
            file_object = loadFile(file_name_fullpath)
            root = ET.parse(file_object).getroot()
            file_object.close()
            for reg_key in root.findall('RegistryItem'):
                tmp_reg_key = reg_key.find('Modified')
                if tmp_reg_key is not None:
                    reg_modified = tmp_reg_key.text
                    try:
                        tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
                    except ValueError as e:
                        tmp_instanceID = datetime.max
                        logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
                    if instanceID < tmp_instanceID:
                        instanceID = tmp_instanceID
                else:
                    logger.warning("Found RegistryItem with no Modified date (Mir bug?): %s" % file_name_fullpath)
        except Exception:
            logger.exception("Error on calculateID for: %s" % file_name_fullpath)

        # If we found no Modified date in any of the RegistryItems we go with plan B (but most probably ShimCacheParser will fail to parse anyway)
        if instanceID is None:
            file_object = loadFile(file_name_fullpath)
            content = file_object.read()
            instanceID = hashlib.md5(content).hexdigest()
            file_object.close()

        return instanceID


问题


面经


文章

微信
公众号

扫码关注公众号