def __rgb2hlv(I_rgb, repetitions=1):
I_hlv = []
for p in prange(0,len(I_rgb)):
r = I_rgb[p, pgc.CH_RED]
g = I_rgb[p, pgc.CH_GREEN]
b = I_rgb[p, pgc.CH_BLUE]
lum = np.sqrt(.241 * r + .691 * g + .068 * b)
h, s, v = colorsys.rgb_to_hsv(r, g, b)
h2 = int(h * repetitions)
lum2 = int(lum * repetitions)
v2 = int(v * repetitions)
if h2 % 2 == 1:
v2 = repetitions - v2
lum = repetitions - lum2
I_hlv.append((h2, lum, v2))
return np.array(I_hlv, dtype=[('h', '<i4'), ('l', '<i4'), ('v', '<i4')])
python类prange()的实例源码
def reverb(I, delay_pixels, decay = 0.5):
assert delay_pixels != 0, "delay_pixels must be not zero"
x = pgc.to_1d_array(I)
if delay_pixels > 0:
for i in prange(0, len(x) - delay_pixels-1):
# WARNING: overflow potential
# x[i + delay_pixels] += (x[i] * decay).astype(np.uint8)
x[i + delay_pixels] += (x[i] * decay)
elif delay_pixels < 0:
for i in prange(len(x)-1, -delay_pixels+1, -1):
# WARNING: overflow potential
# x[i + delay_pixels] += (x[i] * decay).astype(np.uint8)
x[i + delay_pixels] += (x[i] * decay)
I = np.reshape(x, I.shape)
return I
# return I.astype(np.uint8)
#TODO: optimize code
def flanger(X, max_time_delay=0.003, rate=1, Fs=44100, amp=0.7):
I = X.copy()
if pgc.num_channels(X) == 3:
I = __flangerRGB(I, max_time_delay, rate, Fs, amp)
return I
else:
x = pgc.to_1d_array(I)
idx = np.arange(0, len(x))
sin_ref = (np.sin(2 * math.pi * idx * (rate / Fs)))
max_samp_delay = round(max_time_delay * Fs)
y = np.zeros(len(x))
y[1: max_samp_delay] = x[1: max_samp_delay]
for i in prange(max_samp_delay+1, len(x)):
cur_sin = np.abs(sin_ref[i])
cur_delay = math.ceil(cur_sin * max_samp_delay)
y[i] = (amp * x[i]) + amp * (x[i - cur_delay])
I = np.reshape(y, I.shape)
return I
# return I.astype(np.uint8)
def __flangerRGB(I, max_time_delay, rate, Fs, amp):
x0 = pgc.to_1d_array(I[:,:,0])
x1 = pgc.to_1d_array(I[:, :, 1])
x2 = pgc.to_1d_array(I[:, :, 2])
idx = np.arange(0, len(x0))
sin_ref = (np.sin(2 * math.pi * idx * (rate / Fs)))
max_samp_delay = round(max_time_delay * Fs)
y0 = np.zeros(len(x0))
y0[1: max_samp_delay] = x0[1: max_samp_delay]
y1 = np.zeros(len(x0))
y1[1: max_samp_delay] = x0[1: max_samp_delay]
y2 = np.zeros(len(x0))
y2[1: max_samp_delay] = x0[1: max_samp_delay]
for i in prange(max_samp_delay+1, len(x0)):
cur_sin = np.abs(sin_ref[i])
cur_delay = math.ceil(cur_sin * max_samp_delay)
y0[i] = (amp * x0[i]) + amp * (x0[i - cur_delay])
y1[i] = (amp * x1[i]) + amp * (x1[i - cur_delay])
y2[i] = (amp * x2[i]) + amp * (x2[i - cur_delay])
I[:, :, 0] = np.reshape(y0, (pgc.height(I), pgc.width(I)))
I[:, :, 1] = np.reshape(y1, (pgc.height(I), pgc.width(I)))
I[:, :, 2] = np.reshape(y2, (pgc.height(I), pgc.width(I)))
return I
def numba_kde_multithread(eval_points, samples, bandwidths):
result = np.zeros_like(eval_points)
# SPEEDTIP: Parallelize over evaluation points with prange()
for i in numba.prange(len(eval_points)):
eval_x = eval_points[i]
for sample, bandwidth in zip(samples, bandwidths):
result[i] += gaussian((eval_x - sample) / bandwidth) / bandwidth
result[i] /= len(samples)
return result
#### END: numba_multithread
def potential_numba_scalar_prange(cluster):
energy = 0.0
# numba.prange requires parallel=True flag to compile.
# It causes the loop to run in parallel in multiple threads.
for i in numba.prange(len(cluster)-1):
for j in range(i + 1, len(cluster)):
r = distance_numba_scalar_prange(cluster[i], cluster[j])
e = lj_numba_scalar_prange(r)
energy += e
return energy
def GetPotentialParallel(x,tree, G, theta):
result = empty(x.shape[0])
for i in prange(x.shape[0]):
result[i] = G*PotentialWalk(x[i],0.,tree,theta)
return result
def GetAccelParallel(x, tree, G, theta):
result = empty(x.shape)
for i in prange(x.shape[0]):
result[i] = G*ForceWalk(x[i], zeros(3), tree, theta)
return result
def test_array_reduce(self):
def test_impl(N):
A = np.ones(3);
B = np.ones(3);
for i in numba.prange(N):
A += B
return A
hpat_func = hpat.jit(test_impl)
n = 128
np.testing.assert_allclose(hpat_func(n), test_impl(n))
self.assertEqual(count_array_OneDs(), 0)
self.assertEqual(count_parfor_OneDs(), 1)
def shift_rows_sine(X, start, num_rows, offset, phase, freq, padding):
y = np.zeros(num_rows)
for i in range(0,len(phase)):
y += np.sin(phase[i] + freq[i] * 2 * np.pi * np.arange(0, num_rows) / int(num_rows/2))
_offset = np.multiply(offset, y)
I = X.copy()
pad = None
if padding == PADDING_CIRCULAR:
for i in prange(0, len(_offset)):
row = np.roll(I[start + i , 0:-1], int(_offset[i]), axis=0)
I[start + i, 0:-1] = row
return I
def pixel_sort_brighter_than_rgb(X, r, g, b, strict=False, sorting_order=('h', 'l', 'v'), iterations=8):
"""begin sorting when it finds a pixel which is not (r,g,b) in the column or row,
and will stop sorting when it finds a (r,g,b) pixel"""
I = X.copy()
for row in prange(0, pgc.height(I)):
if strict:
from_idx = np.argwhere((I[row, :-1, pgc.CH_RED] > r) & (I[row, :-1, pgc.CH_GREEN] > g)
& (I[row, :-1, pgc.CH_BLUE] > b))
else:
from_idx = np.argwhere((I[row, :-1, pgc.CH_RED] > r) | (I[row, :-1, pgc.CH_GREEN] > g)
| (I[row, :-1, pgc.CH_BLUE] > b))
to_idx = np.argwhere((I[row, :-1, pgc.CH_RED] <= r) & (I[row, :-1, pgc.CH_GREEN] <= g)
& (I[row, :-1, pgc.CH_BLUE] <= b))
if from_idx.size > 0 and to_idx.size > 0:
i = from_idx[0][0]
matches = np.argwhere(to_idx > i)
while not matches.size == 0:
j = to_idx[matches[0][0]][0]
I_hlv = __rgb2hlv(I[row, i:j, 0:3], iterations)
sort_idx = np.argsort(I_hlv, order=sorting_order)
I[row, i:j] = I[row, i+sort_idx]
matches_i = np.argwhere(from_idx > j)
if matches_i.size == 0:
break
else:
i = from_idx[matches_i[0][0]][0]
matches = np.argwhere(to_idx > i)
return I
def __pixelate(X, block_height, block_width, h, w, _operator):
for r in prange(0,h,block_height):
for c in prange(0, w, block_width):
m_val = _operator(X[r:r+block_height,c:c+block_width,:], axis=(0,1))
X[r:r+block_height,c:c+block_width,:] = m_val
return X.astype(np.uint8)
# from http://www.alanzucconi.com/2015/09/30/colour-sorting/
def __tremoloRGB(I, Fc=5, alpha=0.5, Fs=44100):
index = np.arange(0, pgc.width(I)*pgc.height(I))
trem = (1 + alpha * np.sin(2 * np.pi * index * (Fc / Fs)))
for c in prange(0, 2):
x = pgc.to_1d_array(I[:, :, c])
y = np.multiply(x, trem)
I[:, :, c] = np.reshape(y, (pgc.height(I), pgc.width(I)))
return I
def compute_force( nbodies, child, center_of_mass, mass, cell_radius, particles, energy):
for i in numba.prange(particles.shape[0]):
acc = numba_functions.computeForce( nbodies, child, center_of_mass, mass, cell_radius, particles[i] )
energy[i, 2] = acc[0]
energy[i, 3] = acc[1]
def extrapolate_precipitation(prec, altitudes, met_station_height):
"""Extrapolate precipitation to any given height.
This function can be used to extrapolate precipitation data from the height
of the meteorological measurement station to any given height. The routine
is take from the Excel version, which was released by the Cemaneige's
authors [1].
Args:
prec: Numpy [t] array, which contains the precipitation input as
measured at the meteorological station.
altitudes: Numpty [n] array of the median altitudes of each elevation
layer.
met_station_height: Scalar, which is the elevation above sea level of
the meteorological station.
Returns:
layer_prec: Numpy [t,n] array, with the precipitation of each elevation
layer n.
[1] https://webgr.irstea.fr/en/modeles/modele-de-neige/
"""
# precipitation gradient [1/m] defined in Cemaneige excel version
beta_altitude = 0.0004
# elevation threshold
z_thresh = 4000
# Number of elevation layer
num_layers = len(altitudes)
# Number of timesteps
num_timesteps = prec.shape[0]
# array for extrapolated precipitation of each elevation layer
layer_prec = np.zeros((num_timesteps, num_layers), dtype=np.float64)
# different extrapolation schemes depending on layer elevation
for l in prange(num_layers):
# layer median height smaller than threshold value
if altitudes[l] <= z_thresh:
layer_prec[:, l] = prec * np.exp((altitudes[l] - met_station_height)
* beta_altitude)
# layer median greater than threshold value
else:
# elevation of meteorological station smaller than threshold
if met_station_height <= z_thresh:
layer_prec[:, l] = prec * np.exp((z_thresh - met_station_height)
* beta_altitude)
# no extrapolation if station and layer median above threshold
else:
layer_prec[:, l] = prec
return layer_prec
def extrapolate_temperature(min_temp, mean_temp, max_temp, altitudes,
met_station_height):
"""Extrapolate temperature to any given height.
This function can be used to extrapolate temperature data from the height
of the meteorological measurement station to any given height. The routine
is take from the Excel version, which was released by the Cemaneige's
authors [1].
Args:
min_temp: Numpy [t] array, which contains the daily min temperature.
mean_temp: Numpy [t] array, which contains the daily mean temperature.
max_temp: Numpy [t] array, which contains the daily max temperature.
altitudes: Numpty [n] array of the median altitudes of each elevation
layer.
met_station_height: Scalar, which is the elevation above sea level of
the meteorological station.
Returns:
layer_min_temp: Numpy [t,n] array, layer-wise minium daily temperature.
layer_mean_temp: Numpy [t,n] array, layer-wise mean daily temperature.
layer_max_temp: Numpy [t,n] array, layer-wise maximum daily temperature.
[1] https://webgr.irstea.fr/en/modeles/modele-de-neige/
"""
# temperature gradient [mm/m] defined in cema neige excel version
theta_temp = -0.0065
# Number of elevation layer
num_layers = len(altitudes)
# Number of timesteps
num_timesteps = min_temp.shape[0]
# initialize arrays for each variable and all layer
layer_min_temp = np.zeros((num_timesteps, num_layers), dtype=np.float64)
layer_mean_temp = np.zeros((num_timesteps, num_layers), dtype=np.float64)
layer_max_temp = np.zeros((num_timesteps, num_layers), dtype=np.float64)
for l in prange(num_layers):
delta_temp = (altitudes[l] - met_station_height) * theta_temp
# add delta temp to each temp variable
layer_min_temp[:, l] = min_temp + delta_temp
layer_mean_temp[:, l] = mean_temp + delta_temp
layer_max_temp[:, l] = max_temp + delta_temp
return layer_min_temp, layer_mean_temp, layer_max_temp