python类Integer()的实例源码

postgres.py 文件源码 项目:annotated-py-sqlalchemy 作者: hhstore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_column_specification(self, column, override_pk=False, **kwargs):
        colspec = column.name
        if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
            colspec += " SERIAL"
        else:
            colspec += " " + column.type.get_col_spec()
            default = self.get_column_default_string(column)
            if default is not None:
                colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"
        if column.primary_key and not override_pk:
            colspec += " PRIMARY KEY"
        if column.foreign_key:
            colspec += " REFERENCES %s(%s)" % (column.column.foreign_key.column.table.fullname, column.column.foreign_key.column.name) 
        return colspec
mysql.py 文件源码 项目:annotated-py-sqlalchemy 作者: hhstore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_column_specification(self, column, override_pk=False, first_pk=False):
        colspec = column.name + " " + column.type.get_col_spec()
        default = self.get_column_default_string(column)
        if default is not None:
            colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"
        if column.primary_key:
            if not override_pk:
                colspec += " PRIMARY KEY"
            if first_pk and isinstance(column.type, types.Integer):
                colspec += " AUTO_INCREMENT"
        if column.foreign_key:
            colspec += ", FOREIGN KEY (%s) REFERENCES %s(%s)" % (column.name, column.column.foreign_key.column.table.name, column.column.foreign_key.column.name) 
        return colspec
model.py 文件源码 项目:osm-wikidata 作者: EdwardBetts 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def url(self):
        return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)

# class ItemCandidateTag(Base):
#     __tablename__ = 'item_candidate_tag'
#     __table_args__ = (
#         ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
#                              [ItemCandidate.item_id,
#                               ItemCandidate.osm_id,
#                               ItemCandidate.osm_type]),
#     )
#
#     item_id = Column(Integer, primary_key=True)
#     osm_id = Column(BigInteger, primary_key=True)
#     osm_type = Column(osm_type_enum, primary_key=True)
#     k = Column(String, primary_key=True)
#     v = Column(String, primary_key=True)
#
#     item_candidate = relationship(ItemCandidate,
#                                   backref=backref('tag_table', lazy='dynamic'))
base.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_column_specification(self, column, **kw):
        colspec = self.preparer.format_column(column)
        first = None
        if column.primary_key and column.autoincrement:
            try:
                first = [c for c in column.table.primary_key.columns
                         if (c.autoincrement and
                             isinstance(c.type, sqltypes.Integer) and
                             not c.foreign_keys)].pop(0)
            except IndexError:
                pass

        if column is first:
            colspec += " SERIAL"
        else:
            colspec += " " + self.dialect.type_compiler.process(column.type)
            default = self.get_column_default_string(column)
            if default is not None:
                colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"

        return colspec
base.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_column_specification(self, column, **kwargs):
        coltype = self.dialect.type_compiler.process(column.type)
        colspec = self.preparer.format_column(column) + " " + coltype
        default = self.get_column_default_string(column)
        if default is not None:
            colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"

        if (column.primary_key and
            column.table.kwargs.get('sqlite_autoincrement', False) and
            len(column.table.primary_key.columns) == 1 and
            issubclass(column.type._type_affinity, sqltypes.Integer) and
            not column.foreign_keys):
                colspec += " PRIMARY KEY AUTOINCREMENT"

        return colspec
types.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _expression_adaptations(self):
        return {
            operators.add: {
                Integer: self.__class__,
                Interval: DateTime,
                Time: DateTime,
            },
            operators.sub: {
                # date - integer = date
                Integer: self.__class__,

                # date - date = integer.
                Date: Integer,

                Interval: DateTime,

                # date - datetime = interval,
                # this one is not in the PG docs
                # but works
                DateTime: Interval,
            },
        }
mysql.py 文件源码 项目:Price-Comparator 作者: Thejas-1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def visit_column(self, delta):
        table = delta.table
        colspec = self.get_column_specification(delta.result_column)
        if delta.result_column.autoincrement:
            primary_keys = [c for c in table.primary_key.columns
                       if (c.autoincrement and
                            isinstance(c.type, sqltypes.Integer) and
                            not c.foreign_keys)]

            if primary_keys:
                first = primary_keys.pop(0)
                if first.name == delta.current_name:
                    colspec += " AUTO_INCREMENT"
        old_col_name = self.preparer.quote(delta.current_name, table.quote)

        self.start_alter_table(table)

        self.append("CHANGE COLUMN %s " % old_col_name)
        self.append(colspec)
        self.execute()
