def test_write_timestamp_file(self, mock_dt):
"""
Assert a TaskRunner calls write_json with the expected args. Implicitly testing that we are using datetime.isoformat to derive
the value for the "RUN" key.
:param mock_dt: ??? not sure this patch is necessary
"""
fake_date_str = "not iso formatted"
mock_datetime_obj = mock.MagicMock()
mock_datetime_obj.isoformat = mock.MagicMock(return_value=fake_date_str)
mock_dt.datetime.now = mock.MagicMock(return_value=mock_datetime_obj)
self.task_runner_instance.write_json = mock.MagicMock()
self.task_runner_instance.write_timestamp_file()
self.task_runner_instance.write_json.assert_called_once_with({'RUN': fake_date_str})
python类isoformat()的实例源码
def test_reap_dead_node(self):
node = copy.deepcopy(self.dummy_node)
TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
instance = TestInstance(datetime.now(pytz.utc))
ready_condition = None
for condition in node['status']['conditions']:
if condition['type'] == 'Ready':
ready_condition = condition
break
ready_condition['status'] = 'Unknown'
ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(minutes=30))
kube_node = KubeNode(pykube.Node(self.api, node))
kube_node.delete = mock.Mock(return_value="mocked stuff")
self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
kube_node.delete.assert_not_called()
ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
kube_node = KubeNode(pykube.Node(self.api, node))
kube_node.delete = mock.Mock(return_value="mocked stuff")
self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
kube_node.delete.assert_called_once_with()
def test_max_scale_in(self):
node1 = copy.deepcopy(self.dummy_node)
node2 = copy.deepcopy(self.dummy_node)
TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
instance1 = TestInstance(datetime.now(pytz.utc))
instance2 = TestInstance(datetime.now(pytz.utc))
for node in [node1, node2]:
for condition in node['status']['conditions']:
if condition['type'] == 'Ready':
condition['status'] = 'Unknown'
condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
break
kube_node1 = KubeNode(pykube.Node(self.api, node1))
kube_node1.delete = mock.Mock(return_value="mocked stuff")
kube_node2 = KubeNode(pykube.Node(self.api, node2))
kube_node2.delete = mock.Mock(return_value="mocked stuff")
self.cluster.maintain([kube_node1, kube_node2], {kube_node1.instance_id: instance1, kube_node2.instance_id: instance2}, {}, [], [])
kube_node1.delete.assert_not_called()
kube_node2.delete.assert_not_called()
def take_screenshot(driver: webdriver, page_name: str):
"""Will take a screenshot of current page.
:param driver: Any of the WebDrivers
:param page_name: page name which will be used in the screenshot filename
"""
if TAKE_SCREENSHOTS:
session_id = driver.session_id
browser = driver.capabilities.get("browserName", "unknown_browser")
version = driver.capabilities.get("version", "unknown_version")
platform = driver.capabilities.get("platform", "unknown_platform")
stamp = datetime.isoformat(datetime.utcnow())
filename = ("{}-{}-{}-{}-{}-{}.png"
.format(stamp, page_name, browser, version, platform,
session_id))
file_path = abspath(join("screenshots", filename))
driver.save_screenshot(file_path)
logging.debug(
"Screenshot of %s page saved in: %s", page_name, filename)
else:
logging.debug(
"Taking screenshots is disabled. In order to turn it on please set"
" n environment variable TAKE_SCREENSHOTS=true")
def accumulate_data(unique_station_sensor,data,date_filter):
"""
Prep the data in a format easy to push to the server
"""
rolled_up_data = {}
with open(data,'r') as fi:
r = csv.reader(fi)
for i, row in enumerate(r):
j = i-1
if i == 0:
continue
else:
if date_filter[j] and row[4] != "": #only process row if date_filter is true and value is not missing
data_id = (row[0],row[3]) #(stationid,parameter)
date_time = dateutil.parser.parse("{} {}".format(row[1],row[2]))
data_value = (datetime.isoformat(date_time),row[4])
#print(data_value)
#rolled_up_data.setdefault(data_id, []).append(data_value)
rolled_up_data.setdefault(data_id, {}).setdefault('values',[]).append(data_value)
for k,v in rolled_up_data.items():
count = len(v['values'])
rolled_up_data[k]['count']=count
return rolled_up_data
def create_zun_service(self, values):
values['created_at'] = datetime.isoformat(timeutils.utcnow())
zun_service = models.ZunService(values)
zun_service.save()
return zun_service
def update_zun_service(self, host, binary, values):
try:
target = self.client.read('/zun_services/' + host + '_' + binary)
target_value = json.loads(target.value)
values['updated_at'] = datetime.isoformat(timeutils.utcnow())
target_value.update(values)
target.value = json.dump_as_bytes(target_value)
self.client.update(target)
except etcd.EtcdKeyNotFound:
raise exception.ZunServiceNotFound(host=host, binary=binary)
except Exception as e:
LOG.error('Error occurred while updating service: %s',
six.text_type(e))
raise
def create_compute_node(self, context, values):
values['created_at'] = datetime.isoformat(timeutils.utcnow())
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
compute_node = models.ComputeNode(values)
compute_node.save()
return compute_node
test_cluster.py 文件源码
项目:Kubernetes-acs-engine-autoscaler
作者: wbuchwalter
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def setUp(self):
# load dummy kube specs
dir_path = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(dir_path, 'data/busybox.yaml'), 'r') as f:
self.dummy_pod = yaml.load(f.read())
with open(os.path.join(dir_path, 'data/ds-pod.yaml'), 'r') as f:
self.dummy_ds_pod = yaml.load(f.read())
with open(os.path.join(dir_path, 'data/rc-pod.yaml'), 'r') as f:
self.dummy_rc_pod = yaml.load(f.read())
with open(os.path.join(dir_path, 'data/node.yaml'), 'r') as f:
self.dummy_node = yaml.load(f.read())
for condition in self.dummy_node['status']['conditions']:
if condition['type'] == 'Ready' and condition['status'] == 'True':
condition['lastHeartbeatTime'] = datetime.now(condition['lastHeartbeatTime'].tzinfo)
# Convert timestamps to strings to match PyKube
for condition in self.dummy_node['status']['conditions']:
condition['lastHeartbeatTime'] = datetime.isoformat(condition['lastHeartbeatTime'])
condition['lastTransitionTime'] = datetime.isoformat(condition['lastTransitionTime'])
# this isn't actually used here
# only needed to create the KubePod object...
dir_path = os.path.dirname(os.path.realpath(__file__))
self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(os.path.join(dir_path, './data/kube_config.yaml')))
self.cluster = Cluster(
kubeconfig='~/.kube/config',
idle_threshold=60,
spare_agents=1,
instance_init_time=60,
resource_group='my-rg',
notifier=None,
service_principal_app_id='dummy',
service_principal_secret='dummy',
service_principal_tenant_id='dummy',
kubeconfig_private_key='dummy',
client_private_key='dummy',
ca_private_key='dummy',
ignore_pools='',
over_provision=0
)
def validUpgrade(nodeIds, tconf):
schedule = {}
unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
startAt = unow + timedelta(seconds=100)
acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
for i in nodeIds:
schedule[i] = datetime.isoformat(startAt)
startAt = startAt + timedelta(seconds=acceptableDiff + 3)
return dict(name='upgrade-13', version=bumpedVersion(), action=START,
schedule=schedule,
# sha256=get_valid_code_hash(),
sha256='db34a72a90d026dae49c3b3f0436c8d3963476c77468ad955845a1ccf7b03f55',
timeout=1)
def invalidUpgrade(nodeIds, tconf):
schedule = {}
unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
startAt = unow + timedelta(seconds=60)
acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
for i in nodeIds:
schedule[i] = datetime.isoformat(startAt)
startAt = startAt + timedelta(seconds=acceptableDiff - 3)
return dict(name='upgrade-14', version=bumpedVersion(), action=START,
schedule=schedule,
# sha256=get_valid_code_hash(),
sha256='46c715a90b1067142d548cb1f1405b0486b32b1a27d418ef3a52bd976e9fae50',
timeout=10)
def _timefmt(val):
return datetime.isoformat(datetime.utcfromtimestamp(val))
def get_sigmf_iso8601_datetime_now():
return datetime.isoformat(datetime.utcnow()) + 'Z'
def to_json(manager_class, attr):
try:
return datetime.isoformat(attr)
except:
return None
def generate_timestamp():
"""generate ISO8601 timestamp incl microsends, but with colons
replaced to avoid problems if used as file name
"""
return datetime.isoformat(datetime.now()).replace(":", "-")
def validUpgrade(nodeIds, tconf):
schedule = {}
unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
startAt = unow + timedelta(seconds=90)
acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
for i in nodeIds:
schedule[i] = datetime.isoformat(startAt)
startAt = startAt + timedelta(seconds=acceptableDiff + 3)
return dict(name='upgrade-13', version=bumpedVersion(), action=START,
schedule=schedule, sha256='aad1242', timeout=10)
def invalidUpgrade(nodeIds, tconf):
schedule = {}
unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
startAt = unow + timedelta(seconds=60)
acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
for i in nodeIds:
schedule[i] = datetime.isoformat(startAt)
startAt = startAt + timedelta(seconds=acceptableDiff - 3)
return dict(name='upgrade-14', version=bumpedVersion(), action=START,
schedule=schedule, sha256='ffd1224', timeout=10)
def __init__(self, args, optimizer, name, extra_msg=''):
self.iteration = 0
self.sum_loss = 0
self.sum_acc = 0
self.sum_mean_diff = 0
self.sum_max_diff = 0
self.current_section = ''
self.optimizer = optimizer
# setup according to arguments
self.name = name if name is not '' else 'signdist'
self.out_file = "{}_{}".format(date.isoformat(date.today()), self.name)
self.log_file = "{}.log".format(self.out_file)
# write config to head of the log file
self.write_config(args, extra_msg)
def write_config(self, args, extra_msg):
with open(self.log_file, 'a+') as f:
self._comment("=" * 40, f)
self._comment("{} initiated at {}".format(
self.name, datetime.isoformat(datetime.now())
), f)
self._comment("-" * 40, f) # arguments passed
self._comment("Data: " + args.data, f)
self._comment("Batchsize: {}".format(args.batchsize), f)
self._comment("Test ratio: {}".format(args.test), f)
self._comment("Hard triplet ratio: {}".format(args.skilled), f)
dev = "CPU" if args.gpu < 0 else "GPU ".format(args.gpu)
self._comment("Device: " + dev, f)
if args.initmodel:
self._comment("Init model: " + args.initmodel, f)
if args.resume:
self._comment("Resume state: " + args.resume, f)
self._comment("-" * 40, f) # parameters set in script
self._comment("Optimizer: " + self.optimizer.__class__.__name__, f)
self._comment("Initial LR: {}".format(self.optimizer.lr), f)
self._comment("LR interval: {}".format(args.lrinterval), f)
self._comment("Weight decay: {}".format(args.weight_decay), f)
self._comment("Epoch: {}".format(self.optimizer.epoch), f)
if extra_msg:
self._comment(extra_msg, f)
self._comment("-" * 40, f) # complete call
self._comment("{}".format(sys.argv), f)
self._comment("=" * 40, f)
def big_days(x=0):
"""
# page 17, RECORD 11
>>> parse_date( big_days(0) ).isoformat( )
'2012-11-20T21:53:41'
# page 17, ~ RECORD 12
>>> parse_date( big_days(1) ).isoformat( )
'2012-11-20T22:07:38'
>>> parse_date( big_days(2) ).isoformat( )
'2012-11-20T21:53:41'
>>> parse_date( big_days(3) ).isoformat( )
'2012-11-20T22:07:38'
# page 16, RECORD ~15
>>> parse_date( big_days(4) ).isoformat( )
'2012-11-25T16:41:34'
>>> parse_date( big_days(5) ).isoformat( )
'2012-11-25T13:54:32'
# page 15
>>> parse_date( big_days(6) ).isoformat( )
'2012-11-29T20:25:37'
# page 0
>>> parse_date( big_days(7) ).isoformat( )
'2012-12-20T14:59:02'
>>> parse_date( big_days(8) ).isoformat( )
'2012-12-20T15:28:25'
"""
return _bad_days[x]
def strptime(string, formats=None):
"""
Converts a date in string format into a datetime python object. The inverse can be obtained
by calling datetime.isoformat() (which returns 'T' as date time separator, and optionally
microseconds if they are not zero). This function is an easy version of
`dateutil.parser.parse` for parsing iso-like datetime format (e.g. fdnsws standard)
without the need of a module import
:param: string: if a datetime object, returns it. If date object, converts to datetime
and returns it. Otherwise must be a string representing a datetime
:type: string: a string, a date or a datetime object (in that case just returns it)
:param formats: if list or iterable, it holds the strings denoting the formats to be used
to convert string (in the order they are declared). If None (the default), the datetime
format will be guessed from the string length among the following (with optional 'Z', and
with 'T' replaced by space as vaild option):
- '%Y-%m-%dT%H:%M:%S.%fZ'
- '%Y-%m-%dT%H:%M:%SZ'
- '%Y-%m-%dZ'
:raise: ValueError if the string cannot be parsed
:type: on_err_return_none: object or Exception
:return: a datetime object
:Example:
strptime("2016-06-01T09:04:00.5600Z")
strptime("2016-06-01T09:04:00.5600")
strptime("2016-06-01 09:04:00.5600Z")
strptime("2016-06-01T09:04:00Z")
strptime("2016-06-01T09:04:00")
strptime("2016-06-01 09:04:00Z")
strptime("2016-06-01")
```
"""
if isinstance(string, datetime):
return string
try:
string = string.strip()
if formats is None:
has_z = string[-1] == 'Z'
has_t = 'T' in string
if has_t or has_z or ' ' in string:
t_str, z_str = 'T' if has_t else ' ', 'Z' if has_z else ''
formats = ['%Y-%m-%d{}%H:%M:%S.%f{}'.format(t_str, z_str),
'%Y-%m-%d{}%H:%M:%S{}'.format(t_str, z_str)]
else:
formats = ['%Y-%m-%d']
for dtformat in formats:
try:
return datetime.strptime(string, dtformat)
except ValueError: # as exce:
pass
raise ValueError("invalid date time '%s'" % str(string))
except ValueError:
raise
except Exception as exc:
raise ValueError(str(exc))
```
def _test_decode_bolus( ):
"""
## correct
>>> parse_date( bytearray( _bewest_dates['page-19'][6] ) ).isoformat( )
'2012-11-12T00:55:42'
## correct
>>> parse_date( bytearray( _bewest_dates['page-19'][0] ) ).isoformat( )
'2012-11-12T00:55:42'
## this is wrong
>>> parse_date( bytearray( _bewest_dates['page-19'][1] ) ).isoformat( )
'2012-04-10T12:12:00'
## day,month is wrong, time H:M:S is correct
# expected:
>>> parse_date( bytearray( _bewest_dates['page-19'][2] ) ).isoformat( )
'2012-02-08T03:11:12'
## correct
>>> parse_date( bytearray( _bewest_dates['page-19'][3] ) ).isoformat( )
'2012-11-12T08:03:11'
#### not a valid date
# >>> parse_date( bytearray( _bewest_dates['page-19'][4] ) ).isoformat( )
## correct
>>> parse_date( bytearray( _bewest_dates['page-19'][5] ) ).isoformat( )
'2012-11-12T08:03:13'
"""
"""
0x5b 0x7e # bolus wizard,
0xaa 0xf7 0x00 0x0c 0x0c # page-19[0]
0x0f 0x50 0x0d 0x2d 0x6a 0x00 0x0b 0x00
0x00 0x07 0x00 0x0b 0x7d 0x5c 0x08 0x58
0x97 0x04 0x30 0x05 0x14 0x34 0xc8
0x91 0xf8 # 0x91, 0xf8 = month=11, minute=56, seconds=17!
0x00 # general parsing fails here
0x00
0xaa 0xf7 0x40 0x0c 0x0c # expected - page-19[6]
0x0a 0x0c
0x8b 0xc3 0x28 0x0c 0x8c # page-19[3]
0x5b 0x0c
0x8d 0xc3 0x08 0x0c 0x0c # page-19[5]
0x00 0x51 0x0d 0x2d 0x6a 0x1f 0x00 0x00 0x00 0x00 0x00
"""
### csv deconstructed
def decode_wizard(data):
"""
BYTE
01:
02:
03:
04:
05:
06:
07:
08:
09:
10:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
"""
head = data[:2]
date = data[2:7]
datetime = parse_date(date)
body = data[7:]
bg = lib.BangInt([ body[1] & 0x0f, head[1] ])
carb_input = int(body[0])
carb_ratio = int(body[2])
bg_target_low = int(body[5])
bg_target_high = int(body[3])
sensitivity = int(body[4])
print "BOLUS WIZARD", datetime.isoformat( )
wizard = { 'bg_input': bg, 'carb_input': carb_input,
'carb_ratio': carb_ratio,
'insulin_sensitivity': sensitivity,
'bg_target_low': bg_target_low,
'bg_target_high': bg_target_high,
}
return wizard