python类UUID的实例源码

2e00e70316c0_link_container_and_pod_states.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def downgrade():
    op.add_column('container_states', sa.Column('pod_id', postgresql.UUID(),
                  autoincrement=False, nullable=True))
    op.create_foreign_key(u'container_states_pod_id_fkey',
                          'container_states', 'pods', ['pod_id'], ['id'])

    downgrade_data()

    op.drop_column('container_states', 'reason')
    op.drop_column('container_states', 'exit_code')
    op.drop_constraint('container_states_pod_state_id_fkey', 'container_states',
                       type_='foreignkey')
    op.drop_index('ix_pod_id_start_time', table_name='pod_states')
    op.drop_column('container_states', 'pod_state_id')
    op.execute("ALTER TABLE pod_states DROP CONSTRAINT pod_states_pkey, "
               "ADD CONSTRAINT pod_states_pkey PRIMARY KEY (pod_id, start_time);")
    op.drop_column('pod_states', 'id')
    op.execute(sa.schema.DropSequence(sa.Sequence('pod_states_id_seq')))
53489a9c1e2d_usage_ip_states_and_pd_states.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upgrade():
    op.create_table('pd_states',
                    sa.Column('user_id', sa.Integer(), nullable=False),
                    sa.Column('pd_name', sa.String(), nullable=False),
                    sa.Column('size', sa.Integer(), nullable=False),
                    sa.Column('start_time', sa.DateTime(), nullable=False),
                    sa.Column('end_time', sa.DateTime(), nullable=True),
                    sa.ForeignKeyConstraint(['user_id'], ['users.id']),
                    sa.PrimaryKeyConstraint('start_time'))
    op.create_table('ip_states',
                    sa.Column('pod_id', postgresql.UUID(), nullable=False),
                    sa.Column('ip_address', sa.BigInteger(), nullable=False),
                    sa.Column('start_time', sa.DateTime(), nullable=False),
                    sa.Column('end_time', sa.DateTime(), nullable=True),
                    sa.ForeignKeyConstraint(['pod_id'], ['pods.id']),
                    sa.PrimaryKeyConstraint('pod_id', 'start_time'))
test_uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_name_postgresql(self) -> None:
        """
        Tests that if the name ``postgresql`` is passed into the UUID
        database type, then PostgreSQL's UUID type is loaded as a dialect for
        the UUID type.

        The returned type descriptor should be the same as the postgresql UUID
        database type
        """
        uuid_instance = DB_UUID()
        dialect_impl = uuid_instance.load_dialect_impl(self.pg_sql_dialect)
        self.assertIsInstance(
            dialect_impl, self.pg_sql_dialect.type_descriptor(
                postgresql.UUID()
            ).__class__
        )
test_uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_bind_uuid_not_uuid_type(self, uuid: UUID) -> None:
        """
        Tests that if the parameter to write in is a string that looks like
        a UUID, then it is written to the DB as a de-hyphenated UUID. This
        means that the id ``7490bda6-7c69-47c2-ad97-c7453f15811c`` gets written
        as ``7490bda67c6947c2ad97c7453f15811c``. The length of the
        de-hyphenated UUID MUST be 32 characters.

        :param uuid: The uuid to write, randomly generated
        """
        db_uuid = DB_UUID()
        value_to_store = db_uuid.process_bind_param(
            str(uuid), self.sqlite_dialect
        )
        self.assertEqual(
            value_to_store, value_to_store.replace('-', '')
        )
        self.assertEqual(
            self.expected_de_hyphenated_uuid_length,
            len(value_to_store)
        )
uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load_dialect_impl(self, dialect: dialects) -> DialectType:
        """
        SQLAlchemy wraps all database-specific features into
        dialects, which are then responsible for generating the SQL code
        for a specific DB type when loading in data. ``load_dialect_impl``
        is called when CRUD (create, update, delete operations) needs to be
        done on the database. This method is responsible for telling
        SQLAlchemy how to configure the dialect to write this type

        :param dialect: The loaded dialect
        :return: The type descriptor for this type.
        """
        if dialect.name == 'postgresql':
            return dialect.type_descriptor(postgresql.UUID())
        else:
            return dialect.type_descriptor(CHAR(32))
