def get_client_ip(request):
access_route = get_access_route(request)
if len(access_route) == 1:
return access_route[0]
expression = """
(^(?!(?:[0-9]{1,3}\.){3}[0-9]{1,3}$).*$)| # will match non valid ipV4
(^127\.0\.0\.1)| # will match 127.0.0.1
(^10\.)| # will match 10.0.0.0 - 10.255.255.255 IP-s
(^172\.1[6-9]\.)| # will match 172.16.0.0 - 172.19.255.255 IP-s
(^172\.2[0-9]\.)| # will match 172.20.0.0 - 172.29.255.255 IP-s
(^172\.3[0-1]\.)| # will match 172.30.0.0 - 172.31.255.255 IP-s
(^192\.168\.) # will match 192.168.0.0 - 192.168.255.255 IP-s
"""
regex = re.compile(expression, re.X)
for ip in access_route:
if not ip:
# it's possible that the first value from X_FORWARDED_FOR
# will be null, so we need to pass that value
continue
if regex.search(ip):
continue
else:
return ip
python类compile()的实例源码
def _readRegExp(regExp):
"""
Discard leading white space characters from standard input. Then read
from standard input and return a string matching regular expression
regExp. Raise an EOFError if no non-whitespace characters remain
in standard input. Raise a ValueError if the next characters to
be read from standard input do not match 'regExp'.
"""
global _buffer
if isEmpty():
raise EOFError()
compiledRegExp = re.compile(r'^\s*' + regExp)
match = compiledRegExp.search(_buffer)
if match is None:
raise ValueError()
s = match.group()
_buffer = _buffer[match.end():]
return s.lstrip()
#-----------------------------------------------------------------------
def run(self):
Analyzer.run(self)
if self.data_type == 'domain' or self.data_type == 'url':
try:
pattern = re.compile("(?:Category: )([\w\s]+)")
baseurl = 'http://www.fortiguard.com/webfilter?q='
url = baseurl + self.getData()
req = requests.get(url)
category_match = re.search(pattern, req.content, flags=0)
self.report({
'category': category_match.group(1)
})
except ValueError as e:
self.unexpectedError(e)
else:
self.notSupported()
def findHeadings(self, lines, struc):
linenum=len(lines)
level=struc[-1]
_h_re=re.compile(self.textparser._h_re_base % level, re.X | re.M)
hidx=[]
for ii in xrange(linenum):
if _h_re.match(lines[ii]):
hidx.append(ii)
hidx.append(linenum)
groups=[[hidx[ii],hidx[ii+1]] for ii in xrange(len(hidx)-1)]
result=[]
for ii in groups:
#--------Use heading line as container name--------
result.append(TextContainer(lines[ii[0]],struc,ii))
return result
def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
def __init__(
self, index_url="https://pypi.python.org/simple", hosts=('*',),
ca_bundle=None, verify_ssl=True, *args, **kw
):
Environment.__init__(self, *args, **kw)
self.index_url = index_url + "/" [:not index_url.endswith('/')]
self.scanned_urls = {}
self.fetched_urls = {}
self.package_pages = {}
self.allows = re.compile('|'.join(map(translate, hosts))).match
self.to_scan = []
use_ssl = (
verify_ssl
and ssl_support.is_available
and (ca_bundle or ssl_support.find_ca_bundle())
)
if use_ssl:
self.opener = ssl_support.opener_for(ca_bundle)
else:
self.opener = urllib.request.urlopen
audio_converter.py 文件源码
项目:subtitle-synchronization
作者: AlbertoSabater
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def getAudio(freq, audio_files=None):
files = os.listdir(DATA_DIR)
p = re.compile('.*\.[mkv|avi]')
files = [ f for f in files if p.match(f) ]
if audio_files:
files = [ f for f in files if os.path.splitext(f)[0] in audio_files]
audio_dirs = []
for f in files:
name, extension = os.path.splitext(f)
command = "ffmpeg -i {0}{1}{2} -ab 160k -ac 2 -ar {3} -vn {0}{1}_{3}.wav".format(DATA_DIR, name, extension, freq)
audio_dirs.append(DATA_DIR + name + '_' + str(freq) + '.wav')
subprocess.call(command, shell=True)
return audio_dirs
# Convert timestamp to seconds
def getRegEx(pattern):
"""Compiles and returns a 'regular expression' object for the given address-pattern.
"""
# Translate OSC-address syntax to python 're' syntax
pattern = pattern.replace(".", r"\.") # first, escape all '.'s in the pattern.
pattern = pattern.replace("(", r"\(") # escape all '('s.
pattern = pattern.replace(")", r"\)") # escape all ')'s.
pattern = pattern.replace("*", r".*") # replace a '*' by '.*' (match 0 or more characters)
pattern = pattern.translate(OSCtrans) # change '?' to '.' and '{,}' to '(|)'
return re.compile(pattern)
######
#
# OSCMultiClient class
#
######
def getRegEx(pattern):
"""Compiles and returns a 'regular expression' object for the given address-pattern.
"""
# Translate OSC-address syntax to python 're' syntax
pattern = pattern.replace(".", r"\.") # first, escape all '.'s in the pattern.
pattern = pattern.replace("(", r"\(") # escape all '('s.
pattern = pattern.replace(")", r"\)") # escape all ')'s.
pattern = pattern.replace("*", r".*") # replace a '*' by '.*' (match 0 or more characters)
pattern = pattern.translate(OSCtrans) # change '?' to '.' and '{,}' to '(|)'
return re.compile(pattern)
######
#
# OSCMultiClient class
#
######
def getDetailList(self,content):
s2 = r'<h2><a target="_blank" href="(.*?)" title="(.*?)"'
pattern =re.compile(s2 , re.S
)
result = re.findall(pattern, content)
with open('file.txt','w',encoding='gbk') as f:
f.write(content)
if not result:
print('???????..............')
threadsList=[]
for item in result:
t = threading.Thread(target = workthread, args=(item, self.user_agent, self.path))
threadsList.append(t)
t.start()
for threadid in threadsList:
threadid.join()
def characterErrorsUCS2(self, data):
# Someone picked the wrong compile option
# You lose
skip = False
for match in invalid_unicode_re.finditer(data):
if skip:
continue
codepoint = ord(match.group())
pos = match.start()
# Pretty sure there should be endianness issues here
if _utils.isSurrogatePair(data[pos:pos + 2]):
# We have a surrogate pair!
char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
if char_val in non_bmp_invalid_codepoints:
self.errors.append("invalid-codepoint")
skip = True
elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
pos == len(data) - 1):
self.errors.append("invalid-codepoint")
else:
skip = False
self.errors.append("invalid-codepoint")
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def _needs_hiding(mod_name):
"""
>>> _needs_hiding('setuptools')
True
>>> _needs_hiding('pkg_resources')
True
>>> _needs_hiding('setuptools_plugin')
False
>>> _needs_hiding('setuptools.__init__')
True
>>> _needs_hiding('distutils')
True
>>> _needs_hiding('os')
False
>>> _needs_hiding('Cython')
True
"""
pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
return bool(pattern.match(mod_name))
def byte_compile(self, to_compile):
if sys.dont_write_bytecode:
self.warn('byte-compiling is disabled, skipping.')
return
from distutils.util import byte_compile
try:
# try to make the byte compile messages quieter
log.set_verbosity(self.verbose - 1)
byte_compile(to_compile, optimize=0, force=1, dry_run=self.dry_run)
if self.optimize:
byte_compile(
to_compile, optimize=self.optimize, force=1,
dry_run=self.dry_run,
)
finally:
log.set_verbosity(self.verbose) # restore original verbosity
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=False):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, str), "message must be a string"
assert isinstance(category, type), "category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, str), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
_add_filter(action, re.compile(message, re.I), category,
re.compile(module), lineno, append=append)
def getDetailList(self,content):
pattern =re.compile(r'<h2><a target="_blank" href="(.*?)"'\
+r'title="(.*?)">', re.S
)
#uf-8??????
file = open('file.txt', 'w',encoding='gbk')
file.write(content)
file.close()
result = re.findall(pattern, content)
if not result:
print('???????..............')
for item in result:
self.getDetailPic(item)
def create_text_file(self, name):
"""
Create a new text file, change its name and return it
Args:
- name, a string to use as the new text file's name"""
import re
bpy.ops.text.new()
re_text = re.compile(r'^Text.[0-9]{3}$')
text_name = ''
text_index, max_index = 0, 0
for text in bpy.data.texts:
if re_text.match(text.name):
text_index = int(text.name[-3:])
if text_index > max_index:
max_index = text_index
text_name = text.name
if not text_name:
text_name = 'Text'
bpy.data.texts[text_name].name = name
return bpy.data.texts[name]
def create_text_file(name):
"""
Create a new text file, change its name and return it
Args:
- name, the name of the text file, a string"""
if not name and isinstance(name, str):
raise TypeError('The name of the text file has to be a string')
bpy.ops.text.new()
import re
re_text = re.compile(r'^Text.[0-9]{3}$')
text_name = ''
text_index, max_index = 0, 0
for text in bpy.data.texts:
if re_text.match(text.name):
text_index = int(text.name[-3:])
if text_index > max_index:
max_index = text_index
text_name = text.name
if not text_name:
text_name = 'Text'
bpy.data.texts[text_name].name = name
return bpy.data.texts[name]
def load_params(self, f_, filter_=None):
di = pickle.load(f_)
if filter_ is None:
for k,v in di.items():
p = self._vars_di[k].get_value(borrow=True)
if p.shape != v.shape:
raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape)
self._vars_di[k].set_value(v)
else:
pat = re.compile(filter_)
for k,v in di.items():
if not pat.fullmatch(k): continue
p = self._vars_di[k].get_value(borrow=True)
if p.shape != v.shape:
raise ValueError('Shape mismatch, need %s, got %s'%(v.shape, p.shape), p.shape)
self._vars_di[k].set_value(v)
def characterErrorsUCS2(self, data):
# Someone picked the wrong compile option
# You lose
skip = False
for match in invalid_unicode_re.finditer(data):
if skip:
continue
codepoint = ord(match.group())
pos = match.start()
# Pretty sure there should be endianness issues here
if _utils.isSurrogatePair(data[pos:pos + 2]):
# We have a surrogate pair!
char_val = _utils.surrogatePairToCodepoint(data[pos:pos + 2])
if char_val in non_bmp_invalid_codepoints:
self.errors.append("invalid-codepoint")
skip = True
elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
pos == len(data) - 1):
self.errors.append("invalid-codepoint")
else:
skip = False
self.errors.append("invalid-codepoint")
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, script_filename, 'exec')
exec(script_code, namespace, namespace)
def get_encodings_from_content(content):
"""Returns encodings from given content string.
:param content: bytestring to extract encodings from.
"""
warnings.warn((
'In requests 3.0, get_encodings_from_content will be removed. For '
'more information, please see the discussion on issue #2266. (This'
' warning should only appear once.)'),
DeprecationWarning)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def _needs_hiding(mod_name):
"""
>>> _needs_hiding('setuptools')
True
>>> _needs_hiding('pkg_resources')
True
>>> _needs_hiding('setuptools_plugin')
False
>>> _needs_hiding('setuptools.__init__')
True
>>> _needs_hiding('distutils')
True
>>> _needs_hiding('os')
False
>>> _needs_hiding('Cython')
True
"""
pattern = re.compile(r'(setuptools|pkg_resources|distutils|Cython)(\.|$)')
return bool(pattern.match(mod_name))
def byte_compile(self, to_compile):
if sys.dont_write_bytecode:
self.warn('byte-compiling is disabled, skipping.')
return
from distutils.util import byte_compile
try:
# try to make the byte compile messages quieter
log.set_verbosity(self.verbose - 1)
byte_compile(to_compile, optimize=0, force=1, dry_run=self.dry_run)
if self.optimize:
byte_compile(
to_compile, optimize=self.optimize, force=1,
dry_run=self.dry_run,
)
finally:
log.set_verbosity(self.verbose) # restore original verbosity
def expect_match(self, pattern, on_error=None):
"""
Require @item to match at the current `position`.
Raises a ValueError if @item does not match.
@pattern
A regular expression.
@on_error
A function that returns an error. The error returned overrides the
default ValueError.
"""
m = re.compile(pattern).match(self.source, self.position)
if m:
self.position += m.end() - m.start()
return m
if not on_error:
raise ValueError('expected match with \'{0}\', at \'{1}\''.format(pattern, self.source[self.position:]))
raise on_error()
def match(self, pattern):
"""
Return the match obtained by searching @pattern.
The current `position` will advance as many characters as the match's
length.
@pattern
A regular expression.
"""
m = re.compile(pattern).match(self.source, self.position)
if m:
self.position += m.end() - m.start()
return m
return
def craw_last_index(ptt_class_name):
#ptt_class_name = 'Soft_Job'
index_url = 'https://www.ptt.cc/bbs/' + ptt_class_name + '/index.html'
res = requests.get(index_url,verify = True)
soup3 = BeautifulSoup(res.text, "lxml")
x = soup3('',{'class':"btn wide"},text = re.compile('??'))
last_index = x[0]['href']
last_index = last_index.replace('/bbs/' + ptt_class_name + '/index','')
last_index = int( last_index.replace('.html','') )+1
return last_index
#---------------------------------------------------------------------------------
# ?? ubuntu - crontab-e, ????, ??????? data
# ?? PTT ????, ???????, ??????,
# ??????DATA, ???? index ??????, ??????? data,
# ?????, ??????
def get_iface_from_addr(addr):
"""Work out on which interface the provided address is configured."""
for iface in netifaces.interfaces():
addresses = netifaces.ifaddresses(iface)
for inet_type in addresses:
for _addr in addresses[inet_type]:
_addr = _addr['addr']
# link local
ll_key = re.compile("(.+)%.*")
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
return iface
msg = "Unable to infer net iface on which '%s' is configured" % (addr)
raise Exception(msg)