def get_or_create(self, session, model, **kwargs):
try:
query = session.query(model).filter_by(**kwargs)
instance = query.first()
if instance:
return instance, False
else:
try:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
instance = model(**params)
session.add(instance)
session.commit()
session.refresh(instance)
return instance, True
except IntegrityError as e:
# We have failed to add track, rollback current session and continue
session.rollback()
print "[-]Failed to add, continuing"
except Exception as e:
raise e
python类ClauseElement()的实例源码
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def create(self, model, defaults=None, **kwargs):
params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement))
params.update(defaults or {})
instance = model(**params)
self.session.add(instance)
self.session.flush()
return instance
def make_sql_clause(s, constructor):
if not isinstance(s, ex.ClauseElement):
return constructor(s)
else:
return s
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def get_or_create(session, model, defaults=None, **kwargs):
"""
Generic method to get or create a SQLalchemy object from database
"""
if defaults is None:
defaults = {}
try:
query = session.query(model).filter_by(**kwargs)
instance = query.first()
if instance:
return instance, False
else:
session.begin(nested=True)
try:
params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement))
params.update(defaults)
instance = model(**params)
session.add(instance)
session.commit()
return instance, True
except IntegrityError as e:
session.rollback()
instance = query.one()
return instance, False
except Exception as e:
raise e
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
# get an existing @compiles handler
existing = class_.__dict__.get('_compiler_dispatcher', None)
# get the original handler. All ClauseElement classes have one
# of these, but some TypeEngine classes will not.
existing_dispatch = getattr(class_, '_compiler_dispatch', None)
if not existing:
existing = _dispatcher()
if existing_dispatch:
def _wrap_existing_dispatch(element, compiler, **kw):
try:
return existing_dispatch(element, compiler, **kw)
except exc.UnsupportedCompilationError:
raise exc.CompileError(
"%s construct has no default "
"compilation handler." % type(element))
existing.specs['default'] = _wrap_existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
# get an existing @compiles handler
existing = class_.__dict__.get('_compiler_dispatcher', None)
# get the original handler. All ClauseElement classes have one
# of these, but some TypeEngine classes will not.
existing_dispatch = getattr(class_, '_compiler_dispatch', None)
if not existing:
existing = _dispatcher()
if existing_dispatch:
def _wrap_existing_dispatch(element, compiler, **kw):
try:
return existing_dispatch(element, compiler, **kw)
except exc.UnsupportedCompilationError:
raise exc.CompileError(
"%s construct has no default "
"compilation handler." % type(element))
existing.specs['default'] = _wrap_existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def get(cls, *args, **kwargs):
"""Generic 'smart' get function. Does different things based
on number of arguments. Some would call this a 'not smart'
idea but we are not asking them.
Single positional argument and no kwargs:
args[0] is not of type ClauseElement:
Looks up model by primary key of model specified by cls.PKEY
args[0] is of type ClauseElement:
Adds the clause emelemt to a where clause for a query on cls.
Anything Else (any combination of multiple args and any kwargs):
Converts all kwargs to clause elements, adds the args (all
should be clause elements), and passes them together in as
the where filter (all combined with AND) to a query on cls.
"""
if len(args) == 1 and not isinstance(args[0], ClauseElement):
terms = [getattr(cls, cls.PKEY) == args[0]]
else:
terms = list(args) + \
[getattr(cls, k) == v for k, v in kwargs.items()]
obj = cls.query.filter(*terms).first()
if obj:
return obj
cause = dict(kwargs) if kwargs else list(args)
raise ApiException(
"Failed to get %s: %s" % (cls.__name__, cause),
cause=list(cause.keys()) if kwargs else cause)
def exists(cls, *args, **kwargs):
terms = None
if len(args) == 1:
if not isinstance(args[0], ClauseElement):
terms = [getattr(cls, cls.PKEY) == args[0]]
if not terms:
terms = list(args) + \
[getattr(cls, k) == v for k, v in kwargs.items()]
res = db.query(getattr(cls, cls.PKEY)).filter(*terms).limit(1).first()
if res:
return res.id
return False
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher
def compiles(class_, *specs):
"""Register a function as a compiler for a
given :class:`.ClauseElement` type."""
def decorate(fn):
existing = class_.__dict__.get('_compiler_dispatcher', None)
existing_dispatch = class_.__dict__.get('_compiler_dispatch')
if not existing:
existing = _dispatcher()
if existing_dispatch:
existing.specs['default'] = existing_dispatch
# TODO: why is the lambda needed ?
setattr(class_, '_compiler_dispatch',
lambda *arg, **kw: existing(*arg, **kw))
setattr(class_, '_compiler_dispatcher', existing)
if specs:
for s in specs:
existing.specs[s] = fn
else:
existing.specs['default'] = fn
return fn
return decorate
def deregister(class_):
"""Remove all custom compilers associated with a given
:class:`.ClauseElement` type."""
if hasattr(class_, '_compiler_dispatcher'):
# regenerate default _compiler_dispatch
visitors._generate_dispatch(class_)
# remove custom directive
del class_._compiler_dispatcher