def interpolate(self, token):
pos = 0
end = len(token.value)
lineno = token.lineno
while 1:
match = _gettext_re.search(token.value, pos)
if match is None:
break
value = token.value[pos:match.start()]
if value:
yield Token(lineno, 'data', value)
lineno += count_newlines(token.value)
yield Token(lineno, 'variable_begin', None)
yield Token(lineno, 'name', 'gettext')
yield Token(lineno, 'lparen', None)
yield Token(lineno, 'string', match.group(1))
yield Token(lineno, 'rparen', None)
yield Token(lineno, 'variable_end', None)
pos = match.end()
if pos < end:
yield Token(lineno, 'data', token.value[pos:])
python类Token()的实例源码
def interpolate(self, token):
pos = 0
end = len(token.value)
lineno = token.lineno
while 1:
match = _gettext_re.search(token.value, pos)
if match is None:
break
value = token.value[pos:match.start()]
if value:
yield Token(lineno, 'data', value)
lineno += count_newlines(token.value)
yield Token(lineno, 'variable_begin', None)
yield Token(lineno, 'name', 'gettext')
yield Token(lineno, 'lparen', None)
yield Token(lineno, 'string', match.group(1))
yield Token(lineno, 'rparen', None)
yield Token(lineno, 'variable_end', None)
pos = match.end()
if pos < end:
yield Token(lineno, 'data', token.value[pos:])
def interpolate(self, token):
pos = 0
end = len(token.value)
lineno = token.lineno
while 1:
match = _gettext_re.search(token.value, pos)
if match is None:
break
value = token.value[pos:match.start()]
if value:
yield Token(lineno, 'data', value)
lineno += count_newlines(token.value)
yield Token(lineno, 'variable_begin', None)
yield Token(lineno, 'name', 'gettext')
yield Token(lineno, 'lparen', None)
yield Token(lineno, 'string', match.group(1))
yield Token(lineno, 'rparen', None)
yield Token(lineno, 'variable_end', None)
pos = match.end()
if pos < end:
yield Token(lineno, 'data', token.value[pos:])
def interpolate(self, token):
pos = 0
end = len(token.value)
lineno = token.lineno
while 1:
match = _gettext_re.search(token.value, pos)
if match is None:
break
value = token.value[pos:match.start()]
if value:
yield Token(lineno, 'data', value)
lineno += count_newlines(token.value)
yield Token(lineno, 'variable_begin', None)
yield Token(lineno, 'name', 'gettext')
yield Token(lineno, 'lparen', None)
yield Token(lineno, 'string', match.group(1))
yield Token(lineno, 'rparen', None)
yield Token(lineno, 'variable_end', None)
pos = match.end()
if pos < end:
yield Token(lineno, 'data', token.value[pos:])
def interpolate(self, token):
pos = 0
end = len(token.value)
lineno = token.lineno
while 1:
match = _gettext_re.search(token.value, pos)
if match is None:
break
value = token.value[pos:match.start()]
if value:
yield Token(lineno, 'data', value)
lineno += count_newlines(token.value)
yield Token(lineno, 'variable_begin', None)
yield Token(lineno, 'name', 'gettext')
yield Token(lineno, 'lparen', None)
yield Token(lineno, 'string', match.group(1))
yield Token(lineno, 'rparen', None)
yield Token(lineno, 'variable_end', None)
pos = match.end()
if pos < end:
yield Token(lineno, 'data', token.value[pos:])
transformer.py 文件源码
项目:airflow-declarative
作者: rambler-digital-solutions
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def filter_stream(self, stream):
"""
We convert
{{ some.variable | filter1 | filter 2}}
to
{{ some.variable | filter1 | filter 2 | yaml}}
... for all variable declarations in the template
This function is called by jinja2 immediately
after the lexing stage, but before the parser is called.
"""
while not stream.eos:
token = next(stream)
if token.test('variable_begin'):
var_expr = []
while not token.test('variable_end'):
var_expr.append(token)
token = next(stream)
variable_end = token
last_token = var_expr[-1]
if last_token.test('name') and last_token.value == 'yaml':
# don't yaml twice
continue
var_expr.append(Token(10, 'pipe', u'|'))
var_expr.append(Token(10, 'name', u'yaml'))
var_expr.append(variable_end)
for token in var_expr:
yield token
else:
yield token
def filter_stream(self, stream):
"""
We convert
{{ some.variable | filter1 | filter 2}}
to
{{ some.variable | filter1 | filter 2 | bind}}
... for all variable declarations in the template
This function is called by jinja2 immediately
after the lexing stage, but before the parser is called.
"""
while not stream.eos:
token = next(stream)
if token.test("variable_begin"):
var_expr = []
while not token.test("variable_end"):
var_expr.append(token)
token = next(stream)
variable_end = token
last_token = var_expr[-1]
if (not last_token.test("name")
or not last_token.value in ('bind', 'inclause', 'sqlsafe')):
param_name = self.extract_param_name(var_expr)
# don't bind twice
var_expr.append(Token(10, 'pipe', u'|'))
var_expr.append(Token(10, 'name', u'bind'))
var_expr.append(Token(2, 'lparen', u'('))
var_expr.append(Token(10, 'string', param_name))
var_expr.append(Token(2, 'rparen', u')'))
var_expr.append(variable_end)
for token in var_expr:
yield token
else:
yield token
def filter_stream(self, stream):
ctx = StreamProcessContext(stream)
for token in stream:
if token.type != 'data':
yield token
continue
ctx.token = token
value = self.normalize(ctx)
yield Token(token.lineno, 'data', value)
def filter_stream(self, stream):
ctx = StreamProcessContext(stream)
strip_depth = 0
while 1:
if stream.current.type == 'block_begin':
if stream.look().test('name:strip') or \
stream.look().test('name:endstrip'):
stream.skip()
if stream.current.value == 'strip':
strip_depth += 1
else:
strip_depth -= 1
if strip_depth < 0:
ctx.fail('Unexpected tag endstrip')
stream.skip()
if stream.current.type != 'block_end':
ctx.fail('expected end of block, got %s' %
describe_token(stream.current))
stream.skip()
if strip_depth > 0 and stream.current.type == 'data':
ctx.token = stream.current
value = self.normalize(ctx)
yield Token(stream.current.lineno, 'data', value)
else:
yield stream.current
stream.next()
def filter_stream(self, stream):
ctx = StreamProcessContext(stream)
for token in stream:
if token.type != 'data':
yield token
continue
ctx.token = token
value = self.normalize(ctx)
yield Token(token.lineno, 'data', value)
def filter_stream(self, stream):
ctx = StreamProcessContext(stream)
strip_depth = 0
while True:
if stream.current.type == 'block_begin':
if stream.look().test('name:strip') or stream.look().test(
'name:endstrip'):
stream.skip()
if stream.current.value == 'strip':
strip_depth += 1
else:
strip_depth -= 1
if strip_depth < 0:
ctx.fail('Unexpected tag endstrip')
stream.skip()
if stream.current.type != 'block_end':
ctx.fail(
'expected end of block, got %s' % describe_token(
stream.current))
stream.skip()
if strip_depth > 0 and stream.current.type == 'data':
ctx.token = stream.current
value = self.normalize(ctx)
yield Token(stream.current.lineno, 'data', value)
else:
yield stream.current
next(stream)