python类get()的实例源码

models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def for_request(self, request, body=None):
        if body and 'oauth_client_id' in body:
            rv = Tenant.objects.get(pk=body['oauth_client_id'])
            if rv is not None:
                return rv, {}

        jwt_data = request.GET.get('signed_request')

        if not jwt_data:
            header = request.META.get('HTTP_AUTHORIZATION', '')
            jwt_data = header[4:] if header.startswith('JWT ') else None

        if not jwt_data:
            raise BadTenantError('Could not find JWT')

        try:
            oauth_id = jwt.decode(jwt_data, verify=False)['iss']
            client = Tenant.objects.get(pk=oauth_id)
            if client is not None:
                data = jwt.decode(jwt_data, client.secret)
                return client, data
        except jwt.exceptions.DecodeError:
            pass

        raise BadTenantError('Could not find tenant')
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_event_from_url_params(self, group_id, event_id=None, slug_vars=None):
        if event_id is not None:
            try:
                event = Event.objects.get(pk=int(event_id))
            except (ValueError, Event.DoesNotExist):
                return None
            group = event.group
            if six.text_type(group.id) != group_id:
                return None
        else:
            try:
                group = Group.objects.get(pk=int(group_id))
            except (ValueError, Group.DoesNotExist):
                return None
            event = group.get_latest_event()
        event = self._ensure_and_bind_event(event)
        if event is None:
            return None

        if slug_vars is not None:
            if slug_vars['org_slug'] != group.organization.slug or \
               slug_vars['proj_slug'] != group.project.slug:
                return None

        return event
models.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def url_by_alias(page_alias):
        """
        Try to read page url from cache. If it's missing then try to find
        matching page that could be missing in cache. If page is found
        then refresh all url list since it's too old.
        If no matching is found then return None so we can throw any
        exception we want in other places
        """
        if page_alias:
            url_to_alias = cache.get(cachekeys.URL_LIST_CACHE)
            if url_to_alias is None:
                url_to_alias = PageURLCache.refresh()
            url = url_to_alias.get(page_alias)
        else:
            url = None
        return url


# Models:
views.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def clients(request):        

    data = {}

    for word in cache.keys("client_*"):
        client = re.sub(r'^client_', '', word)
        try:

            client_data = cache.get(word)
            data[client] = client_data 

        except:
            raise

    profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id))

    return render(request, 'isubscribe/clients.html', {'DATA':data, 'profile_form': profile_form})
views.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def subscriptions(request):        

    data = {}

    for word in r.keys("subscription_*"):
        subscription = re.sub(r'^subscription_', '', str(word.decode('utf-8')))
        try:

            subscription_data = r.lrange(word, 0, -1)
            data[subscription] = subscription_data 

        except:
            raise

    profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id))   

    return render(request, 'isubscribe/subscriptions.html', {'DATA':data, 'profile_form': profile_form})




#@login_required(login_url=reverse_lazy('login'))
views.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def user_settings(request):        

    logger.debug('settings view triggered by %s' % (request.user.username))

    form = ContactForm(request.POST, instance=Contact.objects.get(user=request.user.id))

    if form.is_valid:
        try:
            form.save()
            return HttpResponse('Done', status=200)
        except:        
            return HttpResponse(json.dumps(form.errors), status=409)
    else:
        return HttpResponse(json.dumps(form.errors), status=409)



    return render(request, 'isubscribe/user_settings.html', {'DATA':data, 'form': form})
views.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_config(request):        

    mimetype = 'application/json'   
    data = {}

    if request.method == 'POST' and 'entity' in request.POST and request.POST['entity'] != '':        
        client_name, check_name = request.POST['entity'].split(':')
        #check_name = 'check_gw_tomcat_errors_1h'
        #data = cache.get('check_' + check_name)
        data = cache.get('check_' + request.POST['entity'])


    return HttpResponse(json.dumps(data), mimetype)



#@login_required(login_url=reverse_lazy('login'))
notify.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def onduty_members(self):

        OnDuty = []
        if 'OnDuty' in cache.keys('OnDuty'):
            OnDuty = cache.get('OnDuty')      
        else:
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=0)).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    for user in instance.event.members_list():
                        OnDuty.append(user.pk)
                    logger.debug('onduty_members found: %s' % OnDuty)
                    #cache.set('OnDuty', OnDuty, timeout=event_end.timestamp())
                    cache.set('OnDuty', OnDuty, timeout=settings.ON_DUTY_CACHE_MEMBERS)
                else:
                    logger.debug('onduty_members can not find onduty_members')
            except:
                logger.error('onduty_members failed finding onduty_members')
                pass

        return OnDuty
