python类parse_date()的实例源码

parameters.py 文件源码 项目:ml-rest 作者: apinf 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cast_primitive_value(spec, value):
    format = spec.get('format')
    type = spec.get('type')
    if type == 'boolean':
        return (force_text(value).lower() in ('1', 'yes', 'true'))
    if type == 'integer' or format in ('integer', 'long'):
        return int(value)
    if type == 'number' or format in ('float', 'double'):
        return float(value)
    if format == 'byte':  # base64 encoded characters
        return base64.b64decode(value)
    if format == 'binary':  # any sequence of octets
        return force_bytes(value)
    if format == 'date':  # ISO8601 date
        return iso8601.parse_date(value).date()
    if format == 'dateTime':  # ISO8601 datetime
        return iso8601.parse_date(value)
    if type == 'string':
        return force_text(value)
    return value
parameters.py 文件源码 项目:lepo 作者: akx 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def cast_primitive_value(spec, value):
    format = spec.get('format')
    type = spec.get('type')
    if type == 'boolean':
        return (force_text(value).lower() in ('1', 'yes', 'true'))
    if type == 'integer' or format in ('integer', 'long'):
        return int(value)
    if type == 'number' or format in ('float', 'double'):
        return float(value)
    if format == 'byte':  # base64 encoded characters
        return base64.b64decode(value)
    if format == 'binary':  # any sequence of octets
        return force_bytes(value)
    if format == 'date':  # ISO8601 date
        return iso8601.parse_date(value).date()
    if format == 'dateTime':  # ISO8601 datetime
        return iso8601.parse_date(value)
    if type == 'string':
        return force_text(value)
    return value
repocheck.py 文件源码 项目:repocribro 作者: MarekSuchanek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _process_event(self, repo, event):
        """Process potentially new event for repository

        :param repo: Repository related to event
        :type repo: ``repocribro.models.Repository``
        :param event: GitHub event data
        :type event: dict
        :return: If the event was new or already registered before
        :rtype: bool
        """
        last = pytz.utc.localize(repo.last_event)
        if iso8601.parse_date(event['created_at']) <= last:
            return False
        hook_type = self.event2webhook.get(event['type'], 'uknown')
        for event_processor in self.hooks.get(hook_type, []):
            try:
                event_processor(db=self.db, repo=repo,
                                payload=event['payload'],
                                actor=event['actor'])
                print('Processed {} from {} event for {}'.format(
                    event['type'], event['created_at'], repo.full_name
                ))
            except HTTPException:
                print('Error while processing #{}'.format(event['id']))
        return True
__init__.py 文件源码 项目:jsonschema2db 作者: better 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _is_valid_type(self, t, value):
        try:
            if t == 'number':
                float(value)
            elif t == 'integer':
                int(value)
            elif t == 'boolean':
                assert type(value) == bool
            elif t == 'timestamp':
                iso8601.parse_date(value)
            elif t == 'date':
                iso8601.parse_date(value + 'T00:00:00Z')
            elif t == 'string':
                # Allow coercing ints/floats, but nothing else
                assert type(value) in [str, int, float]
        except:
            return False
        return True
test_appcontroller.py 文件源码 项目:fuel-ccp-tests 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def linear_check(self):
        """Action to wait until all checks are done
        Each check has own timeout equals 300 secs"""
        creation_dates = []
        for item in self.__linear_order:
            resource_repr = getattr(item, '__resource_repr')
            helpers.wait(
                lambda: self._linear_check(item), timeout=300, interval=2,
                timeout_msg="{} creation timeout reached".format(resource_repr)
            )
            k8s_obj = self.get_k8s_object(resource_repr)
            creation_date = iso8601.parse_date(
                k8s_obj.metadata.creation_timestamp)
            creation_dates.append(creation_date)
            if len(creation_dates) > 1:
                assert creation_dates[-2] <= creation_dates[-1], (
                    "The order of linear objects is broken!")
        LOG.info("Linear check passed!")
