python类IntegrityError()的实例源码

handler.py 文件源码 项目:aiowing 作者: embali 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_current_user(self):
        """Current user"""

        session = await get_session(self.request)
        email = session.get('email', None)

        if email is None:
            return None

        try:
            user = await settings.manager.get(
                User
                .select()
                .where(User.email == email))
        except (User.DoesNotExist, psycopg2.OperationalError,
                peewee.IntegrityError, peewee.ProgrammingError):
            return None

        if not (user.active and user.superuser):
            return None

        return email
test_challenge_set_fielding.py 文件源码 项目:farnsworth 作者: mechaphish 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_create(self):
        r0 = Round.create(num=0)
        r1 = Round.create(num=1)
        team = Team.create(name=Team.OUR_NAME)
        cs = ChallengeSet.create(name="foo")
        cbn1 = ChallengeBinaryNode.create(name="foo_1", cs=cs, blob="aaa1")
        cbn2 = ChallengeBinaryNode.create(name="foo_2", cs=cs, blob="aaa2")

        csf = ChallengeSetFielding.create(cs=cs, cbns=[cbn1], team=team, available_round=r0)
        assert_equals(csf.sha256, "04de190c8dbd04bdb5768118c2cd745c7918f6858eddd765354819fc59c6d46e")

        csf2 = ChallengeSetFielding.create(cs=cs, cbns=[cbn1, cbn2], team=team, available_round=r1)
        assert_equals(csf2.sha256, "277b0b746f1937a8f54797e2698e54f8646f0413ad353da19d93522c05817e73")

        # insert duplicate team+cs+round fails
        assert_raises(IntegrityError, ChallengeSetFielding.create, cs=cs, cbns=[cbn1], team=team,
                      available_round=r0)
save_row.py 文件源码 项目:pymixup 作者: rdevost 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def save_row(row, **kwargs):
    """Save a database row.

    Parameters
    ----------
    row

    Returns
    -------
    row_id : int
    """
    try:
        row.save()
    except IntegrityError as e:
        if 'unique' in e.message.lower():
            raise IntegrityError('\n'.join([
                'Record already exists.',
                e.message
                ]))
        else:
            raise IntegrityError
    except DoesNotExist:
        raise
    return row.id
shows.py 文件源码 项目:argosd 作者: danielkoster 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post():
        """Handles POST requests. Creates a new show."""
        parser = get_parser()
        args = parser.parse_args()

        show = Show()
        show.title = args['title']
        show.follow_from_season = args['follow_from_season']
        show.follow_from_episode = args['follow_from_episode']
        show.minimum_quality = args['minimum_quality']

        if args['wait_minutes_for_better_quality']:
            show.wait_minutes_for_better_quality = \
                args['wait_minutes_for_better_quality']

        try:
            show.save()
        except IntegrityError as e:
            abort(400, message=str(e))

        return show.to_dict(), 201
shows.py 文件源码 项目:argosd 作者: danielkoster 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def put(show_id):
        """Handles PUT requests. Updates a show."""
        parser = get_parser()
        args = parser.parse_args()

        try:
            show = Show.get(Show.id == show_id)
        except DoesNotExist:
            abort(404)

        show.title = args['title']
        show.follow_from_season = args['follow_from_season']
        show.follow_from_episode = args['follow_from_episode']
        show.minimum_quality = args['minimum_quality']

        if args['wait_minutes_for_better_quality']:
            show.wait_minutes_for_better_quality = \
                args['wait_minutes_for_better_quality']

        try:
            show.save()
        except IntegrityError as e:
            abort(400, message=str(e))

        return show.to_dict(), 201
manage.py 文件源码 项目:PyHub 作者: 521xueweihan 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(blog_id):
    blog = Blog.get(Blog.blog_id == blog_id)
    if not blog:
        abort(400)
    form = BlogForm(name=blog.name, url=blog.url, description=blog.description)

    if request.method == 'GET':
        return render_template('update.html', blog=blog, form=form)
    else:
        if form.validate_on_submit():
            try:
                blog.name = form.name.data
                blog.url = form.url.data
                blog.description = form.description.data
                blog.update_time = datetime.now()
                blog.save()
                flash(u'?? {name} ??'.format(name=form.name.data))
                return redirect('/manage')
            except IntegrityError:
                flash(u'?? {name} ?????????'.format(name=form.name.data), 'error')
                return render_template('update.html', blog=blog, form=form)
        else:
            flash(u'?????????', 'error')
            return render_template('update.html', blog=blog, form=form)
