def make_env():
env_spec = gym.spec('ppaquette/DoomBasic-v0')
env_spec.id = 'DoomBasic-v0'
env = env_spec.make()
e = PreprocessImage(SkipWrapper(4)(ToDiscrete("minimal")(env)),
width=80, height=80, grayscale=True)
return e
python类SkipWrapper()的实例源码
def test_skip():
every_two_frame = SkipWrapper(2)
env = gym.make("FrozenLake-v0")
env = every_two_frame(env)
obs = env.reset()
env.render()
def simpleSSBMEnv(act_every=3, **kwargs):
env = SSBMEnv(**kwargs)
# TODO: make this a wrapper
env.action_space = spaces.Discrete(len(ssbm.simpleControllerStates))
env.realController = lambda action: ssbm.simpleControllerStates[action].realController()
from .box_wrapper import BoxWrapper
env = BoxWrapper(env)
from gym.wrappers import SkipWrapper
return SkipWrapper(3)(env)
def RepeatActionWrapper(env, repeat):
"""
This is just a thin wrapper around `gym.wrappes.SkipWrapper`
to get a consistent interface.
:param gym.env env: Environment to wrap
:param int repeat: Number of times that an action will be repeated.
:return gym.Wrapper: A wrapper that repeats an action for `repeat`
steps.
"""
from gym.wrappers import SkipWrapper
return SkipWrapper(repeat)(env)
def test_skip():
every_two_frame = SkipWrapper(2)
env = gym.make("FrozenLake-v0")
env = every_two_frame(env)
obs = env.reset()
env.render()
def test_skip():
every_two_frame = SkipWrapper(2)
env = gym.make("FrozenLake-v0")
env = every_two_frame(env)
obs = env.reset()
env.render()
def dqn_atari(logdir, env='Pong', memory_size=100000):
import numpy as np
import gym
import tensorflow as tf
from gym import wrappers
from tensorflow.contrib import layers
from tensorflow.contrib.framework import arg_scope
from chi.util import in_collections
chi.set_loglevel('debug')
log_top(logdir+'/logs/top')
log_nvidia_smi(logdir+'/logs/nvidia-smi')
env += 'NoFrameskip-v3'
env = gym.make(env)
env = chi.rl.wrappers.AtariWrapper(env)
env = chi.rl.wrappers.StackFrames(env, 4)
env = wrappers.SkipWrapper(4)(env)
test = 10
train = 40
env = monitor = wrappers.Monitor(env, logdir+'/monitor', video_callable=lambda i: i % (test+train) == 0 or i % (test+train) == train)
print_env(env)
@chi.model(tracker=tf.train.ExponentialMovingAverage(1-.0005), # TODO: replace with original weight freeze
optimizer=tf.train.RMSPropOptimizer(.00025, .95, .95, .01))
def q_network(x):
x /= 255
x = layers.conv2d(x, 32, 8, 4)
x = layers.conv2d(x, 64, 4, 2)
x = layers.conv2d(x, 64, 3, 1)
x = layers.flatten(x)
x = layers.fully_connected(x, 512)
x = layers.fully_connected(x, env.action_space.n, activation_fn=None)
x = tf.identity(x, name='Q')
return x
memory = chi.rl.ReplayMemory(memory_size, 32)
agent = DqnAgent(env, q_network, memory)
from time import time
step = monitor.get_total_steps()
t = time()
for ep in range(100000):
for _ in range(train):
agent.play_episode()
for _ in range(test):
agent.play_episode(test=True)
ar = np.mean(monitor.get_episode_rewards()[-(train+test):-test])
at = np.mean(monitor.get_episode_rewards()[-test:])
ds = monitor.get_total_steps() - step
step = monitor.get_total_steps()
dt = time() - t
t = time()
print(f'av. test return {at}, av. train return {ar}, av. fps {ds/dt}')