utils.py 文件源码 项目:graphene-mongo 作者: joaovitorsilvestre 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_operators(args):
    """ Avoid problem that mongoengine for some reason the operators 
        gte, gt, lt, lte doesn't work with dates in isoformat 
    """

    args = {k: v for k, v in args.items() if k not in ['skip', 'limit']}
    for k, v in args.items():
        try:
            is_data = iso8601.parse_date(v)
        except:
            is_data = False

        if is_data:
            args[k] = is_data if is_data else v

    return args
etcd.py 文件源码 项目:etc 作者: sublee 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def make_node(cls, data):
        try:
            key = data['key']
        except KeyError:
            key, kwargs = u'/', {}
        else:
            kwargs = {'modified_index': int(data['modifiedIndex']),
                      'created_index': int(data['createdIndex'])}
        ttl = data.get('ttl')
        if ttl is not None:
            expiration = iso8601.parse_date(data['expiration'])
            kwargs.update(ttl=ttl, expiration=expiration)
        if 'value' in data:
            node_cls = Value
            args = (data['value'],)
        elif data.get('dir', False):
            node_cls = Directory
            args = ([cls.make_node(n) for n in data.get('nodes', ())],)
        else:
            node_cls, args = Node, ()
        return node_cls(key, *args, **kwargs)
pipeline.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self, query, args):
        self.start = args.get(constants.PARAM_START_DATE)
        self.end = args.get(constants.PARAM_END_DATE)

        self.validate_start_end_dates()
        if self.errors:
            return

        start_date, end_date = None, None
        if self.start:
            start_date = str(iso8601.parse_date(self.start).date())
        if self.end:
            end_date = iso8601.parse_date(self.end).date()
            if '-' not in self.end:  # Solo especifica año
                end_date = datetime.date(end_date.year, 12, 31)
            if self.end.count('-') == 1:  # Especifica año y mes
                # Obtengo el último día del mes, monthrange devuelve
                # tupla (month, last_day)
                days = monthrange(end_date.year, end_date.month)[1]
                end_date = datetime.date(end_date.year, end_date.month, days)

        query.add_filter(start_date, end_date)
pipeline.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_date(self, _date, param):
        """Valida y parsea la fecha pasada.

        Args:
            _date (str): date string, ISO 8601
            param (str): Parámetro siendo parseado

        Returns:
            date con la fecha parseada

        Raises:
            ValueError: si el formato no es válido
        """

        try:
            parsed_date = iso8601.parse_date(_date)
        except iso8601.ParseError:
            self._append_error(strings.INVALID_DATE.format(param, _date))
            raise ValueError
        return parsed_date
collapse_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_add_collapse(self):
        """Testea que luego de agregar un collapse default, los
        resultados sean anuales, es decir cada uno a un año de
        diferencia con su anterior"""
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_collapse()
        data = self.query.run()
        prev_timestamp = None
        for row in data:
            timestamp = row[0]
            parsed_timestamp = iso8601.parse_date(timestamp)
            if not prev_timestamp:
                prev_timestamp = parsed_timestamp
                continue
            delta = relativedelta(parsed_timestamp, prev_timestamp)
            self.assertTrue(delta.years == 1, timestamp)
            prev_timestamp = parsed_timestamp
collapse_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_query_fills_nulls(self):
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        data = self.query.run()

        delayed_series_index = 1  # Primera serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