test_querying.py 文件源码 项目:asyncpgsa 作者: CanopyTax 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_querying_table(metadata):
    """
    Create an object for test table.

    """

    # When using pytest-xdist, we don't want concurrent table creations
    # across test processes so we assign a unique name for table based on
    # the current worker id.
    worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
    return Table(
        'test_querying_table_' + worker_id, metadata,
        Column('id', types.Integer, autoincrement=True, primary_key=True),
        Column('t_string', types.String(60)),
        Column('t_list', types.ARRAY(types.String(60))),
        Column('t_enum', types.Enum(MyEnum)),
        Column('t_int_enum', types.Enum(MyIntEnum)),
        Column('t_datetime', types.DateTime()),
        Column('t_date', types.DateTime()),
        Column('t_interval', types.Interval()),
        Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
    )
mysql.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def visit_column(self, delta):
        table = delta.table
        colspec = self.get_column_specification(delta.result_column)
        if delta.result_column.autoincrement:
            primary_keys = [c for c in table.primary_key.columns
                       if (c.autoincrement and
                            isinstance(c.type, sqltypes.Integer) and
                            not c.foreign_keys)]

            if primary_keys:
                first = primary_keys.pop(0)
                if first.name == delta.current_name:
                    colspec += " AUTO_INCREMENT"
        q = util.safe_quote(table)
        old_col_name = self.preparer.quote(delta.current_name, q)

        self.start_alter_table(table)

        self.append("CHANGE COLUMN %s " % old_col_name)
        self.append(colspec)
        self.execute()
test_reflection.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
test_reflection.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
mysql.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
schemaobj.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
postgres.py 文件源码 项目:annotated-py-sqlalchemy 作者: hhstore 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_column_default(self, column):
        if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
            c = self.proxy("select nextval('%s_%s_seq')" % (column.table.name, column.name))
            return c.fetchone()[0]
        else:
            return ansisql.ANSIDefaultRunner.get_column_default(self, column)
sql.py 文件源码 项目:annotated-py-sqlalchemy 作者: hhstore 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _oid_col(self):
        # OID remains a little hackish so far
        if not hasattr(self, '_oid_column'):
            if self.table.engine.oid_column_name() is not None:
                self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True)
                self._oid_column._set_parent(self.table)
            else:
                self._oid_column = None
        return self._oid_column
meta.py 文件源码 项目:scrobbler 作者: hatarist 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tag(name=None):
    sort_by = request.args.get('sort_by')

    if sort_by == 'play_count':
        sort_by = [Artist.playcount.desc()]
    elif sort_by == 'local_play_count':
        sort_by = [Artist.local_playcount.desc()]
    elif sort_by == 'tag_strength':
        sort_by = [desc('strength')]
    else:
        sort_by = [desc('strength'), Artist.local_playcount.desc()]

    name = name.lower()

    top_artists = (
        db.session.query(Artist.name, Artist.tags[name].cast(Integer).label('strength'), Artist.local_playcount, Artist.playcount)
        .filter(Artist.tags.has_key(name))
        .order_by(*sort_by)
        .all()
    )

    top_artists = enumerate(top_artists, start=1)

    return render_template(
        'meta/tag.html',
        tag=tag,
        top_artists=top_artists,
    )
test_reflection.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
test_reflection.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
base.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def visit_typeclause(self, typeclause):
        type_ = typeclause.type.dialect_impl(self.dialect)
        if isinstance(type_, sqltypes.Integer):
            return 'INTEGER'
        else:
            return super(DrizzleCompiler, self).visit_typeclause(typeclause)
test_reflection.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
test_reflection.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
operations.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, sa_schema.MetaData(),
                    sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
operations.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def bulk_insert(self, table, rows):
        """Issue a "bulk insert" operation using the current
        migration context.

        This provides a means of representing an INSERT of multiple rows
        which works equally well in the context of executing on a live
        connection as well as that of generating a SQL script.   In the
        case of a SQL script, the values are rendered inline into the
        statement.

        e.g.::

            from alembic import op
            from datetime import date
            from sqlalchemy.sql import table, column
            from sqlalchemy import String, Integer, Date

            # Create an ad-hoc table to use for the insert statement.
            accounts_table = table('account',
                column('id', Integer),
                column('name', String),
                column('create_date', Date)
            )

            op.bulk_insert(accounts_table,
                [
                    {'id':1, 'name':'John Smith',
                            'create_date':date(2010, 10, 5)},
                    {'id':2, 'name':'Ed Williams',
                            'create_date':date(2007, 5, 27)},
                    {'id':3, 'name':'Wendy Jones',
                            'create_date':date(2008, 8, 15)},
                ]
            )
          """
        self.impl.bulk_insert(table, rows)
base.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def visit_typeclause(self, typeclause):
        type_ = typeclause.type.dialect_impl(self.dialect)
        if isinstance(type_, sqltypes.Integer):
            return 'INTEGER'
        else:
            return super(DrizzleCompiler, self).visit_typeclause(typeclause)
test_reflection.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
test_reflection.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
test_reflection.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
test_reflection.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
mysql.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
schemaobj.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck


问题


面经


文章

微信
公众号

扫码关注公众号