def _struct_typedef():
return (
_TYPEDEF
+ (_STRUCT.setResultsName("type") | _UNION.setResultsName("type"))
+ pyparsing.Optional(_IDENTIFIER).setResultsName("id")
+ parsers.anything_in_curly()
+ pyparsing.Optional(_STAR)
+ _IDENTIFIER.setResultsName("typedef_name")
+ pyparsing.SkipTo(_SEMICOLON)
+ _SEMICOLON
).setResultsName("_struct_typedef")
python类Optional()的实例源码
def _enum():
return (
_ENUM
+ pyparsing.Optional(_IDENTIFIER).setResultsName("id")
+ parsers.anything_in_curly()
+ _SEMICOLON
).setResultsName("_enum")
def _define_function_like(self):
return (
(_IDENTIFIER.setResultsName("name")
+ _OPEN_PARENTHESES).leaveWhitespace()
+ pyparsing.Optional(
pyparsing.delimitedList(
_IDENTIFIER
| pyparsing.Literal("...") # vararg macro.
)).setResultsName("arguments")
+ _CLOSE_PARENTHESES
+ pyparsing.restOfLine.setResultsName("replacement")
).setParseAction(self._add_function_like)
def parse_line(attribute, string):
Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=')) + String('data') + Suppress(Literal(';') + Optional(Comments))
result, i, j = Grammar.scanString(string).next()
return [int_else_float_except_string(s) for s in result['data'].asList()]
def compute(self):
def getname(obj, name):
_val = None
if hasattr(obj, name):
_val = getattr(obj, name, None)
if _val is None:
return _val
try:
if _val.isdynamic: #TODO make this work for non-attributes, non-dynamics (use .issingleton? - what about a concat mode?)
raise ValueError('Combine plugin cannot process %s because it contains a dynamic class' % name)
except AttributeError:
raise TypeError('Expected an attribute but got a %s' % type(_val))
if _val.issingleton():
_ret = '%s' % _val[0].raw()
else:
_ret = ', '.join(['%s' % v.raw() for v in _val])
return _ret
attrmarker = (p.Literal('@') | p.Literal('!'))
attrmatch = attrmarker.suppress() + p.Word(p.alphanums)
for i in attrmatch.scanString(self.config):
x = i[0][0]
self.__attribs__[x] = getname(self.targetobject, x)
if all(v is not None for v in self.__attribs__.values()):
self.computable = True
if self.computable:
attrmatch = p.Literal('@').suppress() + p.Word(p.alphanums)
attrmatch.setParseAction(self.substitute)
attrlist = p.ZeroOrMore(p.Optional(p.White()) + attrmatch + p.Optional(p.White()))
self.__result__ = attrlist.transformString(self.config)
def parseTerms():
"""
expop :: '^'
multop :: '*' | '/'
addop :: '+' | '-'
integer :: ['+' | '-'] '0'..'9'+
atom :: PI | E | real | fn '(' expr ')' | '(' expr ')'
factor :: atom [ expop factor ]*
term :: factor [ multop factor ]*
expr :: term [ addop term ]*
"""
global terms
if not terms:
point = Literal( "." )
e = CaselessLiteral( "E" )
fnumber = Combine( Word( "+-"+nums, nums ) +
Optional( point + Optional( Word( nums ) ) ) +
Optional( e + Word( "+-"+nums, nums ) ) )
ident = Word(alphas, alphas+nums+"_$")
plus = Literal( "+" )
minus = Literal( "-" )
mult = Literal( "*" )
div = Literal( "/" )
lpar = Literal( "(" ).suppress()
rpar = Literal( ")" ).suppress()
addop = plus | minus
multop = mult | div
expop = Literal( "^" )
pi = CaselessLiteral( "PI" )
expr = Forward()
atom = (Optional("-") + ( pi | e | fnumber | ident + lpar + expr + rpar ).setParseAction( pushFirst ) | ( lpar + expr.suppress() + rpar )).setParseAction(pushUMinus)
# by defining exponentiation as "atom [ ^ factor ]..." instead of "atom [ ^ atom ]...", we get right-to-left exponents, instead of left-to-righ
# that is, 2^3^2 = 2^(3^2), not (2^3)^2.
factor = Forward()
factor << atom + ZeroOrMore( ( expop + factor ).setParseAction( pushFirst ) )
term = factor + ZeroOrMore( ( multop + factor ).setParseAction( pushFirst ) )
expr << term + ZeroOrMore( ( addop + term ).setParseAction( pushFirst ) )
terms = expr
return terms
def __init__(self):
real_word_dashes = Word(pyparsing.alphas + '-')
punctuation = Word('.!?:,;-')
punctuation_no_dash = Word('.!?:,;')
punctuation_reference_letter = Word('.:,;-')
printable = Word(pyparsing.printables, exact=1)
letter = Word(pyparsing.alphas, exact=1)
letter_reference = punctuation_reference_letter + letter
nums = Word(pyparsing.nums) + Optional(letter) + \
ZeroOrMore(letter_reference)
word_end = pyparsing.ZeroOrMore(Word(')') | Word('}') | Word(']')) + \
WordEnd()
self.single_number = (
WordStart() +
real_word_dashes +
nums +
word_end
)
self.single_number_parens = (
printable +
letter +
Optional(punctuation_no_dash) +
pyparsing.OneOrMore(
Word('([{', exact=1) +
pyparsing.OneOrMore(nums | Word('-')) +
Word(')]}', exact=1)
) +
word_end
)
self.number_then_punctuation = (
printable +
letter +
nums +
punctuation +
pyparsing.ZeroOrMore(nums | punctuation) +
word_end
)
self.punctuation_then_number = (
printable +
letter +
punctuation_no_dash +
nums +
pyparsing.ZeroOrMore(punctuation | nums) +
word_end
)
interpreter.py 文件源码
项目:Python_Master-the-Art-of-Design-Patterns
作者: PacktPublishing
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress("->")
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
gate = Gate()
garage = Garage()
airco = Aircondition()
heating = Heating()
boiler = Boiler()
fridge = Fridge()
tests = ('open -> gate',
'close -> garage',
'turn on -> aircondition',
'turn off -> heating',
'increase -> boiler temperature -> 20 degrees',
'decrease -> fridge temperature -> 6 degree')
open_actions = {'gate':gate.open, 'garage':garage.open, 'aircondition':airco.turn_on,
'heating':heating.turn_on, 'boiler temperature':boiler.increase_temperature,
'fridge temperature':fridge.increase_temperature}
close_actions = {'gate':gate.close, 'garage':garage.close, 'aircondition':airco.turn_off,
'heating':heating.turn_off, 'boiler temperature':boiler.decrease_temperature,
'fridge temperature':fridge.decrease_temperature}
for t in tests:
if len(event.parseString(t)) == 2: # no argument
cmd, dev = event.parseString(t)
cmd_str, dev_str = ' '.join(cmd), ' '.join(dev)
if 'open' in cmd_str or 'turn on' in cmd_str:
open_actions[dev_str]()
elif 'close' in cmd_str or 'turn off' in cmd_str:
close_actions[dev_str]()
elif len(event.parseString(t)) == 3: # argument
cmd, dev, arg = event.parseString(t)
cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg)
num_arg = 0
try:
num_arg = int(arg_str.split()[0]) # extract the numeric part
except ValueError as err:
print("expected number but got: '{}'".format(arg_str[0]))
if 'increase' in cmd_str and num_arg > 0:
open_actions[dev_str](num_arg)
elif 'decrease' in cmd_str and num_arg > 0:
close_actions[dev_str](num_arg)
def MetricSchema(definition):
creator = pecan.request.auth_helper.get_current_user(
pecan.request)
# First basic validation
schema = voluptuous.Schema({
"archive_policy_name": six.text_type,
"resource_id": functools.partial(ResourceID, creator=creator),
"name": six.text_type,
voluptuous.Optional("unit"):
voluptuous.All(six.text_type, voluptuous.Length(max=31)),
})
definition = schema(definition)
archive_policy_name = definition.get('archive_policy_name')
name = definition.get('name')
if name and '/' in name:
abort(400, "'/' is not supported in metric name")
if archive_policy_name is None:
try:
ap = pecan.request.indexer.get_archive_policy_for_metric(name)
except indexer.NoArchivePolicyRuleMatch:
# NOTE(jd) Since this is a schema-like function, we
# should/could raise ValueError, but if we do so, voluptuous
# just returns a "invalid value" with no useful message – so we
# prefer to use abort() to make sure the user has the right
# error message
abort(400, "No archive policy name specified "
"and no archive policy rule found matching "
"the metric name %s" % name)
else:
definition['archive_policy_name'] = ap.name
resource_id = definition.get('resource_id')
if resource_id is None:
original_resource_id = None
else:
if name is None:
abort(400,
{"cause": "Attribute value error",
"detail": "name",
"reason": "Name cannot be null "
"if resource_id is not null"})
original_resource_id, resource_id = resource_id
enforce("create metric", {
"creator": creator,
"archive_policy_name": archive_policy_name,
"resource_id": resource_id,
"original_resource_id": original_resource_id,
"name": name,
"unit": definition.get('unit'),
})
return definition
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress("->")
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
gate = Gate()
garage = Garage()
airco = Aircondition()
heating = Heating()
boiler = Boiler()
fridge = Fridge()
tests = ('open -> gate',
'close -> garage',
'turn on -> aircondition',
'turn off -> heating',
'increase -> boiler temperature -> 5 degrees',
'decrease -> fridge temperature -> 2 degrees')
open_actions = {'gate': gate.open,
'garage': garage.open,
'aircondition': airco.turn_on,
'heating': heating.turn_on,
'boiler temperature': boiler.increase_temperature,
'fridge temperature': fridge.increase_temperature}
close_actions = {'gate': gate.close,
'garage': garage.close,
'aircondition': airco.turn_off,
'heating': heating.turn_off,
'boiler temperature': boiler.decrease_temperature,
'fridge temperature': fridge.decrease_temperature}
for t in tests:
if len(event.parseString(t)) == 2: # ????
cmd, dev = event.parseString(t)
cmd_str, dev_str = ' '.join(cmd), ' '.join(dev)
if 'open' in cmd_str or 'turn on' in cmd_str:
open_actions[dev_str]()
elif 'close' in cmd_str or 'turn off' in cmd_str:
close_actions[dev_str]()
elif len(event.parseString(t)) == 3: # ???
cmd, dev, arg = event.parseString(t)
cmd_str, dev_str, arg_str = ' '.join(cmd), ' '.join(dev), ' '.join(arg)
num_arg = 0
try:
num_arg = int(arg_str.split()[0]) # ??????
except ValueError as err:
print("expected number but got: '{}'".format(arg_str[0]))
if 'increase' in cmd_str and num_arg > 0:
open_actions[dev_str](num_arg)
elif 'decrease' in cmd_str and num_arg > 0:
close_actions[dev_str](num_arg)
def _type_instance(self):
"""A type declaration.
The modifiers of a typedef:
struct s *P[];
^^^^<- The type instance.
"""
type_instance = (
# Function pointer (*f)(int foobar)
pyparsing.ZeroOrMore(_STAR)
+ _OPEN_PARENTHESIS
+ pyparsing.Optional(_STAR("function_pointer"))
+ self._identifier()("type_instance_name")
+ _CLOSE_PARENTHESIS
+ parsers.anything_in_parentheses()("function_args")
) | (
# Function object f(foo bar *)
pyparsing.ZeroOrMore(_STAR)
+ self._identifier()("type_instance_name")
+ parsers.anything_in_parentheses()("function_args")
) | (
# Simple form: *foo[10];
pyparsing.ZeroOrMore(_STAR)("type_pointer")
+ self._identifier()("type_instance_name")
# Possibly array: [] , [][]
+ pyparsing.ZeroOrMore(
_OPEN_BRACKET
+ pyparsing.SkipTo(_CLOSE_BRACKET)(
"brackets_with_expression_inside*")
+ _CLOSE_BRACKET)
# Bitfields: int x: 7;
+ pyparsing.Optional(
_COLON
+ pyparsing.SkipTo(
_SEMICOLON | _COMMA)("bitfield")
)
)
return pyparsing.Group(
type_instance
+ self._maybe_attributes()
)
def _create_primitives():
global binary, ident, rvalue, number, quoted_string, semi, time_interval, slot_id, comp, config_type, stream, comment, stream_trigger, selector
if ident is not None:
return
semi = Literal(u';').suppress()
ident = Word(alphas+u"_", alphas + nums + u"_")
number = Regex(u'((0x[a-fA-F0-9]+)|[+-]?[0-9]+)').setParseAction(lambda s, l, t: [int(t[0], 0)])
binary = Regex(u'hex:([a-fA-F0-9][a-fA-F0-9])+').setParseAction(lambda s, l, t: [unhexlify(t[0][4:])])
quoted_string = dblQuotedString
comment = Literal('#') + restOfLine
rvalue = number | quoted_string
# Convert all time intervals into an integer number of seconds
time_unit_multipliers = {
u'second': 1,
u'seconds': 1,
u'minute': 60,
u'minutes': 60,
u'hour': 60*60,
u'hours': 60*60,
u'day': 60*60*24,
u'days': 60*60*24,
u'month': 60*60*24*30,
u'months': 60*60*24*30,
u'year': 60*60*24*365,
u'years': 60*60*24*365,
}
config_type = oneOf('uint8_t uint16_t uint32_t int8_t int16_t int32_t uint8_t[] uint16_t[] uint32_t[] int8_t[] int16_t[] int32_t[] string binary')
comp = oneOf('> < >= <= == ~=')
time_unit = oneOf(u"second seconds minute minutes hour hours day days week weeks month months year years")
time_interval = (number + time_unit).setParseAction(lambda s, l, t: [t[0]*time_unit_multipliers[t[1]]])
slot_id = Literal(u"controller") | (Literal(u'slot') + number)
slot_id.setParseAction(lambda s,l,t: [SlotIdentifier.FromString(u' '.join([str(x) for x in t]))])
stream_modifier = Literal("system") | Literal("user") | Literal("combined")
stream = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
stream.setParseAction(lambda s,l,t: [DataStream.FromString(u' '.join([str(x) for x in t]))])
all_selector = Optional(Literal("all")) + Optional(stream_modifier) + oneOf("buffered unbuffered inputs outputs counters constants") + Optional(Literal("nodes"))
all_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
one_selector = Optional(Literal("system")) + oneOf("buffered unbuffered input output counter constant") + number + Optional(Literal("node"))
one_selector.setParseAction(lambda s,l,t: [DataStreamSelector.FromString(u' '.join([str(x) for x in t]))])
selector = one_selector | all_selector
trigger_comp = oneOf('> < >= <= ==')
stream_trigger = Group((Literal(u'count') | Literal(u'value')) + Literal(u'(').suppress() - stream - Literal(u')').suppress() - trigger_comp - number).setResultsName('stream_trigger')
def grammar():
"""Define the query grammar.
Backus-Naur form (BNF) of the grammar::
<grammar> ::= <item> | <item> <and_or> <grammar>
<item> ::= [<neg>] <query-token> | [<neg>] "(" <grammar> ")"
<query-token> ::= <token> | <hosts>
<token> ::= <category>:<key> [<operator> <value>]
Given that the pyparsing library defines the grammar in a BNF-like style, for the details of the tokens not
specified above check directly the source code.
Returns:
pyparsing.ParserElement: the grammar parser.
"""
# Boolean operators
and_or = (pp.CaselessKeyword('and') | pp.CaselessKeyword('or'))('bool')
# 'neg' is used as label to allow the use of dot notation, 'not' is a reserved word in Python
neg = pp.CaselessKeyword('not')('neg')
operator = pp.oneOf(OPERATORS, caseless=True)('operator') # Comparison operators
quoted_string = pp.quotedString.copy().addParseAction(pp.removeQuotes) # Both single and double quotes are allowed
# Parentheses
lpar = pp.Literal('(')('open_subgroup')
rpar = pp.Literal(')')('close_subgroup')
# Hosts selection: glob (*) and clustershell (,!&^[]) syntaxes are allowed:
# i.e. host10[10-42].*.domain
hosts = quoted_string | (~(and_or | neg) + pp.Word(pp.alphanums + '-_.*,!&^[]'))
# Key-value token for allowed categories using the available comparison operators
# i.e. F:key = value
category = pp.oneOf(CATEGORIES, caseless=True)('category')
key = pp.Word(pp.alphanums + '-_.%@:')('key')
selector = pp.Combine(category + ':' + key) # i.e. F:key
# All printables characters except the parentheses that are part of this or the global grammar
all_but_par = ''.join([c for c in pp.printables if c not in ('(', ')', '{', '}')])
value = (quoted_string | pp.Word(all_but_par))('value')
token = selector + pp.Optional(operator + value)
# Final grammar, see the docstring for its BNF based on the tokens defined above
# Groups are used to split the parsed results for an easy access
full_grammar = pp.Forward()
item = pp.Group(pp.Optional(neg) + (token | hosts('hosts'))) | pp.Group(
pp.Optional(neg) + lpar + full_grammar + rpar)
full_grammar << item + pp.ZeroOrMore(pp.Group(and_or) + full_grammar) # pylint: disable=expression-not-assigned
return full_grammar
def parse_table(attribute, string):
Line = OneOrMore(Float)('data') + Literal(';') + Optional(Comments, default='')('name')
Grammar = Suppress(Keyword('mpc.{}'.format(attribute)) + Keyword('=') + Keyword('[') + Optional(Comments)) + OneOrMore(Group(Line)) + Suppress(Keyword(']') + Optional(Comments))
result, i, j = Grammar.scanString(string).next()
_list = list()
for r in result:
_list.append([int_else_float_except_string(s) for s in r['data'].asList()])
return _list
def __init__(self, comm_file_path):
expression_spaced = Forward()
expression = Forward()
args_spaced = Forward()
cb = Optional(',') + ')' # closing_brackets might include a ','
ob = Optional(' ') + '(' + Optional(' ') # closing_brackets might include a ' '
value = (Or([pyparsing_common.identifier.copy().setResultsName('id'),
pyparsing_common.number.copy().setResultsName('number'),
QuotedString("'").setResultsName('string')])).setParseAction(Value).setResultsName('value')
values = (ZeroOrMore(value.setResultsName('valueList', listAllMatches=True) + Optional(','))).setParseAction(
Values)
keyword = pyparsing_common.identifier.copy()
keyword_argument = (
keyword.setResultsName('keyword') + '=' + expression_spaced.setResultsName('expression')
).setParseAction(Keyword_argument)
keyword_arguments = (
keyword_argument.setResultsName('keyword_argument', listAllMatches=True) +
ZeroOrMore(',' + keyword_argument.setResultsName('keyword_argument', listAllMatches=True))
).setParseAction(Keyword_arguments)
expression << (Or([
value, (ob + values.setResultsName('values') + cb),
'_F' + ob + keyword_arguments.setResultsName('keyword_arguments') + cb,
ob + expression.setResultsName('expression') + cb
])).setParseAction(Expression)
expression_spaced << (Or([expression, ob + expression_spaced + cb]))
left_side = pyparsing_common.identifier.setResultsName('left_side')
operator_name = pyparsing_common.identifier.setResultsName('operator_name')
paragraph = (Optional(left_side + "=") + operator_name + ob + Optional(keyword_arguments
.setResultsName(
'keyword_arguments')) + cb + Optional(';')).setParseAction(Paragraph)
file = OneOrMore(paragraph).setResultsName('paragraphs').setParseAction(File)
self.beam_data_model = file.parseFile(comm_file_path)
def __init__(self, obj, config=str()):
def process(config):
pathexpr = p.Literal("'").suppress() + \
p.Optional(
p.Combine(
p.OneOrMore(p.Literal("/") + p.Word(p.alphanums)) + p.Literal("/").suppress()
)
).setResultsName('path') + \
p.Combine(
(p.Literal('@').suppress() | p.Literal('!').suppress()) +
p.Word(p.alphanums) +
p.Literal("'").suppress()
).setResultsName('attrib')
expr = p.Group(pathexpr).setResultsName('search')
match = expr.parseString(config)
_ret = []
if 'search' in match:
if 'path' in match['search']:
_ret.append(match['search']['path'])
if 'attrib' in match['search']:
_ret.append(match['search']['attrib'])
return _ret
super(Xattrib, self).__init__(obj, config=config, defer=True)
if self.config is None or len(self.config) < 1 or not isinstance(self.config, str):
raise ValueError('Xattrib plugin function requires a config string')
try:
_result = process("'%s'" % self.config)
if len(_result) == 2:
self.targetobject, self.targetattribute = _result
elif len(_result) == 1:
_config = getattr(obj, _result[0], None)
if _config is None:
raise ValueError('Xattrib plugin received an attribute name that does not exist')
# TODO len check only required for attributes, but not method plugins
if len(_config) > 1:
raise ValueError('Xattrib plugin received a attribute name that contains multiple values')
self.targetobject, self.targetattribute = process("'%s'" % _config[0])
else:
raise Exception()
except:
raise ValueError('An error occured when processing the search string for the Xattrib plugin function')