def test_create(self):
todo_text = 'text'
todo_id = str(uuid.uuid1())
t1 = TodoModel(todo_id=todo_id,
text=todo_text,
created_at=datetime.now())
t1.save()
self.assertEquals(t1.text, todo_text)
self.assertEquals(t1.checked, False)
t2 = TodoModel.get(hash_key=todo_id)
self.assertEquals(t2.text, todo_text)
r = [result for result in TodoModel.scan()]
self.assertEquals(len(r), 1)
self.assertEquals(r[0].text, todo_text)
self.assertEquals(r[0].todo_id, todo_id)
t2.delete()
python类uuid1()的实例源码
test_todo_model_integration.py 文件源码
项目:serverless-pynamodb-rest-example
作者: helveticafire
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_obtain_netmiko_filename():
"""Test file name and that directory is created."""
create_dir_test = True
file_name_test = '/home/gituser/.netmiko/tmp/test_device.txt'
file_name = obtain_netmiko_filename('test_device')
assert file_name == file_name_test
if create_dir_test:
uuid_str = str(uuid.uuid1())
junk_dir_base = '/home/gituser/JUNK/netmiko'
junk_dir = '{}/{}'.format(junk_dir_base, uuid_str)
base_dir, full_dir = find_netmiko_dir()
print(base_dir)
# Move base_dir and recreate it
if os.path.isdir(base_dir) and os.path.isdir(junk_dir_base):
shutil.move(src=base_dir, dst=junk_dir)
assert os.path.exists(base_dir) == False
assert os.path.exists(full_dir) == False
file_name = obtain_netmiko_filename('test_device')
ensure_dir_exists(base_dir)
ensure_dir_exists(full_dir)
assert os.path.exists(base_dir) == True
assert os.path.exists(full_dir) == True
def generate_uuid(cls, return_hex=False, seed=None):
"""
Generate uuid
:param return_hex: Return in hex format
:param seed: Seed value to generate a consistent uuid
:return:
"""
if seed:
m = hashlib.md5()
m.update(seed.encode('utf-8'))
new_uuid = uuid.UUID(m.hexdigest())
else:
new_uuid = uuid.uuid1()
if return_hex:
return new_uuid.hex
return str(new_uuid)
def __init__(self):
self.access_key_id = 'LTAIjHdzLIPJXaIZ'
self.access_key_secret = '6cVfaC47jGhxUAmW3nt14kktGeqvSu'
self.server_address = 'https://sms.aliyuncs.com'
self.parameters = {
'Format': 'JSON',
'Version': '2016-09-27',
'AccessKeyId': self.access_key_id,
'SignatureVersion': '1.0',
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': str(uuid.uuid1()),
'Timestamp': time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time()))
}
self.user_params = {
'Action': 'SingleSendSms',
'ParamString': '',
'RecNum': '',
'SignName': '?????',
'TemplateCode': 'SMS_39185168',
}
def download(dest_path, url):
try:
file_name = url.split('/')[-1]
path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
if not os.path.exists(path):
f = urlopen(url)
headers = f.headers['content-type'].split('/')
md = 'w'
if 'html' in headers:
file_name = '{}.html'.format(uuid.uuid1())
else:
md = 'wb'
with open(path, md) as local_file:
local_file.write(f.read())
if os.path.exists(path):
return path
else:
logger.info("Wasn't able to find the file....!")
return None
except Exception as error:
logger.error('download error %s', error)
def __init__(self, organisation_name, organisation_uuid, threat_level_id, published, info, date):
dt = datetime.datetime.now()
if not organisation_name or not organisation_uuid:
raise ValueError('Organisation Name and UUID must be set')
if not threat_level_id:
raise ValueError('Threat Level must be set')
if not info:
raise ValueError('Info must be set')
self.__Info = date.strftime("%Y%m%d ") + info
self.__PublishTimestamp = dt.strftime("%s")
self.__Timestamp = dt.strftime("%s")
self.__Analysis = 2
self.__Attribute = list()
self.__Tags = list()
self.__Published = published
self.__Orgc = {'name': organisation_name, 'uuid': organisation_uuid}
self.__Threat_Level_ID = threat_level_id
self.__UUID = uuid.uuid1()
self.__Date = dt.strftime("%Y-%m-%d")
# Getter
def savePositive(imgs, processed, rawImg, captcha):
"""
save right captcha recognized by Tesseract
:param imgs: a list of four processed images, with only one digit in each img
:param processed: full processed img
:param rawImg: raw image without processing
:param captcha: result str of the captcha
:return:
"""
UUID = uuid.uuid1()
'''
for img in imgs:
filename = savingDir + '/' + captcha[i] + '/' + str(UUID) + '.jpg'
img.save(filename, 'JPEG')
'''
rawFilename = savingDir + '/rawData/' + captcha + '_' + str(UUID) + '.jpg'
rawImg.save(rawFilename, 'JPEG')
# processedFilename = savingDir + '/processed/' + captcha + '_' + str(UUID) + '.jpg'
# processed.save(processedFilename, 'JPEG')
def headers(self):
request_id = str(uuid.uuid1())
headers = {
'Cache-Control': 'no-cache',
'User-Agent': '%s/%s' % (
self.__agent_name, self.__agent_version),
'X-Bunq-Client-Request-Id': request_id,
'X-Bunq-Geolocation': '0 0 0 0 NL',
'X-Bunq-Language': 'en_US',
'X-Bunq-Region': 'en_US'
}
if self.session_token is not None:
headers['X-Bunq-Client-Authentication'] = self.session_token
elif self.installation_token is not None:
headers['X-Bunq-Client-Authentication'] = self.installation_token
return headers
def write_to_package_job(control, path, callback_version_id):
# copy to temporary
"""
This job will be called when any field in .deb file control part
has been edited.
:param control: New Control Dict
:type control: dict
:param path: Original Package Path
:type path: str
:param callback_version_id: Callback Version ID, for callback query
:type callback_version_id: int
"""
abs_path = os.path.join(settings.MEDIA_ROOT, path)
temp_path = os.path.join(settings.TEMP_ROOT, str(uuid.uuid1()) + '.deb')
shutil.copyfile(abs_path, temp_path)
# read new package
temp_package = DebianPackage(temp_path)
temp_package.control = control
# save new package
temp_package.save()
t_version = Version.objects.get(id=callback_version_id)
t_version.write_callback(temp_package.path)
def handle_uploaded_file(request):
"""
:param request: Django Request
:type request: HttpRequest
"""
f = request.FILES['package']
temp_root = settings.TEMP_ROOT
if not os.path.exists(temp_root):
mkdir_p(temp_root)
package_temp_path = os.path.join(temp_root, str(uuid.uuid1()) + '.deb')
with open(package_temp_path, 'wb+') as destination:
for chunk in f.chunks():
destination.write(chunk)
os.chmod(package_temp_path, 0755)
if settings.ENABLE_REDIS is True:
queue = django_rq.get_queue('high')
return queue.enqueue(handle_uploaded_package, package_temp_path)
else:
return handle_uploaded_package(package_temp_path)
def save_recipe(self, conf):
"""Save recipe with a unique UUID."""
"""Return a recipe page, recipe name, and notification's texts."""
recipe_additional_filters = conf.get('recipe',
'recipe_additional_filters')
recipe_action = conf.get('recipe', 'recipe_action')
recipe_name = str(uuid.uuid1().hex)
name_field = self.wait.until(EC.element_to_be_clickable(
self.LOCATORS.name))
name_field.clear()
name_field.send_keys(recipe_name)
self.find_element(*self.LOCATORS.filter_textbox).send_keys(
recipe_additional_filters)
self.action_configuration(conf, recipe_action)
save_new_recipe_button = self.wait.until(EC.element_to_be_clickable(
self.LOCATORS.save))
save_new_recipe_button.click()
messages_list = self.message_alert_helper()
recipe_page = Recipe(self.selenium, self.base_url)
return recipe_page.wait_for_request_button(), recipe_name, messages_list # noqa
def __init__(self, storage, expire_time, **kwargs):
'''expireTime should be in ISO format'''
self.__fields = ["_id", "_active", "_expires"]
self.__storage = storage
self._id = uuid.uuid1().hex
if isinstance(expire_time, datetime.datetime):
self._expires = expire_time.isoformat()
else:
self._expires = expire_time
self._update_item(**kwargs)
self._expiration_datetime = None
if self._expires:
self._expiration_datetime = Time.ISOtoDateTime(self._expires)
self.__storage.update_index(self)
def on_save(self, button):
"""
Save and exit.
"""
vcal = icalendar.Calendar()
event = icalendar.Event()
event['uid'] = self.calendar + str(uuid.uuid1())
# event['dtstart'] = self._datetime_to_ical(datetime.datetime(2016,5,15))
# event['dtend'] = self._datetime_to_ical(datetime.datetime(2016,5,15))
event['summary'] = self.msg.edit_text
vcal.add_component(event)
logging.debug("EventWidget:on_save:vcal: {}".format(vcal.to_ical()))
r = self.server.add_event(vcal.to_ical(), self.calendar)
logging.debug("EventWidget:on_save:add_event: {}".format(r))
raise urwid.ExitMainLoop()
rest_api.py 文件源码
项目:fabric8-analytics-stack-analysis
作者: fabric8-analytics
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def submit_kronos_evaluation():
app.logger.info("Submitting the evaluation job")
response = {
"status_description": "Failed to load model, Kronos Region not available"}
if not app.scoring_status:
return flask.jsonify(response)
result_id = str(uuid1())
input_json = request.get_json()
training_data_url = input_json.get("training_data_url")
response = submit_evaluation_job(input_bootstrap_file='/uranus_bootstrap_action.sh',
input_src_code_file='/tmp/testing.zip',
training_url=training_data_url,
result_id=result_id)
response["evaluation_S3_result_id"] = result_id
return flask.jsonify(response)
model_encoder.py 文件源码
项目:logistic-regression-sgd-mapreduce
作者: elsevierlabs-os
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def main(separator='\t'):
id = str(uuid.uuid1())
date_created = datetime.datetime.utcnow().isoformat() + 'Z'
mu = float(os.environ['MU']) if os.environ.has_key('MU') else 0.002
eta = float(os.environ['ETA']) if os.environ.has_key('ETA') else 0.5
n_models_key = os.environ['N_MODELS_KEY'] if os.environ.has_key('N_MODELS_KEY') else 'MODEL'
T = os.environ['T'] if os.environ.has_key('T') else 1
parameters = {}
for line in sys.stdin:
(feature, sigma) = line.strip().split(separator)
parameters[feature] = float(sigma)
n_models = float(parameters[n_models_key])
for f, sigma in parameters.items():
parameters[f] = parameters[f] / n_models
del parameters[n_models_key]
print json.dumps({
"id": id,
"date_created": date_created,
"models": n_models,
"mu": mu,
"eta": eta,
"T": T,
"parameters": parameters
})
def __init__(
self, module_name, func_name, args=None, kwargs=None,
countdown=0, send_after_commit=False,
apply_queue='queue', extra_celery_kwargs=None,
):
mod = importlib.import_module(module_name)
if not hasattr(mod, func_name):
raise ValueError('Invalid API Endpoint is provided.')
self.task_id = uuid.uuid1().hex
self.module_name = module_name
self.func_name = func_name
self.args = args if args is not None else ()
self.kwargs = kwargs if kwargs is not None else {}
self.countdown = countdown if countdown >= 0 else 0
self.send_after_commit = bool(send_after_commit)
self.extra_celery_kwargs = extra_celery_kwargs if extra_celery_kwargs is not None else {}
self.apply_queue = apply_queue
def process_in_message(self, message):
if 'fetch_uri' not in message:
raise InvalidAMQPMessage('Given message has no fetch_uri value.')
message_copy = message.copy()
jobid = uuid.uuid1().hex
_message = dict()
_message['job_id'] = jobid
_message['fetch_uri'] = message_copy.pop('fetch_uri')
# Encode additional fields to send back to client on fetch
try:
_message['settings'] = amqp_message_encode(message_copy)
except:
# @TODO define a more specific exception handling here
raise EncodingError("Can't encode message info. %s" % message)
self.scheduler.schedule(_message)
self.signal_manager.send_catch_log(signal=signals.request_received, jobid=jobid)
def add_permision(self):
lambda_client = boto3.client('lambda')
source_arn = 'arn:aws:execute-api:{}:{}:{}/*/*/*'.format(
self.client._client_config.region_name,
settings.ACCOUNT_ID,
self.api_id
)
try:
lambda_client.add_permission(
FunctionName=self.function_arn,
StatementId=str(uuid.uuid1()),
Action='lambda:InvokeFunction',
Principal='apigateway.amazonaws.com',
SourceArn=source_arn
)
except ClientError:
pass
def upload(processor, data):
user_token = data['user']
today = date.today()
blob = json.dumps(data, separators=(',', ':')).encode('utf-8')
blob = gzip.compress(blob, 7)
name = 'v2/sessions/%s/%s/%s/%s.json.gz' % (
today.year, today.month, user_token, uuid.uuid1().hex)
try:
processor.bucket.put(
name, blob,
ContentEncoding='gzip',
ContentType='application/json')
except ClientError: # pragma: no cover
processor.raven.captureException()
return False
return True
def insert(self, entry):
"""Insert an entry. Automatic generate **uuid1** as entry's ID.
Args:
``entry`` (dict): Entry to be inserted.
Return:
``string``: Entry's UUID.
"""
# Must generate uuid1 here, since id_=str(uuid.uuid1()) in def args
# will return the same value all the times after first call.
id_ = str(uuid.uuid1())
if isinstance(entry, dict):
self._shelf[id_] = entry
return id_
else:
raise Exception('Entry is not a dict object')
def __init__(self,mat:Material.material,A,J,I33,I22,W33,W22,name=None):
"""
mat: material
A: area
J: Torsional constant
I33,I22: Iteria momentum
W33,W22: Bending modulus
"""
self.__mat=mat
self.__A=A
self.__J=J
self.__I33=I33
self.__I22=I22
self.__W33=W33
self.__W22=W22
self.__name=uuid.uuid1() if name==None else name
def __init__(self,origin, pt1, pt2, name=None):
"""
origin: 3x1 vector
pt1: 3x1 vector
pt2: 3x1 vector
"""
self.__origin=origin
vec1 = np.array([pt1[0] - origin[0] , pt1[1] - origin[1] , pt1[2] - origin[2]])
vec2 = np.array([pt2[0] - origin[0] , pt2[1] - origin[1] , pt2[2] - origin[2]])
cos = np.dot(vec1, vec2)/np.linalg.norm(vec1)/np.linalg.norm(vec2)
if cos == 1 or cos == -1:
raise Exception("Three points should not in a line!!")
self.__x = vec1/np.linalg.norm(vec1)
z = np.cross(vec1, vec2)
self.__z = z/np.linalg.norm(z)
self.__y = np.cross(self.z, self.x)
self.__name=uuid.uuid1() if name==None else name
def __init__(self,x,y,z,name=None):
self.__x=x
self.__y=y
self.__z=z
o=[x,y,z]
pt1=[x+1,y,z]
pt2=[x,y+1,z]
self.__local_csys=CoordinateSystem.cartisian(o,pt1,pt2)
self.__restraint=[False]*6
self.__load=[0]*6
self.__disp=[0]*6
self.__name=uuid.uuid1() if name==None else name
self.__hid=None #hidden id
#results
self.__res_disp=None
self.__res_force=None
def setUp(self):
super(TestCase, self).setUp()
# We use Yakutsk, Russia timezone to check if they convert right
# with the local one
self.other_timezone = pytz.timezone('Asia/Yakutsk')
if get_localzone() == self.other_timezone:
self.other_timezone = pytz.timezone('Europe/Vienna')
self.client = Client(None, 'http://localhost').rest_client
self.client.timezone = self.other_timezone
self.a_date = datetime.datetime(
1986, 3, 6, 10, 28, 47,
tzinfo=pytz.UTC,
).astimezone(pytz.timezone('Europe/Vienna'))\
.astimezone(get_localzone())
self.a_uuid = uuid.uuid1()
def _make_context_header(
self,
switches=None,
correlation_id=None,
context_extra=None,
):
# Copy the underlying context object, if it was provided
context = dict(self.context.items()) if self.context else {}
# Either add on, reuse or generate a correlation ID
if correlation_id is not None:
context['correlation_id'] = correlation_id
elif 'correlation_id' not in context:
context['correlation_id'] = six.u(uuid.uuid1().hex)
# Switches can come from three different places, so merge them
# and ensure that they are unique
switches = set(switches or [])
if context_extra:
switches |= set(context_extra.pop('switches', []))
context['switches'] = list(set(context.get('switches', [])) | switches)
# Add any extra stuff
if context_extra:
context.update(context_extra)
return context
def oauth_post_request(uri, oauth_token, origin, msg):
uid = uuid.uuid1()
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'Origin': origin,
'Client-Request-Id': 'WebSDK/%s' % uid,
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-MS-Correlation-Id': uid,
'X-Ms-Namespace': 'internal',
'X-Ms-SDK-Instance': USER_AGENT,
'Referer': origin + '/'
}
response = requests.post(uri, data=json.dumps(msg), headers=headers, verify=False)
response.raise_for_status()
if response.text != '':
return response.json()
else:
return {}
def oauth_post_text_request(uri, oauth_token, origin, data):
uid = uuid.uuid1()
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'Origin': origin,
'Client-Request-Id': 'WebSDK/%s' % uid,
'Accept': 'application/json',
'Content-Type': 'text/plain',
'X-MS-Correlation-Id': uid,
'X-Ms-Namespace': 'internal',
'X-Ms-SDK-Instance': USER_AGENT,
'Referer': origin + '/'
}
response = requests.post(uri, data=data, headers=headers, verify=False)
response.raise_for_status()
if response.text != '':
return response.json()
else:
return {}
def oauth_stream_request(uri, oauth_token, origin):
uid = uuid.uuid1()
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'Origin': origin,
'Client-Request-Id': 'WebSDK/%s' % uid,
'Accept': 'application/json',
'X-Requested-With': 'XMLHttpRequest',
'X-MS-Correlation-Id': uid,
'X-Ms-Namespace': 'internal',
'X-Ms-SDK-Instance': USER_AGENT,
'Referer': origin + '/'
}
response = requests.get(uri, headers=headers, stream=True, verify=False)
response.raise_for_status()
return response
def oauth_request(uri, oauth_token, origin):
uid = uuid.uuid1()
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'Origin': origin,
'Client-Request-Id': 'WebSDK/%s' % uid,
'Accept': 'application/json',
'X-Requested-With': 'XMLHttpRequest',
'X-MS-Correlation-Id': uid,
'X-Ms-Namespace': 'internal',
'X-Ms-SDK-Instance': USER_AGENT,
'Referer': origin + '/'
}
response = requests.get(uri, headers=headers, verify=False)
response.raise_for_status()
return response.json()
def __init__(self, api_version=1, request_id=None):
self.api_version = api_version
if request_id:
self.request_id = request_id
else:
self.request_id = str(uuid.uuid1())
self.ops = []