python类mktime()的实例源码

smart-controller.py 文件源码 项目:yeelight-controller 作者: kevinxw 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __compile_policy_value(self, str_value, value_dict):
        value = str_value
        if not '$' in value:
            return value
        self.__logger.debug("Compiling value %s", value)
        for key in value_dict:
            self.__logger.debug("Searching for key %s", key)
            val = value_dict[key]
            if id(type) and type(val) in (datetime, date):
                self.__logger.debug("Value in dictionary: %s -> %s", key, val)
                val = time.mktime(self.__get_localtime(val).timetuple())
                self.__logger.debug("Timestamp converted: %s -> %s", key, val)
            value = value.replace(self.__get_compiled_key(key), str(val))
            self.__logger.debug("Value after convertsion: %s", value)
        int_value = int(eval(value))
        compiled_value = self.__get_localtime(int_value)
        self.__logger.debug("Compiled value: %s", compiled_value)
        return compiled_value
log_api.py 文件源码 项目:geekcloud 作者: Mr-Linus 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def kill_invalid_connection():
    unfinished_logs = Log.objects.filter(is_finished=False)
    now = datetime.datetime.now()
    now_timestamp = int(time.mktime(now.timetuple()))

    for log in unfinished_logs:
        try:
            log_file_mtime = int(os.stat('%s.log' % log.log_path).st_mtime)
        except OSError:
            log_file_mtime = 0

        if (now_timestamp - log_file_mtime) > 3600:
            if log.login_type == 'ssh':
                try:
                    os.kill(int(log.pid), 9)
                except OSError:
                    pass
            elif (now - log.start_time).days < 1:
                continue

            log.is_finished = True
            log.end_time = now
            log.save()
            logger.warn('kill log %s' % log.log_path)
message.py 文件源码 项目:mobot 作者: JokerQyou 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def _totimestamp(dt_obj):
        """
        Args:
            dt_obj (:class:`datetime.datetime`):

        Returns:
            int:
        """
        if not dt_obj:
            return None

        try:
            # Python 3.3+
            return int(dt_obj.timestamp())
        except AttributeError:
            # Python 3 (< 3.3) and Python 2
            return int(mktime(dt_obj.timetuple()))
zabbix_api.py 文件源码 项目:zabbix_manager 作者: BillWang139967 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def history_get(self,item_ID,date_from,date_till): 
        '''
        return history of item
        [eg1]#zabbix_api history_get 23296 "2016-08-01 00:00:00" "2016-09-01 00:00:00"
        [note]The date_till time must be within the historical data retention time
        '''
        dateFormat = "%Y-%m-%d %H:%M:%S"
        try:
            startTime =  time.strptime(date_from,dateFormat)
            endTime =  time.strptime(date_till,dateFormat)
        except:
            err_msg("???? ['2016-05-01 00:00:00'] ['2016-06-01 00:00:00']")
        time_from = int(time.mktime(startTime))
        time_till = int(time.mktime(endTime))
        history_type=self.__item_search(item_ID)
        self.__history_get(history_type,item_ID,time_from,time_till)
statements.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def timestamp_normalized(self):
        """
        we're expecting self.timestamp to be either a long, int, a datetime, or a timedelta
        :return:
        """
        if not self.timestamp:
            return None

        if isinstance(self.timestamp, six.integer_types):
            return self.timestamp

        if isinstance(self.timestamp, timedelta):
            tmp = datetime.now() + self.timestamp
        else:
            tmp = self.timestamp

        return int(time.mktime(tmp.timetuple()) * 1e+6 + tmp.microsecond)
mailer.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def next_time(time_string):
    try:
        parsed = list(time.strptime(time_string, "%H:%M"))
    except (TypeError, ValueError):
        return float(time_string)

    now = time.localtime()

    current = list(now)
    current[3:6] = parsed[3:6]

    current_time = time.time()
    delta = time.mktime(current) - current_time
    if delta <= 0.0:
        current[2] += 1
        return time.mktime(current) - current_time
    return delta
