def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
python类MULTILINE的实例源码
def papers_from_embedded_script( url, session=None ):
"""
Extract papers data from script embedded in ASP site's HTML.
Note: looks for "var papers = " in a <script> </script> body.
"""
if session: resp = session.get(url)
else: resp = requests.get(url)
soup = BeautifulSoup(resp.text, 'html.parser')
scripts = soup.find_all('script')
#p = re.compile('var papers = (\[.*?\];)', re.MULTILINE)
pattern = re.compile('var papers = \[[.\s\S]*?\];')
str_scripts = [str(script.string) for script in scripts if script.string]
papers_js = [script for script in str_scripts if 'var papers = ' in script][0]
papers = find_and_parse_papers_json(papers_js)
return papers
def check_qsub_job_status(job_id, desired_status = "r"):
'''
Use 'qstat' to check on the run status of a qsub job
returns True or False if the job status matches the desired_status
job running:
desired_status = "r"
job waiting:
desired_status = "qw"
'''
import re
from sh import qstat
job_id_pattern = r"^.*{0}.*\s{1}\s.*$".format(job_id, desired_status)
# using the 'sh' package
qstat_stdout = qstat()
# using the standard subprocess package
# qstat_stdout = subprocess_cmd('qstat', return_stdout = True)
job_match = re.findall(str(job_id_pattern), str(qstat_stdout), re.MULTILINE)
job_status = bool(job_match)
if job_status == True:
status = True
return(job_status)
elif job_status == False:
return(job_status)
def _make_boundary(text=None):
# Craft a random boundary. If text is given, ensure that the chosen
# boundary doesn't appear in the text.
token = random.randrange(sys.maxint)
boundary = ('=' * 15) + (_fmt % token) + '=='
if text is None:
return boundary
b = boundary
counter = 0
while True:
cre = re.compile('^--' + re.escape(b) + '(--)?$', re.MULTILINE)
if not cre.search(text):
break
b = boundary + '.' + str(counter)
counter += 1
return b
def register_options(self):
# type: () -> None
"""Parse options from text like this:
Preferences:
[+|-]alignArguments Enable/disable ...
...
[+|-]spacesWithinPatternBinders Enable/disable ...
-alignSingleLineCaseStatements.maxArrowIndent=[1-100] Set Maximum number ...
-indentSpaces=[1-10] Set Number of spaces ...
"""
exeresult = run_executable(self.exe, ['--help'], cache=self.cache)
options = []
text = unistr(exeresult.stdout)
for m in re.finditer(r'^ (\[\+\|-\]|-)([a-z][a-zA-Z.]+)(?:=\[(\d+)-(\d+)\])?', text,
re.MULTILINE):
optionprefix, optionname, start, end = m.groups()
if start is None:
optiontype = 'bool'
configs = [True, False] # type: List[OptionValue]
else:
optiontype = 'int'
configs = list(inclusiverange(int(start), int(end)))
options.append(option_make(optionname, optiontype, configs))
self.styledefinition = styledef_make(options)
def pdf_as_matrix(buff, border):
"""\
Reads the path in the PDF and returns it as list of 0, 1 lists.
:param io.BytesIO buff: Buffer to read the matrix from.
"""
pdf = buff.getvalue()
h, w = re.search(br'/MediaBox \[0 0 ([0-9]+) ([0-9]+)\]', pdf,
flags=re.MULTILINE).groups()
if h != w:
raise ValueError('Expected equal height/width, got height="{}" width="{}"'.format(h, w))
size = int(w) - 2 * border
graphic = _find_graphic(buff)
res = [[0] * size for i in range(size)]
for x1, y1, x2, y2 in re.findall(r'\s*(\-?\d+)\s+(\-?\d+)\s+m\s+'
r'(\-?\d+)\s+(\-?\d+)\s+l', graphic):
x1, y1, x2, y2 = [int(i) for i in (x1, y1, x2, y2)]
y = abs(y1)
res[y][x1:x2] = [1] * (x2 - x1)
return res
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
result = client.request(url)
# cant user dom parser here because HTML is bugged div is not closed
result = re.findall ('<ul class="episodios">(.*?)</ul>', result, re.MULTILINE | re.DOTALL)
for item in result:
season_episodes = re.findall ('<li>(.*?)</li>', item, re.MULTILINE | re.DOTALL)
for row in season_episodes:
s = client.parseDOM(row, 'div', attrs={'class': 'numerando'})[0].split('x')
season_found = s[0].strip()
episode_found = s[1].strip()
if(season_found != season):
break
if episode_found == episode :
return client.parseDOM(row, 'a', ret='href')[0]
except:
return
def remove_cpp_comment(code):
def blotOutNonNewlines(strIn): # Return a string containing only the newline chars contained in strIn
return "" + ("\n" * strIn.count('\n'))
def replacer(match):
s = match.group(0)
if s.startswith('/'): # Matched string is //...EOL or /*...*/ ==> Blot out all non-newline chars
return blotOutNonNewlines(s)
else: # Matched string is '...' or "..." ==> Keep unchanged
return s
pattern = re.compile(
r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
re.DOTALL | re.MULTILINE
)
return re.sub(pattern, replacer, code)
#remove non ASCII chars
def load_file(conn, csvfile):
print(csvfile)
with open(csvfile, "r") as handle:
data = handle.read()
names = []
for match in AXFR_RE.finditer(data, re.MULTILINE):
names.append((match.group(1).strip('.'), match.group(3)))
suffix = os.path.commonprefix([X[0][::-1] for X in names])[::-1]
names = filter(lambda X: X[0],
[(X[0].replace(suffix, '').strip('.').lower(), X[1])
for X in names])
lookup_names = []
for name, rectype in set(names):
if not name or name == '*': # Ignore single wildcard or empty
continue
if name[:2] == '*.': # Strip wildcard off beginning
name = name[2:]
lookup_names.append((name, rectype))
update_vfy(conn, lookup_names)
def load_file(csvfile):
with open(csvfile, "r") as handle:
data = handle.read()
names = []
for match in AXFR_RE.finditer(data, re.MULTILINE):
names.append((match.group(1).strip('.'), match.group(3)))
suffix = os.path.commonprefix([X[0][::-1] for X in names])[::-1]
names = filter(lambda X: X[0],
[(X[0].replace(suffix, '').strip('.').lower(), X[1])
for X in names])
for name, rectype in set(names):
if not name or name == '*': # Ignore single wildcard or empty
continue
if name[:2] == '*.': # Strip wildcard off beginning
name = name[2:]
subnames = name.split('.')
for subname in iter_names(subnames):
yield subname, rectype
def setup(self, config):
"""
Load name model (word list) and compile regexes for stop characters.
:param config: Configuration object.
:type config: ``dict``
"""
reference_model = os.path.join(
config[helper.CODE_ROOT], config[helper.NAME_MODEL])
self.stopper = regex.compile(('(%s)' % '|'.join([
'and', 'or', 'og', 'eller', r'\?', '&', '<', '>', '@', ':', ';', '/',
r'\(', r'\)', 'i', 'of', 'from', 'to', r'\n', '!'])),
regex.I | regex.MULTILINE)
self.semistop = regex.compile(
('(%s)' % '|'.join([','])), regex.I | regex.MULTILINE)
self.size_probability = [0.000, 0.000, 0.435, 0.489, 0.472, 0.004, 0.000]
self.threshold = 0.25
self.candidates = defaultdict(int)
with gzip.open(reference_model, 'rb') as inp:
self.model = json.loads(inp.read().decode('utf-8'))
self.tokenizer = regex.compile(r'\w{2,20}')
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def __init__(self, pattern, flags=0):
"""BSON regular expression data.
This class is useful to store and retrieve regular expressions that are
incompatible with Python's regular expression dialect.
:Parameters:
- `pattern`: string
- `flags`: (optional) an integer bitmask, or a string of flag
characters like "im" for IGNORECASE and MULTILINE
"""
if not isinstance(pattern, (text_type, bytes)):
raise TypeError("pattern must be a string, not %s" % type(pattern))
self.pattern = pattern
if isinstance(flags, string_type):
self.flags = str_flags_to_int(flags)
elif isinstance(flags, int):
self.flags = flags
else:
raise TypeError(
"flags must be a string or int, not %s" % type(flags))
def _encode_regex(name, value, dummy0, dummy1):
"""Encode a python regex or bson.regex.Regex."""
flags = value.flags
# Python 2 common case
if flags == 0:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"\x00"
# Python 3 common case
elif flags == re.UNICODE:
return b"\x0B" + name + _make_c_string_check(value.pattern) + b"u\x00"
else:
sflags = b""
if flags & re.IGNORECASE:
sflags += b"i"
if flags & re.LOCALE:
sflags += b"l"
if flags & re.MULTILINE:
sflags += b"m"
if flags & re.DOTALL:
sflags += b"s"
if flags & re.UNICODE:
sflags += b"u"
if flags & re.VERBOSE:
sflags += b"x"
sflags += b"\x00"
return b"\x0B" + name + _make_c_string_check(value.pattern) + sflags
def run(cls):
"""Check variables."""
project = __import__(IMPORT, fromlist=[''])
for expected, var in [('@Robpol86', '__author__'), (LICENSE, '__license__'), (VERSION, '__version__')]:
if getattr(project, var) != expected:
raise SystemExit('Mismatch: {0}'.format(var))
# Check changelog.
if not re.compile(r'^%s - \d{4}-\d{2}-\d{2}[\r\n]' % VERSION, re.MULTILINE).search(readme()):
raise SystemExit('Version not found in readme/changelog file.')
# Check tox.
if INSTALL_REQUIRES:
contents = readme('tox.ini')
section = re.compile(r'[\r\n]+install_requires =[\r\n]+(.+?)[\r\n]+\w', re.DOTALL).findall(contents)
if not section:
raise SystemExit('Missing install_requires section in tox.ini.')
in_tox = re.findall(r' ([^=]+)==[\w\d.-]+', section[0])
if INSTALL_REQUIRES != in_tox:
raise SystemExit('Missing/unordered pinned dependencies in tox.ini.')
def _make_boundary(text=None):
# Craft a random boundary. If text is given, ensure that the chosen
# boundary doesn't appear in the text.
token = random.randrange(sys.maxint)
boundary = ('=' * 15) + (_fmt % token) + '=='
if text is None:
return boundary
b = boundary
counter = 0
while True:
cre = re.compile('^--' + re.escape(b) + '(--)?$', re.MULTILINE)
if not cre.search(text):
break
b = boundary + '.' + str(counter)
counter += 1
return b
def getCsfrtoken(self):
fetch = self.request('si/fetch_headers/', None, True)
header = fetch[0]
response = ChallengeResponse(fetch[1])
if not header or not response.isOk():
raise InstagramException("Couldn't get challenge, check your connection")
# return response #fixme unreachable code
match = re.search(r'^Set-Cookie: csrftoken=([^;]+)', fetch[0], re.MULTILINE)
if not match:
raise InstagramException("Missing csfrtoken")
# return $response #fixme unreachable code
token = match.group(1)
return token[22:]
def get_threads_by_tag(self, filename):
try:
data = load_file(join('repos', self.name, 'master',
'source', filename + '.rst'))
except:
return []
label_list = re.findall(r'^\.\. _([0-9a-z\-]+):\s$', data,
re.MULTILINE)
File_Tag = application.threads.File_Tag
Thread = application.threads.Thread
threads_by_tag = (db.session.query(File_Tag.filename, Thread.title)
.filter(File_Tag.thread_id==Thread.id)
.filter(File_Tag.filename.in_(label_list)).all())
return [{'name': l,
'titles': [x[1] for x in threads_by_tag if x[0]==l]}
for l in label_list]
def mounted_at(dev='', loopback=''):
df = subprocess.check_output(['df'])
if dev:
fn = dev[dev.rfind('/')+1:]
dev_or_loop = dev
m = re.search('^' + dev + r'\s.*\s(\S+)$', df, flags=re.MULTILINE)
elif loopback:
dev_or_loop = loopback
fn = loopback[loopback.rfind('/')+1:]
m = re.search(r'\s(/lib/live/\S*' + fn + ')$', df, flags=re.MULTILINE)
else:
sys.exit('mounted_at() needs at least one arg')
if (m):
return m.group(1)
else:
target_mp = '/tmp/mbootuz-' + str(os.getpid()) + '-' + fn
subprocess.call(['mkdir', target_mp])
try:
subprocess.check_output(['mount', dev_or_loop, target_mp])
except subprocess.CalledProcessError as e:
subprocess.call(['rmdir', target_mp])
sys.exit('mount failure [' + e.output +
'], mbootuz aborted')
atexit.register(cleanup, target_mp)
return target_mp
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def __init__(self, pattern, flags=0):
"""BSON regular expression data.
This class is useful to store and retrieve regular expressions that are
incompatible with Python's regular expression dialect.
:Parameters:
- `pattern`: string
- `flags`: (optional) an integer bitmask, or a string of flag
characters like "im" for IGNORECASE and MULTILINE
"""
if not isinstance(pattern, string_types):
raise TypeError("pattern must be a string, not %s" % type(pattern))
self.pattern = pattern
if isinstance(flags, string_types):
self.flags = str_flags_to_int(flags)
elif isinstance(flags, int):
self.flags = flags
else:
raise TypeError(
"flags must be a string or int, not %s" % type(flags))
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def __init__(self, pattern, flags=0):
"""BSON regular expression data.
This class is useful to store and retrieve regular expressions that are
incompatible with Python's regular expression dialect.
:Parameters:
- `pattern`: string
- `flags`: (optional) an integer bitmask, or a string of flag
characters like "im" for IGNORECASE and MULTILINE
"""
if not isinstance(pattern, string_types):
raise TypeError("pattern must be a string, not %s" % type(pattern))
self.pattern = pattern
if isinstance(flags, string_types):
self.flags = str_flags_to_int(flags)
elif isinstance(flags, int):
self.flags = flags
else:
raise TypeError(
"flags must be a string or int, not %s" % type(flags))
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def __init__(self, pattern, flags=0):
"""BSON regular expression data.
This class is useful to store and retrieve regular expressions that are
incompatible with Python's regular expression dialect.
:Parameters:
- `pattern`: string
- `flags`: (optional) an integer bitmask, or a string of flag
characters like "im" for IGNORECASE and MULTILINE
"""
if not isinstance(pattern, string_types):
raise TypeError("pattern must be a string, not %s" % type(pattern))
self.pattern = pattern
if isinstance(flags, string_types):
self.flags = str_flags_to_int(flags)
elif isinstance(flags, int):
self.flags = flags
else:
raise TypeError(
"flags must be a string or int, not %s" % type(flags))
def str_flags_to_int(str_flags):
flags = 0
if "i" in str_flags:
flags |= re.IGNORECASE
if "l" in str_flags:
flags |= re.LOCALE
if "m" in str_flags:
flags |= re.MULTILINE
if "s" in str_flags:
flags |= re.DOTALL
if "u" in str_flags:
flags |= re.UNICODE
if "x" in str_flags:
flags |= re.VERBOSE
return flags
def __init__(self, pattern, flags=0):
"""BSON regular expression data.
This class is useful to store and retrieve regular expressions that are
incompatible with Python's regular expression dialect.
:Parameters:
- `pattern`: string
- `flags`: (optional) an integer bitmask, or a string of flag
characters like "im" for IGNORECASE and MULTILINE
"""
if not isinstance(pattern, string_types):
raise TypeError("pattern must be a string, not %s" % type(pattern))
self.pattern = pattern
if isinstance(flags, string_types):
self.flags = str_flags_to_int(flags)
elif isinstance(flags, int):
self.flags = flags
else:
raise TypeError(
"flags must be a string or int, not %s" % type(flags))
def get_year(self):
try:
yre = '(dei:DocumentFiscalYearFocus$)'
year = self.ins_sp.find(name=re.compile(yre, re.IGNORECASE | re.MULTILINE)).get_text()
except AttributeError:
try:
yre = '(dei:DocumentPeriodEndDate$)'
year = self.ins_sp.find(name=re.compile(yre, re.IGNORECASE | re.MULTILINE)).get_text()
year = year[:4]
except AttributeError:
return False
try:
year = int(year)
sure_years = [2001, 2002, 2003, 2004, 2005,
2006, 2007, 2008, 2009, 2011,
2012, 2013, 2014, 2016]
if year in sure_years:
self.xbrl_year = str(year)
if year == 2010:
self.xbrl_year = '2009'
if year == 2015:
self.xbrl_year = '2014'
return True
except:
return False
def iter_comment_bodies(start_month, end_month, remove_links = True, base_input_path='../partial/worldnews_comments_'):
"""Read and return comments from files
Args:
start_month (int): start month from which the comments are read
end_month (int): end month from which the comments are read
remove_links (bool): if true, comments are returned without links (default: {True})
base_input_path (str): base path of the files (default: {'../partial/worldnews_comments_'})
Yields:
str: a comment body
"""
for i in range(start_month, end_month):
input_file = open(base_input_path + str(i) + '_2016.json', 'r')
for line in input_file:
comment = json.loads(line)
if remove_links:
yield re.sub(r"http\S+", '', comment['body'].encode('utf-8', errors='ignore').decode("utf8", errors='ignore'), flags=re.MULTILINE)
else: yield comment['body']
def analyse_text(text):
# Any limbo module implements something
if re.search(r'^implement \w+;', text, re.MULTILINE):
return 0.7
# TODO:
# - Make lexers for:
# - asm sources
# - man pages
# - mkfiles
# - module definitions
# - namespace definitions
# - shell scripts
# - maybe keyfiles and fonts
# they all seem to be quite similar to their equivalents
# from unix world, so there should not be a lot of problems