def setUpModule():
"""
Called to set up the module by the test runner
"""
if not django:
return
global context, models, storage
context = {
'sys.path': sys.path[:],
'sys.modules': sys.modules.copy(),
'os.environ': os.environ.copy(),
}
if init_django():
from django.core.files.storage import FileSystemStorage
from django_app.adapters import models # noqa
setup_test_environment()
context['DB_NAME'] = create_test_db(verbosity=0, autoclobber=True)
storage = FileSystemStorage(mkdtemp())
python类FileSystemStorage()的实例源码
def upload_avatar(request):
user = request.user
user_avatar = request.FILES.get('user_avatar', None)
if request.method == 'POST' and user_avatar:
ext = os.path.splitext(user_avatar.name)[1]
if ext.lower() in ['.jpg', '.jpeg', '.png']:
filename = user.username + '.jpg'
fs = FileSystemStorage()
if fs.exists(filename):
fs.delete(filename)
fs.save(filename, user_avatar)
user_account = Account.get_by_user(user=user)
request.session['user_avatar'] = user_account.user_avatar
request.session.save()
return redirect('account_settings')
def post(self, request):
form = self.form_class(request.POST)
if form.is_valid():
new_question = Question()
new_question.content = form.cleaned_data['content']
picture = request.FILES.get('picture')
if picture:
fs = FileSystemStorage()
new_question.picture = fs.save('questions/question.jpg', picture)
new_question.profile = Profile.objects.get(user=request.user)
new_question.topic = Topic.objects.get(id=int(request.POST.get('topic')))
new_question.save()
return redirect("/")
else:
topics = Topic.objects.all()
return render(request, "question/ask.html", {"form": form, "topics": topics})
def post(self, request, id):
question = Question.objects.get(id=id)
form = self.form_class(request.POST)
if form.is_valid():
answer = Answer(profile=Profile.objects.get(user=request.user))
answer.content = form.cleaned_data['content']
source_file = request.FILES.get('picture')
if source_file:
fs = FileSystemStorage()
answer.picture = fs.save('answers/answer.jpg', source_file)
answer.question = question
answer.save()
return redirect('question:question', id=id)
else:
return render(request, "question/answer.html", {
"form": form, "question": question
})
def import_dataset(request):
parameters = {key: (value if not isinstance(value, list) else value[0]) for key, value in request.POST.items()}
parameters['directory'] = prepare_import_directory(IMPORTER_DIRECTORY)
parameters['elastic_url'] = es_url
if DocumentStorer.exists(**parameters):
return HttpResponse('Index and mapping exist', status=403)
if parameters['format'] not in {'postgres', 'mongodb', 'elastic'}:
if 'file' in request.FILES:
fs = FileSystemStorage(location=parameters['directory'])
file_name = fs.save(request.FILES['file'].name, request.FILES['file'])
parameters['file_path'] = fs.path(file_name)
elif 'url' not in parameters:
return HttpResponse('failed')
Process(target=_import_dataset, args=(parameters,)).start()
return HttpResponse()
def post(self, request, *args, **kwargs):
form = self.form_class(request.POST, request.FILES)
if form.is_valid():
if (str(request.user) == 'AnonymousUser'):
path = 'media/AnonymousUser/'
user = None
else:
path = 'media/user/' + str(request.user) + '/'
user = request.user
subprocess.call(['mkdir', '-p', path])
f = request.FILES['upload_from_local']
fs = FileSystemStorage(path)
filename = fs.save(f.name, f)
path = path + str(filename)
scan_directory = filename
url = fs.url(filename)
scan_start_time = timezone.now()
scan_id = create_scan_id(user, url, scan_directory, scan_start_time)
apply_scan_async.delay(path, scan_id)
return HttpResponseRedirect('/resultscan/' + str(scan_id))
def upload_file(request):
# TODO: Validate form on client side
if request.method == 'POST' and request.FILES['music-input']:
audio_file = request.FILES['music-input']
fs = FileSystemStorage()
filename = fs.save('analyser/uploaded_files/' + audio_file.name, audio_file)
logging.error('FILENAME:' + filename)
tags = id3tags.get_tags(filename)
convert_to_wav(filename)
return render(request, 'analyser/feature_home.html', {
'uploaded_filename': filename,
'show_loading_animation': True,
'artist': tags['artist'],
'title': tags['title'],
'album': tags['album'],
})
return render(request, 'analyser/index.html')
def __init__(self, apps=None, *args, **kwargs):
# List of locations with static files
self.locations = []
# Maps dir paths to an appropriate storage instance
self.storages = SortedDict()
if not isinstance(settings.STATICFILES_DIRS, (list, tuple)):
raise ImproperlyConfigured(
"Your STATICFILES_DIRS setting is not a tuple or list; "
"perhaps you forgot a trailing comma?")
for root in settings.STATICFILES_DIRS:
if isinstance(root, (list, tuple)):
prefix, root = root
else:
prefix = ''
if os.path.abspath(settings.STATIC_ROOT) == os.path.abspath(root):
raise ImproperlyConfigured(
"The STATICFILES_DIRS setting should "
"not contain the STATIC_ROOT setting")
if (prefix, root) not in self.locations:
self.locations.append((prefix, root))
for prefix, root in self.locations:
filesystem_storage = FileSystemStorage(location=root)
filesystem_storage.prefix = prefix
self.storages[root] = filesystem_storage
super(FileSystemFinder, self).__init__(*args, **kwargs)
def post(self, request, *args, **kwargs):
myfile = request.FILES['myfile']
fs = FileSystemStorage()
filename = fs.save(myfile.name, myfile)
uploaded_file_url = fs.url(filename)
myfile.seek(0)
parsed_logs = []
log_file = myfile.read()
# Create ParsedLog object for each line in log file
for line in log_file.splitlines():
tokens = parse_line(line)
# parse_line returns None if regex fails to match
if tokens != None:
parsed_log = ParsedLog(owner=request.user, ip_address=tokens[0], rfc_id=tokens[1], user_id=tokens[
2], date_time=tokens[3], request_line=tokens[4], http_status=tokens[5], num_bytes=tokens[6])
parsed_logs.append(parsed_log)
# Bulk insert into database
ParsedLog.objects.bulk_create(parsed_logs)
return render(request, 'account/main.html', {'error': "Upload Successful"})
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def user_avatar(self):
fs = FileSystemStorage()
if fs.exists(self.user.username + '.jpg'):
return '/static/avatar/' + self.user.username + '.jpg'
else:
return '/static/img/profile/default.jpg'
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def post(self, request):
form = self.form_class(request.POST)
if form.is_valid():
profile = Profile.objects.get(user=request.user)
profile.education = form.cleaned_data['education']
profile.profession = form.cleaned_data['profession']
profile.employment = form.cleaned_data['employment']
avatar = request.FILES.get('avatar')
if avatar is not None:
fs = FileSystemStorage()
source_file = fs.save('cache/avatar.jpg', avatar)
img = Image.open('media/' + source_file)
width, height = img.size
if width >= height:
upper_x = int((width / 2) - (height / 2))
upper_y = 0
lower_x = int((width / 2) + (height / 2))
lower_y = height
else:
upper_x = 0
upper_y = int((height / 2) - (width / 2))
lower_x = width
lower_y = int((height / 2) + (width / 2))
box = (upper_x, upper_y, lower_x, lower_y)
img = img.crop(box)
img.save('media/'+source_file)
profile.avatar = fs.save('avatar/avatar.jpg', open('media/'+source_file, 'rb'))
os.remove('media/'+source_file)
profile.save()
return redirect("account:profile", username=request.user.username)
else:
return render(request, "account/edit.html", {"form": form})
def simple_upload(request):
if request.method == 'POST' and request.FILES['myfile']:
myfile = request.FILES['myfile']
fs = FileSystemStorage()
filename = fs.save(myfile.name, myfile)
uploaded_file_url = fs.url(filename)
return render(request, 'core/simple_upload.html', {
'uploaded_file_url': uploaded_file_url
})
return render(request, 'core/simple_upload.html')
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def test_post_signed_url_where_not_supported(monkeypatch):
monkeypatch.setattr(views, 'default_storage', FileSystemStorage())
post_payload = {'key': 'file.txt'}
request = HttpRequest()
setattr(request, 'method', 'POST')
setattr(request, 'POST', post_payload)
response = views.signed_url(request)
content = response.content.decode('utf-8')
assert response.status_code == 404
assert json.loads(content)['error'] == "Not found"
def __init__(self, app_names=None, *args, **kwargs):
self.locations = []
self.storages = collections.OrderedDict()
for dependency in self.dependencies:
module = __import__(dependency)
path = '{0}/static'.format(os.path.dirname(module.__file__))
self.locations.append(('', path))
for prefix, root in self.locations:
filesystem_storage = FileSystemStorage(location=root)
filesystem_storage.prefix = prefix
self.storages[root] = filesystem_storage
super(FileSystemFinder, self).__init__(*args, **kwargs)
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def export_download(request, username, id_string, export_type, filename):
owner = get_object_or_404(User, username__iexact=username)
xform = get_object_or_404(XForm, id_string__exact=id_string, user=owner)
helper_auth_helper(request)
if not has_permission(xform, owner, request):
return HttpResponseForbidden(_(u'Not shared.'))
# find the export entry in the db
export = get_object_or_404(Export, xform=xform, filename=filename)
if (export_type == Export.GDOC_EXPORT or export_type == Export.EXTERNAL_EXPORT) \
and export.export_url is not None:
return HttpResponseRedirect(export.export_url)
ext, mime_type = export_def_from_filename(export.filename)
audit = {
"xform": xform.id_string,
"export_type": export.export_type
}
audit_log(
Actions.EXPORT_DOWNLOADED, request.user, owner,
_("Downloaded %(export_type)s export '%(filename)s' "
"on '%(id_string)s'.") %
{
'export_type': export.export_type.upper(),
'filename': export.filename,
'id_string': xform.id_string,
}, audit, request)
if request.GET.get('raw'):
id_string = None
default_storage = get_storage_class()()
if not isinstance(default_storage, FileSystemStorage):
return HttpResponseRedirect(default_storage.url(export.filepath))
basename = os.path.splitext(export.filename)[0]
response = response_with_mimetype_and_name(
mime_type, name=basename, extension=ext,
file_path=export.filepath, show_date=False)
return response
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None
def is_local_storage(self):
return isinstance(self.storage, FileSystemStorage)
def __init__(self, location=None, base_url=None, *args, **kwargs):
if location is None:
location = settings.STATIC_ROOT
if base_url is None:
base_url = settings.STATIC_URL
check_settings(base_url)
super(StaticFilesStorage, self).__init__(location, base_url,
*args, **kwargs)
# FileSystemStorage fallbacks to MEDIA_ROOT when location
# is empty, so we restore the empty value.
if not location:
self.base_location = None
self.location = None