def shift_dataset(m,boundarynoise):
if boundarynoise==0:
return m
nonzero_rows=np.where(m.any(axis=1))[0]
small_m=copy.deepcopy(m)
small_m=small_m[nonzero_rows,:]
small_m=small_m[:,nonzero_rows]
print small_m
print 'roll'
small_m=np.roll(small_m,boundarynoise,axis=0)
print small_m
print 'roll2'
small_m=np.roll(small_m,boundarynoise,axis=1)
print small_m
outm=np.zeros(m.shape)
for i_idx in range(len(nonzero_rows)):
i=nonzero_rows[i_idx]
for j_idx in range(i_idx,len(nonzero_rows)):
j=nonzero_rows[j_idx]
outm[i,j]=small_m[i_idx,j_idx]
outm[j,i]=outm[i,j]
return outm
python类deepcopy()的实例源码
def test_get_pipeline_id(mock_get_properties, mock_get_details, mock_boto3):
"""Tests getting the pipeline ID from boto3"""
test_pipelines = [{
'pipelineIdList': [{
"name": "Test Pipeline",
"id": "1234"
}, {
"name": "Other",
"id": "5678"
}],
"hasMoreResults": False
}]
generated = {"project": "test"}
properties = copy.deepcopy(TEST_PROPERTIES)
mock_get_details.return_value.data = generated
mock_get_properties.return_value = properties
mock_boto3.return_value.get_paginator.return_value.paginate.return_value = test_pipelines
dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
dp.get_pipeline_id()
assert dp.pipeline_id == '1234'
def _merge_mapping(a, b):
"""
MERGE TWO MAPPINGS, a TAKES PRECEDENCE
"""
for name, b_details in b.items():
a_details = a[literal_field(name)]
if a_details.properties and not a_details.type:
a_details.type = "object"
if b_details.properties and not b_details.type:
b_details.type = "object"
if a_details:
a_details.type = _merge_type[a_details.type][b_details.type]
if b_details.type in ES_STRUCT:
_merge_mapping(a_details.properties, b_details.properties)
else:
a[literal_field(name)] = deepcopy(b_details)
return a
def floyd(matrix):
"""
Floyd's algorithm, straight from a textbook. Floyd's algorithm transforms a weight matrix
into a matrix of shortest paths, such that the shortest path from node M to node N is
equal to matrix[m][n]
:return: An array of shortest-path distance calculations.
"""
n = len(matrix)
spaths = deepcopy(matrix)
for k in range(n):
for i in range(n):
for j in range(n):
if spaths[i][k] + spaths[k][j] < spaths[i][j]:
spaths[i][j] = spaths[i][k] + spaths[k][j]
return spaths
def configure_logging(debug=False, verbose=True, stderr=True):
config = copy.deepcopy(LOG_CONFIG)
for handler in config["handlers"].values():
if verbose:
handler["level"] = "INFO"
if debug:
handler["level"] = "DEBUG"
if verbose:
config["handlers"]["stderr"]["formatter"] = "verbose"
if debug:
config["handlers"]["stderr"]["formatter"] = "debug"
if stderr:
config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr")
logging.config.dictConfig(config)
def agent_place(self, state):
next_state = copy.deepcopy(state)
agent = next_state.get_first_obj_of_class("agent")
agent.set_attribute("has_block", 0)
next_x = agent.get_attribute("x") + agent.get_attribute("dx")
next_y = agent.get_attribute("y") + agent.get_attribute("dy")
if self._is_lava_state_action(next_state, "forward"):
lava_remove = 0
for l in next_state.get_objects_of_class("lava"):
if next_x == l.get_attribute("x") and next_y == l.get_attribute("y"):
break
lava_remove += 1
next_state.get_objects_of_class("lava").pop(lava_remove)
else:
new_block = {"x": next_x, "y": next_y}
new_block_obj = self._make_oomdp_objs_from_list_of_dict([new_block], "block")
next_state.get_objects_of_class("block").append(new_block_obj[0])
return next_state
def agent_pickup(self, state):
'''
Args:
state (TaxiState)
'''
next_state = copy.deepcopy(state)
agent = next_state.get_first_obj_of_class("agent")
# update = False
if agent.get_attribute("has_passenger") == 0:
# If the agent does not have a passenger.
for i, passenger in enumerate(next_state.get_objects_of_class("passenger")):
if agent.get_attribute("x") == passenger.get_attribute("x") and agent.get_attribute("y") == passenger.get_attribute("y"):
# Pick up passenger at agent location.
agent.set_attribute("has_passenger", 1)
passenger.set_attribute("in_taxi", 1)
return next_state
def agent_dropoff(self, state):
'''
Args:
state (TaxiState)
Returns:
(TaxiState)
'''
next_state = copy.deepcopy(state)
# Get Agent, Walls, Passengers.
agent = next_state.get_first_obj_of_class("agent")
# agent = OOMDPObject(attributes=agent_att, name="agent")
passengers = next_state.get_objects_of_class("passenger")
if agent.get_attribute("has_passenger") == 1:
# Update if the agent has a passenger.
for i, passenger in enumerate(passengers):
if passenger.get_attribute("in_taxi") == 1:
# Drop off the passenger.
passengers[i].set_attribute("in_taxi", 0)
agent.set_attribute("has_passenger", 0)
return next_state
def sample_models(self, nsamples):
b = self.b_search
samples = []
choice_hists = []
for _ in xrange(nsamples):
bk = copy.deepcopy(b)
bk.initialize(self.in_d, Scope())
hist = []
while( not bk.is_specified() ):
name, vals = bk.get_choices()
#print(name, vals)
assert len(vals) > 1
choice_i = np.random.randint(0, len(vals))
bk.choose(choice_i)
hist.append(choice_i)
# keep the sampled model once specified.
samples.append(bk)
choice_hists.append(hist)
return (samples, choice_hists)
def resource_map():
'''
Dynamically generate a map of resources that will be managed for a single
hook execution.
'''
resource_map = deepcopy(BASE_RESOURCE_MAP)
return resource_map
def points(self):
"""List[tuple(int, int)]: List of all points to define regions around."""
return copy.deepcopy(self._points) # list is mutable, don't allow user to alter it
def _load(self):
"""Load cached settings from JSON file `self._filepath`."""
self._nosave = True
d = {}
with open(self._filepath, 'rb') as file_obj:
for key, value in json.load(file_obj, encoding='utf-8').items():
d[key] = value
self.update(d)
self._original = deepcopy(d)
self._nosave = False
def _pop_token(self, lineno: int, token_value: str) -> Token:
tokensline = self._lines[lineno - 1]
# Pop the first token with the same name in the same line
for t in tokensline:
if t.name != 'STRING':
line_value = t.value
else:
if t.value[0] == 'f' and t.value[1] in ('"', "'"):
# fstring: token identify as STRING but they parse into the AST as a
# collection of nodes so the token_value is different. To find the
# real token position we'll search inside the fstring token value.
tok_subpos = t.value.find(token_value)
if tok_subpos != -1:
# We don't remove the fstring token from the line in this case; other
# nodes could match different parts of it
newtok = deepcopy(t)
newtok.start.col = t.start.col + tok_subpos
return newtok
raise TokenNotFoundException("Could not find token '{}' inside f-string '{}'"
.format(token_value, t.value))
else:
# normal string; they include the single or double quotes so we liteval
line_value = literal_eval(t.value)
if str(line_value) == str(token_value):
tokensline.remove(t)
return t
raise TokenNotFoundException("Token named '{}' not found in line {}"
.format(token_value, lineno))
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
result.floor = deepcopy(self.floor, memo)
if self.content == None:
result.content = None
else:
result.content = dict()
for k, v in self.content.items():
if k not in ['move', MEMORY]:
result.content[k] = deepcopy(v, memo)
return result
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k != 'ai': # Don't copy the ai
setattr(result, k, deepcopy(v, memo))
return result
def log_state(state):
if config.interactive:
print(state)
input()
if config.save_replay:
replay['states'].append(copy.deepcopy(state))
def __init__(self, data_, n_clusters):
self.position = []
if not self.position:
for i in range(len(data_)):
self.position.append(random.randrange(n_clusters))
print(self.position)
self.velocity = [0 for clusterPoint in self.position]
self.S = [0 for clusterPoint in self.position]
self.best = deepcopy(self.position)
self.bestfit = 0
def test_blacklist_bad_files(self):
"""
Blacklist testing of bad files
"""
sf = NNTPSimpleFilter()
entry = copy(self.template_entry)
# hash table always starts empty and is populated on demand
assert len(sf._regex_hash) == 0
# Test against bad file extensions:
for e in [ 'exe', 'pif', 'application', 'gadget', 'msi', 'msp', 'com',
'scr', 'hta', 'cpl', 'msc', 'jar', 'bat', 'vb', 'vbs',
# Encrypted VBE Script file
'vbe',
# Javascript (Windows can execute these outside of browsers)
# so treat it as bad
'js', 'jse',
# Windows Script File
'ws', 'wsf',
# Windows PowerShell Scripts
'ps1', 'ps1xml', 'ps2', 'ps2xml', 'psc1', 'psc2',
# Monad Scripts (later renamed to Powershell)
'msh', 'msh1', 'msh1xml', 'msh2', 'msh2xml',
# Windows Explorer Command file
'scf',
# A link to a program on your computer (usually
# populated with some malicious content)
'lnk',
# A text file used by AutoRun
'inf',
# A windows registry file
'reg',
]:
entry['subject'] = 'What.A.Great.Show (1/1) ' +\
'"what.a.great.show.%s" Yenc (1/1)' % e
assert sf.blacklist(**entry) == True
def test_scoring_video_files(self):
"""
Test that we correctly score video files
"""
sf = NNTPSimpleFilter()
entry = copy(self.template_entry)
# Expected Score
score = 25
# Test against video files:
for e in [ 'avi', 'mpeg', 'mpg', 'mp4', 'mov', 'mkv', 'asf',
'ogg', 'iso', 'rm' ]:
entry['subject'] = 'What.A.Great.Show (1/1) ' +\
'"what.a.great.show.%s" Yenc (1/1)' % e
assert sf.score(**entry) == score
# now test that we can support .??? extensions after
# the initial one
for i in range(1000):
entry['subject'] = 'What.A.Great.Show (1/1) ' +\
'"what.a.great.show.%s.%.3d" Yenc (1/1)' % (e, i)
assert sf.score(**entry) == score
def test_scoring_compressed_files(self):
"""
Test that we correctly score compressed files
"""
sf = NNTPSimpleFilter()
entry = copy(self.template_entry)
# Expected Score
score = 25
# Test against video files:
for e in [ 'rar', '7z', 'zip', 'tgz', 'tar.gz']:
entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
'"what.a.great.archive.%s" Yenc (1/1)' % e
assert sf.score(**entry) == score
# now test that we can support .??? extensions after
# the initial one
for i in range(1000):
entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
'"what.a.great.archive.%s.%.3d" Yenc (1/1)' % (e, i)
assert sf.score(**entry) == score
# Test Sub Rar and Zip files (R?? and Z??)
for e in [ 'r', 'z' ]:
for i in range(100):
entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
'"what.a.great.archive.%s%.2d" Yenc (1/1)' % (e, i)
assert sf.score(**entry) == score
for ii in range(1000):
entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
'"what.a.great.archive.%s%.2d.%.3d" Yenc (1/1)' % (
e, i, ii)
assert sf.score(**entry) == score