python类isoformat()的实例源码

test_taskRunner.py 文件源码 项目:fileflow 作者: industrydive 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_write_timestamp_file(self, mock_dt):
        """
        Assert a TaskRunner calls write_json with the expected args. Implicitly testing that we are using datetime.isoformat to derive
        the value for the "RUN" key.

        :param mock_dt: ??? not sure this patch is necessary
        """
        fake_date_str = "not iso formatted"
        mock_datetime_obj = mock.MagicMock()
        mock_datetime_obj.isoformat = mock.MagicMock(return_value=fake_date_str)
        mock_dt.datetime.now = mock.MagicMock(return_value=mock_datetime_obj)

        self.task_runner_instance.write_json = mock.MagicMock()
        self.task_runner_instance.write_timestamp_file()

        self.task_runner_instance.write_json.assert_called_once_with({'RUN': fake_date_str})
test_cluster.py 文件源码 项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_reap_dead_node(self):
        node = copy.deepcopy(self.dummy_node)
        TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
        instance = TestInstance(datetime.now(pytz.utc))

        ready_condition = None
        for condition in node['status']['conditions']:
            if condition['type'] == 'Ready':
                ready_condition = condition
                break
        ready_condition['status'] = 'Unknown'

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(minutes=30))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_not_called()

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_called_once_with()
test_cluster.py 文件源码 项目:kubernetes-ec2-autoscaler 作者: openai 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_max_scale_in(self):
        node1 = copy.deepcopy(self.dummy_node)
        node2 = copy.deepcopy(self.dummy_node)
        TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
        instance1 = TestInstance(datetime.now(pytz.utc))
        instance2 = TestInstance(datetime.now(pytz.utc))

        for node in [node1, node2]:
            for condition in node['status']['conditions']:
                if condition['type'] == 'Ready':
                    condition['status'] = 'Unknown'
                    condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
                    break

        kube_node1 = KubeNode(pykube.Node(self.api, node1))
        kube_node1.delete = mock.Mock(return_value="mocked stuff")
        kube_node2 = KubeNode(pykube.Node(self.api, node2))
        kube_node2.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node1, kube_node2], {kube_node1.instance_id: instance1, kube_node2.instance_id: instance2}, {}, [], [])
        kube_node1.delete.assert_not_called()
        kube_node2.delete.assert_not_called()
__init__.py 文件源码 项目:directory-tests 作者: uktrade 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def take_screenshot(driver: webdriver, page_name: str):
    """Will take a screenshot of current page.

    :param driver: Any of the WebDrivers
    :param page_name: page name which will be used in the screenshot filename
    """
    if TAKE_SCREENSHOTS:
        session_id = driver.session_id
        browser = driver.capabilities.get("browserName", "unknown_browser")
        version = driver.capabilities.get("version", "unknown_version")
        platform = driver.capabilities.get("platform", "unknown_platform")
        stamp = datetime.isoformat(datetime.utcnow())
        filename = ("{}-{}-{}-{}-{}-{}.png"
                    .format(stamp, page_name, browser, version, platform,
                            session_id))
        file_path = abspath(join("screenshots", filename))
        driver.save_screenshot(file_path)
        logging.debug(
            "Screenshot of %s page saved in: %s", page_name, filename)
    else:
        logging.debug(
            "Taking screenshots is disabled. In order to turn it on please set"
            " n environment variable TAKE_SCREENSHOTS=true")
IWN-ingest.py 文件源码 项目:IWN-Ingest 作者: LimnoTech 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def accumulate_data(unique_station_sensor,data,date_filter):
    """
    Prep the data in a format easy to push to the server
    """
    rolled_up_data = {}
    with open(data,'r') as fi:
        r = csv.reader(fi)
        for i, row in enumerate(r):
            j = i-1
            if i == 0:
                continue
            else:
                if date_filter[j] and row[4] != "": #only process row if date_filter is true and value is not missing
                    data_id = (row[0],row[3])  #(stationid,parameter)
                    date_time = dateutil.parser.parse("{} {}".format(row[1],row[2]))
                    data_value = (datetime.isoformat(date_time),row[4])
                    #print(data_value)

                    #rolled_up_data.setdefault(data_id, []).append(data_value)
                    rolled_up_data.setdefault(data_id, {}).setdefault('values',[]).append(data_value)
    for k,v in rolled_up_data.items():
        count = len(v['values'])
        rolled_up_data[k]['count']=count
    return rolled_up_data
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def create_zun_service(self, values):
        values['created_at'] = datetime.isoformat(timeutils.utcnow())
        zun_service = models.ZunService(values)
        zun_service.save()
        return zun_service
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def update_zun_service(self, host, binary, values):
        try:
            target = self.client.read('/zun_services/' + host + '_' + binary)
            target_value = json.loads(target.value)
            values['updated_at'] = datetime.isoformat(timeutils.utcnow())
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.ZunServiceNotFound(host=host, binary=binary)
        except Exception as e:
            LOG.error('Error occurred while updating service: %s',
                      six.text_type(e))
            raise
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def create_compute_node(self, context, values):
        values['created_at'] = datetime.isoformat(timeutils.utcnow())
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()
        compute_node = models.ComputeNode(values)
        compute_node.save()
        return compute_node