test_indexes.py 文件源码 项目:open-syllabus-project 作者: davidmcclure 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_unique_pairs(add_doc, add_institution):

    """
    Don't allow duplicate links between the same doc -> inst pair.
    """

    inst = add_institution()

    doc = add_doc()

    Institution_Document.create(
        institution=inst,
        document=doc,
    )

    with pytest.raises(IntegrityError):

        Institution_Document.create(
            institution=inst,
            document=doc,
        )
test_indexes.py 文件源码 项目:open-syllabus-project 作者: davidmcclure 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_unique_pairs(add_subfield, add_doc):

    """
    Don't allow duplicate links between the same field -> document.
    """

    s = add_subfield()
    d = add_doc()

    Subfield_Document.create(
        subfield=s,
        document=d,
        offset=1,
        snippet='abc'
    )

    with pytest.raises(IntegrityError):

        Subfield_Document.create(
            subfield=s,
            document=d,
            offset=2,
            snippet='def'
        )
model.py 文件源码 项目:aiopeewee 作者: kszucs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_or_create(cls, **kwargs):
        defaults = kwargs.pop('defaults', {})
        query = cls.select()
        for field, value in kwargs.items():
            if '__' in field:
                query = query.filter(**{field: value})
            else:
                query = query.where(getattr(cls, field) == value)

        try:
            return await query.get(), False
        except cls.DoesNotExist:
            try:
                params = dict((k, v) for k, v in kwargs.items()
                              if '__' not in k)
                params.update(defaults)

                async with cls._meta.database.atomic():
                    return await cls.create(**params), True
            except IntegrityError as exc:
                try:
                    return await query.get(), False
                except cls.DoesNotExist:
                    raise exc
tfidf_vader.py 文件源码 项目:Informed-Finance-Canary 作者: Darthone 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def vader(term):
    data = term[1]
    result = {'compound':[], 'neg':[], 'neu':[], 'pos':[] }
    sid = SentimentIntensityAnalyzer()
    for sentence in data:
        ss = sid.polarity_scores(sentence)
        result['compound'].append(ss['compound'])
        result['neg'].append(ss['neg'])
        result['neu'].append(ss['neu'])
        result['pos'].append(ss['pos'])
    vaderList = [sum(result[i]) for i in result.keys()]
    list = [term[0],vaderList]
    resultsKeys = result.keys() 
    db_data = ({'article': list[0], 'negative': list[1][0], 'neutral': list[1][1], 'positive': list[1][2], 'compound': list[1][3]})
    try:
        with db.atomic():
                ArticleFeature.insert(db_data).execute()
    except peewee.IntegrityError:
        print term[0]
        #print 'Skipping Duplicate'
model.py 文件源码 项目:torpeewee 作者: snower 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_or_create(cls, **kwargs):
        defaults = kwargs.pop('defaults', {})
        query = cls.select()
        for field, value in kwargs.items():
            if '__' in field:
                query = query.filter(**{field: value})
            else:
                query = query.where(getattr(cls, field) == value)

        try:
            result = (yield query.get()), False
        except cls.DoesNotExist:
            try:
                params = dict((k, v) for k, v in kwargs.items()
                              if '__' not in k)
                params.update(defaults)
                result = (yield cls.create(**params)), True
            except IntegrityError as exc:
                try:
                    result = (yield query.get()), False
                except cls.DoesNotExist:
                    raise exc
        raise gen.Return(result)
