def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
python类VARCHAR的实例源码
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def sqltype_to_stdtype(sqltype):
import sqlalchemy.types as sqltypes
if isinstance(sqltype, (sqltypes.VARCHAR, sqltypes.CHAR, sqltypes.TEXT, sqltypes.Enum, sqltypes.String)):
return _STRING_TYPE
if isinstance(sqltype, (sqltypes.DATETIME, sqltypes.DATE, sqltypes.TIME, sqltypes.TIMESTAMP)):
return _DATE_TYPE
if isinstance(sqltype, (sqltypes.INTEGER, sqltypes.BIGINT, sqltypes.SMALLINT, sqltypes.Integer)):
return _INTEGER_TYPE
if isinstance(sqltype, (sqltypes.REAL, sqltypes.DECIMAL, sqltypes.NUMERIC, sqltypes.FLOAT)):
return _DECIMAL_TYPE
if isinstance(sqltype, sqltypes.BOOLEAN):
return _BOOLEAN_TYPE
def stdtype_to_sqltype(stdtype):
import sqlalchemy.types as sqltypes
if isinstance(stdtype, stdtypes.StringType):
return sqltypes.VARCHAR(length=stdtype.max_len) if 0 < stdtype.max_len < 65536 else sqltypes.TEXT()
if isinstance(stdtype, stdtypes.BoolType):
return sqltypes.BOOLEAN()
if isinstance(stdtype, stdtypes.DateType):
return sqltypes.DATE() if stdtype.only_date else sqltypes.TIMESTAMP()
if isinstance(stdtype, stdtypes.IntegerType):
return sqltypes.BIGINT() if stdtype.length > 11 else sqltypes.INTEGER()
if isinstance(stdtype, stdtypes.DecimalType):
return sqltypes.DECIMAL()
if isinstance(stdtype, stdtypes.ArrayType):
return sqltypes.ARRAY(item_type=stdtype.item_type)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw):
if not type_.length:
raise exc.CompileError(
"VARCHAR requires a length on dialect %s" %
self.dialect.name)
basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw)
return self._extend_string(type_, basic)
def tick_insert(data):
logging.debug("tick_data_rows: %d", len(data))
dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'}
dtypes['time'] = DATETIME
with db_buffer.connect() as conn:
data.to_sql('tick_data', conn, if_exists="append", index=False, dtype=dtypes, chunksize=None)
def history_insert(data):
logging.debug("history_data_rows: %d", len(data))
dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'}
with db_buffer.connect() as conn:
data.to_sql('history', conn, index=False, if_exists="append", dtype=dtypes, chunksize=None)
def history_index_insert(data):
logging.debug("history_index_data_rows: %d", len(data))
dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'}
with db_buffer.connect() as conn:
data.to_sql('history_index', conn, index=False, if_exists="append", dtype=dtypes, chunksize=None)
def fetch_stock_basics(conn):
logging.debug("Fetch stocks")
df = ts.get_stock_basics()
df['timeToMarket'] = df['timeToMarket'].map(lambda s: datetime.strptime(str(s), '%Y%m%d') if s > 0 else None)
df.to_sql('stock_basics', conn, if_exists="replace", dtype={"code": VARCHAR(32)})
def fetch_index_list(conn):
logging.debug("Fetch indices")
df = ts.get_index()[['code', 'name']]
df.to_sql('stock_index', conn, if_exists="replace", dtype={"code": VARCHAR(32)})
def __init__(self, length=None, **kwargs):
super(VARCHAR, self).__init__(length=length, **kwargs)