test_cluster.py 文件源码 项目:Kubernetes-acs-engine-autoscaler 作者: wbuchwalter 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        # load dummy kube specs
        dir_path = os.path.dirname(os.path.realpath(__file__))
        with open(os.path.join(dir_path, 'data/busybox.yaml'), 'r') as f:
            self.dummy_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/ds-pod.yaml'), 'r') as f:
            self.dummy_ds_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/rc-pod.yaml'), 'r') as f:
            self.dummy_rc_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/node.yaml'), 'r') as f:
            self.dummy_node = yaml.load(f.read())
            for condition in self.dummy_node['status']['conditions']:
                if condition['type'] == 'Ready' and condition['status'] == 'True':
                    condition['lastHeartbeatTime'] = datetime.now(condition['lastHeartbeatTime'].tzinfo)
            # Convert timestamps to strings to match PyKube
            for condition in self.dummy_node['status']['conditions']:
                condition['lastHeartbeatTime'] = datetime.isoformat(condition['lastHeartbeatTime'])
                condition['lastTransitionTime'] = datetime.isoformat(condition['lastTransitionTime'])

        # this isn't actually used here
        # only needed to create the KubePod object...
        dir_path = os.path.dirname(os.path.realpath(__file__))
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(os.path.join(dir_path, './data/kube_config.yaml')))

        self.cluster = Cluster(
            kubeconfig='~/.kube/config',
            idle_threshold=60,
            spare_agents=1,
            instance_init_time=60,
            resource_group='my-rg',
            notifier=None,
            service_principal_app_id='dummy',
            service_principal_secret='dummy',
            service_principal_tenant_id='dummy',
            kubeconfig_private_key='dummy',
            client_private_key='dummy',
            ca_private_key='dummy',
            ignore_pools='',
            over_provision=0
        )
conftest.py 文件源码 项目:indy-node 作者: hyperledger 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def validUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=100)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff + 3)
    return dict(name='upgrade-13', version=bumpedVersion(), action=START,
                schedule=schedule,
                # sha256=get_valid_code_hash(),
                sha256='db34a72a90d026dae49c3b3f0436c8d3963476c77468ad955845a1ccf7b03f55',
                timeout=1)
conftest.py 文件源码 项目:indy-node 作者: hyperledger 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def invalidUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=60)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff - 3)
    return dict(name='upgrade-14', version=bumpedVersion(), action=START,
                schedule=schedule,
                # sha256=get_valid_code_hash(),
                sha256='46c715a90b1067142d548cb1f1405b0486b32b1a27d418ef3a52bd976e9fae50',
                timeout=10)
mixins.py 文件源码 项目:exchange 作者: boundlessgeo 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _timefmt(val):
        return datetime.isoformat(datetime.utcfromtimestamp(val))
utils.py 文件源码 项目:SigMF 作者: gnuradio 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_sigmf_iso8601_datetime_now():
    return datetime.isoformat(datetime.utcnow()) + 'Z'
mapper.py 文件源码 项目:python-yakumo 作者: yosshy 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def to_json(manager_class, attr):
        try:
            return datetime.isoformat(attr)
        except:
            return None
utils.py 文件源码 项目:pipelines 作者: gis-rpd 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def generate_timestamp():
    """generate ISO8601 timestamp incl microsends, but with colons
    replaced to avoid problems if used as file name
    """
    return datetime.isoformat(datetime.now()).replace(":", "-")
conftest.py 文件源码 项目:old-sovrin 作者: sovrin-foundation 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def validUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=90)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff + 3)
    return dict(name='upgrade-13', version=bumpedVersion(), action=START,
                schedule=schedule, sha256='aad1242', timeout=10)