test_querying.py 文件源码 项目:asyncpgsa 作者: CanopyTax 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_querying_table(metadata):
    """
    Create an object for test table.

    """

    # When using pytest-xdist, we don't want concurrent table creations
    # across test processes so we assign a unique name for table based on
    # the current worker id.
    worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
    return Table(
        'test_querying_table_' + worker_id, metadata,
        Column('id', types.Integer, autoincrement=True, primary_key=True),
        Column('t_string', types.String(60)),
        Column('t_list', types.ARRAY(types.String(60))),
        Column('t_enum', types.Enum(MyEnum)),
        Column('t_int_enum', types.Enum(MyIntEnum)),
        Column('t_datetime', types.DateTime()),
        Column('t_date', types.DateTime()),
        Column('t_interval', types.Interval()),
        Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
    )
convert.py 文件源码 项目:QualquerMerdaAPI 作者: tiagovizoto 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _add_column_kwargs(self, kwargs, column):
        """Add keyword arguments to kwargs (in-place) based on the passed in
        `Column <sqlalchemy.schema.Column>`.
        """
        if column.nullable:
            kwargs['allow_none'] = True
        kwargs['required'] = not column.nullable and not _has_default(column)

        if hasattr(column.type, 'enums'):
            kwargs['validate'].append(validate.OneOf(choices=column.type.enums))

        # Add a length validator if a max length is set on the column
        # Skip UUID columns
        # (see https://github.com/marshmallow-code/marshmallow-sqlalchemy/issues/54)
        if hasattr(column.type, 'length'):
            try:
                python_type = column.type.python_type
            except (AttributeError, NotImplementedError):
                python_type = None
            if not python_type or not issubclass(python_type, uuid.UUID):
                kwargs['validate'].append(validate.Length(max=column.type.length))

        if hasattr(column.type, 'scale'):
            kwargs['places'] = getattr(column.type, 'scale', None)
