def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20):
SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210
self.observation_space = spaces.Tuple([
spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)),
spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3))
])
self.action_space = spaces.Tuple(
[spaces.Discrete(3), spaces.Discrete(3)])
pygame.init()
self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
self._viewer = None
self._game = PongGame(
has_double_players=True,
window_size=(SCREEN_WIDTH, SCREEN_HEIGHT),
ball_speed=ball_speed,
bat_speed=bat_speed,
max_num_rounds=max_num_rounds)
python类Box()的实例源码
def __init__(self, env, prepro, shape, overwrite_render=True, high=255):
"""
Args:
env: (gym env)
prepro: (function) to apply to a state for preprocessing
shape: (list) shape of obs after prepro
overwrite_render: (bool) if True, render is overwriten to vizualise effect of prepro
grey_scale: (bool) if True, assume grey scale, else black and white
high: (int) max value of state after prepro
"""
super(PreproWrapper, self).__init__(env)
self.overwrite_render = overwrite_render
self.viewer = None
self.prepro = prepro
self.observation_space = spaces.Box(low=0, high=high, shape=shape)
self.high = high
def print_env(env: Env):
spec = getattr(env, 'spec', False)
if spec:
print(f'Env spec: {vars(spec)}')
acsp = env.action_space
obsp = env.observation_space
print(f'Observation space {obsp}')
if isinstance(obsp, Box) and len(obsp.high) < 20:
print(f'low = {obsp.low}\nhigh = {obsp.high}')
print(f'Action space {acsp}')
if isinstance(acsp, Box) and len(acsp.high) < 20:
print(f'low = {acsp.low}\nhigh = {acsp.high}')
print("")
def __init__(self, env, prepro, shape, overwrite_render=True, high=255):
"""
Args:
env: (gym env)
prepro: (function) to apply to a state for preprocessing
shape: (list) shape of obs after prepro
overwrite_render: (bool) if True, render is overwriten to vizualise effect of prepro
grey_scale: (bool) if True, assume grey scale, else black and white
high: (int) max value of state after prepro
"""
super(PreproWrapper, self).__init__(env)
self.overwrite_render = overwrite_render
self.viewer = None
self.prepro = prepro
self.observation_space = spaces.Box(low=0, high=high, shape=shape)
self.high = high
def configureActions(self, discrete_actions):
# true if action space is discrete; 3 values; no push, left, right
# false if action space is continuous; fx, both (-action_force, action_force)
self.discrete_actions = discrete_actions
# 3 discrete actions: no push, left, right
# 1 continuous action elements; fx
if self.discrete_actions:
self.action_space = spaces.Discrete(3)
else:
self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1))
# Our observations can be within this box
float_max = np.finfo(np.float32).max
self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions):
# true if action space is discrete; 5 values; no push, left, right, up & down
# false if action space is continuous; fx, fy both (-action_force, action_force)
self.discrete_actions = discrete_actions
# 5 discrete actions: no push, left, right
# 2 continuous action elements; fx & fy
if self.discrete_actions:
self.action_space = spaces.Discrete(5)
else:
self.action_space = spaces.Box(-1.0, 1.0, shape=(2,))
# Our observations can be within this box
float_max = np.finfo(np.float32).max
self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions):
# if it is possible to switch actions, do this here
# true if action space is discrete
# false if action space is continuous
self.discrete_actions = discrete_actions
# if self.discrete_actions:
# self.action_space = spaces.Discrete(3)
# else:
# self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1))
# # Our observations can be within this box
# float_max = np.finfo(np.float32).max
# self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def __init__(self, cycle_cap, chain_cap, min, max, w_fun):
self.cycle_cap = cycle_cap
self.chain_cap = chain_cap
self.min = min
self.max = max
self.w_fun = w_fun
self.action_space = spaces.Box(min, max, (len(BLOODS)**2,))
self.params = {
"cycle_cap": cycle_cap,
"chain_cap": chain_cap,
"min": min,
"max": max
}
self.stats = {
"cycle_reward": 0,
"chain_reward": 0
}
for blood in BLOODS:
self.stats["%s_patient_matched" % blood] = 0
self.stats["%s_donor_matched" % blood] = 0
def __init__(self, env, monitor_path, video=True, **usercfg):
super(CEM, self).__init__(**usercfg)
self.env = wrappers.Monitor(env, monitor_path, force=True, video_callable=(None if video else False))
self.config.update(dict(
num_steps=env.spec.tags.get("wrapper_config.TimeLimit.max_episode_steps"), # maximum length of episode
n_iter=100, # number of iterations of CEM
batch_size=25, # number of samples per batch
elite_frac=0.2 # fraction of samples used as elite set
))
self.config.update(usercfg)
if isinstance(env.action_space, Discrete):
self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.n
elif isinstance(env.action_space, Box):
self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.shape[0]
else:
raise NotImplementedError
# Initialize mean and standard deviation
self.theta_mean = np.zeros(self.dim_theta)
self.theta_std = np.ones(self.dim_theta)
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0, 0.0, 1), # learning rate
spaces.Box(-7.0, -2.0, 1), # decay
spaces.Box(-5.0, 0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0, 1.0, 1), # l1 reg
spaces.Box(-6.0, 1.0, 1), # l2 reg
spaces.Box(0.0, 1.0, (5, 2)), # convolutional layer parameters
spaces.Box(0.0, 1.0, (2, 2)), # fully connected layer parameters
))
# observation features, in order: num of instances, num of labels,
# validation accuracy after training with given parameters
self.observation_space = spaces.Box(-1e5, 1e5, 2) # validation accuracy
# Start the first game
self._reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0,0.0, 1), # learning rate
spaces.Box(-7.0,-2.0, 1), # decay
spaces.Box(-5.0,0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0,1.0, 1), # l1 reg
spaces.Box(-6.0,1.0, 1), # l2 reg
))
# observation features, in order: num of instances, num of labels,
# number of filter in part A / B of neural net, num of neurons in
# output layer, validation accuracy after training with given
# parameters
self.observation_space = spaces.Box(-1e5,1e5, 6) # validation accuracy
# Start the first game
self._reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self, *args, **kwargs):
super(TestConverters, self).__init__(*args, **kwargs)
self.space_d = spaces.Discrete(4)
self.gym_out_d = 2
self.rf_out_d = [0, 0, 1, 0]
self.space_c = spaces.Box(-1, 1, [2, 4])
self.gym_out_c = np.random.uniform(low=-1, high=1, size=(2, 4))
self.rf_out_c = self.gym_out_c
self.space_b = spaces.MultiBinary(4)
self.gym_out_b = [0, 1, 0, 1]
self.rf_out_b = [[1, 0], [0, 1], [1, 0], [0, 1]]
self.space_t = spaces.Tuple((self.space_d,
self.space_c,
self.space_b,
spaces.Tuple((self.space_d, self.space_c))
))
self.gym_out_t = tuple([self.gym_out_d, self.gym_out_c, self.gym_out_b,
tuple([self.gym_out_d, self.gym_out_c])])
self.rf_out_t = tuple([self.rf_out_d, self.rf_out_c, self.rf_out_b,
tuple([self.rf_out_d, self.rf_out_c])])
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20):
SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210
self.observation_space = spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3))
self.action_space = spaces.Discrete(3)
pygame.init()
self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
self._viewer = None
self._game = PongGame(
has_double_players=False,
window_size=(SCREEN_WIDTH, SCREEN_HEIGHT),
ball_speed=ball_speed,
bat_speed=bat_speed,
max_num_rounds=max_num_rounds)
def is_compound(space):
""" Checks whether a space is a compound space. These are non-scalar
`Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces
(A Tuple space with a single, non-compound subspace is still considered
compound).
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, spaces.Discrete):
return False
elif isinstance(space, spaces.Box):
return len(space.shape) != 1 or space.shape[0] != 1
elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Tuple):
return True
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def get_actions(game_or_env):
if isinstance(game_or_env, str):
env = gym.make(game_or_env)
else:
env = game_or_env
if isinstance(env.action_space, Discrete):
num_actions = env.action_space.n
elif isinstance(env.action_space, Box):
num_actions = np.prod(env.action_space.shape)
else:
raise Exception('Unsupported Action Space \'{}\''.format(
type(env.action_space).__name__))
indices = range(num_actions)
if env.spec.id in ['Pong-v0', 'Breakout-v0']:
# Gym currently specifies 6 actions for pong and breakout when only 3 are needed
# TODO: patch the environments instead
num_actions = 3
indices = [1 ,2, 3]
return num_actions, env.action_space, indices
def __init__(self, config=ENV_CONFIG):
self.config = config
if config["discrete_actions"]:
self.action_space = Discrete(10)
else:
self.action_space = Box(-1.0, 1.0, shape=(3,))
if config["use_depth_camera"]:
self.observation_space = Box(
-1.0, 1.0, shape=(config["x_res"], config["y_res"], 1))
else:
self.observation_space = Box(
0.0, 255.0, shape=(config["x_res"], config["y_res"], 3))
self._spec = lambda: None
self._spec.id = "Carla-v0"
self.server_port = None
self.server_process = None
self.client = None
self.num_steps = 0
self.prev_measurement = None
def configure(self, actions, frame_size, *, raw_array=False, max_step=-1):
'''
Usage:
self.super()._configure(actions, frame_size)
'''
self.frame_size = frame_size
self.raw_array = raw_array
self.image = Image.new('RGB', self.frame_size, 'black')
self.draw = ImageDraw.Draw(self.image)
self.max_step = max_step
self.step_cnt = 0
self.actions = actions
self.action_space = spaces.Discrete(len(actions))
self.observation_space = spaces.Box(0., 255., (*self.frame_size, 3))
self.__configured = True
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0, 0.0, 1), # learning rate
spaces.Box(-7.0, -2.0, 1), # decay
spaces.Box(-5.0, 0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0, 1.0, 1), # l1 reg
spaces.Box(-6.0, 1.0, 1), # l2 reg
spaces.Box(0.0, 1.0, (5, 2)), # convolutional layer parameters
spaces.Box(0.0, 1.0, (2, 2)), # fully connected layer parameters
))
# observation features, in order: num of instances, num of labels,
# validation accuracy after training with given parameters
self.observation_space = spaces.Box(-1e5, 1e5, 2) # validation accuracy
# Start the first game
self._reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0,0.0, 1), # learning rate
spaces.Box(-7.0,-2.0, 1), # decay
spaces.Box(-5.0,0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0,1.0, 1), # l1 reg
spaces.Box(-6.0,1.0, 1), # l2 reg
))
# observation features, in order: num of instances, num of labels,
# number of filter in part A / B of neural net, num of neurons in
# output layer, validation accuracy after training with given
# parameters
self.observation_space = spaces.Box(-1e5,1e5, 6) # validation accuracy
# Start the first game
self._reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self, dim=(14, 9)):
self.dim = dim
self.size = dim[0] * dim[1]
self.max_blocks_per_turn = min(dim)
self.target_difficulty = None
self.target_pos = None
# Observe the world
self.observation_space = spaces.Tuple((
spaces.Box(0, num_block_type, shape=dim),
spaces.Box(np.array([0, 0]), np.array(dim)),
spaces.Discrete(num_directions),
spaces.Box(0, 1, shape=(1))
))
# Actions allow the world to be populated.
self.action_space = spaces.Discrete(num_actions)
def step(self, action):
if isinstance(self._env.action_space, Box):
# rescale the action
lb = self._env.action_space.low
ub = self._env.action_space.high
scaled_action = lb + (action + 1.) * 0.5 * (ub - lb)
scaled_action = np.clip(scaled_action, lb, ub)
else:
scaled_action = action
wrapped_step = self._env.step(scaled_action)
next_obs, reward, done, info = wrapped_step
if self._normalize_obs:
next_obs = self._apply_normalize_obs(next_obs)
if self._normalize_reward:
reward = self._apply_normalize_reward(reward)
return next_obs, reward, done, info
def step(self, action):
if isinstance(self._env.action_space, Box):
# rescale the action
lb = self._env.action_space.low
ub = self._env.action_space.high
scaled_action = lb + (action + 1.) * 0.5 * (ub - lb)
scaled_action = np.clip(scaled_action, lb, ub)
else:
scaled_action = action
wrapped_step = self._env.step(scaled_action)
_, reward, done, info = wrapped_step
next_frame_raw = self._env.render('rgb_array')
next_frame = self._process_frame42(next_frame_raw)
return next_frame_raw, next_frame, reward, done, info
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World((0,0))
self.moon = None
self.robots = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Up-Down: -1.0..-0.5 fire down engine, +0.5..+1.0 fire up engine, -0.5..0.5 off
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, up engine, right engin, down
self.action_space = spaces.Discrete(5)
self.hard_reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0, 0.0, 1), # learning rate
spaces.Box(-7.0, -2.0, 1), # decay
spaces.Box(-5.0, 0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0, 1.0, 1), # l1 reg
spaces.Box(-6.0, 1.0, 1), # l2 reg
spaces.Box(0.0, 1.0, (5, 2)), # convolutional layer parameters
spaces.Box(0.0, 1.0, (2, 2)), # fully connected layer parameters
))
# observation features, in order: num of instances, num of labels,
# validation accuracy after training with given parameters
self.observation_space = spaces.Box(-1e5, 1e5, 2) # validation accuracy
# Start the first game
self._reset()
def __init__(self, natural=False):
"""
Initialize environment
"""
# I use array of len 1 to store constants (otherwise there were some errors)
self.action_space = spaces.Tuple((
spaces.Box(-5.0,0.0, 1), # learning rate
spaces.Box(-7.0,-2.0, 1), # decay
spaces.Box(-5.0,0.0, 1), # momentum
spaces.Box(2, 8, 1), # batch size
spaces.Box(-6.0,1.0, 1), # l1 reg
spaces.Box(-6.0,1.0, 1), # l2 reg
))
# observation features, in order: num of instances, num of labels,
# number of filter in part A / B of neural net, num of neurons in
# output layer, validation accuracy after training with given
# parameters
self.observation_space = spaces.Box(-1e5,1e5, 6) # validation accuracy
# Start the first game
self._reset()