handlers.py 文件源码 项目:aiowing 作者: embali 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get(self):
        page = int(self.request.match_info.get('page', 1))

        try:
            records = await settings.manager.execute(
                Record
                .select()
                .where(Record.active == True)
                .order_by(Record.name.asc())
                .offset((page - 1) * settings.RECORDS_PER_PAGE)
                .limit(settings.RECORDS_PER_PAGE + 1))
        except (psycopg2.OperationalError, peewee.IntegrityError,
                peewee.ProgrammingError):
            records = []

        count = len(records)

        if count == 0 and page != 1:
            return web.HTTPFound(self.request.app.router['web_records'].url())

        next_page = page + 1 if count > settings.RECORDS_PER_PAGE else None
        prev_page = page - 1 if page != 1 else None

        return dict(request=self.request,
                    records=records[:settings.RECORDS_PER_PAGE],
                    prev_page=prev_page,
                    page=page,
                    next_page=next_page)
handlers.py 文件源码 项目:aiowing 作者: embali 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_page_context(self, page):
        try:
            count = await settings.manager.count(Record.select())
        except (psycopg2.OperationalError, peewee.IntegrityError,
                peewee.ProgrammingError):
            count = 0

        page_count, prev_page, page, next_page = \
            await self.paging(count, settings.RECORDS_PER_PAGE, page)

        try:
            records = await settings.manager.execute(
                Record
                .select()
                .order_by(
                    Record.active.desc(),
                    Record.uts.desc())
                .paginate(page, paginate_by=settings.RECORDS_PER_PAGE))
        except (psycopg2.OperationalError, peewee.IntegrityError,
                peewee.ProgrammingError):
            records = []

        return {'request': self.request,
                'current_user': (await self.get_current_user()),
                'records': records,
                'count': count,
                'page_count': page_count,
                'prev_page': prev_page,
                'page': page,
                'next_page': next_page}
test_challenge_binary_node.py 文件源码 项目:farnsworth 作者: mechaphish 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_cs_name_and_sha256_uniqueness(self):
        cs1 = ChallengeSet.create(name="foo")
        cs2 = ChallengeSet.create(name="bar")
        # first binary is ok
        ChallengeBinaryNode.create(name="test1", cs=cs1, blob=BLOB)
        # same binary with different name is ok
        ChallengeBinaryNode.create(name="test2", cs=cs1, blob=BLOB)
        # same binary with different cs is ok
        ChallengeBinaryNode.create(name="test1", cs=cs2, blob=BLOB)
        # same cs and name but different binary is ok
        ChallengeBinaryNode.create(name="test1", cs=cs2, blob=BLOB2)
        # same cs, name and binary raises error
        assert_raises(IntegrityError, ChallengeBinaryNode.create, name="test1", cs=cs1, blob=BLOB)
indexed_blob_model.py 文件源码 项目:farnsworth 作者: mechaphish 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_or_get(cls, **kwargs):
        blob = kwargs.pop('blob')
        sha256 = kwargs.pop('sha256', hashlib.sha256(blob).hexdigest())
        try:
            with cls._meta.database.atomic():
                return cls.create(blob=blob, sha256=sha256, **kwargs), True
        except peewee.IntegrityError:
            try:
                return cls.get(sha256=sha256, **kwargs), False
            except cls.DoesNotExist: # this could happen with master-slave sync delay
                return None, False
api.py 文件源码 项目:pyetesync 作者: etesync 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def save(self):
        try:
            self._cache_obj.save()
        except peewee.IntegrityError as e:
            if 'UNIQUE' in str(e):
                raise exceptions.AlreadyExists(e)
            else:
                raise exceptions.DoesNotExist(e)
api.py 文件源码 项目:pyetesync 作者: etesync 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save(self):
        self._cache_obj.dirty = True
        try:
            self._cache_obj.save()
        except peewee.IntegrityError as e:
            raise exceptions.AlreadyExists(e)
data_import.py 文件源码 项目:teaching-equivalencies 作者: memorial-ece 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def import_people(url):
    r = requests.get(url)
    if r.status_code != 200:
        sys.stderr.write('Error retrieving %s:\n%s\n' % (url, r))
        return

    total_created = 0

    soup = bs4.BeautifulSoup(r.text, 'html.parser')
    for row in soup.table.find_all('tr'):
        columns = row.find_all('td')
        if len(columns) != 4:
            continue

        (name, room, phone, email) = [ c.text for c in columns ]
        if not email.endswith('mun.ca'):
            continue

        name = name.split(',')[0].strip()
        email = email.replace('[at]', '@').strip()

        try:
            _, created = db.Person.get_or_create(name = name, email = email)
            total_created += created

        except peewee.IntegrityError, e:
            sys.stderr.write("""error: failure to create person '%s' (%s)
  note: %s
  note: existing people with same email:
%s
""" % (
                    name, email, e,
                    ''.join([
                        "    - '%s' (%s)\n" % (p.name, p.email) for p in
                            db.Person.select().where(db.Person.email == email)
                    ])
            ))

    print('Imported details of %d individuals.' % total_created)
