python类timedelta()的实例源码

client.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_jwt(self):
        exp = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
        exp = calendar.timegm(exp.timetuple())
        # Generate the JWT
        payload = {
            # issued at time
            'iat': int(time.time()),
            # JWT expiration time (10 minute maximum)
            'exp': exp,
            # Integration's GitHub identifier
            'iss': options.get('github.integration-app-id'),
        }

        return jwt.encode(
            payload, options.get('github.integration-private-key'), algorithm='RS256'
        )
views.py 文件源码 项目:stregsystemet 作者: f-klubben 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def razzia_wizard(request):
    if request.method == 'POST':
        return redirect(
            reverse("razzia_view") + "?start={0}-{1}-{2}&end={3}-{4}-{5}&products={6}&username=&razzia_title={7}"
            .format(int(request.POST['start_year']),
                    int(request.POST['start_month']),
                    int(request.POST['start_day']),
                    int(request.POST['end_year']), int(request.POST['end_month']),
                    int(request.POST['end_day']),
                    request.POST.get('products'),
                    request.POST.get('razzia_title')))

    suggested_start_date = timezone.now() - datetime.timedelta(days=-180)
    suggested_end_date = timezone.now()

    start_date_picker = fields.DateField(
        widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)]))
    end_date_picker = fields.DateField(
        widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)]))

    return render(request, 'admin/stregsystem/razzia/wizard.html',
                  {
                      'start_date_picker': start_date_picker.widget.render("start", suggested_start_date),
                      'end_date_picker': end_date_picker.widget.render("end", suggested_end_date)},
                  )
views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def nodeinfo_view(request):
    """Generate a NodeInfo document."""
    site = Site.objects.get_current()
    usage = {"users": {}}
    if settings.SOCIALHOME_STATISTICS:
        usage = {
            "users": {
                "total": User.objects.count(),
                "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
                "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
            },
            "localPosts": Content.objects.filter(author__user__isnull=False, content_type=ContentType.CONTENT).count(),
            "localComments": Content.objects.filter(author__user__isnull=False, content_type=ContentType.REPLY).count(),
        }
    nodeinfo = NodeInfo(
        software={"name": "socialhome", "version": version},
        protocols={"inbound": ["diaspora"], "outbound": ["diaspora"]},
        services={"inbound": [], "outbound": []},
        open_registrations=settings.ACCOUNT_ALLOW_REGISTRATION,
        usage=usage,
        metadata={"nodeName": site.name}
    )
    return JsonResponse(nodeinfo.doc)
test_views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_view_responds_stats_on(self):
        self.get(NODEINFO_DOCUMENT_PATH)
        self.response_200()
        self.assertEqual(
            json.loads(decode_if_bytes(self.last_response.content))["usage"],
            {
                "users": {
                    "total": User.objects.count(),
                    "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
                    "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
                },
                "localPosts": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.CONTENT).count(),
                "localComments": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.REPLY).count(),
            }
        )
test_integrates_with_template_rendering.py 文件源码 项目:django-performance-testing 作者: PaesslerAG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_has_support_for_elapsed_time_in_template_render(settings):
    settings.PERFORMANCE_LIMITS = {
        'Template.render': {
            'time': {
                'total': 0
            }
        }
    }
    template = loader.get_template('all-group-names.markdown')
    with freeze_time('2016-09-29 15:52:01') as frozen_time:
        class SlowIterable(object):
            def __iter__(self):
                yield 'foo'
                frozen_time.tick(timedelta(seconds=5))
                yield 'bar'

        with pytest.raises(LimitViolationError) as excinfo:
            template.render(context={'groups': SlowIterable()})

    assert excinfo.value.context == {'template': ['all-group-names.markdown']}