test_pool_upgrade.py 文件源码 项目:old-sovrin 作者: sovrin-foundation 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def invalidUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=60)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff - 3)

    return dict(name='upgrade-14', version=bumpedVersion(), action=START,
                schedule=schedule, sha256='ffd1224', timeout=10)
logger.py 文件源码 项目:TripletEmbedding 作者: hrantzsch 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, args, optimizer, name, extra_msg=''):
        self.iteration = 0
        self.sum_loss = 0
        self.sum_acc = 0
        self.sum_mean_diff = 0
        self.sum_max_diff = 0
        self.current_section = ''
        self.optimizer = optimizer

        # setup according to arguments
        self.name = name if name is not '' else 'signdist'
        self.out_file = "{}_{}".format(date.isoformat(date.today()), self.name)
        self.log_file = "{}.log".format(self.out_file)
        # write config to head of the log file
        self.write_config(args, extra_msg)
logger.py 文件源码 项目:TripletEmbedding 作者: hrantzsch 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def write_config(self, args, extra_msg):
        with open(self.log_file, 'a+') as f:
            self._comment("=" * 40, f)
            self._comment("{} initiated at {}".format(
                self.name, datetime.isoformat(datetime.now())
            ), f)
            self._comment("-" * 40, f)  # arguments passed
            self._comment("Data: " + args.data, f)
            self._comment("Batchsize: {}".format(args.batchsize), f)
            self._comment("Test ratio: {}".format(args.test), f)
            self._comment("Hard triplet ratio: {}".format(args.skilled), f)
            dev = "CPU" if args.gpu < 0 else "GPU ".format(args.gpu)
            self._comment("Device: " + dev, f)
            if args.initmodel:
                self._comment("Init model: " + args.initmodel, f)
            if args.resume:
                self._comment("Resume state: " + args.resume, f)
            self._comment("-" * 40, f)  # parameters set in script
            self._comment("Optimizer: " + self.optimizer.__class__.__name__, f)
            self._comment("Initial LR: {}".format(self.optimizer.lr), f)
            self._comment("LR interval: {}".format(args.lrinterval), f)
            self._comment("Weight decay: {}".format(args.weight_decay), f)
            self._comment("Epoch: {}".format(self.optimizer.epoch), f)
            if extra_msg:
                self._comment(extra_msg, f)
            self._comment("-" * 40, f)  # complete call
            self._comment("{}".format(sys.argv), f)
            self._comment("=" * 40, f)
tests.py 文件源码 项目:decocare 作者: openaps 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def big_days(x=0):
  """
    # page 17, RECORD 11
    >>> parse_date( big_days(0) ).isoformat( )
    '2012-11-20T21:53:41'

    # page 17, ~ RECORD 12
    >>> parse_date( big_days(1) ).isoformat( )
    '2012-11-20T22:07:38'

    >>> parse_date( big_days(2) ).isoformat( )
    '2012-11-20T21:53:41'

    >>> parse_date( big_days(3) ).isoformat( )
    '2012-11-20T22:07:38'

    # page 16, RECORD ~15
    >>> parse_date( big_days(4) ).isoformat( )
    '2012-11-25T16:41:34'

    >>> parse_date( big_days(5) ).isoformat( )
    '2012-11-25T13:54:32'

    # page 15
    >>> parse_date( big_days(6) ).isoformat( )
    '2012-11-29T20:25:37'

    # page 0
    >>> parse_date( big_days(7) ).isoformat( )
    '2012-12-20T14:59:02'

    >>> parse_date( big_days(8) ).isoformat( )
    '2012-12-20T15:28:25'

  """
  return _bad_days[x]
__init__.py 文件源码 项目:stream2segment 作者: rizac 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def strptime(string, formats=None):
    """
        Converts a date in string format into a datetime python object. The inverse can be obtained
        by calling datetime.isoformat() (which returns 'T' as date time separator, and optionally
        microseconds if they are not zero). This function is an easy version of
        `dateutil.parser.parse` for parsing iso-like datetime format (e.g. fdnsws standard)
        without the need of a module import
        :param: string: if a datetime object, returns it. If date object, converts to datetime
        and returns it. Otherwise must be a string representing a datetime
        :type: string: a string, a date or a datetime object (in that case just returns it)
        :param formats: if list or iterable, it holds the strings denoting the formats to be used
        to convert string (in the order they are declared). If None (the default), the datetime
        format will be guessed from the string length among the following (with optional 'Z', and
        with 'T' replaced by space as vaild option):
           - '%Y-%m-%dT%H:%M:%S.%fZ'
           - '%Y-%m-%dT%H:%M:%SZ'
           - '%Y-%m-%dZ'
        :raise: ValueError if the string cannot be parsed
        :type: on_err_return_none: object or Exception
        :return: a datetime object
        :Example:
strptime("2016-06-01T09:04:00.5600Z")
        strptime("2016-06-01T09:04:00.5600")
        strptime("2016-06-01 09:04:00.5600Z")
        strptime("2016-06-01T09:04:00Z")
        strptime("2016-06-01T09:04:00")
        strptime("2016-06-01 09:04:00Z")
        strptime("2016-06-01")
    ```
"""
if isinstance(string, datetime):
    return string