handlers.py 文件源码 项目:windseed 作者: embali 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_page_context(self):
        """
        Return current page context
        """
        try:
            records = Record\
                .select()\
                .where(Record.active == True)\
                .paginate(1, paginate_by=env.SITEMAP_PER_PAGE)
        except peewee.IntegrityError:
            records = []

        return dict(records=records)
handlers.py 文件源码 项目:windseed 作者: embali 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_page_context(self):
        """
        Return current page context
        """
        try:
            page = int(self.get_argument('page', 1))
        except ValueError:
            page = 1

        try:
            count = peewee.SelectQuery(Record).count()
        except peewee.IntegrityError:
            count = 0

        page_count = int(count/env.ADMIN_ITEMS_PER_PAGE) + \
            int(bool(count % env.ADMIN_ITEMS_PER_PAGE))

        prev_page, page, next_page = self.paging(page, page_count)

        try:
            records = Record\
                .select()\
                .order_by(
                    Record.active.desc(),
                    Record.uts.desc())\
                .paginate(page, paginate_by=env.ADMIN_ITEMS_PER_PAGE)
        except peewee.IntegrityError:
            records = []

        return dict(records=records,
                    count=count,
                    page_count=page_count,
                    prev_page=prev_page,
                    page=page,
                    next_page=next_page)
addidents.py 文件源码 项目:pymixup 作者: rdevost 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_identifiers(identifier_list=None, do_obfuscate=True):
    """Add identifier and obfuscated names to Identifiers table.

    Parameters
    ----------
    do_obfuscate : bool
    identifier_list : list
    """
    for identifier_name in identifier_list:
        # Skip identifiers in reserved
        try:
            get_reserved_by_name(identifier_name)
        except DoesNotExist:
            pass
        else:
            continue

        if not do_obfuscate \
                or identifier_name[0:2] == '__' \
                or identifier_name == '__init__.py' \
                or identifier_name.startswith('test_') \
                or identifier_name.endswith('_test.py'):
            obfuscated_name = identifier_name
        else:
            obfuscated_name = ''

        identifier_row = get_identifier(None)
        try:
            save_identifier(identifier_row,
                            name=identifier_name,
                            obfuscated_name=obfuscated_name)
        except IntegrityError as e:
            if 'unique' not in e.message.lower():
                raise
            # If should not be obfuscated, replace obfuscated, o/w pass
            if not do_obfuscate:
                identifier_row = get_identifier_by_name(identifier_name)
                if identifier_row.obfuscated_name != identifier_name:
                    save_identifier(identifier_row,
                                    obfuscated_name=identifier_name)
save_row.py 文件源码 项目:pymixup 作者: rdevost 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def save_identifier(identifier_row, **kwargs):
    """Save and identifier row.

    Parameters
    ----------
    identifier_row

    Returns
    -------
    identifier_id : int
    """
    try:
        for name, value in kwargs.iteritems():
            getattr(identifier_row, name)  # Make sure column exists
            setattr(identifier_row, name, value)

    except AttributeError:
        raise

    with obfuscatedb.atomic():
        try:
            identifier_id = save_row(identifier_row, **kwargs)
        except IntegrityError as e:
            # Resave with different obfuscated_name if already exists
            if 'unique' in e.message.lower() \
                    and not kwargs.get('obfuscated_name', None) \
                    and 'obfuscated_name' in e.message:
                identifier_row.obfuscated_name = None
                identifier_id = save_identifier(identifier_row)
            else:
                raise
        except DoesNotExist:
            raise

        return identifier_id
