def execute_command(self, command, content=None, cwd=None):
pipe = subprocess.Popen(command, shell=True, cwd=cwd,
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
if content:
content = smart_bytes(content)
stdout, stderr = pipe.communicate(content)
if isinstance(stderr, bytes):
try:
stderr = stderr.decode()
except UnicodeDecodeError:
pass
if stderr.strip():
raise CompilerError(stderr, error_output=stderr)
if self.verbose:
print(stderr)
if pipe.returncode != 0:
msg = "Command '{0}' returned non-zero exit status {1}".format(command, pipe.returncode)
raise CompilerError(msg, error_output=msg)
return stdout
python类smart_bytes()的实例源码
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def default_username_algo(email):
# store the username as a base64 encoded sha1 of the email address
# this protects against data leakage because usernames are often
# treated as public identifiers (so we can't use the email address).
return base64.urlsafe_b64encode(
hashlib.sha1(smart_bytes(email)).digest()
).rstrip(b'=')
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
try:
return jsonpickle.decode(
base64.b64decode(encoding.smart_bytes(value)).decode())
except ValueError:
return pickle.loads(
base64.b64decode(encoding.smart_bytes(value)))
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
try:
return jsonpickle.decode(
base64.b64decode(encoding.smart_bytes(value)).decode())
except ValueError:
return pickle.loads(
base64.b64decode(encoding.smart_bytes(value)))
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
try:
return jsonpickle.decode(
base64.b64decode(encoding.smart_bytes(value)).decode())
except ValueError:
return pickle.loads(
base64.b64decode(encoding.smart_bytes(value)))
def decrypt_pdf(src):
decrypted = TemporaryFile()
popen = subprocess.Popen(['qpdf', '--decrypt', '/dev/stdin', '-'],
stdin=src, stdout=decrypted, stderr=subprocess.PIPE)
stdout, stderr = popen.communicate()
if popen.returncode in (0, 3): # 0 == ok, 3 == warning
if popen.returncode == 3:
logger.warn('qpdf warning:\n%s', smart_bytes(stderr, errors='backslashreplace'))
else:
from ecs.users.utils import get_current_user
user = get_current_user()
logger.warn('qpdf error (returncode=%s):\nUser: %s (%s)\n%s', popen.returncode, user, user.email if user else 'anonymous', smart_bytes(stderr, errors='backslashreplace'))
raise ValueError('pdf broken')
decrypted.seek(0)
return decrypted
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def render_style(self, style_path, force=False, **kwargs):
"""
Renders the CSS template at *style_path* using *kwargs* and returns the
path to the rendered result. If the given style has already been
rendered the existing cache path will be returned.
If *force* is ``True`` the stylesheet will be rendered even if it
already exists (cached).
This method also cleans up older versions of the same rendered template.
"""
cache_dir = self.settings['cache_dir']
if not isinstance(cache_dir, str):
cache_dir = cache_dir.decode('utf-8')
if not isinstance(style_path, str):
style_path = style_path.decode('utf-8')
mtime = os.stat(style_path).st_mtime
shortened_path = short_hash(style_path)
rendered_filename = 'rendered_%s_%s' % (shortened_path, int(mtime))
rendered_path = os.path.join(cache_dir, rendered_filename)
if not os.path.exists(rendered_path) or force:
style_css = self.render_string(
style_path,
**kwargs
)
# NOTE: Tornado templates are always rendered as bytes. That is why
# we're using 'wb' below...
with io.open(rendered_path, 'wb') as f:
f.write(smart_bytes(style_css))
# Remove older versions of the rendered template if present
for fname in os.listdir(cache_dir):
if fname == rendered_filename:
continue
elif shortened_path in fname:
# Older version present.
# Remove it (and it's minified counterpart).
os.remove(os.path.join(cache_dir, fname))
return rendered_path
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def get_hexdigest(plaintext, length=None):
digest = hashlib.md5(smart_bytes(plaintext)).hexdigest()
if length:
return digest[:length]
return digest
def get_precompiler_cachekey(command, contents):
return hashlib.sha1(smart_bytes('precompiler.%s.%s' % (command, contents))).hexdigest()
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def verify_token(self, token, **kwargs):
"""Validate the token signature."""
nonce = kwargs.get('nonce')
if self.OIDC_RP_SIGN_ALGO.startswith('RS'):
key = self.OIDC_RP_IDP_SIGN_KEY
else:
key = self.OIDC_RP_CLIENT_SECRET
# Verify the token
verified_token = self._verify_jws(
force_bytes(token),
# Use smart_bytes here since the key string comes from settings.
smart_bytes(key),
)
# The 'verified_token' will always be a byte string since it's
# the result of base64.urlsafe_b64decode().
# The payload is always the result of base64.urlsafe_b64decode().
# In Python 3 and 2, that's always a byte string.
# In Python3.6, the json.loads() function can accept a byte string
# as it will automagically decode it to a unicode string before
# deserializing https://bugs.python.org/issue17909
token_nonce = json.loads(verified_token.decode('utf-8')).get('nonce')
if import_from_settings('OIDC_USE_NONCE', True) and nonce != token_nonce:
msg = 'JWT Nonce verification failed.'
raise SuspiciousOperation(msg)
return True
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
return pickle.loads(base64.b64decode(encoding.smart_bytes(value)))
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
try:
return jsonpickle.decode(
base64.b64decode(encoding.smart_bytes(value)).decode())
except ValueError:
return pickle.loads(
base64.b64decode(encoding.smart_bytes(value)))
def generate_sha1(string, salt=None):
if not isinstance(string, (str, text_type)):
string = str(string)
if not salt:
salt = sha1(str(random.random()).encode('utf-8')).hexdigest()[:5]
salted_bytes = (smart_bytes(salt) + smart_bytes(string))
hash_ = sha1(salted_bytes).hexdigest()
return salt, hash_
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def dumps(self, value):
return force_bytes(json.dumps(value))
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def fill_template(template_name, context, output_format='odt'):
"""
Fill a document with data and convert it to the requested format.
Returns an absolute path to the generated file.
"""
if not isinstance(context, Context):
context = Context(context)
context['output_format'] = output_format
source_file = find_template_file(template_name)
source_extension = os.path.splitext(source_file)[1]
source = zipfile.ZipFile(source_file, 'r')
dest_file = NamedTemporaryFile(delete=False, suffix=source_extension)
dest = zipfile.ZipFile(dest_file, 'w')
manifest_data = ''
for name in source.namelist():
data = source.read(name)
if name.endswith('.xml'):
data = smart_str(data)
if any(name.endswith(file) for file in ('content.xml', 'styles.xml')):
template = Template(fix_inline_tags(data))
data = template.render(context)
elif name == 'META-INF/manifest.xml':
manifest_data = data[:-20] # Cut off the closing </manifest> tag
continue # We will append it at the very end
dest.writestr(name, smart_bytes(data))
for _, image in context.dicts[0].get(IMAGES_CONTEXT_KEY, {}).items():
filename = os.path.basename(image.name)
ext = os.path.splitext(filename)[1][1:]
manifest_data += ('<manifest:file-entry '
'manifest:media-type="image/%(ext)s" '
'manifest:full-path="Pictures/%(filename)s"/>\n'
) % locals()
image.open()
dest.writestr('Pictures/%s' % filename, image.read())
image.close()
manifest_data += '</manifest:manifest>'
dest.writestr('META-INF/manifest.xml', manifest_data)
source.close()
dest.close()
if source_extension[1:] != output_format:
results = Queue()
convertor = Process(target=_convert_subprocess,
args=(str(dest_file.name), output_format, results))
convertor.start()
return results.get()
else:
return dest_file.name