e183d3958ae0_split_translated_strings.py 文件源码 项目:dila 作者: socialwifi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def upgrade():
    op.create_table('base_string',
    sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False),
    sa.Column('resource_pk', postgresql.UUID(as_uuid=True), nullable=False),
    sa.Column('base_string', sa.Text(), nullable=False),
    sa.Column('comment', sa.Text(), nullable=False),
    sa.Column('context', sa.Text(), nullable=False),
    sa.ForeignKeyConstraint(['resource_pk'], ['resource.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.drop_table('translated_string')
    op.create_table('translated_string',
    sa.Column('id', sa.Integer(), nullable=False, autoincrement=True),
    sa.Column('base_string_pk', postgresql.UUID(as_uuid=True), nullable=False),
    sa.Column('language_pk', sa.Integer, nullable=False),
    sa.Column('translation', sa.Text(), nullable=False),
    sa.Column('translator_comment', sa.Text(), nullable=False),
    sa.ForeignKeyConstraint(['base_string_pk'], ['base_string.id'], ),
    sa.ForeignKeyConstraint(['language_pk'], ['language.id'], ),
    sa.PrimaryKeyConstraint('id'),
    sa.UniqueConstraint('base_string_pk', 'language_pk', name='base_string_language_uc'),
    )
d4e81c8c52b9_add_india_ethnicity_table.py 文件源码 项目:radar 作者: renalreg 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('india_ethnicities',
    sa.Column('id', postgresql.UUID(), server_default=sa.text(u'uuid_generate_v4()'), nullable=False),
    sa.Column('patient_id', sa.Integer(), nullable=False),
    sa.Column('source_group_id', sa.Integer(), nullable=False),
    sa.Column('source_type', sa.String(), nullable=False),
    sa.Column('father_ancestral_state', sa.String(), nullable=True),
    sa.Column('father_language', sa.String(), nullable=True),
    sa.Column('mother_ancestral_state', sa.String(), nullable=True),
    sa.Column('mother_language', sa.String(), nullable=True),
    sa.Column('created_user_id', sa.Integer(), nullable=False),
    sa.Column('created_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False),
    sa.Column('modified_user_id', sa.Integer(), nullable=False),
    sa.Column('modified_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False),
    sa.ForeignKeyConstraint(['created_user_id'], ['users.id'], ),
    sa.ForeignKeyConstraint(['modified_user_id'], ['users.id'], ),
    sa.ForeignKeyConstraint(['patient_id'], ['patients.id'], onupdate='CASCADE', ondelete='CASCADE'),
    sa.ForeignKeyConstraint(['source_group_id'], ['groups.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index('india_ethnicity_patient_idx', 'india_ethnicities', ['patient_id'], unique=False)
    # ### end Alembic commands ###
cb7650cd6a4d_add_web_hook_and_web_hook_token.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('web_hook',
    sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
    sa.Column('name', sa.TEXT(), nullable=False, unique=True),
    sa.Column('description', sa.TEXT(), nullable=True),
    sa.Column('url', sa.TEXT(), nullable=False),
    sa.Column('shared_secret', sa.TEXT(), nullable=False),
    sa.Column('status', sa.Integer(), nullable=False),
    sa.Column('consecutive_failure_count', sa.Integer(), nullable=False),
    sa.Column('register_time', sa.TIMESTAMP(), nullable=False),
    sa.Column('created_by_uid', postgresql.UUID(as_uuid=True), nullable=True),
    sa.Column('permissions', sa.TEXT(), nullable=False, server_default='"[]"'),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('web_hook_token',
    sa.Column('token_id', sa.TEXT(), nullable=False),
    sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
    sa.Column('web_hook_id', postgresql.UUID(as_uuid=True), nullable=False),
    sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
    sa.ForeignKeyConstraint(['web_hook_id'], ['web_hook.id'], ),
    sa.PrimaryKeyConstraint('user_id', 'web_hook_id')
    )
    # ### end Alembic commands ###
storage.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _update_rels(self, model, links):
        session = self.DBSession()
        source = model.rid

        rels = {(k, uuid.UUID(target)) for k, targets in links.items() for target in targets}

        existing = {
            (link.rel, link.target_rid)
            for link in model.rels
        }

        to_remove = existing - rels
        to_add = rels - existing

        for rel, target in to_remove:
            link = session.query(Link).get((source, rel, target))
            session.delete(link)

        for rel, target in to_add:
            link = Link(source_rid=source, rel=rel, target_rid=target)
            session.add(link)

        return to_add, to_remove
custom_types.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load_dialect_impl(self, dialect):
        if dialect.name == 'postgresql':
            return dialect.type_descriptor(UUID())
        else:
            return dialect.type_descriptor(CHAR(32))
custom_types.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_bind_param(self, value, dialect):
        if value is None:
            return value
        elif dialect.name == 'postgresql':
            return str(value)
        else:
            if not isinstance(value, uuid.UUID):
                return "%.32x" % uuid.UUID(value).int
            else:
                # hexstring
                return "%.32x" % value.int
custom_types.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            return uuid.UUID(value)
79a6e3998d6_add_pod_states_table.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    op.create_table('pod_states',
    sa.Column('pod_id', postgresql.UUID(), nullable=False),
    sa.Column('start_time', sa.DateTime(), nullable=False),
    sa.Column('end_time', sa.DateTime(), nullable=True),
    sa.Column('last_event_time', sa.DateTime(), nullable=True),
    sa.Column('last_event', sa.String(length=255), nullable=True),
    sa.Column('hostname', sa.String(length=255), nullable=True),
    sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ),
    sa.PrimaryKeyConstraint('pod_id', 'start_time')
    )
    ### end Alembic commands ###
56f9182bf415_add_persistent_disk.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def upgrade():
    op.create_table(
        'persistent_disk',
        sa.Column('id', sa.String(length=32), nullable=False),
        sa.Column('drive_name', sa.String(length=64), nullable=False),
        sa.Column('name', sa.String(length=64), nullable=False),
        sa.Column('owner_id', sa.Integer(), nullable=False),
        sa.Column('size', sa.Integer(), nullable=False),
        sa.Column('pod_id', postgresql.UUID(), nullable=True),
        sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ),
        sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('drive_name'),
        sa.UniqueConstraint('name', 'owner_id')
    )
889fee1f3c80_add_meta_to_dataset.py 文件源码 项目:ml-annotate 作者: planbrothers 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def downgrade():
    op.add_column('dataset', sa.Column('organization_id', postgresql.UUID(), autoincrement=False, nullable=True))
    op.execute('''
        UPDATE "dataset" SET
            organization_id = (meta->>'organization_id')::uuid
        WHERE
            meta->>'organization_id' IS NOT NULL
    ''')
    op.drop_column('dataset', 'meta')
test_uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_bind_uuid_postgres(self, uuid: UUID) -> None:
        """
        Tests that if the UUID is passed to a Postgres dialect, that it is
        returned as a string with hyphens

        :param uuid: A randomly-generated UUID to store
        """
        db_uuid = DB_UUID()
        value_to_store = db_uuid.process_bind_param(
            uuid, self.pg_sql_dialect
        )
        self.assertEqual(value_to_store, str(uuid))
test_uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_bind_uuid_something_else(self, uuid: UUID) -> None:
        """
        Tests that the UUID gets de-hyphenated if using the CHAR 32 type, same
        as always.

        :param uuid: A randomly-generated UUID to store
        """
        db_uuid = DB_UUID()
        value_to_store = db_uuid.process_bind_param(
            uuid, self.sqlite_dialect
        )
        self.assertEqual(
            value_to_store, "%.32x" % int(uuid)
        )
json_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_result_value(
            self, value: Optional[str], dialect: dialects
    ) -> Optional[dict]:
        """

        :param value: The value to process from the SQL query
        :param dialect: The dialect to use for the processing
        :return: The value as a UUID
        """
        if value is None:
            return value
        else:
            return json.loads(value)
uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_result_value(
            self, value: Optional[str], dialect: dialects
    ) -> Optional[uuid.UUID]:
        """

        :param value: The value to process from the SQL query
        :param dialect: The dialect to use for the processing
        :return: The value as a UUID
        """
        if value is None:
            return value
        else:
            return uuid.UUID(value)
uuid_database_type.py 文件源码 项目:TopChef 作者: TopChef 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def copy(self, *args, **kwargs) -> 'UUID':
        """

        :param args: The arguments to the UUID constructor
        :param kwargs: The keyword arguments to the UUID constructor
        :return: A deep copy of this object
        """
        return UUID(*args, **kwargs)
columns.py 文件源码 项目:lymph-sqlalchemy 作者: deliveryhero 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def UuidColumn(*args, **kwargs):
    kwargs.setdefault('nullable', False)
    kwargs['type_'] = UUID(as_uuid=True)
    return Column(*args, **kwargs)
f7c8c10f5aea_rethink_authors.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_constraint(None, 'source', type_='foreignkey')
    op.drop_index(op.f('ix_source_author_id'), table_name='source')
    op.drop_column('source', 'author_id')
    op.add_column(
        'build', sa.Column('author_id', postgresql.UUID(), autoincrement=False, nullable=True)
    )
    op.create_foreign_key('build_author_id_fkey', 'build', 'author', ['author_id'], ['id'])
    op.create_index('ix_build_author_id', 'build', ['author_id'], unique=False)
    # ### end Alembic commands ###
71a34df585ed_migrate_coverage.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('filecoverage', sa.Column(
        'job_id', postgresql.UUID(), autoincrement=False, nullable=False))
    op.drop_constraint(None, 'filecoverage', type_='foreignkey')
    op.create_foreign_key('filecoverage_job_id_fkey', 'filecoverage', 'job', [
                          'job_id'], ['id'], ondelete='CASCADE')
    op.create_unique_constraint(
        'unq_job_filname', 'filecoverage', ['job_id', 'filename'])
    op.drop_constraint('unq_coverage_filname', 'filecoverage', type_='unique')
    op.drop_index(op.f('ix_filecoverage_build_id'), table_name='filecoverage')
    op.drop_column('filecoverage', 'build_id')
    # ### end Alembic commands ###
guid.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def load_dialect_impl(self, dialect):
        if dialect.name == 'postgresql':
            return dialect.type_descriptor(UUID())
        else:
            return dialect.type_descriptor(CHAR(32))
guid.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_bind_param(self, value, dialect):
        if value is None:
            return value
        elif dialect.name == 'postgresql':
            return str(value)
        else:
            if not isinstance(value, uuid.UUID):
                return "%.32x" % uuid.UUID(value)
            else:
                # hexstring
                return "%.32x" % value
guid.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            return uuid.UUID(value)
columns.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, as_uuid=True):
        super(UUID, self).__init__(binary=True, native=True)
columns.py 文件源码 项目:websauna 作者: websauna 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load_dialect_impl(self, dialect):
        if dialect.name == 'postgresql' and self.native:
            # Use the native UUID type.
            return dialect.type_descriptor(postgresql.UUID())
        else:
            # Fallback to either a BINARY or a CHAR.
            kind = types.BINARY(16)
            return dialect.type_descriptor(kind)


问题


面经


文章

微信
公众号

扫码关注公众号