notify.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def user_dnd(self, user_pk):

        if 'DnD_' + str(user_pk) in cache.keys("DnD_*"):
            #DnD = cache.get('DnD_' + str(user_pk))
            DnD = True        
        else:
            DnD = False
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=1, members__in=[user_pk])).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    DnD = True
                    cache.set('DnD_' + str(user_pk), DnD, timeout=event_end.timestamp())
            except:
                pass

        return DnD
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_forever_timeout(self):
        """
        Passing in None into timeout results in a value that is cached forever
        """
        cache = self.cache
        cache.set('key1', 'eggs', None)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.add('key2', 'ham', None)
        self.assertEqual(cache.get('key2'), 'ham')
        added = cache.add('key1', 'new eggs', None)
        self.assertIs(added, False)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, None)
        self.assertEqual(cache.get('key3'), 'sausage')
        self.assertEqual(cache.get('key4'), 'lobster bisque')
test_views.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_client_task_tester(client, clear_redis_store):
    url = reverse('task_tester')

    def fake_task(key, value, expires):
        cache.set(key, value, expires)

    _mock_function = 'tecken.views.sample_task.delay'
    with mock.patch(_mock_function, new=fake_task):

        response = client.get(url)
        assert response.status_code == 400
        assert b'Make a POST request to this URL first' in response.content

        response = client.post(url)
        assert response.status_code == 201
        assert b'Now make a GET request to this URL' in response.content

        response = client.get(url)
        assert response.status_code == 200
        assert b'It works!' in response.content
views.py 文件源码 项目:tecken 作者: mozilla-services 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def task_tester(request):
    if request.method == 'POST':
        cache.set('marco', 'ping', 100)
        sample_task.delay('marco', 'polo', 10)
        return http.HttpResponse(
            'Now make a GET request to this URL\n',
            status=201,
        )
    else:
        if not cache.get('marco'):
            return http.HttpResponseBadRequest(
                'Make a POST request to this URL first\n'
            )
        for i in range(3):
            value = cache.get('marco')
            if value == 'polo':
                return http.HttpResponse('It works!\n')
            time.sleep(1)

        return http.HttpResponseServerError(
            'Tried 4 times (4 seconds) and no luck :(\n'
        )
models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_canonical(cls, language_code):
        """Returns the canonical `Language` object matching `language_code`.

        If no language can be matched, `None` will be returned.

        :param language_code: the code of the language to retrieve.
        """
        try:
            return cls.objects.get(code__iexact=language_code)
        except cls.DoesNotExist:
            _lang_code = language_code
            if "-" in language_code:
                _lang_code = language_code.replace("-", "_")
            elif "_" in language_code:
                _lang_code = language_code.replace("_", "-")
            try:
                return cls.objects.get(code__iexact=_lang_code)
            except cls.DoesNotExist:
                return None
models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def validate_email(self):
        """Ensure emails are unique across the models tracking emails.

        Since it's essential to keep email addresses unique to support our
        workflows, a `ValidationError` will be raised if the email trying
        to be saved is already assigned to some other user.
        """
        lookup = Q(email__iexact=self.email)
        if self.pk is not None:
            # When there's an update, ensure no one else has this address
            lookup &= ~Q(user=self)

        try:
            EmailAddress.objects.get(lookup)
        except EmailAddress.DoesNotExist:
            pass
        else:
            raise ValidationError({
                'email': [_('This email address already exists.')]
            })
views.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, request):
        try:
            start, end = get_start_end_paramters(request, default_days_back=1)
        except ValueError as e:
            return HttpResponseBadRequest(str(e))
        combine = request.query_params.get('combine')
        sites = request.query_params.getlist('site')
        telescopes = request.query_params.getlist('telescope')
        try:
            telescope_availability = get_telescope_availability_per_day(
                start, end, sites=sites, telescopes=telescopes
            )
        except ElasticSearchException:
            logger.warning('Error connecting to ElasticSearch. Is SBA reachable?')
            return Response('ConnectionError')
        if combine:
            telescope_availability = combine_telescope_availabilities_by_site_and_class(telescope_availability)
        str_telescope_availability = {str(k): v for k, v in telescope_availability.items()}

        return Response(str_telescope_availability)
