python类delete()的实例源码

models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete(self, *args, **kwargs):
        directory = self.directory

        # Just doing a plain delete will collect all related objects in memory
        # before deleting: translation projects, stores, units, quality checks,
        # suggestions, and submissions.
        # This can easily take down a process. If we do a translation project
        # at a time and force garbage collection, things stay much more
        # managable.
        import gc
        gc.collect()
        for tp in self.translationproject_set.iterator():
            tp.delete()
            gc.collect()

        super(Project, self).delete(*args, **kwargs)

        directory.delete()
models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def invalidate_resources_cache(**kwargs):
    instance = kwargs["instance"]
    if instance.__class__.__name__ not in ['Directory', 'Store']:
        return

    # Don't invalidate if the save didn't create new objects
    no_new_objects = (
        ('created' in kwargs
         and 'raw' in kwargs)
        and (not kwargs['created']
             or kwargs['raw']))

    if no_new_objects and instance.parent.get_children():
        return

    proj_code = split_pootle_path(instance.pootle_path)[1]
    if proj_code is not None:
        cache.delete(make_method_key(Project, 'resources', proj_code))
celery.py 文件源码 项目:fantasie 作者: shawn1m 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__

            def acquire_lock():
                return cache.add(lock_id, "true", timeout)

            def release_lock():
                return cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc
views.py 文件源码 项目:dart 作者: lmco 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def business_area_handler(request, pk):
    if request.method == 'POST':
        data = json.loads(request.body)
        if pk is None:
            # New business area
            BusinessArea.objects.create(name=data['name'])
            return JsonResponse(ReturnStatus(True, 'OK').to_dict())
        else:
            # existing business area update
            try:
                ba = BusinessArea.objects.get(pk=pk)
                ba.name = data['name']
                ba.save()
            except BusinessArea.DoesNotExist:
                return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict())
            return JsonResponse(ReturnStatus(True, 'OK').to_dict())
    elif request.method == 'DELETE':
        try:
            ba = BusinessArea.objects.get(pk=pk)
            if ba.mission_set.all().count() != 0:
                return JsonResponse(ReturnStatus(False, 'Business Areas can not be deleted while missions are still associated with them.').to_dict())
            ba.delete()
        except BusinessArea.DoesNotExist:
            return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict())
        return JsonResponse(ReturnStatus(True, 'OK').to_dict())
models.py 文件源码 项目:steemprojects.com 作者: noisy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def merge(self, profile):
        for account in profile.account_set.all():
            account.profile = self

            if account.user_social_auth:
                account.user_social_auth.user = self.user
                account.user_social_auth.save()

        from package.models import Project

        change = False
        for project in Project.objects.filter(usage=profile.user):
            if self.user not in project.usage.all():
                project.usage.add(self.user)
                change = True

        if change:
            cache.delete("sitewide_used_packages_list_{}".format(self.user.pk))

        # TODO: add merge of verified_by, and email

        profile.user.delete()
        profile.delete()
sfmc.py 文件源码 项目:basket 作者: mozmeao 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def refresh_auth_tokens_from_cache(self):
        """Refresh the auth token and other values from cache"""
        if self.authToken is not None and time() + MAX_BUFFER < self.authTokenExpiration:
            # no need to refresh if the current tokens are still good
            return

        tokens = cache.get(self.token_cache_key)
        if tokens:
            if not isinstance(tokens, dict):
                # something wrong was cached
                cache.delete(self.token_cache_key)
                return

            for prop, value in tokens.items():
                if prop in self.token_property_names:
                    setattr(self, prop, value)

            # set the value so we can detect if it changed later
            self._old_authToken = self.authToken
            self.build_soap_client()
sfmc.py 文件源码 项目:basket 作者: mozmeao 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete_row(self, de_name, token=None, email=None):
        """
        Delete a row from a data extension. Either token or email are required.

        @param de_name: name of the data extension
        @param token: user's token
        @param email: user's email address
        @return: None
        """
        assert token or email, 'token or email required'
        if token:
            values = {'TOKEN': token}
        else:
            values = {'EMAIL_ADDRESS_': email}

        row = self._get_row_obj(de_name, values)
        resp = row.delete()
        assert_response(resp)
