def get_local_wavenumbermesh(self, scaled=True, broadcast=False,
eliminate_highest_freq=False):
kx = fftfreq(self.N[0], 1./self.N[0])
ky = rfftfreq(self.N[1], 1./self.N[1])
if eliminate_highest_freq:
for i, k in enumerate((kx, ky)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
Ks = np.meshgrid(kx, ky[self.rank*self.Np[1]//2:(self.rank*self.Np[1]//2+self.Npf)], indexing='ij', sparse=True)
if scaled is True:
Lp = 2*np.pi/self.L
Ks[0] *= Lp[0]
Ks[1] *= Lp[1]
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
python类broadcast_to()的实例源码
def _broadcast_shape(*args):
"""Returns the shape of the ararys that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def get_scipy_batch_logpdf(self, idx):
if not self.scipy_arg_fn:
return
dist_params = self.get_dist_params(idx, wrap_tensor=False)
dist_params_wrapped = self.get_dist_params(idx)
dist_params = self._convert_logits_to_ps(dist_params)
test_data = self.get_test_data(idx, wrap_tensor=False)
test_data_wrapped = self.get_test_data(idx)
shape = self.pyro_dist.shape(test_data_wrapped, **dist_params_wrapped)
batch_log_pdf = []
for i in range(len(test_data)):
batch_params = {}
for k in dist_params:
param = np.broadcast_to(dist_params[k], shape)
batch_params[k] = param[i]
args, kwargs = self.scipy_arg_fn(**batch_params)
if self.is_discrete:
batch_log_pdf.append(self.scipy_dist.logpmf(test_data[i],
*args,
**kwargs))
else:
batch_log_pdf.append(self.scipy_dist.logpdf(test_data[i],
*args,
**kwargs))
return batch_log_pdf
def test_max_unbounded(self):
n_batch = 7
ndim_action = 3
mu = np.random.randn(n_batch, ndim_action).astype(np.float32)
mat = np.broadcast_to(
np.eye(ndim_action, dtype=np.float32)[None],
(n_batch, ndim_action, ndim_action))
v = np.random.randn(n_batch).astype(np.float32)
q_out = action_value.QuadraticActionValue(
chainer.Variable(mu),
chainer.Variable(mat),
chainer.Variable(v))
v_out = q_out.max
self.assertIsInstance(v_out, chainer.Variable)
v_out = v_out.data
np.testing.assert_almost_equal(v_out, v)
def _broadcast_shape(*args):
"""Returns the shape of the ararys that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def compute_convolution_nd(data, kernel, dimension: int, mode=ConvolutionMode.valid, element_wise: bool=False):
mode_string = __get_convolution_mode_string(mode)
result = []
data_prefix_shape = data.shape[:-dimension]
kernel_prefix_shape = kernel.shape[:-dimension]
if element_wise:
final_shape = element_wise_shape(data_prefix_shape, kernel_prefix_shape)[0]
data = numpy.broadcast_to(data, final_shape + data.shape[-2:])
kernel = numpy.broadcast_to(kernel, final_shape + kernel.shape[-2:])
if final_shape:
for index in array_index_traversal(final_shape):
result.append(__compute_convolution_nd(data[index], kernel[index], dimension, mode_string))
return numpy.array(result).reshape(final_shape + result[0].shape)
else:
return __compute_convolution_nd(data, kernel, dimension, mode_string)
else:
if kernel_prefix_shape:
final_shape = data_prefix_shape + kernel_prefix_shape + basic_convolution_shape(data.shape[-dimension:], kernel.shape[-dimension:], dimension, mode_string)
result = numpy.zeros(final_shape)
for kernel_index in array_index_traversal(kernel_prefix_shape):
sub_result_index = tuple(slice(None) for _ in data_prefix_shape) + kernel_index + tuple(slice(None) for _ in range(dimension))
result[sub_result_index] = __compute_convolution_nd(data, kernel[kernel_index], dimension, mode_string)
return result
else:
return __compute_convolution_nd(data, kernel, dimension, mode_string)
def test_One(backend, M, N, K, alpha, beta, forward):
x = indigo.util.rand64c(K,N)
y = indigo.util.rand64c(M,N)
B = backend()
if getattr(B.onemm, '__isabstractmethod__', False):
pytest.skip("backed <%s> doesn't implement onemm" % backend.__name__)
if not hasattr(B, 'onemm'):
pytest.skip("backend doesn't implement onemm")
O = B.One((M,K), dtype=np.complex64)
if forward:
u, v = x, y
else:
v, u = x, y
u_d = B.copy_array(u)
v_d = B.copy_array(v)
exp = beta * v + \
np.broadcast_to(alpha*u.sum(axis=0,keepdims=True), v.shape)
O.eval(v_d, u_d, alpha=alpha, beta=beta, forward=forward)
act = v_d.to_host()
np.testing.assert_allclose(act, exp, rtol=1e-5)
def _broadcast_shape(*args):
"""Returns the shape of the ararys that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
if len(args) == 1:
# a single argument does not work with np.broadcast
return np.asarray(args[0]).shape
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def _broadcast_shape(*args):
"""Returns the shape of the ararys that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def broadcast_to(self, shape):
"""
Performs the equivalent of np.broadcast_to for COO.
Parameters
----------
shape : tuple[int]
The shape to broadcast the data to.
Returns
-------
The broadcasted sparse array.
Raises
------
ValueError
If the operand cannot be broadcast to the given shape.
"""
result_shape = self._get_broadcast_shape(self.shape, shape, is_result=True)
params = self._get_broadcast_parameters(self.shape, result_shape)
coords, data = self._get_expanded_coords_data(self.coords, self.data, params, result_shape)
return COO(coords, data, shape=result_shape, has_duplicates=self.has_duplicates,
sorted=self.sorted)
def _broadcast_shape(*args):
"""Returns the shape of the arrays that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def test_repeat_tile(self):
initial_shape = (8, 4)
repeats = ((3, 1, 1),
(3, 3, 3),
(1, 2, 1),
(2, 2, 2, 2))
def _generate_noncontiguous_input():
out = np.broadcast_to(np.random.random((1, 4)),
initial_shape)
assert not (out.flags.c_contiguous or out.flags.f_contiguous)
return out
for repeat in repeats:
for tensor in (torch.from_numpy(np.random.random(initial_shape)),
torch.from_numpy(_generate_noncontiguous_input()),):
self.assertEqual(tensor.repeat(*repeat).numpy(),
np.tile(tensor.numpy(), repeat))
def ordinal_loss(y, mask):
xp = cuda.get_array_module(y.data)
volatile = y.volatile
b, c, n = y.data.shape
max_y = F.broadcast_to(F.max(y, axis=1, keepdims=True), y.data.shape)
y = y - max_y
sum_y = F.broadcast_to(F.expand_dims(F.sum(y, axis=1), 1), y.data.shape)
down_tri = np.tri(c, dtype=np.float32)
up_tri = down_tri.T
w1 = Variable(xp.asarray(down_tri.reshape(c, c, 1, 1)), volatile=volatile)
w2 = Variable(xp.asarray(up_tri.reshape(c, c, 1, 1)), volatile=volatile)
h = F.exp(F.expand_dims(y, -1))
h1 = F.convolution_2d(h, w1)
h1 = F.convolution_2d(F.log(h1), w1)
h2 = F.convolution_2d(h, w2)
h2 = F.convolution_2d(F.log(h2), w2)
h = F.reshape(h1 + h2, (b, c, n))
return F.sum((h - sum_y - y) * mask) / b
def __forward(self, batch_x, batch_t, weight, train=True):
xp = self.xp
x = Variable(xp.asarray(batch_x), volatile=not train)
t = Variable(xp.asarray(batch_t), volatile=not train)
y = self.net(x, train=train)
b, c, n = y.data.shape
mask = Variable(xp.asarray(np.broadcast_to(weight.reshape(-1, 1, 1), (b, c, n)) * loss_mask(batch_t, self.net.rating_num)), volatile=not train)
if self.ordinal_weight == 0:
loss = F.sum(-F.log_softmax(y) * mask) / b
elif self.ordinal_weight == 1:
loss = ordinal_loss(y, mask)
else:
loss = (1 - self.ordinal_weight) * F.sum(-F.log_softmax(y) * mask) / b + self.ordinal_weight * ordinal_loss(y, mask)
acc = self.__accuracy(y, t)
return loss, acc
def broadcast(vec: T.Tensor, matrix: T.Tensor) -> T.Tensor:
"""
Broadcasts vec into the shape of matrix following numpy rules:
vec ~ (N, 1) broadcasts to matrix ~ (N, M)
vec ~ (1, N) and (N,) broadcast to matrix ~ (M, N)
Args:
vec: A vector (either flat, row, or column).
matrix: A matrix (i.e., a 2D tensor).
Returns:
tensor: A tensor of the same size as matrix containing the elements
of the vector.
Raises:
BroadcastError
"""
try:
return numpy.broadcast_to(vec, shape(matrix))
except ValueError:
raise BroadcastError('cannot broadcast vector of dimension {} \
onto matrix of dimension {}'.format(shape(vec), shape(matrix)))
def test_lmatvec(b0, b1, quad, format, axis, k0, k1):
"""Test matrix-vector product"""
global c, c1, d, d1
b0 = b0(N, quad=quad)
b1 = b1(N, quad=quad)
mat = shenfun.spectralbase.inner_product((b0, k0), (b1, k1))
c = mat.matvec(a, c, format='csr')
c1 = mat.matvec(a, c1, format=format)
assert np.allclose(c, c1)
d.fill(0)
d1.fill(0)
d = mat.matvec(b, d, format='csr', axis=axis)
d1 = mat.matvec(b, d1, format=format, axis=axis)
assert np.allclose(d, d1)
# Test multidimensional with axis equals 1D case
d1.fill(0)
bc = [np.newaxis,]*3
bc[axis] = slice(None)
fj = np.broadcast_to(a[bc], (N,)*3).copy()
d1 = mat.matvec(fj, d1, format=format, axis=axis)
cc = [0,]*3
cc[axis] = slice(None)
assert np.allclose(c, d1[cc])
def test_axis(ST, quad, axis):
ST = ST(N, quad=quad, plan=True)
points, weights = ST.points_and_weights(N)
f_hat = np.random.random(N)
B = inner_product((ST, 0), (ST, 0))
c = np.zeros_like(f_hat)
c = B.solve(f_hat, c)
# Multidimensional version
bc = [np.newaxis,]*3
bc[axis] = slice(None)
fk = np.broadcast_to(f_hat[bc], (N,)*3).copy()
ST.plan((N,)*3, axis, np.float, {})
if hasattr(ST, 'bc'):
ST.bc.set_tensor_bcs(ST) # To set Dirichlet boundary conditions on multidimensional array
ck = np.zeros_like(fk)
ck = B.solve(fk, ck, axis=axis)
cc = [0,]*3
cc[axis] = slice(None)
assert np.allclose(ck[cc], c)
#test_axis(cbases.ShenDirichletBasis, "GC", 1)
def _broadcast_shape(*args):
"""Returns the shape of the ararys that would result from broadcasting the
supplied arrays against each other.
"""
if not args:
raise ValueError('must provide at least one argument')
# use the old-iterator because np.nditer does not handle size 0 arrays
# consistently
b = np.broadcast(*args[:32])
# unfortunately, it cannot handle 32 or more arguments directly
for pos in range(32, len(args), 31):
# ironically, np.broadcast does not properly handle np.broadcast
# objects (it treats them as scalars)
# use broadcasting to avoid allocating the full array
b = broadcast_to(0, b.shape)
b = np.broadcast(b, *args[pos:(pos + 31)])
return b.shape
def get_local_mesh(self):
"""Returns the local decomposed physical mesh"""
X = np.ogrid[self.rank*self.Np[0]:(self.rank+1)*self.Np[0],
:self.N[1], :self.N[2]]
X[0] = (X[0]*self.L[0]/self.N[0]).astype(self.float)
X[1] = (X[1]*self.L[1]/self.N[1]).astype(self.float)
X[2] = (X[2]*self.L[2]/self.N[2]).astype(self.float)
X = [np.broadcast_to(x, self.real_shape()) for x in X]
return X
def get_local_wavenumbermesh(self, scaled=False, broadcast=False, eliminate_highest_freq=False):
"""Returns (scaled) local decomposed wavenumbermesh
If scaled is True, then the wavenumbermesh is scaled with physical mesh
size. This takes care of mapping the physical domain to a computational
cube of size (2pi)**3.
If eliminate_highest_freq is True, then the Nyquist frequency is set to zero.
"""
kx, ky, kz = self.complex_local_wavenumbers()
if eliminate_highest_freq:
ky = fftfreq(self.N[1], 1./self.N[1])
for i, k in enumerate((kx, ky, kz)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
ky = ky[self.complex_local_slice()[1]]
Ks = np.meshgrid(kx, ky, kz, indexing='ij', sparse=True)
if scaled:
Lp = 2*np.pi/self.L
for i in range(3):
Ks[i] *= Lp[i]
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
def get_local_mesh(self):
xzrank = self.comm0.Get_rank() # Local rank in xz-plane
xyrank = self.comm1.Get_rank() # Local rank in xy-plane
# Create the physical mesh
x1 = slice(xzrank * self.N1[0], (xzrank+1) * self.N1[0], 1)
x2 = slice(xyrank * self.N2[1], (xyrank+1) * self.N2[1], 1)
X = np.ogrid[x1, x2, :self.N[2]]
X[0] = (X[0]*self.L[0]/self.N[0]).astype(self.float)
X[1] = (X[1]*self.L[1]/self.N[1]).astype(self.float)
X[2] = (X[2]*self.L[2]/self.N[2]).astype(self.float)
X = [np.broadcast_to(x, self.real_shape()) for x in X]
return X
def get_local_wavenumbermesh(self, scaled=False, broadcast=False,
eliminate_highest_freq=False):
"""Returns (scaled) local decomposed wavenumbermesh
If scaled is True, then the wavenumbermesh is scaled with physical mesh
size. This takes care of mapping the physical domain to a computational
cube of size (2pi)**3
"""
s = self.complex_local_slice()
kx = fftfreq(self.N[0], 1./self.N[0]).astype(int)
ky = fftfreq(self.N[1], 1./self.N[1]).astype(int)
kz = rfftfreq(self.N[2], 1./self.N[2]).astype(int)
if eliminate_highest_freq:
for i, k in enumerate((kx, ky, kz)):
if self.N[i] % 2 == 0:
k[self.N[i]//2] = 0
kx = kx[s[0]]
kz = kz[s[2]]
Ks = np.meshgrid(kx, ky, kz, indexing='ij', sparse=True)
if scaled is True:
Lp = 2*np.pi/self.L
for i in range(3):
Ks[i] = (Ks[i]*Lp[i]).astype(self.float)
K = Ks
if broadcast is True:
K = [np.broadcast_to(k, self.complex_shape()) for k in Ks]
return K
def scalar_broadcast_match(a, b):
""" Returns arguments as np.array, if one is a scalar it will broadcast the other one's shape.
"""
a, b = np.atleast_1d(a, b)
if a.size == 1 and b.size != 1:
a = np.broadcast_to(a, b.shape)
elif b.size == 1 and a.size != 1:
b = np.broadcast_to(b, a.shape)
return a, b
def predict(self, input_x):
if isinstance(input_x, chainer.Variable):
device = cuda.get_device(input_x.data)
else:
device = cuda.get_device(input_x)
xp = self.predictor.xp
with device:
output = self.predictor(input_x)
batch_size, input_channel, input_h, input_w = input_x.shape
batch_size, _, grid_h, grid_w = output.shape
x, y, w, h, conf, prob = F.split_axis(F.reshape(output, (batch_size, self.predictor.n_boxes, self.predictor.n_classes+5, grid_h, grid_w)), (1, 2, 3, 4, 5), axis=2)
x = F.sigmoid(x)
y = F.sigmoid(y)
conf = F.sigmoid(conf)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
prob = F.softmax(prob)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
# convert coordinates to those on the image
x_shift = xp.asarray(np.broadcast_to(np.arange(grid_w, dtype=np.float32), x.shape))
y_shift = xp.asarray(np.broadcast_to(np.arange(grid_h, dtype=np.float32).reshape(grid_h, 1), y.shape))
w_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 0], (self.predictor.n_boxes, 1, 1, 1)), w.shape))
h_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 1], (self.predictor.n_boxes, 1, 1, 1)), h.shape))
box_x = (x + x_shift) / grid_w
box_y = (y + y_shift) / grid_h
box_w = F.exp(w) * w_anchor / grid_w
box_h = F.exp(h) * h_anchor / grid_h
return box_x, box_y, box_w, box_h, conf, prob
def predict(self, input_x):
if isinstance(input_x, chainer.Variable):
device = cuda.get_device(input_x.data)
else:
device = cuda.get_device(input_x)
xp = self.predictor.xp
with device:
output = self.predictor(input_x)
batch_size, input_channel, input_h, input_w = input_x.shape
batch_size, _, grid_h, grid_w = output.shape
x, y, w, h, conf, prob = F.split_axis(F.reshape(output, (batch_size, self.predictor.n_boxes, self.predictor.n_classes+5, grid_h, grid_w)), (1, 2, 3, 4, 5), axis=2)
x = F.sigmoid(x)
y = F.sigmoid(y)
conf = F.sigmoid(conf)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
prob = F.softmax(prob)
prob = F.transpose(prob, (0, 2, 1, 3, 4))
# convert coordinates to those on the image
x_shift = xp.asarray(np.broadcast_to(np.arange(grid_w, dtype=np.float32), x.shape))
y_shift = xp.asarray(np.broadcast_to(np.arange(grid_h, dtype=np.float32).reshape(grid_h, 1), y.shape))
w_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 0], (self.predictor.n_boxes, 1, 1, 1)), w.shape))
h_anchor = xp.asarray(np.broadcast_to(np.reshape(np.array(self.anchors, dtype=np.float32)[:, 1], (self.predictor.n_boxes, 1, 1, 1)), h.shape))
box_x = (x + x_shift) / grid_w
box_y = (y + y_shift) / grid_h
box_w = F.exp(w) * w_anchor / grid_w
box_h = F.exp(h) * h_anchor / grid_h
return box_x, box_y, box_w, box_h, conf, prob
def test_indexing_array_weird_strides(self):
# See also gh-6221
# the shapes used here come from the issue and create the correct
# size for the iterator buffering size.
x = np.ones(10)
x2 = np.ones((10, 2))
ind = np.arange(10)[:, None, None, None]
ind = np.broadcast_to(ind, (10, 55, 4, 4))
# single advanced index case
assert_array_equal(x[ind], x[ind.copy()])
# higher dimensional advanced index
zind = np.zeros(4, dtype=np.intp)
assert_array_equal(x2[ind, zind], x2[ind.copy(), zind])
def broadcast_to(array, shape, subok=False):
"""Broadcast an array to a new shape.
Parameters
----------
array : array_like
The array to broadcast.
shape : tuple
The shape of the desired array.
subok : bool, optional
If True, then sub-classes will be passed-through, otherwise
the returned array will be forced to be a base-class array (default).
Returns
-------
broadcast : array
A readonly view on the original array with the given shape. It is
typically not contiguous. Furthermore, more than one element of a
broadcasted array may refer to a single memory location.
Raises
------
ValueError
If the array is not compatible with the new shape according to NumPy's
broadcasting rules.
Notes
-----
.. versionadded:: 1.10.0
Examples
--------
>>> x = np.array([1, 2, 3])
>>> np.broadcast_to(x, (3, 3))
array([[1, 2, 3],
[1, 2, 3],
[1, 2, 3]])
"""
return _broadcast_to(array, shape, subok=subok, readonly=True)
def test_max_bounded(self):
n_batch = 20
ndim_action = 3
mu = np.random.randn(n_batch, ndim_action).astype(np.float32)
mat = np.broadcast_to(
np.eye(ndim_action, dtype=np.float32)[None],
(n_batch, ndim_action, ndim_action))
v = np.random.randn(n_batch).astype(np.float32)
min_action, max_action = -1.3, 1.3
q_out = action_value.QuadraticActionValue(
chainer.Variable(mu),
chainer.Variable(mat),
chainer.Variable(v),
min_action, max_action)
v_out = q_out.max
self.assertIsInstance(v_out, chainer.Variable)
v_out = v_out.data
# If mu[i] is an valid action, v_out[i] should be v[i]
mu_is_allowed = np.all(
(min_action < mu) * (mu < max_action),
axis=1)
np.testing.assert_almost_equal(v_out[mu_is_allowed], v[mu_is_allowed])
# Otherwise, v_out[i] should be less than v[i]
mu_is_not_allowed = ~np.all(
(min_action - 1e-2 < mu) * (mu < max_action + 1e-2),
axis=1)
np.testing.assert_array_less(
v_out[mu_is_not_allowed],
v[mu_is_not_allowed])
def test_pool_average_3d(ndarray_1x1x4x4):
x = np.broadcast_to(ndarray_1x1x4x4, (1, 1, 4, 4, 4))
node = onnx.helper.make_node('AveragePool', inputs=['x'], outputs=['y'],
kernel_shape=(2, 2, 2), strides=(2, 2, 2))
y = np.array([[[13.5, 15.5],
[21.5, 23.5]],
[[13.5, 15.5],
[21.5, 23.5]]], dtype=np.float32).reshape(1, 1, 2, 2, 2)
ng_results = convert_and_calculate(node, [x], [y])
assert np.array_equal(ng_results, [y])
def test_pool_global_average_3d(ndarray_1x1x4x4):
x = np.broadcast_to(ndarray_1x1x4x4, (1, 1, 4, 4, 4))
node = onnx.helper.make_node('GlobalAveragePool', inputs=['x'], outputs=['y'])
y = np.array([18.5], dtype=np.float32).reshape(1, 1, 1, 1, 1)
ng_results = convert_and_calculate(node, [x], [y])
assert np.array_equal(ng_results, [y])