models.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def archive_bearer_token(self):
        # During testing, you will probably have to copy access tokens from prod for this to work
        try:
            app = Application.objects.get(name='Archive')
        except Application.DoesNotExist:
            logger.error('Archive application not found. Oauth applications need to be populated.')
            return ''
        access_token = AccessToken.objects.filter(user=self.user, application=app, expires__gt=timezone.now()).last()
        if not access_token:
            access_token = AccessToken(
                user=self.user,
                application=app,
                token=uuid.uuid4().hex,
                expires=timezone.now() + timedelta(days=30)
            )
            access_token.save()
        return access_token.token
rise_set_utils.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_rise_set_intervals(request_dict, site=''):
    intervals = []
    site = site if site else request_dict['location'].get('site', '')
    telescope_details = configdb.get_telescopes_with_instrument_type_and_location(
            request_dict['molecules'][0]['instrument_name'],
            site,
            request_dict['location'].get('observatory', ''),
            request_dict['location'].get('telescope', '')
    )
    if not telescope_details:
        return intervals

    intervals_by_site = get_rise_set_intervals_by_site(request_dict)
    intervalsets_by_telescope = intervals_by_site_to_intervalsets_by_telescope(intervals_by_site, telescope_details.keys())
    filtered_intervalsets_by_telescope = filter_out_downtime_from_intervalsets(intervalsets_by_telescope)
    filtered_intervalset = Intervals().union(filtered_intervalsets_by_telescope.values())
    filtered_intervals = filtered_intervalset.toTupleList()

    return filtered_intervals
telescope_states.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None):
        try:
            self.es = Elasticsearch([settings.ELASTICSEARCH_URL])
        except LocationValueError:
            logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.')
            raise ImproperlyConfigured('ELASTICSEARCH_URL')

        self.instrument_types = instrument_types
        self.available_telescopes = self._get_available_telescopes()

        sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites
        telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \
            if not telescopes else telescopes

        self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0)
        self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0)
        cached_event_data = cache.get('tel_event_data')
        if cached_event_data:
            self.event_data = cached_event_data
        else:
            self.event_data = self._get_es_data(sites, telescopes)
            cache.set('tel_event_data', self.event_data, 1800)
telescope_states.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        telescope_states = {}
        current_lump = dict(reasons=None, types=None, start=None)

        for event in self.event_data:
            if self._telescope(event['_source']) not in self.available_telescopes:
                continue

            if current_lump['start'] is None:
                current_lump = self._set_lump(event)
                continue

            if self._belongs_in_lump(event['_source'], current_lump):
                current_lump = self._update_lump(current_lump, event)
            else:
                lump_end = self._lump_end(current_lump, event['_source'])
                if lump_end >= self.start:
                    telescope_states = self._update_states(telescope_states, current_lump, lump_end)
                    current_lump = self._set_lump(event)

        if current_lump['start']:
            lump_end = self._lump_end(current_lump)
            telescope_states = self._update_states(telescope_states, current_lump, lump_end)

        return telescope_states
dump_mcqs.py 文件源码 项目:malvo 作者: shivamMg 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def set_mcqs_in_cache():
    """
    Set MCQs in cache if they have changed or have not been set.
    """
    languages = {
        'C': 'c_mcqs',
        'J': 'java_mcqs',
    }

    # If MCQs have been changed or have not been created
    if not cache.get('mcqs_flag', False):
        for lang_code, cache_key in languages.items():
            mcqs_json = extract_mcqs(lang_code)
            cache.set(cache_key, mcqs_json)

        # Mark MCQs as unchanged
        cache.set('mcqs_flag', True)
views.py 文件源码 项目:malvo 作者: shivamMg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_question_statuses(team):
    """
    Returns a dictionary of Question numbers and statuses as key-value pairs.
    Status could be:
        'S': Solved
        'U': Unattempted
    """
    status_dict = {}

    for ques in Question.objects.filter(language=team.lang_pref):
        try:
            team.teammcqanswer_set.get(question_no=ques.question_no)
            status = 'S'
        except TeamMcqAnswer.DoesNotExist:
            status = 'U'

        status_dict[ques.question_no] = status

    return status_dict
