def sample(self):
return random.lognormvariate(self.mean, self.std)
python类lognormvariate()的实例源码
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in range(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in list(heap._len_to_seq.values()):
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in xrange(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in heap._len_to_seq.values():
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in xrange(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in heap._len_to_seq.values():
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in range(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in list(heap._len_to_seq.values()):
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in range(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in list(heap._len_to_seq.values()):
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in xrange(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in heap._len_to_seq.values():
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def interkey_interval():
"""in milliseconds"""
# return 0 # makes testing faster
return (random.lognormvariate(0.0, 0.5) * 30.0) / 1000.0
return float(random.randrange(10, 50)) / 1000.0
def sample(self):
return random.lognormvariate(self.mean, self.std)
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in range(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in list(heap._len_to_seq.values()):
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def CreatData():
"""
1.?????????????
2.????-2.5?+2.5?
3.?????func_(x)
"""
RandNumberX = arange(-2.5,2.5,5.0/DALL)
RandNumberY=[]
X=[]
Xc=[]
Y = []
Yc=[]
for i in range(len(RandNumberX)):
if (i+1)%3==0:
Xc.append(RandNumberX[i])
else:
X.append(RandNumberX[i])
for x in RandNumberX:
#RandNumberY.append(func_(x)+random.lognormvariate(0, 1)) #????
RandNumberY.append(func_(x)+ uniform(-0.2, 0.2))
for i in range(len(RandNumberY)):
if (i+1)%3==0:
Yc.append(RandNumberY[i])
else:
Y.append(RandNumberY[i])
return X,Y,Xc,Yc,RandNumberX,RandNumberY
def CreatData():
"""
1.?????????????
2.????-2.5?+2.5?
3.?????func_(x)
"""
RandNumberX = arange(-2.5,2.5,5.0/DALL)
RandNumberY=[]
X=[]
Xc=[]
Y = []
Yc=[]
for i in range(len(RandNumberX)):
if (i+1)%3==0:
Xc.append(RandNumberX[i])
else:
X.append(RandNumberX[i])
for x in RandNumberX:
#RandNumberY.append(func_(x)+random.lognormvariate(0, 1)) #????
RandNumberY.append(func_(x)+ uniform(-0.2, 0.2))
for i in range(len(RandNumberY)):
if (i+1)%3==0:
Yc.append(RandNumberY[i])
else:
Y.append(RandNumberY[i])
return X,Y,Xc,Yc,RandNumberX,RandNumberY
def CreatData():
"""
1.?????????????
2.????-2.5?+2.5?
3.?????func_(x)
"""
RandNumberX = arange(-2.5,2.5,5.0/DALL)
RandNumberY=[]
X=[]
Xc=[]
Y = []
Yc=[]
for i in range(len(RandNumberX)):
if (i+1)%3==0:
Xc.append(RandNumberX[i])
else:
X.append(RandNumberX[i])
for x in RandNumberX:
#RandNumberY.append(func_(x)+random.lognormvariate(0, 1)) #????
RandNumberY.append(func_(x)+ uniform(-0.2, 0.2))
for i in range(len(RandNumberY)):
if (i+1)%3==0:
Yc.append(RandNumberY[i])
else:
Y.append(RandNumberY[i])
return X,Y,Xc,Yc,RandNumberX,RandNumberY
def CreatData():
"""
1.?????????????
2.????-2.5?+2.5?
3.?????func_(x)
"""
RandNumberX = arange(-2.5,2.5,5.0/DataMaxSize)
RandNumberY=[]
for x in RandNumberX:
#RandNumberY.append(func_(x)+random.lognormvariate(0, 1)) #????
RandNumberY.append(func_(x)+ uniform(-0.2, 0.2))
return RandNumberX,RandNumberY
def _execute(self, sources, alignment_stream, interval):
if alignment_stream is None:
raise ToolExecutionError("Alignment stream expected")
for ti, _ in alignment_stream.window(interval, force_calculation=True):
yield StreamInstance(ti, random.lognormvariate(mu=self.mu, sigma=self.sigma))
def test_heap(self):
iterations = 5000
maxblocks = 50
blocks = []
# create and destroy lots of blocks of different sizes
for i in xrange(iterations):
size = int(random.lognormvariate(0, 1) * 1000)
b = multiprocessing.heap.BufferWrapper(size)
blocks.append(b)
if len(blocks) > maxblocks:
i = random.randrange(maxblocks)
del blocks[i]
# XXX There should be a better way to release resources for a
# single block
if i % maxblocks == 0:
import gc; gc.collect()
# get the heap object
heap = multiprocessing.heap.BufferWrapper._heap
# verify the state of the heap
all = []
occupied = 0
heap._lock.acquire()
self.addCleanup(heap._lock.release)
for L in heap._len_to_seq.values():
for arena, start, stop in L:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'free'))
for arena, start, stop in heap._allocated_blocks:
all.append((heap._arenas.index(arena), start, stop,
stop-start, 'occupied'))
occupied += (stop-start)
all.sort()
for i in range(len(all)-1):
(arena, start, stop) = all[i][:3]
(narena, nstart, nstop) = all[i+1][:3]
self.assertTrue((arena != narena and nstart == 0) or
(stop == nstart))
def crossover(self, ind):
"""Used by the evolution process to generate a new individual.
Notes
-----
This is a tweaked version of the classical DE crossover
algorithm, the main difference that candidate parameters are
generated using a lognormal distribution. Bound handling is
achieved by resampling where the candidate solution exceeds +/-1
Parameters
----------
Returns
-------
y: deap individual
An individual representing a candidate solution, to be
assigned a fitness.
"""
if self._params['neighbours']:
a, b, c = random.sample([self.population[i]
for i in ind.neighbours], 3)
else:
a, b, c = random.sample(self.population, 3)
y = self.toolbox.clone(a)
y.ident = ind.ident
y.neighbours = ind.neighbours
del y.fitness.values
# y should now be a copy of ind with the vector elements from a
ident = random.randrange(len(self._params['value_means']))
for i, value in enumerate(y):
if i == ident or random.random() < self._params['cxpb']:
entry = a[i] + random.lognormvariate(-1.2, 0.5) * \
self._params['diff_weight'] * (b[i] - c[i])
tries = 0
while abs(entry) > 1.0:
tries += 1
entry = a[i] + random.lognormvariate(-1.2, 0.5) * \
self._params['diff_weight'] * (b[i] - c[i])
if tries > 10000:
entry = a[i]
y[i] = entry
return y
def _crossover(self, ind):
"""Used by the evolution process to generate a new individual.
Notes
-----
This is a tweaked version of the classical DE crossover
algorithm, the main difference that candidate parameters are
generated using a lognormal distribution. Bound handling is
achieved by resampling where the candidate solution exceeds +/-1
Parameters
----------
ind : deap individual
Returns
-------
y : deap individual
An individual representing a candidate solution, to be
assigned a fitness.
"""
if self.neighbours:
a, b, c = random.sample([self.population[i]
for i in ind.neighbours], 3)
else:
a, b, c = random.sample(self.population, 3)
y = self.toolbox.clone(a)
y.ident = ind.ident
y.neighbours = ind.neighbours
del y.fitness.values
# y should now be a copy of ind with the vector elements from a
ident = random.randrange(len(self.value_means))
for i, value in enumerate(y):
if i == ident or random.random() < self.cxpb:
entry = a[i] + random.lognormvariate(-1.2, 0.5) * \
self.diff_weight * (b[i] - c[i])
tries = 0
while abs(entry) > 1.0:
tries += 1
entry = a[i] + random.lognormvariate(-1.2, 0.5) * \
self.diff_weight * (b[i] - c[i])
if tries > 10000:
entry = a[i]
y[i] = entry
return y