iam_checks.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test(self):
        if self.user_dict['password_enabled'] == 'true' :
            last_changed = dateutil.parser.parse(self.user_dict['password_last_changed'])
            now = datetime.datetime.utcnow().replace(tzinfo=last_changed.tzinfo)
            diff = now - last_changed
            delta = datetime.timedelta(
                days=config.config['PASSWORD_ROTATION_DAYS']
            )
            if diff > delta:
                self.reason = 'Password has not been changed in {0} days'.format(
                    delta.days
                )
                self.status = common.CheckState.FAIL
            else:
                self.status = common.CheckState.PASS
        elif self.user_dict['password_last_changed'] == 'not_supported':
            self.reason = 'password_last_changed field is not supported'
            self.status = common.CheckState.ERROR
        else:
            self.reason = 'Password is not enabled'
            self.status = common.CheckState.PASS
iam_checks.py 文件源码 项目:ThreatPrep 作者: ThreatResponse 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def key_rotated(self, key_id):
        active_key = 'access_key_{0}_active'.format(key_id)
        if self.user_dict[active_key] != 'true':
            return True #since the key is not active, call it rotated
        last_rotated_key = 'access_key_{0}_last_rotated'.format(key_id)
        last_rotated = self.user_dict[last_rotated_key]
        try:
            last_rotated_date = dateutil.parser.parse(last_rotated)
        except ValueError as e:
            return False #The key has not been rotated so the value is N/A
        delta = datetime.timedelta(days=config.config['ACCESS_KEY_ROTATION_DAYS'])
        now = datetime.datetime.now().replace(tzinfo=last_rotated_date.tzinfo)
        diff = now-last_rotated_date
        if diff > delta:
            return False
        return True