botbuster.py 文件源码 项目:twitch-botbuster 作者: Leo675 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_creation_date(user):
    client_id = 'jzkbprff40iqj646a697cyrvl0zt2m6'
    headers = { 'Client-ID' : client_id }
    # Loop ends when a value is returned
    while 1:
        # Uses try in case of request timeout
        try:
            r = requests.get('https://api.twitch.tv/kraken/users/{}'.format(user), headers = headers)
        except:
            time.sleep(1)
            continue

        if r.status_code == 200:
            # Captures only YYYY-MM-DD
            date = re.match(
                '([\d]{4}-[\d]{2}-[\d]{2})',
                json.loads(r.text)['created_at']
            )
            epoch = datetime.datetime.strptime("{}".format(date.group(1)) , "%Y-%m-%d")
            epoch = int(time.mktime(epoch.timetuple()) / 3600)
            # except:
                # print('Failed to get time')
                # return
            return epoch
cleanup-maildir.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def getDateSent(self):
        """Get the time of sending from the Date header

        Returns a time object using time.mktime.  Not very reliable, because
        the Date header can be missing or spoofed (and often is, by spammers).
        Throws a MessageDateError if the Date header is missing or invalid.
        """
        dh = self.getheader('Date')
        if dh == None: 
            return None
        try:
            return time.mktime(rfc822.parsedate(dh))
        except ValueError:
            raise MessageDateError("message has missing or bad Date")
        except TypeError:  # gets thrown by mktime if parsedate returns None
            raise MessageDateError("message has missing or bad Date")
        except OverflowError:
            raise MessageDateError("message has missing or bad Date")
peewee.py 文件源码 项目:harbour-mercury 作者: chstem 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def db_value(self, value):
        if value is None:
            return

        if isinstance(value, datetime.datetime):
            pass
        elif isinstance(value, datetime.date):
            value = datetime.datetime(value.year, value.month, value.day)
        else:
            return int(round(value * self.resolution))

        if self.utc:
            timestamp = calendar.timegm(value.utctimetuple())
        else:
            timestamp = time.mktime(value.timetuple())
        timestamp += (value.microsecond * .000001)
        if self.resolution > 1:
            timestamp *= self.resolution
        return int(round(timestamp))
cookies.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def morsel_to_cookie(morsel):
    """Convert a Morsel object into a Cookie containing the one k/v pair."""

    expires = None
    if morsel['max-age']:
        expires = time.time() + morsel['max-age']
    elif morsel['expires']:
        time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
        expires = time.mktime(
            time.strptime(morsel['expires'], time_template)) - time.timezone
    return create_cookie(
        comment=morsel['comment'],
        comment_url=bool(morsel['comment']),
        discard=False,
        domain=morsel['domain'],
        expires=expires,
        name=morsel.key,
        path=morsel['path'],
        port=None,
        rest={'HttpOnly': morsel['httponly']},
        rfc2109=False,
        secure=bool(morsel['secure']),
        value=morsel.value,
        version=morsel['version'] or 0,
    )
gam.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getYYYYMMDD(minLen=1, returnTimeStamp=False, returnDateTime=False, alternateValue=None):
  if Cmd.ArgumentsRemaining():
    argstr = Cmd.Current().strip()
    if argstr:
      if alternateValue is not None and argstr.lower() == alternateValue.lower():
        Cmd.Advance()
        return None
      if argstr[0] in [u'+', u'-']:
        argstr = getDeltaDate(argstr).strftime(YYYYMMDD_FORMAT)
      try:
        dateTime = datetime.datetime.strptime(argstr, YYYYMMDD_FORMAT)
        Cmd.Advance()
        if returnTimeStamp:
          return time.mktime(dateTime.timetuple())*1000
        if returnDateTime:
          return dateTime
        return argstr
      except ValueError:
        invalidArgumentExit(YYYYMMDD_FORMAT_REQUIRED)
    elif minLen == 0:
      Cmd.Advance()
      return u''
  missingArgumentExit(YYYYMMDD_FORMAT_REQUIRED)
flint.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def time_parse(self, s):
        try:
            epoch = int(s)
            return epoch
        except ValueError:
            pass

        try:
            epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d')))
            return epoch
        except ValueError:
            pass

        try:
            epoch = int(time.mktime(time.strptime(s, '%Y-%m-%d %H:%M:%S')))
            return epoch
        except ValueError:
            pass

        raise ValueError('Invalid time: "%s"' % s)
