def setUpClass(cls):
super(TestMJMLTCPServer, cls).setUpClass()
root_dir = os.path.dirname(settings.BASE_DIR)
tcpserver_path = os.path.join(root_dir, 'mjml', 'node', 'tcpserver.js')
env = os.environ.copy()
env['NODE_PATH'] = root_dir
for host, port in mjml_settings.MJML_TCPSERVERS:
p = subprocess.Popen(['node', tcpserver_path, str(port), host],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env)
cls.processes.append(p)
time.sleep(5)
python类BASE_DIR的实例源码
def delete(self, request, *args, **kwargs):
"""
This does not actually delete the file, only the database record. But
that is easy to implement.
"""
self.object = self.get_object()
individual_id = self.object.id
if self.object.user:
username = self.object.user.username
else:
username = 'public'
#delete files
if self.object.vcf_file:
self.object.vcf_file.delete()
# if self.object.strs_file:
# self.object.strs_file.delete()
# if self.object.cnvs_file:
# self.object.cnvs_file.delete()
os.system('rm -rf %s/genomes/%s/%s' % (settings.BASE_DIR, username, individual_id))
self.object.delete()
# response = JSONResponse(True, {}, response_mimetype(self.request))
# response['Content-Disposition'] = 'inline; filename=files.json'
# return response
messages.add_message(request, messages.INFO, "Individual deleted with success!")
#return redirect('individuals_list')
return redirect('individuals_list')
def clean_individuals():
print("Running periodic task!")
individuals = Individual.objects.filter(user=None)
for individual in individuals:
time_difference = datetime.datetime.now()-individual.creation_date
if time_difference.days > 0:
#delete individuals
os.system('rm -rf %s/genomes/public/%s' % (settings.BASE_DIR, individual_id))
individual.delete()
def get_upload_path(self, filename):
if self.user != None:
string = "%s/genomes/%s/%s/%s" % (settings.BASE_DIR, slugify(self.user.username), self.id, filename)#.replace(' ', '_')
else:
string = "%s/genomes/public/%s/%s" % (settings.BASE_DIR, self.id, filename)#.replace(' ', '_')
print('string',string)
return string
def download_files(self):
print('Download Files')
file_list = open('%s/data/files/all_files.txt' % (settings.BASE_DIR), 'w')
s3credentials = S3Credential.objects.all()
for s3credential in s3credentials:
print(s3credential.name)
for bucket_name in s3credential.buckets.splitlines():
session = boto3.Session(
aws_access_key_id=s3credential.access_key,
aws_secret_access_key=s3credential.secret_key
)
s3 = session.resource('s3')
bucket = s3.Bucket(bucket_name)
print(bucket)
for key in bucket.objects.all():
if key.size != 0:
file = [str(key.last_modified), str(key.size), bucket.name, key.key]
file_list.writelines('%s\n' % ('\t'.join(file)))
self.stdout.write(self.style.SUCCESS('Successfully downloaded files!'))
def django_db_setup():
settings.DATABASES['default']['name'] = os.path.join(
settings.BASE_DIR, 'db.sqlite3')
def handle(self, *args, **options):
call_subprocess('./node_modules/.bin/webpack --config webpack.config.js')
for each in settings.WEB_PACK_FILES:
directory = settings.BASE_DIR + '/static/webpack_bundles/'
css_file = max([os.path.join(directory, d) for d in os.listdir(directory) if d.startswith(each['webpack_js']) and d.endswith('css')], key=os.path.getmtime)
js_file = max([os.path.join(directory, d) for d in os.listdir(directory) if d.startswith(each['webpack_js']) and d.endswith('js')], key=os.path.getmtime)
if settings.ENABLE_DJANGO_WEBPACK_S3_STORAGES:
upload_to_s3(css_file)
upload_to_s3(js_file)
import re
regex = r'(.*?<link rel="stylesheet" type="text/css" href=")(.*?)(" id="packer_css"/>.*?<script id="packer_js" src=")(.*?)(" type="text/javascript"></script>.*)'
with open(each['html_file_name'], 'r+') as f:
content = f.read()
m = re.match(regex, content, re.DOTALL)
href = settings.STATIC_URL + css_file.split('/static/')[-1]
src = settings.STATIC_URL + js_file.split('/static/')[-1]
content = m.group(1) + href + m.group(3) + src + m.group(5)
with open(each['html_file_name'], 'w') as f:
f.write(content)
result = {'message': "Successfully Created Compressed CSS, JS Files"}
return json.dumps(result)
def get_next_migration_filename(app_name, connection=None, migration_type='data'):
'''
Return name (including the absolute path) of the next migration to insert for this app
'''
latest_migration_name = get_latest_migration(app_name)
next_migration_name = '{0:04d}_i18n_{1}_migration.py'.format(
int(latest_migration_name[0:4]) + 1,
migration_type
)
app_path = os.path.join(*apps.get_app_config(app_name).name.split('.'))
return os.path.join(settings.BASE_DIR, app_path, 'migrations', next_migration_name)
def webpack_dev_server(config_path=None):
config_path = config_path or 'webpack.config.js'
with open(config_path, 'r') as f:
config = f.read()
munged = get_munged_config(config)
handle, name = tempfile.mkstemp(prefix='webpack-config')
with open(name, 'w') as f:
f.write(munged)
result = subprocess.run(['npm', 'bin'], stdout=subprocess.PIPE)
bin_path = result.stdout.decode().rstrip()
dev_server_path = os.path.join(bin_path, 'webpack-dev-server')
args = [dev_server_path, '--config', name, '--hot']
return subprocess.Popen(
args,
cwd=settings.BASE_DIR,
stdout=subprocess.PIPE,
env={
'NODE_PATH': os.path.join(settings.BASE_DIR, 'node_modules')
}
)
def contribute_json(request):
"""Advantages of having our own custom view over using
django.view.static.serve is that we get the right content-type
and as a view we write a unit test that checks that the JSON is valid
and can be deserialized."""
with open(os.path.join(settings.BASE_DIR, 'contribute.json')) as f:
contribute_json_dict = json.load(f)
return http.JsonResponse(
contribute_json_dict,
json_dumps_params={'indent': 3}
)
def current_versions(request):
"""return a JSON dict of a selection of keys and their versions
"""
context = {
'versions': []
}
with connection.cursor() as cursor:
cursor.execute('select version()')
row = cursor.fetchone()
value, = row
context['versions'].append({
'key': 'PostgreSQL',
'value': value.split(' on ')[0].replace('PostgreSQL', '').strip()
})
context['versions'].append({
'key': 'Tecken',
'value': dockerflow_get_version(settings.BASE_DIR)
})
context['versions'].append({
'key': 'Django',
'value': get_version(),
})
redis_store_info = get_redis_connection('store').info()
context['versions'].append({
'key': 'Redis Store',
'value': redis_store_info['redis_version']
})
try:
redis_cache_info = get_redis_connection('default').info()
except NotImplementedError:
redis_cache_info = {'redis_version': 'fakeredis'}
context['versions'].append({
'key': 'Redis Cache',
'value': redis_cache_info['redis_version']
})
context['versions'].sort(key=lambda x: x['key'])
return http.JsonResponse(context)
def get_python_rpc_source_path():
return os.path.join(settings.BASE_DIR, "tasks", "rpc.py")
def get_markdown_directory(self):
return os.path.join(
settings.BASE_DIR,
'apps',
'api',
'documentation',
)
def get_markdown_directory(self):
return os.path.join(
settings.BASE_DIR,
'apps',
'staticpages',
'pages',
)
def get_default_text(file_name):
TEMPALTE_DIR = os.path.join(settings.BASE_DIR, 'templates')
with open(os.path.join(TEMPALTE_DIR, file_name), 'r') as template:
output = join_as_compacted_paragraphs(template.readlines())
return output
# Model Fields
def create_states(apps, schema_editor):
State = apps.get_model('core', 'State')
fixture_file = ('InternetSemLimites', 'core', 'fixtures', 'states.csv')
fixture_path = path.join(settings.BASE_DIR, *fixture_file)
with open(fixture_path, encoding='utf-8') as fh:
for line in reader(fh):
State.objects.create(name=line[1], abbr=line[0])
def remove_media_archive(self):
filename = safe_join(settings.BASE_DIR, "media.zip")
has_file = os.path.exists(filename)
if has_file is True:
print("Removing old media archive ...")
os.remove(filename)
def zipdir(self):
shutil.make_archive("media", 'zip', safe_join(
settings.BASE_DIR, "media"))
0016_load_country_income_thresholds.py 文件源码
项目:micromasters
作者: mitodl
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_country_income_thresholds_data():
"""
Returns a list of dictionaries of data imported from financialaid/fixtures/country_income_threshold_data.json.
"""
fixture_path = os.path.join(settings.BASE_DIR, "financialaid/fixtures/country_income_threshold_data.json")
with open(fixture_path, "r") as f:
country_data = json.loads(f.read())
return [
{
"country_code": country["fields"]["country_code"],
"income_threshold": DEFAULT_INCOME_THRESHOLD
}
for country in country_data
]
def test_exam_read_no_shows(self):
"""Test that a typical no-show result from Perason does not result in any errors"""
test_file_path = '{}/exams/pearson/test_resources/noshow.dat'.format(settings.BASE_DIR)
reader = EXAMReader()
with open(test_file_path, 'r') as test_file:
results = reader.read(test_file)
# Assert that there are no error messages in the results tuple
assert len(results[1]) == 0