def __init__(self, *args, **kwargs):
super(TestConverters, self).__init__(*args, **kwargs)
self.space_d = spaces.Discrete(4)
self.gym_out_d = 2
self.rf_out_d = [0, 0, 1, 0]
self.space_c = spaces.Box(-1, 1, [2, 4])
self.gym_out_c = np.random.uniform(low=-1, high=1, size=(2, 4))
self.rf_out_c = self.gym_out_c
self.space_b = spaces.MultiBinary(4)
self.gym_out_b = [0, 1, 0, 1]
self.rf_out_b = [[1, 0], [0, 1], [1, 0], [0, 1]]
self.space_t = spaces.Tuple((self.space_d,
self.space_c,
self.space_b,
spaces.Tuple((self.space_d, self.space_c))
))
self.gym_out_t = tuple([self.gym_out_d, self.gym_out_c, self.gym_out_b,
tuple([self.gym_out_d, self.gym_out_c])])
self.rf_out_t = tuple([self.rf_out_d, self.rf_out_c, self.rf_out_b,
tuple([self.rf_out_d, self.rf_out_c])])
python类MultiBinary()的实例源码
def is_compound(space):
""" Checks whether a space is a compound space. These are non-scalar
`Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces
(A Tuple space with a single, non-compound subspace is still considered
compound).
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, spaces.Discrete):
return False
elif isinstance(space, spaces.Box):
return len(space.shape) != 1 or space.shape[0] != 1
elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Tuple):
return True
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space):
"""
For a discrete space, gets the number of available actions as a tuple.
:param gym.Space space: The discrete space which to inspect.
:return tuple: Tuple of integers containing the number of discrete actions.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if not is_discrete(space):
raise TypeError("Space {} is not discrete".format(space))
if isinstance(space, spaces.Discrete):
return tuple((space.n,))
elif isinstance(space, spaces.MultiDiscrete):
# add +1 here as space.high is an inclusive bound
return tuple(space.high - space.low + 1)
elif isinstance(space, spaces.MultiBinary):
return (2,) * space.n
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def _make_rf2gym_converter(space):
"""Makes space converter function that maps space samples ReinforceFlow -> Gym."""
# TODO: add spaces.MultiDiscrete support.
if isinstance(space, spaces.Discrete):
def converter(sample):
return np.argmax(sample)
return converter
if isinstance(space, spaces.MultiBinary):
def converter(sample):
return tuple([np.argmax(s) for s in sample])
return converter
if isinstance(space, spaces.Box):
return lambda sample: sample
if isinstance(space, spaces.Tuple):
sub_converters = []
for sub_space in space.spaces:
sub_converters.append(_make_rf2gym_converter(sub_space))
def converter(sample):
converted_tuple = []
for sub_sample, sub_converter in zip(sample, sub_converters):
converted_tuple.append(sub_converter(sub_sample))
return tuple(converted_tuple)
return converter
raise ValueError("Unsupported space %s." % space)
def flatten(space):
"""
Flattens a space, which means that for continuous spaces (Box)
the space is reshaped to be of rank 1, and for multidimensional
discrete spaces a single discrete action with an increased number
of possible values is created.
Please be aware that the latter can be potentially pathological in case
the input space has many discrete actions, as the number of single discrete
actions increases exponentially ("curse of dimensionality").
:param gym.Space space: The space that will be flattened
:return Transform: A transform object describing the transformation
to the flattened space.
:raises TypeError, if `space` is not a `gym.Space`.
NotImplementedError, if the supplied space is neither `Box` nor
`MultiDiscrete` or `MultiBinary`, and not recognized as
an already flat space by `is_compound`.
"""
# no need to do anything if already flat
if not is_compound(space):
return Transform(space, space, lambda x: x, lambda x: x)
if isinstance(space, spaces.Box):
shape = space.low.shape
lo = space.low.flatten()
hi = space.high.flatten()
def convert(x):
return np.reshape(x, shape)
def back(x):
return np.reshape(x, lo.shape)
flat_space = spaces.Box(low=lo, high=hi)
return Transform(original=space, target=flat_space, convert_from=convert, convert_to=back)
elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
if isinstance(space, spaces.MultiDiscrete):
ranges = [range(space.low[i], space.high[i]+1, 1) for i in range(space.num_discrete_space)]
elif isinstance(space, spaces.MultiBinary): # pragma: no branch
ranges = [range(0, 2) for i in range(space.n)]
prod = itertools.product(*ranges)
lookup = list(prod)
inverse_lookup = {value: key for (key, value) in enumerate(lookup)}
flat_space = spaces.Discrete(len(lookup))
convert = lambda x: lookup[x]
back = lambda x: inverse_lookup[x]
return Transform(original=space, target=flat_space, convert_from=convert, convert_to=back)
raise NotImplementedError("Does not know how to flatten {}".format(type(space))) # pragma: no cover
# rescale a continuous action space
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError
def _to_rf_space(space):
"""Converts Gym space instance into ReinforceFlow."""
if isinstance(space, spaces.Discrete):
return DiscreteOneHot(space.n)
if isinstance(space, spaces.MultiDiscrete):
# space.low > 0 will lead to unused first n actions.
# return Tuple([DiscreteOneHot(n) for n in space.high])
raise ValueError("MultiDiscrete spaces aren't supported yet.")
if isinstance(space, spaces.MultiBinary):
return Tuple([DiscreteOneHot(2) for _ in space.n])
if isinstance(space, spaces.Box):
return Continious(space.low, space.high)
if isinstance(space, spaces.Tuple):
converted_spaces = []
for sub_space in space.spaces:
converted_spaces.append(_to_rf_space(sub_space))
return Tuple(*converted_spaces)
raise ValueError("Unsupported space %s." % space)
def _make_gym2rf_converter(space):
"""Makes converter function that maps space samples Gym -> ReinforceFlow."""
# TODO: add spaces.MultiDiscrete support.
if isinstance(space, spaces.Discrete):
def converter(sample):
return one_hot(space.n, sample)
return converter
if isinstance(space, spaces.MultiBinary):
def converter(sample):
return tuple([one_hot(2, s) for s in sample])
return converter
if isinstance(space, spaces.Box):
return lambda sample: sample
if isinstance(space, spaces.Tuple):
sub_converters = []
for sub_space in space.spaces:
sub_converters.append(_make_gym2rf_converter(sub_space))
def converter(sample):
converted_tuple = []
for sub_sample, sub_converter in zip(sample, sub_converters):
converted_tuple.append(sub_converter(sub_sample))
return tuple(converted_tuple)
return converter
raise ValueError("Unsupported space %s." % space)
def is_discrete(space):
""" Checks if a space is discrete. A space is considered to
be discrete if it is derived from Discrete, MultiDiscrete
or MultiBinary.
A Tuple space is discrete if it contains only discrete
subspaces.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Box):
return False
elif isinstance(space, spaces.Tuple):
return all(map(is_discrete, space.spaces))
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError