def downgrade():
op.add_column('container_states', sa.Column('pod_id', postgresql.UUID(),
autoincrement=False, nullable=True))
op.create_foreign_key(u'container_states_pod_id_fkey',
'container_states', 'pods', ['pod_id'], ['id'])
downgrade_data()
op.drop_column('container_states', 'reason')
op.drop_column('container_states', 'exit_code')
op.drop_constraint('container_states_pod_state_id_fkey', 'container_states',
type_='foreignkey')
op.drop_index('ix_pod_id_start_time', table_name='pod_states')
op.drop_column('container_states', 'pod_state_id')
op.execute("ALTER TABLE pod_states DROP CONSTRAINT pod_states_pkey, "
"ADD CONSTRAINT pod_states_pkey PRIMARY KEY (pod_id, start_time);")
op.drop_column('pod_states', 'id')
op.execute(sa.schema.DropSequence(sa.Sequence('pod_states_id_seq')))
python类UUID的实例源码
2e00e70316c0_link_container_and_pod_states.py 文件源码
项目:kuberdock-platform
作者: cloudlinux
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
53489a9c1e2d_usage_ip_states_and_pd_states.py 文件源码
项目:kuberdock-platform
作者: cloudlinux
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def upgrade():
op.create_table('pd_states',
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('pd_name', sa.String(), nullable=False),
sa.Column('size', sa.Integer(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id']),
sa.PrimaryKeyConstraint('start_time'))
op.create_table('ip_states',
sa.Column('pod_id', postgresql.UUID(), nullable=False),
sa.Column('ip_address', sa.BigInteger(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['pod_id'], ['pods.id']),
sa.PrimaryKeyConstraint('pod_id', 'start_time'))
def test_name_postgresql(self) -> None:
"""
Tests that if the name ``postgresql`` is passed into the UUID
database type, then PostgreSQL's UUID type is loaded as a dialect for
the UUID type.
The returned type descriptor should be the same as the postgresql UUID
database type
"""
uuid_instance = DB_UUID()
dialect_impl = uuid_instance.load_dialect_impl(self.pg_sql_dialect)
self.assertIsInstance(
dialect_impl, self.pg_sql_dialect.type_descriptor(
postgresql.UUID()
).__class__
)
def test_bind_uuid_not_uuid_type(self, uuid: UUID) -> None:
"""
Tests that if the parameter to write in is a string that looks like
a UUID, then it is written to the DB as a de-hyphenated UUID. This
means that the id ``7490bda6-7c69-47c2-ad97-c7453f15811c`` gets written
as ``7490bda67c6947c2ad97c7453f15811c``. The length of the
de-hyphenated UUID MUST be 32 characters.
:param uuid: The uuid to write, randomly generated
"""
db_uuid = DB_UUID()
value_to_store = db_uuid.process_bind_param(
str(uuid), self.sqlite_dialect
)
self.assertEqual(
value_to_store, value_to_store.replace('-', '')
)
self.assertEqual(
self.expected_de_hyphenated_uuid_length,
len(value_to_store)
)
def load_dialect_impl(self, dialect: dialects) -> DialectType:
"""
SQLAlchemy wraps all database-specific features into
dialects, which are then responsible for generating the SQL code
for a specific DB type when loading in data. ``load_dialect_impl``
is called when CRUD (create, update, delete operations) needs to be
done on the database. This method is responsible for telling
SQLAlchemy how to configure the dialect to write this type
:param dialect: The loaded dialect
:return: The type descriptor for this type.
"""
if dialect.name == 'postgresql':
return dialect.type_descriptor(postgresql.UUID())
else:
return dialect.type_descriptor(CHAR(32))
def test_querying_table(metadata):
"""
Create an object for test table.
"""
# When using pytest-xdist, we don't want concurrent table creations
# across test processes so we assign a unique name for table based on
# the current worker id.
worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
return Table(
'test_querying_table_' + worker_id, metadata,
Column('id', types.Integer, autoincrement=True, primary_key=True),
Column('t_string', types.String(60)),
Column('t_list', types.ARRAY(types.String(60))),
Column('t_enum', types.Enum(MyEnum)),
Column('t_int_enum', types.Enum(MyIntEnum)),
Column('t_datetime', types.DateTime()),
Column('t_date', types.DateTime()),
Column('t_interval', types.Interval()),
Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
)
def _add_column_kwargs(self, kwargs, column):
"""Add keyword arguments to kwargs (in-place) based on the passed in
`Column <sqlalchemy.schema.Column>`.
"""
if column.nullable:
kwargs['allow_none'] = True
kwargs['required'] = not column.nullable and not _has_default(column)
if hasattr(column.type, 'enums'):
kwargs['validate'].append(validate.OneOf(choices=column.type.enums))
# Add a length validator if a max length is set on the column
# Skip UUID columns
# (see https://github.com/marshmallow-code/marshmallow-sqlalchemy/issues/54)
if hasattr(column.type, 'length'):
try:
python_type = column.type.python_type
except (AttributeError, NotImplementedError):
python_type = None
if not python_type or not issubclass(python_type, uuid.UUID):
kwargs['validate'].append(validate.Length(max=column.type.length))
if hasattr(column.type, 'scale'):
kwargs['places'] = getattr(column.type, 'scale', None)
def upgrade():
op.create_table('base_string',
sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('resource_pk', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('base_string', sa.Text(), nullable=False),
sa.Column('comment', sa.Text(), nullable=False),
sa.Column('context', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['resource_pk'], ['resource.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.drop_table('translated_string')
op.create_table('translated_string',
sa.Column('id', sa.Integer(), nullable=False, autoincrement=True),
sa.Column('base_string_pk', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('language_pk', sa.Integer, nullable=False),
sa.Column('translation', sa.Text(), nullable=False),
sa.Column('translator_comment', sa.Text(), nullable=False),
sa.ForeignKeyConstraint(['base_string_pk'], ['base_string.id'], ),
sa.ForeignKeyConstraint(['language_pk'], ['language.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('base_string_pk', 'language_pk', name='base_string_language_uc'),
)
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('india_ethnicities',
sa.Column('id', postgresql.UUID(), server_default=sa.text(u'uuid_generate_v4()'), nullable=False),
sa.Column('patient_id', sa.Integer(), nullable=False),
sa.Column('source_group_id', sa.Integer(), nullable=False),
sa.Column('source_type', sa.String(), nullable=False),
sa.Column('father_ancestral_state', sa.String(), nullable=True),
sa.Column('father_language', sa.String(), nullable=True),
sa.Column('mother_ancestral_state', sa.String(), nullable=True),
sa.Column('mother_language', sa.String(), nullable=True),
sa.Column('created_user_id', sa.Integer(), nullable=False),
sa.Column('created_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False),
sa.Column('modified_user_id', sa.Integer(), nullable=False),
sa.Column('modified_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False),
sa.ForeignKeyConstraint(['created_user_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['modified_user_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['patient_id'], ['patients.id'], onupdate='CASCADE', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['source_group_id'], ['groups.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('india_ethnicity_patient_idx', 'india_ethnicities', ['patient_id'], unique=False)
# ### end Alembic commands ###
cb7650cd6a4d_add_web_hook_and_web_hook_token.py 文件源码
项目:Albireo
作者: lordfriend
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('web_hook',
sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('name', sa.TEXT(), nullable=False, unique=True),
sa.Column('description', sa.TEXT(), nullable=True),
sa.Column('url', sa.TEXT(), nullable=False),
sa.Column('shared_secret', sa.TEXT(), nullable=False),
sa.Column('status', sa.Integer(), nullable=False),
sa.Column('consecutive_failure_count', sa.Integer(), nullable=False),
sa.Column('register_time', sa.TIMESTAMP(), nullable=False),
sa.Column('created_by_uid', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('permissions', sa.TEXT(), nullable=False, server_default='"[]"'),
sa.PrimaryKeyConstraint('id')
)
op.create_table('web_hook_token',
sa.Column('token_id', sa.TEXT(), nullable=False),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('web_hook_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['web_hook_id'], ['web_hook.id'], ),
sa.PrimaryKeyConstraint('user_id', 'web_hook_id')
)
# ### end Alembic commands ###
def _update_rels(self, model, links):
session = self.DBSession()
source = model.rid
rels = {(k, uuid.UUID(target)) for k, targets in links.items() for target in targets}
existing = {
(link.rel, link.target_rid)
for link in model.rels
}
to_remove = existing - rels
to_add = rels - existing
for rel, target in to_remove:
link = session.query(Link).get((source, rel, target))
session.delete(link)
for rel, target in to_add:
link = Link(source_rid=source, rel=rel, target_rid=target)
session.add(link)
return to_add, to_remove
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value).int
else:
# hexstring
return "%.32x" % value.int
def process_result_value(self, value, dialect):
if value is None:
return value
else:
return uuid.UUID(value)
79a6e3998d6_add_pod_states_table.py 文件源码
项目:kuberdock-platform
作者: cloudlinux
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.create_table('pod_states',
sa.Column('pod_id', postgresql.UUID(), nullable=False),
sa.Column('start_time', sa.DateTime(), nullable=False),
sa.Column('end_time', sa.DateTime(), nullable=True),
sa.Column('last_event_time', sa.DateTime(), nullable=True),
sa.Column('last_event', sa.String(length=255), nullable=True),
sa.Column('hostname', sa.String(length=255), nullable=True),
sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ),
sa.PrimaryKeyConstraint('pod_id', 'start_time')
)
### end Alembic commands ###
56f9182bf415_add_persistent_disk.py 文件源码
项目:kuberdock-platform
作者: cloudlinux
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def upgrade():
op.create_table(
'persistent_disk',
sa.Column('id', sa.String(length=32), nullable=False),
sa.Column('drive_name', sa.String(length=64), nullable=False),
sa.Column('name', sa.String(length=64), nullable=False),
sa.Column('owner_id', sa.Integer(), nullable=False),
sa.Column('size', sa.Integer(), nullable=False),
sa.Column('pod_id', postgresql.UUID(), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ),
sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('drive_name'),
sa.UniqueConstraint('name', 'owner_id')
)
889fee1f3c80_add_meta_to_dataset.py 文件源码
项目:ml-annotate
作者: planbrothers
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def downgrade():
op.add_column('dataset', sa.Column('organization_id', postgresql.UUID(), autoincrement=False, nullable=True))
op.execute('''
UPDATE "dataset" SET
organization_id = (meta->>'organization_id')::uuid
WHERE
meta->>'organization_id' IS NOT NULL
''')
op.drop_column('dataset', 'meta')
def test_bind_uuid_postgres(self, uuid: UUID) -> None:
"""
Tests that if the UUID is passed to a Postgres dialect, that it is
returned as a string with hyphens
:param uuid: A randomly-generated UUID to store
"""
db_uuid = DB_UUID()
value_to_store = db_uuid.process_bind_param(
uuid, self.pg_sql_dialect
)
self.assertEqual(value_to_store, str(uuid))
def test_bind_uuid_something_else(self, uuid: UUID) -> None:
"""
Tests that the UUID gets de-hyphenated if using the CHAR 32 type, same
as always.
:param uuid: A randomly-generated UUID to store
"""
db_uuid = DB_UUID()
value_to_store = db_uuid.process_bind_param(
uuid, self.sqlite_dialect
)
self.assertEqual(
value_to_store, "%.32x" % int(uuid)
)
def process_result_value(
self, value: Optional[str], dialect: dialects
) -> Optional[dict]:
"""
:param value: The value to process from the SQL query
:param dialect: The dialect to use for the processing
:return: The value as a UUID
"""
if value is None:
return value
else:
return json.loads(value)
def process_result_value(
self, value: Optional[str], dialect: dialects
) -> Optional[uuid.UUID]:
"""
:param value: The value to process from the SQL query
:param dialect: The dialect to use for the processing
:return: The value as a UUID
"""
if value is None:
return value
else:
return uuid.UUID(value)
def copy(self, *args, **kwargs) -> 'UUID':
"""
:param args: The arguments to the UUID constructor
:param kwargs: The keyword arguments to the UUID constructor
:return: A deep copy of this object
"""
return UUID(*args, **kwargs)
def UuidColumn(*args, **kwargs):
kwargs.setdefault('nullable', False)
kwargs['type_'] = UUID(as_uuid=True)
return Column(*args, **kwargs)
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'source', type_='foreignkey')
op.drop_index(op.f('ix_source_author_id'), table_name='source')
op.drop_column('source', 'author_id')
op.add_column(
'build', sa.Column('author_id', postgresql.UUID(), autoincrement=False, nullable=True)
)
op.create_foreign_key('build_author_id_fkey', 'build', 'author', ['author_id'], ['id'])
op.create_index('ix_build_author_id', 'build', ['author_id'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('filecoverage', sa.Column(
'job_id', postgresql.UUID(), autoincrement=False, nullable=False))
op.drop_constraint(None, 'filecoverage', type_='foreignkey')
op.create_foreign_key('filecoverage_job_id_fkey', 'filecoverage', 'job', [
'job_id'], ['id'], ondelete='CASCADE')
op.create_unique_constraint(
'unq_job_filname', 'filecoverage', ['job_id', 'filename'])
op.drop_constraint('unq_coverage_filname', 'filecoverage', type_='unique')
op.drop_index(op.f('ix_filecoverage_build_id'), table_name='filecoverage')
op.drop_column('filecoverage', 'build_id')
# ### end Alembic commands ###
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql':
return dialect.type_descriptor(UUID())
else:
return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect):
if value is None:
return value
elif dialect.name == 'postgresql':
return str(value)
else:
if not isinstance(value, uuid.UUID):
return "%.32x" % uuid.UUID(value)
else:
# hexstring
return "%.32x" % value
def process_result_value(self, value, dialect):
if value is None:
return value
else:
return uuid.UUID(value)
def __init__(self, as_uuid=True):
super(UUID, self).__init__(binary=True, native=True)
def load_dialect_impl(self, dialect):
if dialect.name == 'postgresql' and self.native:
# Use the native UUID type.
return dialect.type_descriptor(postgresql.UUID())
else:
# Fallback to either a BINARY or a CHAR.
kind = types.BINARY(16)
return dialect.type_descriptor(kind)