def get_all_without_views(cls, formats=[]):
'''Returns all resources that have no resource views
:param formats: if given, returns only resources that have no resource
views and are in any of the received formats
:type formats: list
:rtype: list of ckan.model.Resource objects
'''
query = meta.Session.query(cls).outerjoin(ckan.model.ResourceView) \
.filter(ckan.model.ResourceView.id == None)
if formats:
lowercase_formats = [f.lower() for f in formats]
query = query.filter(func.lower(cls.format).in_(lowercase_formats))
return query.all()
python类lower()的实例源码
def search(cls, search):
if search is None or len(search) == 0:
return []
query = Tool.query.outerjoin(EDAMOperation, Tool.edam_operations).join(ToolVersion).join(Instance, ToolVersion.instances)
nodes = parse_search_query(search)
for node in nodes:
if type(node) == ComparisonNode:
key = node[0]
value = u" ".join(node[2]).lower()
if key == u"topic":
query = query.filter(func.lower(EDAMOperation.label) == value)
elif key == u"instance":
query = query.filter(func.lower(Instance.brand) == value)
else:
# unknown key
return []
else:
term = "%{0}%".format(" ".join(node).lower())
query = query.filter(Tool.name.ilike(term) | Tool.description.ilike(term) | Tool.display_name.ilike(term))
return query.all()
def __get_stocks(self, symbol):
"""????????????????????
Args:
symbol: ?????????????None????????????????
Returns:
???????????????
"""
with start_session() as session:
q = session.query(Stock)
if symbol:
q = q.filter(func.lower(Stock.symbol) == symbol.lower())
else:
q = q.filter_by(activated=True)
return q.all()
def make_parameter(self, desc):
parameter = self.database.get_session() \
.query(self.Parameter) \
.filter(desc.strip().lower() == func.lower(self.Parameter.description)) \
.first()
if parameter is None:
parameter = self.Parameter(description=desc.strip())
return parameter;
# Example for reference
# form[Division] = navn
# form[Parameter1] = en
# form[Type1] = Enum
# form[Min1] =
# form[Max1] =
# form[Option1_1] = a
# form[Option1_2] = b
# form[Option1_3] = c
# form[Parameter2] = nu
# form[Type2] = Number
# form[Min2] = 0
# form[Max2] = 10
def update_setting():
# generic method to update settings
setting_name = request.form["setting"]
data = request.form["data"]
if data == "true":
data = True
elif data == "false":
data = False
if setting_name.lower() not in g.settings.__dict__.keys():
return bad_request(f"setting field {setting_name} does not exist")
if setting_name == 'min_age' and data is not None:
age = age_to_words(int(data))
setattr(g.settings, 'min_age_word', age)
setattr(g.settings, setting_name, data)
db.session.add(g.settings)
db.session.commit()
return jsonify(status="OK"), 200
def settings():
# returns data about the current settings
i = inspect(db.engine)
cols = i.get_columns("Settings")
ignore = [
'id',
'questions',
'required_ids',
'response_body',
'min_age_word']
out = {}
for col in cols:
# don't include things that can't be modified in the settings menu
if col["name"].lower() in ignore:
continue
col_dict = {}
col_dict["value"] = getattr(g.settings, col["name"])
col_dict["type"] = type(col_dict["value"]).__name__
out[col["name"].lower()] = col_dict
return Response(json.dumps(out), mimetype="application/json")
def add_exemption():
# adds an exemption of restrictions for a user
username = request.form["username"]
if not username:
return bad_request("username not provided")
user = User.query.filter(func.lower(username) ==
func.lower(username)).first()
if user:
user.is_exempt = True
if len(user.response) > 0:
user.response = ""
db.session.add(user)
db.session.commit()
return jsonify(status="OK"), 200
else:
return bad_request("User does not exist yet")
def post(self):
slack_id = self.get_argument("slack_id", None)
if not slack_id:
raise tornado.web.HTTPError(400)
slackidentity = SlackIdentityModel(slack_id.lower())
slackidentity.user = self.current_user
session.add(slackidentity)
session.commit()
sec_log.info(
"slackidentity {} added to {}".format(
slackidentity, slackidentity.user)
)
return self.redirect(
"/services/add_slack_identity/success?slackidentity_id={id}".format(
id=slackidentity.id)
)
def get_all_statements_with_value(request, value):
"""
Returns all statements, where with the value
:param request: request
:param value: string
:return: dict()
"""
issue_uid = issue_helper.get_issue_id(request)
db_statements = get_not_disabled_statement_as_query().filter_by(issue_uid=issue_uid).all()
return_array = []
slug = DBDiscussionSession.query(Issue).get(issue_uid).slug
_um = UrlManager(request.application_url, for_api=False, slug=slug)
for stat in db_statements:
db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first()
if value.lower() in db_tv.content.lower():
rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid)
rd['url'] = _um.get_url_for_statement_attitude(False, db_tv.statement_uid)
return_array.append(rd)
return_array = __sort_array(return_array)
return return_array[:list_length]
def get_strings_for_start(value, issue, is_startpoint):
"""
Checks different position-strings for a match with given value
:param value: string
:param issue: int
:param is_startpoint: boolean
:return: dict()
"""
db_statements = get_not_disabled_statement_as_query().filter(and_(Statement.is_startpoint == is_startpoint,
Statement.issue_uid == issue)).all()
return_array = []
for stat in db_statements:
db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first()
if value.lower() in db_tv.content.lower():
rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid)
return_array.append(rd)
return_array = __sort_array(return_array)
return return_array[:list_length]
def get_strings_for_edits(value, statement_uid):
"""
Checks different textversion-strings for a match with given value
:param value: string
:param statement_uid: Statement.uid
:return: dict()
"""
db_tvs = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=statement_uid).all() # TODO #432
return_array = []
index = 1
for textversion in db_tvs:
if value.lower() in textversion.content.lower():
rd = __get_fuzzy_string_dict(current_text=value, return_text=textversion.content, uid=textversion.statement_uid) # TODO #432
return_array.append(rd)
index += 1
return_array = __sort_array(return_array)
return return_array[:list_length]
def get_strings_for_public_nickname(value, nickname):
"""
Returns dictionaries with public nicknames of users, where the nickname containts the value
:param value: String
:param nickname: current users nickname
:return: dict()
"""
db_user = DBDiscussionSession.query(User).filter(func.lower(User.public_nickname).contains(func.lower(value)),
~User.public_nickname.in_([nickname, 'admin', nick_of_anonymous_user])).all()
return_array = []
for index, user in enumerate(db_user):
dist = get_distance(value, user.public_nickname)
return_array.append({'index': index,
'distance': dist,
'text': user.public_nickname,
'avatar': get_public_profile_picture(user)})
return_array = __sort_array(return_array)
return return_array[:list_length]
def json_accounts_user_search():
query = request.args['query']
return jsonify(accounts=map(
lambda result: {
"account_id": result[0],
"user_id": result[1],
"user_name": result[2]
},
session.query(FinanceAccount.id, User.id, User.name).filter(
FinanceAccount.id == User.finance_account_id,
or_(
func.lower(User.name).like(func.lower("%{0}%".format(query))),
User.id.like("{0}%".format(query))
)
).all()
))
def get_account_id(s: sqlalchemy.orm.session.Session,
name: str,
server: Server) -> int:
"""Get an account id, creating the account if needed.
Note that player names are not case sensitive, so names are stored with
their canonical capitalisation but we always compare the lowercase version.
"""
player_id = get_player_id(s, name)
acc = s.query(Account.id).filter(
func.lower(Account.name) == name.lower(),
Account.server == server).one_or_none()
if acc:
return acc[0]
else:
acc = Account(name=name, server=server, player_id=player_id)
s.add(acc)
s.commit()
return acc.id
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def search_by_name(self, keyword):
if len(keyword) == 0:
return []
keyword = '%' + keyword.lower() + '%'
return self.db.session.query(User).filter(or_(func.lower(User.name).like(keyword),
func.lower(User.fullname).like(keyword))).all()
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def _validate_unique_case_insensitive(self, model_field, field, value):
model = model_field.class_
taken_query = model.query.filter(
func.lower(model_field) == func.lower(value)
)
if self.id is not None: # in case of update
taken_query = taken_query.filter(model.id != self.id)
if taken_query.first() is not None:
self._error(field, 'has already been taken')
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def get(uid=None, limit=60, offset=0, maxlines=None, colored=None, scene=None, name=None):
q = db.session.query(AsciiModel)
if uid and isinstance(uid, int):
result = q.filter(AsciiModel.id == uid).first()
if result:
result.html = AsciiController.irc_to_html(result.path)
return [result]
if maxlines and isinstance(maxlines, int):
q = q.filter(AsciiModel.numlines <= maxlines)
if name and isinstance(name, str):
name = name.replace("%", "")
name = name.replace("_", "")
q = q.filter(func.lower(AsciiModel.name).like("%"+name+"%"))
if isinstance(colored, bool):
q = q.filter(AsciiModel.colored == colored)
if isinstance(scene, bool):
q = q.filter(AsciiModel.scene == scene)
if offset and isinstance(offset, int):
q = q.offset(offset)
if limit and isinstance(limit, int):
q = q.limit(limit)
results = q.all()
rtn = []
for result in results:
result.html = AsciiController.irc_to_html(result.path)
rtn.append(result)
return rtn
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def build_comparision(self, field, value):
if issubclass(field.type.__class__, ARRAY):
return or_(*[field.any(v.lower()) for v in value.split(',')])
return or_(*[func.lower(field).like('%{}%'.format(v.lower())) for v in value.split(',')])
def __output(self, stock, indicators):
"""history????indicator???????????
Args:
stock: ?????????
indicators: ????indicator????
"""
filename = '-'.join([i.__class__.__name__ for i in indicators])
output_path = os.path.join(const.OUTPUT_INDICATOR_DIR,
'{}-{}.csv'.format(stock.symbol.lower(), filename))
self._prepare_directory(os.path.dirname(output_path))
headers = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
for i in indicators:
headers.append(i.name_with_args)
with open(output_path, 'w', encoding=self.encoding, newline='') as fp:
w = csv.writer(fp)
w.writerow(headers)
for idx, hist in enumerate(stock.histories):
row = [hist.date.strftime('%Y-%m-%d'),
hist.open_price,
hist.high_price,
hist.low_price,
hist.close_price,
hist.volume]
for i in indicators:
try:
val = i.get(idx)
if isinstance(val, float):
# numpy.float64???float????????????
# ?CSV????????????????????????
val = float(val)
except NoDataError:
val = ''
row.append(val)
w.writerow(row)
plogger.info('[{}] ????????????'.format(output_path))
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def prepare_queryset(query, model, key, value):
return query.filter(func.lower(getattr(model, key)).contains(value[0].lower()))