try:
    string = string.strip()

    if formats is None:
        has_z = string[-1] == 'Z'
        has_t = 'T' in string
        if has_t or has_z or ' ' in string:
            t_str, z_str = 'T' if has_t else ' ', 'Z' if has_z else ''
            formats = ['%Y-%m-%d{}%H:%M:%S.%f{}'.format(t_str, z_str),
                       '%Y-%m-%d{}%H:%M:%S{}'.format(t_str, z_str)]
        else:
            formats = ['%Y-%m-%d']

    for dtformat in formats:
        try:
            return datetime.strptime(string, dtformat)
        except ValueError:  # as exce:
            pass
    raise ValueError("invalid date time '%s'" % str(string))
except ValueError:
    raise
except Exception as exc:
    raise ValueError(str(exc))

```

tests.py 文件源码 项目:decocare 作者: openaps 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _test_decode_bolus( ):
  """
  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][6] ) ).isoformat( )
  '2012-11-12T00:55:42'

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][0] ) ).isoformat( )
  '2012-11-12T00:55:42'

  ## this is wrong
  >>> parse_date( bytearray( _bewest_dates['page-19'][1] ) ).isoformat( )
  '2012-04-10T12:12:00'

  ## day,month is wrong, time H:M:S is correct
  # expected:
  >>> parse_date( bytearray( _bewest_dates['page-19'][2] ) ).isoformat( )
  '2012-02-08T03:11:12'

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][3] ) ).isoformat( )
  '2012-11-12T08:03:11'

  #### not a valid date
  # >>> parse_date( bytearray( _bewest_dates['page-19'][4] ) ).isoformat( )

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][5] ) ).isoformat( )
  '2012-11-12T08:03:13'


  """
  """

  0x5b 0x7e # bolus wizard,
  0xaa 0xf7 0x00 0x0c 0x0c # page-19[0]

  0x0f 0x50 0x0d 0x2d 0x6a 0x00 0x0b 0x00
  0x00 0x07 0x00 0x0b 0x7d 0x5c 0x08 0x58
  0x97 0x04 0x30 0x05 0x14 0x34 0xc8
  0x91 0xf8      # 0x91, 0xf8 = month=11, minute=56, seconds=17!
  0x00           # general parsing fails here
  0x00
  0xaa 0xf7 0x40 0x0c 0x0c # expected  - page-19[6]

  0x0a 0x0c
  0x8b 0xc3 0x28 0x0c 0x8c # page-19[3]


  0x5b 0x0c

  0x8d 0xc3 0x08 0x0c 0x0c # page-19[5]
  0x00 0x51 0x0d 0x2d 0x6a 0x1f 0x00 0x00 0x00 0x00 0x00
  """

### csv deconstructed
tests.py 文件源码 项目:decocare 作者: openaps 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def decode_wizard(data):
  """
  BYTE
  01:
  02:
  03:
  04:
  05:
  06:
  07:
  08:
  09:
  10:
  12:
  13:
  14:
  15:
  16:
  17:
  18:
  19:
  20:
  21:
  22:
  """
  head = data[:2]
  date = data[2:7]
  datetime = parse_date(date)
  body = data[7:]
  bg = lib.BangInt([ body[1] & 0x0f, head[1] ])
  carb_input = int(body[0])
  carb_ratio = int(body[2])
  bg_target_low = int(body[5])
  bg_target_high = int(body[3])
  sensitivity = int(body[4])

  print "BOLUS WIZARD", datetime.isoformat( )
  wizard = { 'bg_input': bg, 'carb_input': carb_input,
             'carb_ratio': carb_ratio,
             'insulin_sensitivity': sensitivity,
             'bg_target_low': bg_target_low,
             'bg_target_high': bg_target_high,
  }
  return wizard


问题


面经


文章

微信
公众号

扫码关注公众号