tasks.py 文件源码 项目:videofront 作者: openfun 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def transcode_video(public_video_id, delete=True):
    """
    Args:
        public_video_id (str)
        delete (bool): delete video on failure
    """
    with Lock('TASK_LOCK_TRANSCODE_VIDEO:' + public_video_id, 3600) as lock:
        if lock.is_acquired:
            try:
                models.invalidate_cache(public_video_id)
                _transcode_video(public_video_id, delete=delete)
            except Exception as e:
                # Store error message
                message = "\n".join([str(arg) for arg in e.args])
                models.ProcessingState.objects.filter(
                    video__public_id=public_video_id
                ).update(
                    status=models.ProcessingState.STATUS_FAILED,
                    message=message,
                )
                raise
            finally:
                models.invalidate_cache(public_video_id)
views.py 文件源码 项目:ecs 作者: ecs-org 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_once(request, ref_key=None):
    cache_key = 'document-ref-{}'.format(ref_key)
    doc_id = cache.get(cache_key)

    if not doc_id:
        raise Http404()

    cache.delete(cache_key)

    doc = get_object_or_404(Document, pk=doc_id)
    response = FileResponse(doc.retrieve(request.user, 'view'),
        content_type=doc.mimetype)

    # Disable browser caching, so the PDF won't end up on the users hard disk.
    response['Cache-Control'] = 'no-cache, no-store, must-revalidate'
    response['Pragma'] = 'no-cache'
    response['Expires'] = '0'
    response['Vary'] = '*'
    return response
node.py 文件源码 项目:esdc-ce 作者: erigones 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def all_nictags(cls, clear_cache=False):
        """Return dictionary with of nictags {name:type}"""
        if clear_cache:
            cache.delete(NICTAGS_ALL_KEY)

        nictags = cache.get(NICTAGS_ALL_KEY)

        if not nictags:
            nodes = cls.objects.all()
            nictags = {}

            for node in nodes:
                for nic in node.nictags:
                    nic_name = nic['name']
                    nic_type = nic['type'].replace('aggr', 'normal')
                    # if nictag name is already present and types are not equal
                    if nic_name in nictags and nic_type != nictags[nic_name]:
                        raise ValueError('Duplicate NIC tag name with different type exists on another compute node!')

                    nictags[nic_name] = nic_type

            cache.set(NICTAGS_ALL_KEY, nictags, NICTAGS_ALL_EXPIRES)

        return nictags
userHandler.py 文件源码 项目:sengladmin 作者: yufajieluo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def register(self, account, password, fullname, phone, captcha):
        if not account or not password or not fullname or not phone or not captcha:
            rsp = self.rsp_handler.generate_rsp_msg(29001, None)
            return rsp
        user = User.objects(account = account)
        if user:
            rsp = self.rsp_handler.generate_rsp_msg(21001, None)
            return rsp
        key = 'register:captcha:%s' % account
        if cache.get(key) != captcha:
            rsp = self.rsp_handler.generate_rsp_msg(21002, None)
            return rsp
        cache.delete(key)    

        now_time = time.strftime('%Y-%m-%d %H:%M:%S')
        user = User(account = account, password = self.crypt_handler.encrypt(password), username = fullname, phone = phone, create_time = now_time)
        user.save()

        rsp = self.rsp_handler.generate_rsp_msg(200, None)
        return rsp
userHandler.py 文件源码 项目:sengladmin 作者: yufajieluo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_permission(self, name):
        if not name:
            rsp = self.rsp_handler.generate_rsp_msg(29001, None)
            return rsp
        permissions = Permission.objects(name = name)
        if not permissions:
            rsp = self.rsp_handler.generate_rsp_msg(22002, None)
            return rsp
        permission = permissions[0]
        permission.delete()

        roles = Role.objects()
        for role in roles:
            if name in role.permissions:
                role.permissions.remove(name)
                role.save()
        rsp = self.rsp_handler.generate_rsp_msg(200, None)
        return rsp
