python类datetime()的实例源码

scraper.py 文件源码 项目:Easysentiment 作者: Jflick58 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def default(self, obj):
        """default method."""
        if hasattr(obj, '__json__'):
            return obj.__json__()
        elif isinstance(obj, collections.Iterable):
            return list(obj)
        elif isinstance(obj, datetime):
            return obj.isoformat()
        elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
            return dict(obj)
        elif hasattr(obj, '__dict__'):
            return {member: getattr(obj, member)
                    for member in dir(obj)
                    if not member.startswith('_') and
                    not hasattr(getattr(obj, member), '__call__')}

        return json.JSONEncoder.default(self, obj)
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_response_and_time(self, key, default=(None, None)):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response, timestamp = self.responses[key]
        except KeyError:
            return default
        return self.restore_response(response), timestamp
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setUpTestData(cls):
        super().setUpTestData()
        cls.create_local_and_remote_user()
        cls.user2 = AnonymousUser()
        cls.local_user = UserFactory()
        cls.public_content = ContentFactory(
            visibility=Visibility.PUBLIC, text="**Foobar**", author=cls.profile,
        )
        cls.site_content = ContentFactory(
            visibility=Visibility.SITE, text="_Foobar_"
        )
        cls.limited_content = ContentFactory(visibility=Visibility.LIMITED)
        cls.self_content = ContentFactory(visibility=Visibility.SELF)
        cls.remote_content = ContentFactory(
            visibility=Visibility.PUBLIC, remote_created=make_aware(datetime.datetime(2015, 1, 1)),
            author=cls.remote_profile,
        )
        cls.ids = [
            cls.public_content.id, cls.site_content.id, cls.limited_content.id, cls.self_content.id
        ]
        cls.set = {
            cls.public_content, cls.site_content, cls.limited_content, cls.self_content
        }
scraper_and_analysis.py 文件源码 项目:Easysentiment 作者: Jflick58 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def default(self, obj):
        """default method."""
        if hasattr(obj, '__json__'):
            return obj.__json__()
        elif isinstance(obj, collections.Iterable):
            return list(obj)
        elif isinstance(obj, datetime):
            return obj.isoformat()
        elif hasattr(obj, '__getitem__') and hasattr(obj, 'keys'):
            return dict(obj)
        elif hasattr(obj, '__dict__'):
            return {member: getattr(obj, member)
                    for member in dir(obj)
                    if not member.startswith('_') and
                    not hasattr(getattr(obj, member), '__call__')}

        return json.JSONEncoder.default(self, obj)
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_am_pm_behaviour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=22,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-12)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
__init__.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_time(cls, request, time_attr):
        """Extracts a time object from the given request

        :param request: the request object
        :param time_attr: the attribute name
        :return: datetime.timedelta
        """
        time_part = datetime.datetime.strptime(
            request.params[time_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        return datetime.timedelta(
            hours=time_part.hour,
            minutes=time_part.minute
        )
__init__.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_datetime(cls, request, date_attr, time_attr):
        """Extracts a UTC  datetime object from the given request
        :param request: the request object
        :param date_attr: the attribute name
        :return: datetime.datetime
        """
        date_part = datetime.datetime.strptime(
            request.params[date_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        time_part = datetime.datetime.strptime(
            request.params[time_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        # update the time values of date_part with time_part
        return date_part.replace(
            hour=time_part.hour,
            minute=time_part.minute,
            second=time_part.second,
            microsecond=time_part.microsecond
        )
numpy_utils.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def coerce_to_dtype(dtype, value):
    """
    Make a value with the specified numpy dtype.

    Only datetime64[ns] and datetime64[D] are supported for datetime dtypes.
    """
    name = dtype.name
    if name.startswith('datetime64'):
        if name == 'datetime64[D]':
            return make_datetime64D(value)
        elif name == 'datetime64[ns]':
            return make_datetime64ns(value)
        else:
            raise TypeError(
                "Don't know how to coerce values of dtype %s" % dtype
            )
    return dtype.type(value)
tradingcalendar_tse.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='US/Eastern').tz_convert('UTC')

    return market_open, market_close
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_test_panel_ohlc_source(sim_params, env):
    start = sim_params.first_open \
        if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)

    end = sim_params.last_close \
        if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)

    index = env.days_in_range(start, end)
    price = np.arange(0, len(index)) + 100
    high = price * 1.05
    low = price * 0.95
    open_ = price + .1 * (price % 2 - .5)
    volume = np.ones(len(index)) * 1000
    arbitrary = np.ones(len(index))

    df = pd.DataFrame({'price': price,
                       'high': high,
                       'low': low,
                       'open': open_,
                       'volume': volume,
                       'arbitrary': arbitrary},
                      index=index)
    panel = pd.Panel.from_dict({0: df})

    return DataPanelSource(panel), panel
tradingcalendar_bmf.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    # only "early close" event in Bovespa actually is a late start
    # as the market only opens at 1pm
    open_hour = 13 if day in quarta_cinzas else 10
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=open_hour,
            minute=00),
        tz='America/Sao_Paulo').tz_convert('UTC')
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=16),
        tz='America/Sao_Paulo').tz_convert('UTC')

    return market_open, market_close