collapse_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_query_fills_nulls_second_series(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        data = self.query.run()

        delayed_series_index = 2  # Segunda serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
collapse_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_end_of_period(self):
        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.add_pagination(start=0, limit=1000)
        query.sort('asc')
        query.add_filter(start="1970")
        orig_data = query.run()

        self.query.add_series(self.single_series,
                              self.rep_mode,
                              self.series_periodicity,
                              'end_of_period')
        self.query.add_filter(start="1970")
        self.query.add_collapse('year')
        eop_data = self.query.run()

        for eop_row in eop_data:
            eop_value = eop_row[1]
            year = iso8601.parse_date(eop_row[0]).year
            for row in orig_data:
                row_date = iso8601.parse_date(row[0])
                if row_date.year == year and row_date.month == 12:
                    self.assertAlmostEqual(eop_value, row[1], 5)  # EOP trae pérdida de precisión
                    break
es_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_query_fills_nulls(self):
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        self.query.sort('asc')
        data = self.query.run()

        delayed_series_index = 1  # Primera serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
es_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_query_fills_nulls_second_series(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.sort('asc')
        delayed_first_date = iso8601.parse_date(query.run()[0][0])
        self.query.sort('asc')
        data = self.query.run()

        delayed_series_index = 2  # Segunda serie agregada
        for row in data:
            current_date = iso8601.parse_date(row[0])
            if current_date < delayed_first_date:
                self.assertEqual(row[delayed_series_index], None)
            else:
                break
es_query_tests.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_index_continuity(self):
        self.query.add_series(self.delayed_series,
                              self.rep_mode,
                              self.series_periodicity)
        self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        self.query.add_filter(start="1910", end="1920")
        self.query.sort('asc')

        query = ESQuery(index=settings.TEST_INDEX)
        query.add_series(self.single_series, self.rep_mode, self.series_periodicity)
        query.add_filter(start="1921")  # Garantiza datos vacíos entre 1920-1921
        query.add_pagination(start=0, limit=1000)
        query.sort('asc')

        data = self.query.run()
        current_date = iso8601.parse_date(data[0][0])
        for row in data[1:]:
            row_date = iso8601.parse_date(row[0])
            self.assertEqual(current_date + relativedelta(months=1), row_date)
            current_date = row_date
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_times(request):
    """Gets start and endtime from request

    As we use no timezone in NAV, remove it from parsed timestamps
    :param request: django.http.HttpRequest
    """
    starttime = request.GET.get('starttime')
    endtime = request.GET.get('endtime')
    try:
        if starttime:
            starttime = iso8601.parse_date(starttime).replace(tzinfo=None)
        if endtime:
            endtime = iso8601.parse_date(endtime).replace(tzinfo=None)
    except iso8601.ParseError:
        raise Iso8601ParseError

    return starttime, endtime
utils.py 文件源码 项目:trees-api 作者: codeforberlin 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_timestamp(filename):

    class GMLHandler(xml.sax.ContentHandler):

        timestamp = None

        def startElement(self, name, attrs):
            if name == "wfs:FeatureCollection":
                self.timestamp = attrs['timeStamp']

    handler = GMLHandler()
    parser = xml.sax.make_parser()
    parser.setContentHandler(handler)
    parser.parse(filename)

    timestamp = iso8601.parse_date(handler.timestamp, default_timezone=None)
    return pytz.timezone(settings.TIME_ZONE).localize(timestamp)
mixins.py 文件源码 项目:django-daiquiri 作者: aipescience 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _test_get_job_destruction(self, username):
        '''
        GET /{jobs}/{job-id}/destruction returns the destruction instant for
        {job-id} as [std:iso8601].
        '''
        for job in self.jobs:
            url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
            response = self.client.get(url)

            if username == 'user':
                self.assertEqual(response.status_code, 200)

                if job.destruction_time:
                    destruction_time = iso8601.parse_date(response.content.decode())
                    self.assertEqual(destruction_time, job.destruction_time)
                else:
                    self.assertEqual(response.content.decode(), '')
            else:
                self.assertEqual(response.status_code, 404)
mixins.py 文件源码 项目:django-daiquiri 作者: aipescience 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _test_post_job_destruction(self, username):
        '''
        POST /{jobs}/{job-id}/destruction with DESTRUCTION={std:iso8601}
        (application/x-www-form-urlencoded) sets the destruction instant for
        {job-id} and redirects to /{jobs}/{job-id} as 303.
        '''
        destruction_time = '2016-01-01T00:00:00'

        for job in self.jobs:
            url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk})
            response = self.client.post(url, urlencode({'DESTRUCTION': destruction_time}), content_type='application/x-www-form-urlencoded')

            if username == 'user':
                redirect_url = 'http://testserver' + reverse(self.url_names['detail'], kwargs={'pk': job.pk})
                self.assertRedirects(response, redirect_url, status_code=303)
                self.assertEqual(
                    self.jobs.get(pk=job.pk).destruction_time,
                    iso8601.parse_date('2016-01-01T00:00:00')
                )
            else:
                self.assertEqual(response.status_code, 404)