access.py 文件源码 项目:c3nav 作者: c3nav 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def redeem(self, user=None):
        if (user is None and self.redeemed) or self.accesspermissions.exists():
            raise self.RedeemError('Already redeemed.')

        if timezone.now() > self.valid_until + timedelta(minutes=5 if self.redeemed else 0):
            raise self.RedeemError('No longer valid.')

        if user:
            with transaction.atomic():
                if self.author_id and self.unique_key:
                    AccessPermission.objects.filter(author_id=self.author_id, unique_key=self.unique_key).delete()
                for restriction in self.restrictions:
                    AccessPermission.objects.create(
                        user=user,
                        access_restriction_id=restriction.pk,
                        author_id=self.author_id,
                        expire_date=restriction.expire_date,
                        can_grant=self.can_grant,
                        unique_key=self.unique_key,
                        token=self if self.pk else None,
                    )

        if self.pk and not self.unlimited:
            self.redeemed = True
            self.save()
models.py 文件源码 项目:wger-lycan-clan 作者: andela 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        '''
        Reset all cached infos
        '''
        self.name = smart_capitalize(self.name_original)
        super(Exercise, self).save(*args, **kwargs)

        # Cached objects
        cache.delete(cache_mapper.get_exercise_muscle_bg_key(self))

        # Cached template fragments
        for language in Language.objects.all():
            delete_template_fragment_cache('muscle-overview', language.id)
            delete_template_fragment_cache('exercise-overview', language.id)
            delete_template_fragment_cache('exercise-overview-mobile', language.id)
            delete_template_fragment_cache('equipment-overview', language.id)

        # Cached workouts
        for set in self.set_set.all():
            reset_workout_canonical_form(set.exerciseday.training_id)
models.py 文件源码 项目:wger-lycan-clan 作者: andela 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete(self, *args, **kwargs):
        '''
        Reset all cached infos
        '''

        # Cached objects
        cache.delete(cache_mapper.get_exercise_muscle_bg_key(self))

        # Cached template fragments
        for language in Language.objects.all():
            delete_template_fragment_cache('muscle-overview', language.id)
            delete_template_fragment_cache('exercise-overview', language.id)
            delete_template_fragment_cache('exercise-overview-mobile', language.id)
            delete_template_fragment_cache('equipment-overview', language.id)

        # Cached workouts
        for set in self.set_set.all():
            reset_workout_canonical_form(set.exerciseday.training.pk)

        super(Exercise, self).delete(*args, **kwargs)
models.py 文件源码 项目:wger-lycan-clan 作者: andela 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete(self, *args, **kwargs):
        '''
        Reset all cached infos
        '''
        super(ExerciseImage, self).delete(*args, **kwargs)

        for language in Language.objects.all():
            delete_template_fragment_cache('muscle-overview', language.id)
            delete_template_fragment_cache('exercise-overview', language.id)
            delete_template_fragment_cache('exercise-overview-mobile', language.id)
            delete_template_fragment_cache('equipment-overview', language.id)

        # Make sure there is always a main image
        if not ExerciseImage.objects.accepted() \
                .filter(exercise=self.exercise, is_main=True).count() \
           and ExerciseImage.objects.accepted() \
                .filter(exercise=self.exercise) \
                .filter(is_main=False) \
                .count():

                image = ExerciseImage.objects.accepted() \
                    .filter(exercise=self.exercise, is_main=False)[0]
                image.is_main = True
                image.save()
models.py 文件源码 项目:talkativot 作者: lablup 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_obj_perms_connected_with_user(sender, instance, **kwargs):
    """ Remove user's permissions upon user deletion (``pre_delete.connect``).

    :param sender: sender object
    :param instance: user instance
    :param kwargs: dictionary argument
    :return: None
    """
    filters = Q(content_type=ContentType.objects.get_for_model(instance),
                object_pk=instance.pk)
    UserObjectPermission.objects.filter(filters).delete()
    GroupObjectPermission.objects.filter(filters).delete()

    if instance.profile:
        instance.profile.delete()
    if instance.preference:
        for i in instance.preference.all():
            i.delete()
