def __init__(self, maxUmbralAstral):
# Print debug
self.debug = False
# Outer bound for Astral Fire and Umbral Ice
BLM.MAXUMBRALASTRAL = maxUmbralAstral
# Available buffs
self.BUFFS = []
# Maximum time available
self.MAXTIME = 45
self.HELPER = BLM.Helper()
# Available abilities
self.ABILITIES = [
BLM.Ability("Blizzard 1", 180, 6, 2.5, 2.49, self.HELPER.UmbralIceIncrease, BLM.DamageType.Ice, self.HELPER), #480
BLM.Ability("Fire 1", 180, 15, 2.5, 2.49, self.HELPER.AstralFireIncrease, BLM.DamageType.Fire, self.HELPER), #1200
BLM.Ability("Transpose", 0, 0, 0.75, 12.9, self.HELPER.SwapAstralUmbral, BLM.DamageType.Neither, self.HELPER),
BLM.Ability("Fire 3", 240, 30, 3.5, 2.5, self.HELPER.AstralFireMax, BLM.DamageType.Fire, self.HELPER), #2400
BLM.Ability("Blizzard 3", 240, 18, 3.5, 2.5, self.HELPER.UmbralIceMax, BLM.DamageType.Ice, self.HELPER), #2400
BLM.Ability("Fire 4", 260, 15, 2.8, 2.5, None, BLM.DamageType.Fire, self.HELPER)] #2400
# State including ability cooldowns, buff time remaining, mana, and Astral/Umbral
self.initialState = np.array([0] * (len(self.ABILITIES) + len(self.BUFFS)) + [BLM.MAXMANA] + [0])
self.state = self._reset()
# What the learner can pick between
self.action_space = spaces.Discrete(len(self.ABILITIES))
# What the learner can see to make a choice (cooldowns and buffs)
self.observation_space = spaces.MultiDiscrete([[0,180]] * (len(self.ABILITIES) + len(self.BUFFS)) + [[0, BLM.MAXMANA]] + [[-3,3]])
python类MultiDiscrete()的实例源码
def is_compound(space):
""" Checks whether a space is a compound space. These are non-scalar
`Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces
(A Tuple space with a single, non-compound subspace is still considered
compound).
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, spaces.Discrete):
return False
elif isinstance(space, spaces.Box):
return len(space.shape) != 1 or space.shape[0] != 1
elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Tuple):
return True
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space):
"""
For a discrete space, gets the number of available actions as a tuple.
:param gym.Space space: The discrete space which to inspect.
:return tuple: Tuple of integers containing the number of discrete actions.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if not is_discrete(space):
raise TypeError("Space {} is not discrete".format(space))
if isinstance(space, spaces.Discrete):
return tuple((space.n,))
elif isinstance(space, spaces.MultiDiscrete):
# add +1 here as space.high is an inclusive bound
return tuple(space.high - space.low + 1)
elif isinstance(space, spaces.MultiBinary):
return (2,) * space.n
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def __init__(self, env):
if isinstance(env, six.string_types):
env = gym.make(env)
super(GymWrapper, self).__init__(env)
if isinstance(env.action_space, spaces.MultiDiscrete):
raise ValueError("Gym environments with MultiDiscrete spaces aren't supported yet.")
self.observation_space = _to_rf_space(self.env.observation_space)
self.action_space = _to_rf_space(self.env.action_space)
self._obs_to_rf = _make_gym2rf_converter(self.observation_space)
self._action_to_rf = _make_gym2rf_converter(self.action_space)
self._action_to_gym = _make_rf2gym_converter(self.action_space)
seed = reinforceflow.get_random_seed()
if seed and hasattr(self.env, 'seed'):
self.env.seed(seed)
def _make_rf2gym_converter(space):
"""Makes space converter function that maps space samples ReinforceFlow -> Gym."""
# TODO: add spaces.MultiDiscrete support.
if isinstance(space, spaces.Discrete):
def converter(sample):
return np.argmax(sample)
return converter
if isinstance(space, spaces.MultiBinary):
def converter(sample):
return tuple([np.argmax(s) for s in sample])
return converter
if isinstance(space, spaces.Box):
return lambda sample: sample
if isinstance(space, spaces.Tuple):
sub_converters = []
for sub_space in space.spaces:
sub_converters.append(_make_rf2gym_converter(sub_space))
def converter(sample):
converted_tuple = []
for sub_sample, sub_converter in zip(sample, sub_converters):
converted_tuple.append(sub_converter(sub_sample))
return tuple(converted_tuple)
return converter
raise ValueError("Unsupported space %s." % space)
def _take_action(self, actions):
# if there is only one action space, it wasn't wrapped in Tuple
if len(self.action_spaces) == 1:
actions = [actions]
# send appropriate command for different actions
for spc, cmds, acts in zip(self.action_spaces, self.action_names, actions):
if isinstance(spc, spaces.Discrete):
logger.debug(cmds[acts])
self.agent_host.sendCommand(cmds[acts])
elif isinstance(spc, spaces.Box):
for cmd, val in zip(cmds, acts):
logger.debug(cmd + " " + str(val))
self.agent_host.sendCommand(cmd + " " + str(val))
elif isinstance(spc, spaces.MultiDiscrete):
for cmd, val in zip(cmds, acts):
logger.debug(cmd + " " + str(val))
self.agent_host.sendCommand(cmd + " " + str(val))
else:
logger.warn("Unknown action space for %s, ignoring." % cmds)
def __init__(self, name, horizon, gamma):
"""
Constructor.
Args:
name (str): gym id of the environment;
horizon (int): horizon of the MDP;
horizon (int): the horizon;
gamma (float): the discount factor.
"""
self.__name__ = name
# MPD creation
self.env = gym.make(self.__name__)
self.env._max_episode_steps = np.inf # Hack to ignore gym time limit.
# MDP properties
assert not isinstance(self.env.observation_space,
gym_spaces.MultiDiscrete)
assert not isinstance(self.env.action_space, gym_spaces.MultiDiscrete)
action_space = self._convert_gym_space(self.env.action_space)
observation_space = self._convert_gym_space(self.env.observation_space)
mdp_info = MDPInfo(observation_space, action_space, gamma, horizon)
if isinstance(action_space, Discrete):
self._convert_action = self._convert_action_function
else:
self._convert_action = self._no_convert
if isinstance(observation_space,
Discrete) and len(observation_space.size) > 1:
self._convert_state = self._convert_state_function
else:
self._convert_state = self._no_convert
super(Gym, self).__init__(mdp_info)
def test_flattened_wrapper():
expect = gym.make("ProvideTest-v0")
md = spaces.MultiDiscrete([(0, 1), (0, 1)])
expect.observation_space = md
expect.provide_observation = (1, 1)
wrapper = FlattenedObservationWrapper(expect)
o, r, d, i = wrapper.step(3)
assert wrapper.observation_space.contains(o)
assert o == 3
def __init__(self):
utils.EzPickle.__init__(self)
self.rom_path = ''
self.screen_height = 224
self.screen_width = 256
self.action_space = spaces.MultiDiscrete([[0, 1]] * NUM_ACTIONS)
self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
self.launch_vars = {}
self.cmd_args = ['--xscale 2', '--yscale 2', '-f 0']
self.lua_path = []
self.subprocess = None
self.no_render = True
self.viewer = None
# Pipes
self.pipe_name = ''
self.path_pipe_prefix = os.path.join(tempfile.gettempdir(), 'smb-fifo')
self.path_pipe_in = '' # Input pipe (maps to fceux out-pipe and to 'in' file)
self.path_pipe_out = '' # Output pipe (maps to fceux in-pipe and to 'out' file)
self.pipe_out = None
self.lock_out = Lock()
self.disable_in_pipe = False
self.disable_out_pipe = False
self.launch_vars['pipe_name'] = ''
self.launch_vars['pipe_prefix'] = self.path_pipe_prefix
# Other vars
self.is_initialized = 0 # Used to indicate fceux has been launched and is running
self.is_exiting = 0 # Used to stop the listening thread
self.last_frame = 0 # Last processed frame
self.reward = 0 # Reward for last action
self.episode_reward = 0 # Total rewards for episode
self.is_finished = False
self.screen = np.zeros(shape=(self.screen_height, self.screen_width, 3), dtype=np.uint8)
self.info = {}
self.level = 0
self._reset_info_vars()
self.first_step = False
self.lock = (NesLock()).get_lock()
# Seeding
self.curr_seed = 0
self._seed()
def __init__(self, level):
self.previous_level = -1
self.level = level
self.game = DoomGame()
self.loader = Loader()
self.doom_dir = os.path.dirname(os.path.abspath(__file__))
self._mode = 'algo' # 'algo' or 'human'
self.no_render = False # To disable double rendering in human mode
self.viewer = None
self.is_initialized = False # Indicates that reset() has been called
self.curr_seed = 0
self.lock = (DoomLock()).get_lock()
self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
self.allowed_actions = list(range(NUM_ACTIONS))
self.screen_height = 480
self.screen_width = 640
self.screen_resolution = ScreenResolution.RES_640X480
self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
self._seed()
self._configure()
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError
def __init__(self, level):
self.previous_level = -1
self.level = level
self.game = DoomGame()
self.loader = Loader()
self.doom_dir = os.path.dirname(os.path.abspath(__file__))
self._mode = 'algo' # 'algo' or 'human'
self.no_render = False # To disable double rendering in human mode
self.viewer = None
self.is_initialized = False # Indicates that reset() has been called
self.curr_seed = 0
self.lock = (DoomLock()).get_lock()
self.action_space = spaces.MultiDiscrete([[0, 1]] * 38 + [[-10, 10]] * 2 + [[-100, 100]] * 3)
self.allowed_actions = list(range(NUM_ACTIONS))
self.screen_height = 480
self.screen_width = 640
self.screen_resolution = ScreenResolution.RES_640X480
self.observation_space = spaces.Box(low=0, high=255, shape=(self.screen_height, self.screen_width, 3))
self._seed()
self._configure()
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError
def __init__(self, env_data_filename, mode="single", max_utterance_len=20, max_game_turns=20):
with open(env_data_filename, mode="rb") as in_file:
self._env_data = pickle.load(in_file)
self._mode = mode
self._max_game_turns = max_game_turns
self._num_tokens = len(self._env_data["id2token"])
self._num_entities = len(self._env_data["env_data"])
self._entities = list(self._env_data["env_data"].keys())
self.action_space = spaces.Discrete(self._num_tokens)
self.observation_space = spaces.MultiDiscrete([[0, self._num_tokens] * max_utterance_len])
self._last_entity = 0
self._last_question = 0
self._last_sequence = []
self._game_turns = None
self._game_score = 0
def _to_rf_space(space):
"""Converts Gym space instance into ReinforceFlow."""
if isinstance(space, spaces.Discrete):
return DiscreteOneHot(space.n)
if isinstance(space, spaces.MultiDiscrete):
# space.low > 0 will lead to unused first n actions.
# return Tuple([DiscreteOneHot(n) for n in space.high])
raise ValueError("MultiDiscrete spaces aren't supported yet.")
if isinstance(space, spaces.MultiBinary):
return Tuple([DiscreteOneHot(2) for _ in space.n])
if isinstance(space, spaces.Box):
return Continious(space.low, space.high)
if isinstance(space, spaces.Tuple):
converted_spaces = []
for sub_space in space.spaces:
converted_spaces.append(_to_rf_space(sub_space))
return Tuple(*converted_spaces)
raise ValueError("Unsupported space %s." % space)
def _make_gym2rf_converter(space):
"""Makes converter function that maps space samples Gym -> ReinforceFlow."""
# TODO: add spaces.MultiDiscrete support.
if isinstance(space, spaces.Discrete):
def converter(sample):
return one_hot(space.n, sample)
return converter
if isinstance(space, spaces.MultiBinary):
def converter(sample):
return tuple([one_hot(2, s) for s in sample])
return converter
if isinstance(space, spaces.Box):
return lambda sample: sample
if isinstance(space, spaces.Tuple):
sub_converters = []
for sub_space in space.spaces:
sub_converters.append(_make_gym2rf_converter(sub_space))
def converter(sample):
converted_tuple = []
for sub_sample, sub_converter in zip(sample, sub_converters):
converted_tuple.append(sub_converter(sub_sample))
return tuple(converted_tuple)
return converter
raise ValueError("Unsupported space %s." % space)
def test_flattened_wrapper():
expect = gym.make("ExpectTest-v0")
md = spaces.MultiDiscrete([(0, 1), (0, 1)])
expect.action_space = md
expect.expectation = (1, 1)
wrapper = FlattenedActionWrapper(expect)
wrapper.step(3)
def is_discrete(space):
""" Checks if a space is discrete. A space is considered to
be discrete if it is derived from Discrete, MultiDiscrete
or MultiBinary.
A Tuple space is discrete if it contains only discrete
subspaces.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Box):
return False
elif isinstance(space, spaces.Tuple):
return all(map(is_discrete, space.spaces))
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def __init__(self, multi_discrete, options=None):
assert isinstance(multi_discrete, MultiDiscrete)
self.multi_discrete = multi_discrete
self.num_discrete_space = self.multi_discrete.num_discrete_space
# Config 1
if options is None:
self.n = self.num_discrete_space + 1 # +1 for NOOP at beginning
self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
for i in range(self.num_discrete_space):
self.mapping[i + 1][i] = self.multi_discrete.high[i]
# Config 2
elif isinstance(options, list):
assert len(options) <= self.num_discrete_space
self.n = len(options) + 1 # +1 for NOOP at beginning
self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
for i, disc_num in enumerate(options):
assert disc_num < self.num_discrete_space
self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num]
# Config 3
elif isinstance(options, dict):
self.n = len(options.keys())
self.mapping = options
for i, key in enumerate(options.keys()):
if i != key:
raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \
'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key))
if not self.multi_discrete.contains(options[key]):
raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \
'not contained in the underlying MultiDiscrete action space. ' \
'Invalid mapping: {1}'.format(key, options[key]))
# Unknown parameter provided
else:
raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
def __init__(self, multi_discrete, options=None):
assert isinstance(multi_discrete, MultiDiscrete)
self.multi_discrete = multi_discrete
self.num_discrete_space = self.multi_discrete.num_discrete_space
if options is None:
options = list(range(self.num_discrete_space))
if not isinstance(options, list):
raise Error('BoxToMultiDiscrete - Invalid parameter provided.')
assert len(options) <= self.num_discrete_space
self.low = np.array([self.multi_discrete.low[x] for x in options])
self.high = np.array([self.multi_discrete.high[x] for x in options])
self.mapping = { i: disc_num for i, disc_num in enumerate(options)}
def __init__(self, multi_discrete, options=None):
super().__init__(0)
assert isinstance(multi_discrete, MultiDiscrete)
self.multi_discrete = multi_discrete
self.num_discrete_space = self.multi_discrete.num_discrete_space
# Config 1
if options is None:
self.n = self.num_discrete_space + 1 # +1 for NOOP at beginning
self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
for i in range(self.num_discrete_space):
self.mapping[i + 1][i] = self.multi_discrete.high[i]
# Config 2
elif isinstance(options, list):
assert len(options) <= self.num_discrete_space
self.n = len(options) + 1 # +1 for NOOP at beginning
self.mapping = {i: [0] * self.num_discrete_space for i in range(self.n)}
for i, disc_num in enumerate(options):
assert disc_num < self.num_discrete_space
self.mapping[i + 1][disc_num] = self.multi_discrete.high[disc_num]
# Config 3
elif isinstance(options, dict):
self.n = len(list(options.keys()))
self.mapping = options
for i, key in enumerate(options.keys()):
if i != key:
raise Error('DiscreteToMultiDiscrete must contain ordered keys. ' \
'Item {0} should have a key of "{0}", but key "{1}" found instead.'.format(i, key))
if not self.multi_discrete.contains(options[key]):
raise Error('DiscreteToMultiDiscrete mapping for key {0} is ' \
'not contained in the underlying MultiDiscrete action space. ' \
'Invalid mapping: {1}'.format(key, options[key]))
# Unknown parameter provided
else:
raise Error('DiscreteToMultiDiscrete - Invalid parameter provided.')
def make_pdtype(ac_space):
from gym import spaces
if isinstance(ac_space, spaces.Box):
assert len(ac_space.shape) == 1
return DiagGaussianPdType(ac_space.shape[0])
elif isinstance(ac_space, spaces.Discrete):
return CategoricalPdType(ac_space.n)
elif isinstance(ac_space, spaces.MultiDiscrete):
return MultiCategoricalPdType(ac_space.low, ac_space.high)
elif isinstance(ac_space, spaces.MultiBinary):
return BernoulliPdType(ac_space.n)
else:
raise NotImplementedError
def _set_action(self, action, agent, action_space, time=None):
agent.action.u = np.zeros(self.world.dim_p)
agent.action.c = np.zeros(self.world.dim_c)
# process action
if isinstance(action_space, spaces.MultiDiscrete):
act = []
size = action_space.high - action_space.low + 1
index = 0
for s in size:
act.append(action[index:(index+s)])
index += s
action = act
else:
action = [action]
if agent.movable:
# physical action
if self.discrete_action_input:
agent.action.u = np.zeros(self.world.dim_p)
# process discrete action
if action[0] == 1: agent.action.u[0] = -1.0
if action[0] == 2: agent.action.u[0] = +1.0
if action[0] == 3: agent.action.u[1] = -1.0
if action[0] == 4: agent.action.u[1] = +1.0
else:
if self.force_discrete_action:
d = np.argmax(action[0])
action[0][:] = 0.0
action[0][d] = 1.0
if self.discrete_action_space:
agent.action.u[0] += action[0][1] - action[0][2]
agent.action.u[1] += action[0][3] - action[0][4]
else:
agent.action.u = action[0]
sensitivity = 5.0
if agent.accel is not None:
sensitivity = agent.accel
agent.action.u *= sensitivity
action = action[1:]
if not agent.silent:
# communication action
if self.discrete_action_input:
agent.action.c = np.zeros(self.world.dim_c)
agent.action.c[action[0]] = 1.0
else:
agent.action.c = action[0]
action = action[1:]
# make sure we used all elements of action
assert len(action) == 0
# reset rendering assets