base.py 文件源码 项目:django-wechat-base 作者: ChanMo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
api.py 文件源码 项目:django-wechat-base 作者: ChanMo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
views.py 文件源码 项目:endorsementdb.com 作者: endorsementdb 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def search_endorsers(request):
    query = request.GET.get('q')
    endorsers = []
    endorser_pks = set()
    if query:
        # First find the endorsers whose names start with this query.
        results = Endorser.objects.filter(name__istartswith=query)
        for endorser in results[:5]:
            endorser_pks.add(endorser.pk)
            endorsers.append(endorser)

        if results.count() < 5:
            results = Endorser.objects.filter(name__icontains=query)
            for endorser in results:
                if endorser.pk in endorser_pks:
                    continue

                endorsers.append(endorser)
                if len(endorsers) == 5:
                    break

    return JsonResponse({
        'endorsers': [{'pk': e.pk, 'name': e.name} for e in endorsers],
    })
views.py 文件源码 项目:endorsementdb.com 作者: endorsementdb 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def progress_wikipedia_missing(request, slug):
    if slug == NEWSPAPER_SLUG:
        endorsements = Endorsement.objects.filter(
            endorser__importednewspaper=None,
            endorser__tags=Tag.objects.get(name='Publication')
        )
    else:
        position = Position.objects.get(slug=SLUG_MAPPING[slug])
        endorsements = Endorsement.objects.filter(
            position=position,
            endorser__importedendorsement=None
        )

    context = {
        'slug': slug,
        'endorsements': endorsements,
    }
    return render(request, 'progress/wikipedia_missing.html', context)
renderer.py 文件源码 项目:feincms3 作者: matthiask 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def render(self, region, context, timeout=None):
        """render(self, region, context, *, timeout=None)
        Render a single region using the context passed

        If ``timeout`` is ``None`` caching is disabled.

        .. note::
           You should treat anything except for the ``region`` and ``context``
           argument as keyword-only.
        """
        if timeout is not None:
            key = self.cache_key(region)
            html = cache.get(key)
            if html is not None:
                return html

        html = mark_safe(''.join(
            self._renderer.render_plugin_in_context(plugin, context)
            for plugin in self._contents[region]
        ))

        if timeout is not None:
            cache.set(key, html, timeout=timeout)
        return html
django_hello.py 文件源码 项目:base_function 作者: Rockyzsu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def general_image(self, image_format='PNG'):
        fm_width = self.cleaned_data['width']
        fm_height = self.cleaned_data['height']

        key = '{}.{}.{}'.format(fm_width, fm_height, image_format)
        content = cache.get(key)
        if content is None:
            image = Image.new('RGB', (fm_width, fm_height), color=122)

            draw = ImageDraw.Draw(image)
            text = '{}x{}'.format(fm_width, fm_height)
            text_width, text_height = draw.textsize(text)

            if text_width < fm_width and text_height < fm_height:
                text_top = (fm_height - text_height) // 2
                text_left = (fm_width - text_width) // 2

                draw.text((text_top, text_left), text, fill=(255, 255, 255))

            content = BytesIO()
            image.save(content, image_format)
            content.seek(0)
            cache.set(key, content, 60 * 60)

        return content
tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_setnx(self):
        # we should ensure there is no test_key_nx in redis
        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        res = self.cache.set("test_key_nx", 1, nx=True)
        self.assertTrue(res)
        # test that second set will have
        res = self.cache.set("test_key_nx", 2, nx=True)
        self.assertFalse(res)
        res = self.cache.get("test_key_nx")
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)
tests.py 文件源码 项目:django-redis-sentinel-redux 作者: danigosa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_setnx_timeout(self):
        # test that timeout still works for nx=True
        res = self.cache.set("test_key_nx", 1, timeout=2, nx=True)
        self.assertTrue(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        # test that timeout will not affect key, if it was there
        self.cache.set("test_key_nx", 1)
        res = self.cache.set("test_key_nx", 2, timeout=2, nx=True)
        self.assertFalse(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)


问题


面经


文章

微信
公众号

扫码关注公众号