models.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def page_changed(sender, **kwargs):
    """
    post_save receiver for Page model:
    * clears Page-related cache keys,
    * refresh mappings: alias <-> page real url,
    * clears cache keys related to PageChanges.
    """
    page = kwargs['instance']
    cache_key = cachekeys.template_source(page.pk)
    cache.delete(cache_key)
    PageURLCache.refresh()
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sensu_client_list():

    API_URL = settings.SENSU_API_URL + '/clients'
    userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii")
    headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest',
               'Accept': 'application/json, text/javascript, */*; q=0.01',
               'Authorization' : 'Basic %s' %  userAndPass }

    try:

        request = http.request('GET', API_URL, None, headers, preload_content=False)        
        response = request.status

        if response == 200:
            reader = codecs.getreader('utf-8')
            data = json.load(reader(request))
            request.release_conn()
        else:
            logger.error('response: %s' % str(response))

    except:
        logger.error("sensu_client_list failed")
        raise

    subscriptions = []

    [ r.delete(subscription) for subscription in r.keys("subscription_*") ]
    #[ cache.delete(client) for client in cache.keys("client_*") ]

    for object in data:

        cache.set('client_' + object['name'], object, timeout=settings.CACHE_CLIENT_TTL + 300)

        if 'subscriptions' in object:            
            subscriptions.extend(object['subscriptions'])

            for subscription in object['subscriptions']:                
                logger.debug("sensu_client_list update subscription_%s adding %s" % (subscription, object['name']))
                r.rpush('subscription_' + subscription, object['name'])

    cache.set('subscriptions', list(set(subscriptions)), timeout=settings.CACHE_CLIENT_TTL + 300)
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def user_rules(message):
    users_rules = {}
    try:        
        for obj in Rule.objects.filter(owner=message['user_id']):

            if obj.owner.id not in users_rules:
                users_rules[obj.owner.id] = {}

            for status in obj.status:
                if int(status) not in users_rules[obj.owner.id]:
                    users_rules[obj.owner.id][int(status)] = []
                users_rules[obj.owner.id][int(status)].append(obj.regex_string)            

    except:
        logger.error("user_rules failed")
        raise

    for user_id in users_rules:
        for status in users_rules[user_id]:
            uni_regex = '|'.join(users_rules[user_id][status])
            logger.debug("user_rules build rule for user: %s status: %s" % (user_id, status))
            r.set('regexrule_%s_%s' % (user_id, status), pickle.dumps(re.compile(r'(?:%s)' % uni_regex, re.IGNORECASE)))
        for status in [1,2]:
            if status not in users_rules[user_id]:
                logger.debug("user_rules clean rule for user: %s status: %s" % (user_id, status))
                r.delete('regexrule_%s_%s' % (user_id, status))
    return
models.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def unlock(self):
        if not self.scratch:
            cache.delete(self.lock_id)
tasks.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_scratch_orgs():
    reset_database_connection()

    from metaci.cumulusci.models import ScratchOrgInstance
    count = 0
    for org in ScratchOrgInstance.objects.filter(deleted=False,
                                                 delete_error__isnull=False):
        delete_scratch_org.delay(org.id)
        count += 1

    if not count:
        return 'No orgs found to delete'

    return 'Scheduled deletion attempts for {} orgs'.format(count)