filters.py 文件源码 项目:django-daiquiri 作者: aipescience 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def filter_queryset(self, request, queryset, view):
        query_dict = make_query_dict_upper_case(request.GET)

        # apply only for list
        if view.action == 'list_jobs':
            phases = query_dict.getlist('PHASE')

            if phases:
                queryset = queryset.filter(phase__in=phases)
            else:
                queryset = queryset.exclude(phase__exact=Job.PHASE_ARCHIVED)

            after = query_dict.get('AFTER')
            if after:
                queryset = queryset.filter(creation_time__gt=iso8601.parse_date(after))

            last = query_dict.get('LAST')
            if last:
                queryset = queryset.filter(start_time__isnull=False) \
                    .order_by('-start_time')[:int(last)]

        return queryset
models.py 文件源码 项目:aw-core 作者: ActivityWatch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _timestamp_parse(ts: ConvertableTimestamp) -> datetime:
    """
    Takes something representing a timestamp and
    returns a timestamp in the representation we want.
    """
    if isinstance(ts, str):
        ts = iso8601.parse_date(ts)
    # Set resolution to milliseconds instead of microseconds
    # (Fixes incompability with software based on unix time, for example mongodb)
    ts = ts.replace(microsecond=int(ts.microsecond / 1000) * 1000)
    # Add timezone if not set
    if not ts.tzinfo:
        # Needed? All timestamps should be iso8601 so ought to always contain timezone.
        # Yes, because it is optional in iso8601
        logger.warning("timestamp without timezone found, using UTC: {}".format(ts))
        ts = ts.replace(tzinfo=timezone.utc)
    return ts
tender.py 文件源码 项目:openprocurement.auctions.dgf 作者: openprocurement 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_create_auction_auctionPeriod(self):
        data = self.initial_data.copy()
        #tenderPeriod = data.pop('tenderPeriod')
        #data['auctionPeriod'] = {'startDate': tenderPeriod['endDate']}
        response = self.app.post_json('/auctions', {'data': data})
        self.assertEqual(response.status, '201 Created')
        self.assertEqual(response.content_type, 'application/json')
        auction = response.json['data']
        self.assertIn('tenderPeriod', auction)
        self.assertIn('auctionPeriod', auction)
        self.assertNotIn('startDate', auction['auctionPeriod'])
        self.assertEqual(parse_date(data['auctionPeriod']['startDate']).date(), parse_date(auction['auctionPeriod']['shouldStartAfter'], TZ).date())
        if SANDBOX_MODE:
            auction_startDate = parse_date(data['auctionPeriod']['startDate'], None)
            if not auction_startDate.tzinfo:
                auction_startDate = TZ.localize(auction_startDate)
            tender_endDate = parse_date(auction['tenderPeriod']['endDate'], None)
            if not tender_endDate.tzinfo:
                tender_endDate = TZ.localize(tender_endDate)
            self.assertLessEqual((auction_startDate - tender_endDate).total_seconds(), 70)
        else:
            self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).date(), parse_date(data['auctionPeriod']['startDate'], TZ).date() - timedelta(days=1))
            self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).time(), time(20, 0))
test_db_api.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_instance_get_all_by_filters_changes_since(self):
        i1 = self.create_instance_with_args(updated_at=
                                            '2013-12-05T15:03:25.000000')
        i2 = self.create_instance_with_args(updated_at=
                                            '2013-12-05T15:03:26.000000')
        changes_since = iso8601.parse_date('2013-12-05T15:03:25.000000')
        result = db.instance_get_all_by_filters(self.ctxt,
                                                {'changes-since':
                                                 changes_since})
        self._assertEqualListsOfInstances([i1, i2], result)

        changes_since = iso8601.parse_date('2013-12-05T15:03:26.000000')
        result = db.instance_get_all_by_filters(self.ctxt,
                                                {'changes-since':
                                                 changes_since})
        self._assertEqualListsOfInstances([i2], result)

        db.instance_destroy(self.ctxt, i1['uuid'])
        filters = {}
        filters['changes-since'] = changes_since
        filters['marker'] = i1['uuid']
        result = db.instance_get_all_by_filters(self.ctxt,
                                                filters)
        self._assertEqualListsOfInstances([i2], result)
