def HistoryWrapper(steps):
class HistoryWrapper(gym.Wrapper):
"""
Track history of observations for given amount of steps
Initial steps are zero-filled
"""
def __init__(self, env):
super(HistoryWrapper, self).__init__(env)
self.steps = steps
self.history = self._make_history()
def _make_history(self):
return [np.zeros(shape=self.env.observation_space.shape) for _ in range(steps)]
def _step(self, action):
obs, reward, done, info = self.env.step(action)
self.history.pop(0)
self.history.append(obs)
return np.array(self.history), reward, done, info
def _reset(self):
self.history = self._make_history()
self.history.pop(0)
self.history.append(self.env.reset())
return np.array(self.history)
return HistoryWrapper
python类Wrapper()的实例源码
def HistoryWrapper(steps):
class _HistoryWrapper(gym.Wrapper):
"""
Track history of observations for given amount of steps
Initial steps are zero-filled
"""
def __init__(self, env):
super(_HistoryWrapper, self).__init__(env)
self.steps = steps
self.history = self._make_history()
self.observation_space = self._make_observation_space(steps, env.observation_space)
@staticmethod
def _make_observation_space(steps, orig_obs):
assert isinstance(orig_obs, gym.spaces.Box)
low = np.repeat(np.expand_dims(orig_obs.low, 0), steps, axis=0)
high = np.repeat(np.expand_dims(orig_obs.high, 0), steps, axis=0)
return gym.spaces.Box(low, high)
def _make_history(self, last_item = None):
size = self.steps if last_item is None else self.steps-1
res = collections.deque([np.zeros(shape=self.env.observation_space.shape)] * size)
if last_item is not None:
res.append(last_item)
return res
def _step(self, action):
obs, reward, done, info = self.env.step(action)
self.history.popleft()
self.history.append(obs)
return self.history, reward, done, info
def _reset(self):
self.history = self._make_history(last_item=self.env.reset())
return self.history
return _HistoryWrapper
def make_env(env_name, monitor_dir=None, wrappers=()):
"""
Make gym environment with optional monitor
:param env_name: name of the environment to create
:param monitor_dir: optional directory to save monitor results
:param wrappers: list of optional Wrapper object instances
:return: environment object
"""
env = gym.make(env_name)
for wrapper in wrappers:
env = wrapper(env)
if monitor_dir:
env = gym.wrappers.Monitor(env, monitor_dir)
return env
def RepeatActionWrapper(env, repeat):
"""
This is just a thin wrapper around `gym.wrappes.SkipWrapper`
to get a consistent interface.
:param gym.env env: Environment to wrap
:param int repeat: Number of times that an action will be repeated.
:return gym.Wrapper: A wrapper that repeats an action for `repeat`
steps.
"""
from gym.wrappers import SkipWrapper
return SkipWrapper(repeat)(env)
def __init__(self, env, noop_max=30):
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env):
gym.Wrapper.__init__(self, env)
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True
def __init__(self, env, skip=4):
gym.Wrapper.__init__(self, env)
# Most recent raw observations (for max pooling across time steps)
self._obs_buffer = deque(maxlen=2)
self._skip = skip
def __init__(self, env, k):
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
assert shp[2] == 1 # can only stack 1-channel frames
self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], k))
def ToDiscrete():
class ToDiscreteWrapper(gym.Wrapper):
"""
Wrapper to convert MultiDiscrete action space to Discrete
Only supports one config, which maps to the most logical discrete space possible
"""
def __init__(self, env):
super(ToDiscreteWrapper, self).__init__(env)
mapping = {
0: [0, 0, 0, 0, 0, 0], # NOOP
1: [1, 0, 0, 0, 0, 0], # Up
2: [0, 0, 1, 0, 0, 0], # Down
3: [0, 1, 0, 0, 0, 0], # Left
4: [0, 1, 0, 0, 1, 0], # Left + A
5: [0, 1, 0, 0, 0, 1], # Left + B
6: [0, 1, 0, 0, 1, 1], # Left + A + B
7: [0, 0, 0, 1, 0, 0], # Right
8: [0, 0, 0, 1, 1, 0], # Right + A
9: [0, 0, 0, 1, 0, 1], # Right + B
10: [0, 0, 0, 1, 1, 1], # Right + A + B
11: [0, 0, 0, 0, 1, 0], # A
12: [0, 0, 0, 0, 0, 1], # B
13: [0, 0, 0, 0, 1, 1], # A + B
}
self.action_space = gym.spaces.multi_discrete.DiscreteToMultiDiscrete(self.action_space, mapping)
def _step(self, action):
return self.env._step(self.action_space(action))
return ToDiscreteWrapper
def ToBox():
class ToBoxWrapper(gym.Wrapper):
"""
Wrapper to convert MultiDiscrete action space to Box
Only supports one config, which allows all keys to be pressed
"""
def __init__(self, env):
super(ToBoxWrapper, self).__init__(env)
self.action_space = gym.spaces.multi_discrete.BoxToMultiDiscrete(self.action_space)
def _step(self, action):
return self.env._step(self.action_space(action))
return ToBoxWrapper
def SetPlayingMode(target_mode):
""" target mode can be 'algo' or 'human' """
class SetPlayingModeWrapper(gym.Wrapper):
"""
Doom wrapper to change playing mode 'human' or 'algo'
"""
def __init__(self, env):
super(SetPlayingModeWrapper, self).__init__(env)
if target_mode not in ['algo', 'human']:
raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode))
self.unwrapped.mode = target_mode
return SetPlayingModeWrapper
def SetPlayingMode(target_mode):
""" target mode can be 'algo' or 'human' """
class SetPlayingModeWrapper(gym.Wrapper):
"""
Doom wrapper to change playing mode 'human' or 'algo'
"""
def __init__(self, env):
super(SetPlayingModeWrapper, self).__init__(env)
if target_mode not in ['algo', 'human']:
raise gym.error.Error('Error - The mode "{}" is not supported. Supported options are "algo" or "human"'.format(target_mode))
self.unwrapped._mode = target_mode
return SetPlayingModeWrapper
def __init__(self, env, noop_max=30):
"""Sample initial states by taking random number of no-ops on reset.
No-op is assumed to be action 0.
"""
gym.Wrapper.__init__(self, env)
self.noop_max = noop_max
self.override_num_noops = None
if isinstance(env.action_space, gym.spaces.MultiBinary):
self.noop_action = np.zeros(self.env.action_space.n, dtype=np.int64)
else:
# used for atari environments
self.noop_action = 0
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def __init__(self, env):
"""Take action on reset for environments that are fixed until firing."""
gym.Wrapper.__init__(self, env)
assert env.unwrapped.get_action_meanings()[1] == 'FIRE'
assert len(env.unwrapped.get_action_meanings()) >= 3
def __init__(self, env):
"""Make end-of-life == end-of-episode, but only reset on true game over.
Done by DeepMind for the DQN and co. since it helps value estimation.
"""
gym.Wrapper.__init__(self, env)
self.lives = 0
self.was_real_done = True
def __init__(self, env, skip=4):
"""Return only every `skip`-th frame"""
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype='uint8')
self._skip = skip
def __init__(self, env, k):
"""Stack k last frames.
Returns lazy array, which is much more memory efficient.
See Also
--------
baselines.common.atari_wrappers.LazyFrames
"""
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = spaces.Box(low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))
def __init__(self, env, k):
"""Stack k last frames.
Returns lazy array, which is much more memory efficient.
See Also
--------
LazyFrames
"""
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = spaces.Box(
low=0, high=255, shape=(shp[0], shp[1], shp[2] * k))
def SkipWrapper(repeat_count):
class SkipWrapper(gym.Wrapper):
"""
Generic common frame skipping wrapper
Will perform action for `x` additional steps
"""
def __init__(self, env):
super(SkipWrapper, self).__init__(env)
self.repeat_count = repeat_count
self.stepcount = 0
def _step(self, action):
done = False
total_reward = 0
current_step = 0
while current_step < (self.repeat_count + 1) and not done:
self.stepcount += 1
obs, reward, done, info = self.env.step(action)
total_reward += reward
current_step += 1
if 'skip.stepcount' in info:
raise gym.error.Error('Key "skip.stepcount" already in info. Make sure you are not stacking ' \
'the SkipWrapper wrappers.')
info['skip.stepcount'] = self.stepcount
return obs, total_reward, done, info
def _reset(self):
self.stepcount = 0
return self.env.reset()
return SkipWrapper