tasks.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_scratch_org(org_instance_id):
    reset_database_connection()
    from metaci.cumulusci.models import ScratchOrgInstance
    try:
        org = ScratchOrgInstance.objects.get(id=org_instance_id)
    except ScratchOrgInstance.DoesNotExist:
        return 'Failed: could not find ScratchOrgInstance with id {}'.format(
            org_instance_id)

    org.delete_org()
    if org.deleted:
        return 'Deleted org instance #{}'.format(org.id)
    else:
        return 'Failed to delete org instance #{}'.format(org.id)
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_delete(self):
        # Cache keys can be deleted
        cache = self.cache
        cache.set("key1", "spam")
        cache.set("key2", "eggs")
        self.assertEqual(cache.get("key1"), "spam")
        cache.delete("key1")
        self.assertIsNone(cache.get("key1"))
        self.assertEqual(cache.get("key2"), "eggs")
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_unicode(self):
        # Unicode values can be cached
        cache = self.cache
        stuff = {
            'ascii': 'ascii_value',
            'unicode_ascii': 'Iñtërnâtiônàlizætiøn1',
            'Iñtërnâtiônàlizætiøn': 'Iñtërnâtiônàlizætiøn2',
            'ascii2': {'x': 1}
        }
        # Test `set`
        for (key, value) in stuff.items():
            cache.set(key, value)
            self.assertEqual(cache.get(key), value)

        # Test `add`
        for (key, value) in stuff.items():
            cache.delete(key)
            cache.add(key, value)
            self.assertEqual(cache.get(key), value)

        # Test `set_many`
        for (key, value) in stuff.items():
            cache.delete(key)
        cache.set_many(stuff)
        for (key, value) in stuff.items():
            self.assertEqual(cache.get(key), value)
tests.py 文件源码 项目:django-lrucache-backend 作者: kogan 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_cache_versioning_delete(self):
        cache = self.cache
        cache2 = LRUObjectCache('lru2', dict(VERSION=2))
        cache2._cache = cache._cache

        cache.set('answer1', 37, version=1)
        cache.set('answer1', 42, version=2)
        cache.delete('answer1')
        self.assertIsNone(cache.get('answer1', version=1))
        self.assertEqual(cache.get('answer1', version=2), 42)

        cache.set('answer2', 37, version=1)
        cache.set('answer2', 42, version=2)
        cache.delete('answer2', version=2)
        self.assertEqual(cache.get('answer2', version=1), 37)
        self.assertIsNone(cache.get('answer2', version=2))

        cache.set('answer3', 37, version=1)
        cache.set('answer3', 42, version=2)
        cache2.delete('answer3')
        self.assertEqual(cache.get('answer3', version=1), 37)
        self.assertIsNone(cache.get('answer3', version=2))

        cache.set('answer4', 37, version=1)
        cache.set('answer4', 42, version=2)
        cache2.delete('answer4', version=1)
        self.assertIsNone(cache.get('answer4', version=1))
        self.assertEqual(cache.get('answer4', version=2), 42)
permissions.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        super(PermissionSet, self).save(*args, **kwargs)
        # FIXME: can we use `post_save` signals or invalidate caches in model
        # managers, please?
        key = iri_to_uri('Permissions:%s' % self.user.username)
        cache.delete(key)
permissions.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def delete(self, *args, **kwargs):
        super(PermissionSet, self).delete(*args, **kwargs)
        # FIXME: can we use `post_delete` signals or invalidate caches in model
        # managers, please?
        key = iri_to_uri('Permissions:%s' % self.user.username)
        cache.delete(key)
models.py 文件源码 项目:zing 作者: evernote 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def invalidate_accessible_projects_cache(**kwargs):
    instance = kwargs["instance"]
    # XXX: maybe use custom signals or simple function calls?
    if (instance.__class__.__name__ not in
        ['Project', 'TranslationProject', 'PermissionSet']):
        return

    cache.delete_pattern(make_method_key('Project', 'cached_dict', '*'))
    cache.delete('projects:all')
    cache.delete_pattern('projects:accessible:*')
views.py 文件源码 项目:dart 作者: lmco 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):

        # Delete cached data for the legends
        legend_top_key = make_template_fragment_key('legend_partial_top')
        cache.delete(legend_top_key)
        legend_bottom_key = make_template_fragment_key('legend_partial_bottom')
        cache.delete(legend_bottom_key)

        # Delete cached data for the host format string
        cache.delete('host_output_format_string')

        return super(UpdateDynamicSettingsView, self).post(request, *args, **kwargs)


问题


面经


文章

微信
公众号

扫码关注公众号