github.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def half_life(issues):
        """Calculate the half life of the service's issues.

        Args:
          issues (:py:class:`list`): The service's issue data.

        Returns:
          :py:class:`datetime.timedelta`: The half life of the issues.

        """
        lives = []
        for issue in issues:
            start = safe_parse(issue.get('created_at'))
            end = safe_parse(issue.get('closed_at'))
            if start and end:
                lives.append(end - start)
        if lives:
            lives.sort()
            size = len(lives)
            return lives[((size + (size % 2)) // 2) - 1]
github.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def health_summary(self, half_life):
        """Calculate the health of the service.

        Args:
          half_life (:py:class:`datetime.timedelta`): The half life of
            the service's issues.

        Returns:
          :py:class:`str`: The health of the service, either ``'ok'``,
            ``'neutral'`` or ``'error'``.

        """
        if half_life is None:
            return 'neutral'
        if half_life <= timedelta(days=self.ok_threshold):
            return 'ok'
        elif half_life <= timedelta(days=self.neutral_threshold):
            return 'neutral'
        return 'error'
test_is_time_to_run.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_am_pm_behaviour(self):
        check_time = datetime.datetime(
            year=2016, month=11, day=7, hour=22,
            minute=10, second=0, microsecond=1)
        PreHourlyProcessorUtil.get_data_provider().set_last_processed(
            date_time=(check_time + datetime.timedelta(hours=-12)))
        self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
analyzer.py 文件源码 项目:toll_road 作者: idosekely 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _time_frame(self, tf):
        _fmt = '%Y%m%d%H%M'
        delta = {
            "last_day": datetime.timedelta(days=1),
            "last_3_days": datetime.timedelta(days=3),
            "last_week": datetime.timedelta(days=7),
            "all": None,
            "custom": None,
        }
        if delta[tf]:
            now = datetime.datetime.now()
            to_time = now.strftime(_fmt)
            from_time = now - delta[tf]
            from_time = from_time.strftime(_fmt)
        else:
            from_time = None
            to_time = None
        return from_time, to_time
exporter.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_if_media_sync_offset_satisfied(logger, settings, audit):
    """
    Check if the media sync offset is satisfied. The media sync offset is a duration in seconds specified in the
    configuration file. This duration is the amount of time audit media is given to sync up with SafetyCulture servers
    before this tool exports the audit data.
    :param logger:    The logger
    :param settings:  Settings from command line and configuration file
    :param audit:     Audit JSON
    :return:          Boolean - True if the media sync offset is satisfied, otherwise, returns false.
    """
    modified_at = dateutil.parser.parse(audit['modified_at'])
    now = datetime.utcnow()
    elapsed_time_difference = (pytz.utc.localize(now) - modified_at)
    # if the media_sync_offset has been satisfied
    if not elapsed_time_difference > timedelta(seconds=settings[MEDIA_SYNC_OFFSET_IN_SECONDS]):
        logger.info('Audit {0} modified too recently, some media may not have completed syncing. Skipping export until next sync cycle'.format(
            audit['audit_id']))
        return False
    return True
si.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def uptime(self):
        with open('/proc/uptime', 'r') as f:
            uptime, idletime = f.readline().split()
            up_seconds = int(float(uptime))
            idle_seconds = int(float(idletime))
            # in some machine like Linode VPS, idle time may bigger than up time
            if idle_seconds > up_seconds:
                cpu_count = multiprocessing.cpu_count()
                idle_seconds = idle_seconds/cpu_count
                # in some VPS, this value may still bigger than up time
                # may be the domain 0 machine has more cores
                # we calclate approximately for it
                if idle_seconds > up_seconds:
                    for n in range(2,10):
                        if idle_seconds/n < up_seconds:
                            idle_seconds = idle_seconds/n
                            break
            fmt = '{days} ? {hours} ?? {minutes} ? {seconds} ?'
            uptime_string = strfdelta(datetime.timedelta(seconds = up_seconds), fmt)
            idletime_string = strfdelta(datetime.timedelta(seconds = idle_seconds), fmt)
        return {
            'up': uptime_string,
            'idle': idletime_string,
            'idle_rate': div_percent(idle_seconds, up_seconds),
        }
models.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def computeTrends(self, trends):
        time = datetime.datetime.now()
        t1 = time - datetime.timedelta(minutes=3)
        t2 = time - datetime.timedelta(minutes=10)
        for value in self.values:
            key = "{0}.{1}".format(self.deviceId, value.id)
            if key in trends and len(trends[key])>2:
                previous = filter(lambda x: x.time > t2 and x.time <= t1, trends[key])
                current = filter(lambda x: x.time > t1 and x.time <= time, trends[key])
                if len(previous) >= 5 and len(current) >= 2:
                    previous_values = [x.value for x in previous]
                    previous_avg = sum(previous_values)/len(previous_values)
                    current_values = [x.value for x in current]
                    current_avg = sum(current_values)/len(current_values)
                    if current_avg > previous_avg + 0.02:
                        value.trend = 1
                    if current_avg < previous_avg - 0.02:
                        value.trend = -1
        return self
backuper.py 文件源码 项目:Telebackup 作者: LonamiWebs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def calculate_etl(self, downloaded, total, start=None):
        """Calculates the estimated time left, based on how long it took us
           to reach "downloaded" and how many messages we have left.

           If no start time is given, the time will simply by estimated by how
           many chunks are left, which will NOT work if what is being downloaded is media"""
        left = total - downloaded
        if not start:
            # We add chunk size - 1 because division will truncate the decimal places,
            # so for example, if we had a chunk size of 8:
            #   7 messages + 7 = 14 -> 14 // 8 = 1 chunk download required
            #   8 messages + 7 = 15 -> 15 // 8 = 1 chunk download required
            #   9 messages + 7 = 16 -> 16 // 8 = 2 chunks download required
            #
            # Clearly, both 7 and 8 fit in one chunk, but 9 doesn't.
            chunks_left = (left + self.download_chunk_size - 1) // self.download_chunk_size
            etl = chunks_left * self.download_delay
        else:
            if downloaded:
                delta_time = (datetime.now() - start).total_seconds() / downloaded
                etl = left * delta_time
            else:
                etl = 0

        return timedelta(seconds=round(etl, 1))
testing.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tearDown(self):
        """clean up the test
        """
        import datetime
        import transaction
        from stalker import defaults
        from stalker.db.session import DBSession
        from stalker.db.declarative import Base
        from pyramid import testing

        testing.tearDown()

        # clean up test database
        connection = DBSession.connection()
        engine = connection.engine
        connection.close()
        Base.metadata.drop_all(engine)
        transaction.commit()
        DBSession.remove()

        defaults.timing_resolution = datetime.timedelta(hours=1)
testing.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tearDown(self):
        """clean up the test
        """
        import datetime
        import transaction
        from stalker import defaults
        from stalker.db.declarative import Base
        from stalker.db.session import DBSession

        from pyramid import testing
        testing.tearDown()

        # clean up test database
        connection = DBSession.connection()
        engine = connection.engine
        connection.close()
        Base.metadata.drop_all(engine)
        transaction.commit()
        DBSession.remove()

        defaults.timing_resolution = datetime.timedelta(hours=1)
__init__.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_time(cls, request, time_attr):
        """Extracts a time object from the given request

        :param request: the request object
        :param time_attr: the attribute name
        :return: datetime.timedelta
        """
        time_part = datetime.datetime.strptime(
            request.params[time_attr][:-4],
            '%a, %d %b %Y %H:%M:%S'
        )

        return datetime.timedelta(
            hours=time_part.hour,
            minutes=time_part.minute
        )
test_oauth2_models.py 文件源码 项目:django-tdd-restful-api 作者: elastic7327 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_create_oauth2_token(self):
        admin_user = mixer.blend('auth.User', is_staff=True, is_superuser=True)
        app = Application.objects.create(
                name='SuperAPI OAUTH2 APP',
                user=admin_user,
                client_type=Application.CLIENT_PUBLIC,
                authorization_grant_type=Application.GRANT_PASSWORD,
        )
        assert Application.objects.count() == 1, "Should be equal"

        random = get_random_string(length=16)

        admin_token = AccessToken.objects.create(
                user=admin_user,
                scope='read write',
                # ?? ????? . . .
                expires=timezone.now() + timedelta(minutes=5),
                token=f'{random}---{admin_user.username}',
                application=app
        )

        assert admin_token is not None, "??? ???"
trading.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def next_market_minute(self, start):
        """
        Get the next market minute after @start. This is either the immediate
        next minute, the open of the same day if @start is before the market
        open on a trading day, or the open of the next market day after @start.
        """
        if self.is_trading_day(start):
            market_open, market_close = self.get_open_and_close(start)
            # If start before market open on a trading day, return market open.
            if start < market_open:
                return market_open
            # If start is during trading hours, then get the next minute.
            elif start < market_close:
                return start + datetime.timedelta(minutes=1)
        # If start is not in a trading day, or is after the market close
        # then return the open of the *next* trading day.
        return self.next_open_and_close(start)[0]
trading.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def previous_market_minute(self, start):
        """
        Get the next market minute before @start. This is either the immediate
        previous minute, the close of the same day if @start is after the close
        on a trading day, or the close of the market day before @start.
        """
        if self.is_trading_day(start):
            market_open, market_close = self.get_open_and_close(start)
            # If start after the market close, return market close.
            if start > market_close:
                return market_close
            # If start is during trading hours, then get previous minute.
            if start > market_open:
                return start - datetime.timedelta(minutes=1)
        # If start is not a trading day, or is before the market open
        # then return the close of the *previous* trading day.
        return self.previous_open_and_close(start)[1]
factory.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def create_trade_history(sid, prices, amounts, interval, sim_params, env,
                         source_id="test_factory"):
    trades = []
    current = sim_params.first_open

    oneday = timedelta(days=1)
    use_midnight = interval >= oneday
    for price, amount in zip(prices, amounts):
        if use_midnight:
            trade_dt = current.replace(hour=0, minute=0)
        else:
            trade_dt = current
        trade = create_trade(sid, price, amount, trade_dt, source_id)
        trades.append(trade)
        current = get_next_trading_dt(current, interval, env)

    assert len(trades) == len(prices)
    return trades
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setUp(self):
        setup_logger(self)
        self.sim_params = factory.create_simulation_parameters(num_days=4,
                                                               env=self.env)

        trade_history = factory.create_trade_history(
            133,
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            self.sim_params,
            self.env
        )
        self.source = SpecificEquityTrades(
            event_list=trade_history,
            env=self.env,
        )
        self.df_source, self.df = \
            factory.create_test_df_source(self.sim_params, self.env)

        self.panel_source, self.panel = \
            factory.create_test_panel_source(self.sim_params, self.env)
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
        setup_logger(self)
        self.env = TradingEnvironment()
        self.sim_params = factory.create_simulation_parameters(num_days=4,
                                                               env=self.env)
        self.env.write_data(equities_identifiers=[1, 133])

        trade_history = factory.create_trade_history(
            1,
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            self.sim_params,
            self.env
        )
        self.source = SpecificEquityTrades(
            event_list=trade_history,
            env=self.env,
        )

        self.df_source, self.df = \
            factory.create_test_df_source(self.sim_params, self.env)
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sim_params = factory.create_simulation_parameters(num_days=4,
                                                               env=self.env)
        self.trade_history = factory.create_trade_history(
            self.sid,
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            self.sim_params,
            self.env
        )

        self.source = SpecificEquityTrades(
            event_list=self.trade_history,
            env=self.env,
        )
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sim_params = factory.create_simulation_parameters(
            num_days=4, env=self.env
        )
        self.trade_history = factory.create_trade_history(
            self.sidint,
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            self.sim_params,
            self.env,
        )

        self.source = SpecificEquityTrades(
            event_list=self.trade_history,
            env=self.env,
        )
test_security_list.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_iterate_over_rl(self):
        sim_params = factory.create_simulation_parameters(
            start=list(LEVERAGED_ETFS.keys())[0], num_days=4, env=self.env)
        trade_history = factory.create_trade_history(
            'BZQ',
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            sim_params,
            env=self.env
        )
        self.source = SpecificEquityTrades(event_list=trade_history,
                                           env=self.env)
        algo = IterateRLAlgo(symbol='BZQ', sim_params=sim_params, env=self.env)
        algo.run(self.source)
        self.assertTrue(algo.found)
test_security_list.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_algo_without_rl_violation(self):
        sim_params = factory.create_simulation_parameters(
            start=list(LEVERAGED_ETFS.keys())[0], num_days=4,
            env=self.env)
        trade_history = factory.create_trade_history(
            'AAPL',
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            sim_params,
            env=self.env
        )
        self.source = SpecificEquityTrades(event_list=trade_history,
                                           env=self.env)
        algo = RestrictedAlgoWithoutCheck(symbol='AAPL',
                                          sim_params=sim_params,
                                          env=self.env)
        algo.run(self.source)
test_security_list.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_algo_with_rl_violation_after_knowledge_date(self):
        sim_params = factory.create_simulation_parameters(
            start=list(
                LEVERAGED_ETFS.keys())[0] + timedelta(days=7), num_days=5,
            env=self.env)
        trade_history = factory.create_trade_history(
            'BZQ',
            [10.0, 10.0, 11.0, 11.0],
            [100, 100, 100, 300],
            timedelta(days=1),
            sim_params,
            env=self.env
        )
        self.source = SpecificEquityTrades(event_list=trade_history,
                                           env=self.env)
        algo = RestrictedAlgoWithoutCheck(symbol='BZQ',
                                          sim_params=sim_params,
                                          env=self.env)
        with self.assertRaises(TradingControlViolation) as ctx:
            algo.run(self.source)

        self.check_algo_exception(algo, ctx, 0)
test_security_list.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_algo_without_rl_violation_after_delete(self):
        with security_list_copy():
            # add a delete statement removing bzq
            # write a new delete statement file to disk
            add_security_data([], ['BZQ'])
            sim_params = factory.create_simulation_parameters(
                start=self.extra_knowledge_date, num_days=3)

            trade_history = factory.create_trade_history(
                'BZQ',
                [10.0, 10.0, 11.0, 11.0],
                [100, 100, 100, 300],
                timedelta(days=1),
                sim_params,
                env=self.env,
            )
            self.source = SpecificEquityTrades(event_list=trade_history,
                                               env=self.env)
            algo = RestrictedAlgoWithoutCheck(
                symbol='BZQ', sim_params=sim_params, env=self.env
            )
            algo.run(self.source)


问题


面经


文章

微信
公众号

扫码关注公众号