events.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _coerce_datetime(maybe_dt):
    if isinstance(maybe_dt, datetime.datetime):
        return maybe_dt
    elif isinstance(maybe_dt, datetime.date):
        return datetime.datetime(
            year=maybe_dt.year,
            month=maybe_dt.month,
            day=maybe_dt.day,
            tzinfo=pytz.utc,
        )
    elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
        year, month, day = maybe_dt
        return datetime.datetime(
            year=year,
            month=month,
            day=day,
            tzinfo=pytz.utc,
        )
    else:
        raise TypeError('Cannot coerce %s into a datetime.datetime'
                        % type(maybe_dt).__name__)
tradingcalendar_china.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_open_and_close(day, early_closes):
    market_open = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=9,
            minute=31),
        tz='US/Eastern').tz_convert('UTC')
    # 1 PM if early close, 4 PM otherwise
    close_hour = 13 if day in early_closes else 16
    market_close = pd.Timestamp(
        datetime(
            year=day.year,
            month=day.month,
            day=day.day,
            hour=close_hour),
        tz='Asia/Shanghai').tz_convert('UTC')

    return market_open, market_close
test_algorithm_gen.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_generator_dates(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """

        sim_params = factory.create_simulation_parameters(
            start=datetime(2011, 7, 30, tzinfo=pytz.utc),
            end=datetime(2012, 7, 30, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        self.assertTrue(list(gen))

        self.assertTrue(algo.slippage.latest_date)
        self.assertTrue(algo.latest_date)
test_algorithm_gen.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_progress(self):
        """
        Ensure the pipeline of generators are in sync, at least as far as
        their current dates.
        """
        sim_params = factory.create_simulation_parameters(
            start=datetime(2008, 1, 1, tzinfo=pytz.utc),
            end=datetime(2008, 1, 5, tzinfo=pytz.utc),
            env=self.env,
        )
        algo = TestAlgo(self, sim_params=sim_params, env=self.env)
        trade_source = factory.create_daily_trade_source(
            [8229],
            sim_params,
            env=self.env,
        )
        algo.set_sources([trade_source])

        gen = algo.get_generator()
        results = list(gen)
        self.assertEqual(results[-2]['progress'], 1.0)
test_events.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_checks_should_trigger(self):
        class CountingRule(Always):
            count = 0

            def should_trigger(self, dt, env):
                CountingRule.count += 1
                return True

        for r in [CountingRule] * 5:
                self.em.add_event(
                    Event(r(), lambda context, data: None)
                )

        mock_algo_class = namedtuple('FakeAlgo', ['trading_environment'])
        mock_algo = mock_algo_class(trading_environment="fake_env")
        self.em.handle_data(mock_algo, None, datetime.datetime.now())

        self.assertEqual(CountingRule.count, 5)
plistlib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def writeValue(self, value):
        if isinstance(value, (str, unicode)):
            self.simpleElement("string", value)
        elif isinstance(value, bool):
            # must switch for bool before int, as bool is a
            # subclass of int...
            if value:
                self.simpleElement("true")
            else:
                self.simpleElement("false")
        elif isinstance(value, (int, long)):
            self.simpleElement("integer", "%d" % value)
        elif isinstance(value, float):
            self.simpleElement("real", repr(value))
        elif isinstance(value, dict):
            self.writeDict(value)
        elif isinstance(value, Data):
            self.writeData(value)
        elif isinstance(value, datetime.datetime):
            self.simpleElement("date", _dateToString(value))
        elif isinstance(value, (tuple, list)):
            self.writeArray(value)
        else:
            raise TypeError("unsuported type: %s" % type(value))
xmlrpclib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def make_comparable(self, other):
        if isinstance(other, DateTime):
            s = self.value
            o = other.value
        elif datetime and isinstance(other, datetime.datetime):
            s = self.value
            o = other.strftime("%Y%m%dT%H:%M:%S")
        elif isinstance(other, (str, unicode)):
            s = self.value
            o = other
        elif hasattr(other, "timetuple"):
            s = self.timetuple()
            o = other.timetuple()
        else:
            otype = (hasattr(other, "__class__")
                     and other.__class__.__name__
                     or type(other))
            raise TypeError("Can't compare %s and %s" %
                            (self.__class__.__name__, otype))
        return s, o
base.py 文件源码 项目:cbapi-python 作者: carbonblack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_response(self, key, default=None):
        """ Retrieves response and timestamp for `key` if it's stored in cache,
        otherwise returns `default`

        :param key: key of resource
        :param default: return this if `key` not found in cache
        :returns: tuple (response, datetime)

        .. note:: Response is restored after unpickling with :meth:`restore_response`
        """
        try:
            if key not in self.responses:
                key = self.keys_map[key]
            response = self.responses[key]
        except KeyError:
            return default
        return response
test_models.py 文件源码 项目:tts-bug-bounty-dashboard 作者: 18F 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_activity_sets_sla_triaged_at():
    r = new_report()
    r.save()
    assert r.sla_triaged_at is None

    # An activity that shouldn't update sla_triaged_at
    d1 = now()
    r.activities.create(id=1, type='activity-comment', created_at=d1)
    assert r.sla_triaged_at is None

    # And now one that should
    d2 = d1 + datetime.timedelta(hours=3)
    r.activities.create(id=2, type='activity-bug-not-applicable', created_at=d2)
    assert r.sla_triaged_at == d2

    # And now another aciivity that would update the date, if it wasn't already set
    d3 = d2 + datetime.timedelta(hours=3)
    r.activities.create(id=3, type='activity-bug-resolved', created_at=d3)
    assert r.sla_triaged_at == d2
toml.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_date(val):
    microsecond = 0
    tz = None
    try:
        if len(val) > 19:
            if val[19] == '.':
                microsecond = int(val[20:26])
                if len(val) > 26:
                    tz = TomlTz(val[26:32])
            else:
                tz = TomlTz(val[19:25])
    except ValueError:
        tz = None
    try:
        d = datetime.datetime(int(val[:4]), int(val[5:7]), int(val[8:10]), int(val[11:13]), int(val[14:16]), int(val[17:19]), microsecond, tz)
    except ValueError:
        return None
    return d
test_ApplicationAutoscaling.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        aws_data = {
            'ScalableTargets': [
                {
                    'ServiceNamespace': 'ecs',
                    'ResourceId': 'service/my_cluster/my_service',
                    'ScalableDimension': 'ecs:service:DesiredCount',
                    'MinCapacity': 1,
                    'MaxCapacity': 3,
                    'RoleARN': 'my_role_arn',
                    'CreationTime': datetime(2017, 4, 14)
                }
            ],
            'NextToken': None
        }
        init = Mock(return_value=None)
        init.return_value = None
        appscaling_client = Mock()
        appscaling_client.describe_scalable_targets = Mock(return_value=aws_data)
        client = Mock(return_value=appscaling_client)
        with Replacer() as r:
            r.replace('boto3.client', client)
            r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
            self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster')
test_ApplicationAutoscaling.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        aws_data = {
            'ServiceNamespace': 'ecs',
            'ResourceId': 'service/my_cluster/my_service',
            'ScalableDimension': 'ecs:service:DesiredCount',
            'MinCapacity': 1,
            'MaxCapacity': 3,
            'RoleARN': 'my_role_arn',
            'CreationTime': datetime(2017, 4, 14)
        }
        init = Mock(return_value=None)
        init.return_value = None
        appscaling_client = Mock()
        self.describe_scaling_targets = Mock(return_value=aws_data)
        appscaling_client.describe_scalable_targets = self.describe_scaling_targets
        client = Mock(return_value=appscaling_client)
        with Replacer() as r:
            r.replace('boto3.client', client)
            r.replace('deployfish.aws.appscaling.ScalingPolicy.__init__', init)
            self.appscaling = ApplicationAutoscaling('my_service', 'my_cluster', aws=aws_data)
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)):
            self.public_content.save()
            self.assertFalse(self.public_content.edited)
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertTrue(self.public_content.edited)
test_models.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_dict_for_view_edited_post(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertEqual(self.public_content.dict_for_view(self.user), {
                "author": self.public_content.author_id,
                "author_guid": self.public_content.author.guid,
                "author_handle": self.public_content.author.handle,
                "author_home_url": self.public_content.author.home_url,
                "author_image": self.public_content.author.safer_image_url_small,
                "author_is_local": bool(self.public_content.author.user),
                "author_name": self.public_content.author.handle,
                "author_profile_url": self.public_content.author.get_absolute_url(),
                "content_type": self.public_content.content_type.string_value,
                "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
                "detail_url": self.public_content.get_absolute_url(),
                "formatted_timestamp": self.public_content.timestamp,
                "guid": self.public_content.guid,
                "has_shared": False,
                "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
                "id": self.public_content.id,
                "is_authenticated": True,
                "is_author": True,
                "is_following_author": False,
                "parent": "",
                "profile_id": self.public_content.author.id,
                "rendered": self.public_content.rendered,
                "reply_count": 0,
                "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
                "shares_count": 0,
                "slug": self.public_content.slug,
                "through": self.public_content.id,
                "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
            })
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_is_time_to_run_before_late_metric_slack_time(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=9, second=59, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_is_time_to_run_after_late_metric_slack_time(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_is_time_to_run_with_already_done_this_hour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=11,
            minute=30, second=0, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=check_time)
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_is_time_to_run_after_midnight_but_before_late_metric_slack_time(
            self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=0,
            minute=5, second=0, microsecond=0)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-1)))
        self.assertFalse(PreHourlyProcessorUtil.is_time_to_run(check_time))


问题


面经


文章

微信
公众号

扫码关注公众号