bday.py 文件源码 项目:Abb1t 作者: k-freeman 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def seconds_until(until=9):
    """ Counts the seconds until it is a certain hour again.

    Keyword Arguments:
        until (int): the hour we want to count to (default: {9})

    Returns:
        (float): how many seconds until the specified time.
    """
    now = time.localtime()
    now_sec = time.mktime(now)

    if now.tm_hour >= until:
        delta = (until * 60 * 60) \
            + (60 * 60 * (24 - now.tm_hour)) \
            - (60 * now.tm_min) \
            - (now.tm_sec)
    else:
        delta = (until * 60 * 60) \
            - (60 * 60 * now.tm_hour) \
            - (60 * now.tm_min) \
            - (now.tm_sec)

    then = time.localtime(now_sec + delta)
    return time.mktime(then) - time.time()
test_sched.py 文件源码 项目:dsq 作者: baverman 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_crontab():
    c = Crontab()
    c.add('boo')
    c.add('foo', 0)
    c.add('bar', [1, 3], -5, -1, -1, 0)

    assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'}
    assert c.actions(1, 1, 1, 1, 1) == {'boo'}
    assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'}
    assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'}

    ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple())
    assert c.actions_ts(ts) == {'boo', 'bar'}
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_date_and_size(zip_stat):
        size = zip_stat.file_size
        # ymdhms+wday, yday, dst
        date_time = zip_stat.date_time + (0, 0, -1)
        # 1980 offset already done
        timestamp = time.mktime(date_time)
        return timestamp, size
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_date_and_size(zip_stat):
        size = zip_stat.file_size
        # ymdhms+wday, yday, dst
        date_time = zip_stat.date_time + (0, 0, -1)
        # 1980 offset already done
        timestamp = time.mktime(date_time)
        return timestamp, size
export_metrics.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def scroll(query, begin, until, prefix=None):
    diff = timedelta(minutes=4)
    while begin < until:
        to = min(begin + diff, until)
        res = DB.query(query % (pad(begin), pad(to)))
        for batch in res:
            for row in batch:
                # truncate longer ids to match with shorter host names
                if "container_id" in row:
                    row["container_id"] = row["container_id"][0:11]

                time_col = row["time"][0:min(26, len(row["time"]) - 1)]
                if len(time_col) == 19:
                    t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S")
                else:
                    t = time.strptime(time_col, "%Y-%m-%dT%H:%M:%S.%f")

                if prefix is not None:
                    for key in row.iterkeys():
                        if (key not in SKIP_PREFIX) and ((prefix + "|") not in key):
                            row[APP_METRIC_DELIMITER.join((prefix, key))] = row.pop(key)
                yield (time.mktime(t), row)
        begin = to
retry.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def parse_retry_after(self, retry_after):
        # Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
        if re.match(r"^\s*[0-9]+\s*$", retry_after):
            seconds = int(retry_after)
        else:
            retry_date_tuple = email.utils.parsedate(retry_after)
            if retry_date_tuple is None:
                raise InvalidHeader("Invalid Retry-After header: %s" % retry_after)
            retry_date = time.mktime(retry_date_tuple)
            seconds = retry_date - time.time()

        if seconds < 0:
            seconds = 0

        return seconds
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_date_and_size(zip_stat):
        size = zip_stat.file_size
        # ymdhms+wday, yday, dst
        date_time = zip_stat.date_time + (0, 0, -1)
        # 1980 offset already done
        timestamp = time.mktime(date_time)
        return timestamp, size
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_date_and_size(zip_stat):
        size = zip_stat.file_size
        # ymdhms+wday, yday, dst
        date_time = zip_stat.date_time + (0, 0, -1)
        # 1980 offset already done
        timestamp = time.mktime(date_time)
        return timestamp, size
discover.py 文件源码 项目:docker-monitoring-zabbix-agent 作者: digiapulssi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def uptime(args):
    with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe:
        status = pipe.read().strip()
    if "No such image or container" in status:
        print "0"
    else:
        statusjs = json.loads(status)
        if statusjs["Running"]:
            uptime = statusjs["StartedAt"]
            start = time.strptime(uptime[:19], "%Y-%m-%dT%H:%M:%S")
            print int(time.time() - time.mktime(start))
        else:
            print "0"

# get the approximate disk usage
# alt docker inspect -s -f {{.SizeRootFs}} 49219085bdaa
# alt docker exec " + args.container + " du -s -b / 2> /dev/null
helpers.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def convert(timeString):
    if timeString == 'now':
        return now()
    _sets = timeString.split(':')
    if len(_sets) != 7:
        return CF.UTCTime(0,0,0)
    _year, _month, _day, _blank, _hours, _minutes, _seconds = timeString.split(':')
    _full_seconds = float(_seconds)
    _time = time.mktime((int(_year),int(_month),int(_day),int(_hours),int(_minutes),int(_full_seconds),0,0,0))-time.timezone
    return CF.UTCTime(1, _time, _full_seconds - int(_full_seconds))
    # Break out the whole seconds into a GMT time