addreserveds.py 文件源码 项目:pymixup 作者: rdevost 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add_reserveds(package_name, reserved_list, name_prefix=''):
    """Add reserved names to Reserved table.

    Parameters
    ----------
    name_prefix : str
    package_name : str
    reserved_list : list
    """
    # Add package name as a module reserved name
    try:
        reserved_row = get_reserved(None)
        save_reserved(reserved_row,
                      name=package_name,
                      primary_package=package_name)
    except IntegrityError as e:
        # Continue if name already in db, o/w raise error
        if 'unique' not in e.message.lower():
            raise
    # Add package reserved names
    for reserved_name in reserved_list:
        if reserved_name == '__init__.py':
            continue
        reserved_row = get_reserved(None)
        try:
            save_reserved(reserved_row,
                          name=''.join([name_prefix, reserved_name]),
                          primary_package=package_name)
        except IntegrityError as e:
            # Continue if name already in db, o/w raise error
            if 'unique' not in e.message.lower():
                raise
test_rs_save_row.py 文件源码 项目:pymixup 作者: rdevost 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_save_reserved_duplicate():
    from logic.reserved import save_reserved, get_reserved

    # Fail at duplicate add
    reserved_row = get_reserved(None)
    assert not reserved_row.id
    with pytest.raises(IntegrityError):
        assert save_reserved(
            reserved_row,
            name=u'reserved_one')
tasks.py 文件源码 项目:argosd 作者: danielkoster 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _deferred(self):
        episodes = self._parse_episodes_from_feed()

        logging.debug('Relevant episodes found in RSS feed: %d', len(episodes))

        # Save all episodes we haven't stored yet
        for episode in episodes:
            logging.debug('Found episode: %s', episode)
            if self._get_existing_episode_from_database(episode) is None:
                try:
                    episode.save()
                    self._notify_user(episode)
                    logging.info('Saved episode: %s', episode)
                except IntegrityError:
                    logging.error('Could not save episode to database')
test_model.py 文件源码 项目:steamwatch 作者: akeil 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_app_integrity():
    App.create(steamid='3', kind='game')
    with pytest.raises(IntegrityError):
        App.create(steamid='3', kind='game')

    with pytest.raises(IntegrityError):
        App.create(steamid='4')  # missing 'kind'


# Package ---------------------------------------------------------------------
test_model.py 文件源码 项目:steamwatch 作者: akeil 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_package_integrity():
    Package.create(steamid='03', kind='game')
    with pytest.raises(IntegrityError):
        Package.create(steamid='03', kind='game')
manage.py 文件源码 项目:PyHub 作者: 521xueweihan 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create():
    form = BlogForm()
    if form.validate_on_submit():
        try:
            blog = Blog.create(
                blog_id=uuid.uuid4(), name=form.name.data, url=form.url.data,
                description=form.description.data)
            flash(u'?? {name} ??'.format(name=blog.name))
        except IntegrityError:
            flash(u'?? {name} ?????????'.format(name=form.name.data), 'error')
    else:
        flash(u'?????????', 'error')
    return redirect('/manage')
user.py 文件源码 项目:airbnb_clone 作者: electrachong 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_user():

        # Create new User record using Peewee module class
        # Note: right now, this does not prevent empty strings being passed to API
        try:
                new_user = User(
                        email = request.form.get('email'),
                        password = request.form.get('password'),
                        first_name = request.form.get('first_name'),
                        last_name = request.form.get('last_name'),
                        is_admin = bool(request.form.get('is_admin'))
                )
                new_user.set_password(new_user.password)
                new_user.save()
        # Deal with exception if a required field was null
        # by returning 400 response '''
        except peewee.OperationalError:
                return dict(
                        code=400,
                        msg="Bad request: missing data"
                ), 400
        # Deal with exception that arises if email was not unique
        except peewee.IntegrityError:
                return dict(
                        code=10000,
                        msg="Email already exists"
                ), 409

        return new_user.to_hash()
test_indexes.py 文件源码 项目:open-syllabus-project 作者: davidmcclure 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_unique_url(add_institution):

    """
    URLs should be unique.
    """

    add_institution(url='http://test.edu')

    with pytest.raises(peewee.IntegrityError):
        add_institution(url='http://test.edu')


问题


面经


文章

微信
公众号

扫码关注公众号