test_iso8601.py 文件源码 项目:deb-subunit 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_parse_no_timezone():
    """issue 4 - Handle datetime string without timezone

    This tests what happens when you parse a date with no timezone. While not
    strictly correct this is quite common. I'll assume UTC for the time zone
    in this case.
    """
    d = iso8601.parse_date("2007-01-01T08:00:00")
    assert d.year == 2007
    assert d.month == 1
    assert d.day == 1
    assert d.hour == 8
    assert d.minute == 0
    assert d.second == 0
    assert d.microsecond == 0
    assert d.tzinfo == iso8601.UTC
logentries.py 文件源码 项目:log-tick 作者: xxv 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_logs(self, start, end, log_filter):
        try:
            response = urllib.request.urlopen(self.get_url(start, end, log_filter))
        except urllib.error.URLError as err:
            print("Error reading from network: %s" % err)
            return []
        body = response.read().decode('utf-8')
        events = []
        for line in body.split('\n'):
            match = re.match(r'.*?(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'
                             r'.\d{6}[-+]\d{2}:\d{2}).*path="([^"]+)".*', line)
            if match:
                path = self.filter_path(match.groups()[1])
                when = iso8601.parse_date(match.groups()[0])
                events.append((when, path))

        return events
models.py 文件源码 项目:mirandum 作者: google 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def as_dict(self):
        details = json.loads(self.details)
        name = "Anonymous Donor"
        if 'donorName' in details and details['donorName']:
            name = details['donorName']
        datetime = iso8601.parse_date(details['createdOn'])    
        info = {
            # general 
            'name': name,
            'comment': details.get('message', "") or '',
            'donation_amount': float(details['donationAmount']),
            'currency': 'USD',
            # Display-friendly
            'amount': "$%.2f" % details['donationAmount'],
            'timestamp': datetime,
        }
        return info
models.py 文件源码 项目:mirandum 作者: google 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def as_dict(self):
        details = json.loads(self.details)
        name = "Anonymous"
        amount = " ".join([str(details['amount']), details['currencyCode']])
        if 'user' in details:
            name = details['user']['displayName']
        elif 'username' in details:
            name = details['username']
        timestamp = iso8601.parse_date(details['date'])
        info = {
            'name': name,
            'amount': amount,
            'comment': details['note'],
            'donation_amount': float(details['amount']),
            'currency': details['currencyCode'],
            'timestamp': timestamp,
        }
        return info
validation.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def parse_datetime(date):
    """
    Validates date is in iso8601 format. Returns parsed datetime in UTC as as
    native datetime (tzinfo=None).
    """
    if not isinstance(date, basestring):
        raise Invalid('date is not a string')
    try:
        return iso8601.parse_date(date).astimezone(iso8601.UTC).replace(
            tzinfo=None)
    except:
        raise Invalid('date is not in iso8601 format')
test_billing.py 文件源码 项目:dj-paypal 作者: HearthSim 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_sync_all_active_plans():
    all_plans = get_fixture("rest.billingplan.all.active.json")
    models.BillingPlan.objects.sync_data(all_plans["plans"])

    assert models.BillingPlan.objects.count() == len(all_plans["plans"])

    for plan in all_plans["plans"]:
        plan_obj = get_fixture(
            "GET/v1/payments/billing-plans/{id}.json".format(id=plan["id"])
        )
        plan = models.BillingPlan.objects.get(id=plan_obj["id"])
        assert plan.id == plan_obj["id"]
        assert plan.state == getattr(enums.BillingPlanState, plan_obj["state"])
        assert plan.type == getattr(enums.BillingPlanType, plan_obj["type"])
        assert plan.name == plan_obj["name"]
        assert plan.description == plan_obj["description"]
        assert plan.merchant_preferences == plan_obj["merchant_preferences"]
        assert plan.create_time == parse_date(plan_obj["create_time"])
        assert plan.update_time == parse_date(plan_obj["update_time"])

        for definition in plan_obj["payment_definitions"]:
            pd = models.PaymentDefinition.objects.get(id=definition["id"])
            assert pd.id == definition["id"]
            assert pd.name == definition["name"]
            assert pd.type == getattr(enums.PaymentDefinitionType, definition["type"])
            assert plan.payment_definitions.filter(id=pd.id).count() == 1


问题


面经


文章

微信
公众号

扫码关注公众号