# Insert the arithmetic functions as operators on the PrecisionUTCTime class
track.py 文件源码 项目:zappa-bittorrent-tracker 作者: Miserlou 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def purge_expired_peers():
    """
    Removes peers who haven't announced in the last internval.

    Should be set as a recurring event source in your Zappa config.
    """

    if DATASTORE == "DynamoDB":
        # This is a costly operation, but I think it has to be done.
        # Optimizations (pagination? queries? batching?) welcomed.
        all_torrents = table.scan()
        for torrent in all_torrents['Items']:
            for peer_id in torrent['peers']:
                peer_last_announce = int(torrent['peers'][peer_id][0]['last_announce'])
                window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
                window_unix = int(time.mktime(window.timetuple()))

                if peer_last_announce < window_unix:
                    remove_peer_from_info_hash(torrent['info_hash'], peer_id)
    else:
        # There must be a better way to do this.
        # Also, it should probably be done as a recurring function and cache,
        # not dynamically every time.
        for key in s3_client.list_objects(Bucket=BUCKET_NAME)['Contents']:
            if 'peers.json' in key['Key']:
                remote_object = s3.Object(BUCKET_NAME, key['Key']).get()
                content = remote_object['Body'].read().decode('utf-8')
                torrent = json.loads(content)
                for peer_id in torrent['peers']:
                    peer_last_announce = int(torrent['peers'][peer_id]['last_announce'])
                    window = datetime.now() - timedelta(seconds=ANNOUNCE_INTERVAL)
                    window_unix = int(time.mktime(window.timetuple()))

                    if peer_last_announce < window_unix:
                        remove_peer_from_info_hash(torrent['info_hash'], peer_id)

    return

##
# Database
##
client.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def expireat(self, name, when):
        """
        Set an expire flag on key ``name``. ``when`` can be represented
        as an integer indicating unix time or a Python datetime object.
        """
        if isinstance(when, datetime.datetime):
            when = int(mod_time.mktime(when.timetuple()))
        return self.execute_command('EXPIREAT', name, when)
client.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def pexpireat(self, name, when):
        """
        Set an expire flag on key ``name``. ``when`` can be represented
        as an integer representing unix time in milliseconds (unix time * 1000)
        or a Python datetime object.
        """
        if isinstance(when, datetime.datetime):
            ms = int(when.microsecond / 1000)
            when = int(mod_time.mktime(when.timetuple())) * 1000 + ms
        return self.execute_command('PEXPIREAT', name, when)
binlog_analyzer.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _handle_header_line(self, line):
        m = re.search("\\#(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+server\\s+id\\s+\\d+", line)
        datetime_str = "%s %s" % (m.group(1), m.group(2))
        dt = datetime.datetime.strptime(datetime_str, '%y%m%d %H:%M:%S')
        new_header_timestamp = int(time.mktime(dt.timetuple()))
        self.header_timestamp = new_header_timestamp
Poloniex.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def createTimeStamp(datestr, format="%Y-%m-%d %H:%M:%S"):
    return time.mktime(time.strptime(datestr, format))
rfc822.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getdate(self, name):
        """Retrieve a date field from a header.

        Retrieves a date field from the named header, returning a tuple
        compatible with time.mktime().
        """
        try:
            data = self[name]
        except KeyError:
            return None
        return parsedate(data)
rfc822.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getdate_tz(self, name):
        """Retrieve a date field from a header as a 10-tuple.

        The first 9 elements make up a tuple compatible with time.mktime(),
        and the 10th is the offset of the poster's time zone from GMT/UTC.
        """
        try:
            data = self[name]
        except KeyError:
            return None
        return parsedate_tz(data)


    # Access as a dictionary (only finds *last* header of each type):
rfc822.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def mktime_tz(data):
    """Turn a 10-tuple as returned by parsedate_tz() into a UTC timestamp."""
    if data[9] is None:
        # No zone info, so localtime is better assumption than GMT
        return time.mktime(data[:8] + (-1,))
    else:
        t = time.mktime(data[:8] + (0,))
        return t - data[9] - time.timezone


问题


面经


文章

微信
公众号

扫码关注公众号