def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20):
SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210
self.observation_space = spaces.Tuple([
spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3)),
spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3))
])
self.action_space = spaces.Tuple(
[spaces.Discrete(3), spaces.Discrete(3)])
pygame.init()
self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
self._viewer = None
self._game = PongGame(
has_double_players=True,
window_size=(SCREEN_WIDTH, SCREEN_HEIGHT),
ball_speed=ball_speed,
bat_speed=bat_speed,
max_num_rounds=max_num_rounds)
python类Discrete()的实例源码
def __init__(self, env, gym_core_id=None):
super(GymCoreAction, self).__init__(env)
if gym_core_id is None:
# self.spec is None while inside of the make, so we need
# to pass gym_core_id in explicitly there. This case will
# be hit when instantiating by hand.
gym_core_id = self.spec._kwargs['gym_core_id']
spec = gym.spec(gym_core_id)
raw_action_space = gym_core_action_space(gym_core_id)
self._actions = raw_action_space.actions
self.action_space = gym_spaces.Discrete(len(self._actions))
if spec._entry_point.startswith('gym.envs.atari:'):
self.key_state = translator.AtariKeyState(gym.make(gym_core_id))
else:
self.key_state = None
def configureActions(self, discrete_actions):
# true if action space is discrete; 3 values; no push, left, right
# false if action space is continuous; fx, both (-action_force, action_force)
self.discrete_actions = discrete_actions
# 3 discrete actions: no push, left, right
# 1 continuous action elements; fx
if self.discrete_actions:
self.action_space = spaces.Discrete(3)
else:
self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1))
# Our observations can be within this box
float_max = np.finfo(np.float32).max
self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions):
# true if action space is discrete; 5 values; no push, left, right, up & down
# false if action space is continuous; fx, fy both (-action_force, action_force)
self.discrete_actions = discrete_actions
# 5 discrete actions: no push, left, right
# 2 continuous action elements; fx & fy
if self.discrete_actions:
self.action_space = spaces.Discrete(5)
else:
self.action_space = spaces.Box(-1.0, 1.0, shape=(2,))
# Our observations can be within this box
float_max = np.finfo(np.float32).max
self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def configureActions(self, discrete_actions):
# if it is possible to switch actions, do this here
# true if action space is discrete
# false if action space is continuous
self.discrete_actions = discrete_actions
# if self.discrete_actions:
# self.action_space = spaces.Discrete(3)
# else:
# self.action_space = spaces.Box(-1.0, 1.0, shape=(1, 1))
# # Our observations can be within this box
# float_max = np.finfo(np.float32).max
# self.observation_space = gym.spaces.Box(-float_max, float_max, self.state_shape)
def __init__(self, maxUmbralAstral):
# Print debug
self.debug = False
# Outer bound for Astral Fire and Umbral Ice
BLM.MAXUMBRALASTRAL = maxUmbralAstral
# Available buffs
self.BUFFS = []
# Maximum time available
self.MAXTIME = 45
self.HELPER = BLM.Helper()
# Available abilities
self.ABILITIES = [
BLM.Ability("Blizzard 1", 180, 6, 2.5, 2.49, self.HELPER.UmbralIceIncrease, BLM.DamageType.Ice, self.HELPER), #480
BLM.Ability("Fire 1", 180, 15, 2.5, 2.49, self.HELPER.AstralFireIncrease, BLM.DamageType.Fire, self.HELPER), #1200
BLM.Ability("Transpose", 0, 0, 0.75, 12.9, self.HELPER.SwapAstralUmbral, BLM.DamageType.Neither, self.HELPER),
BLM.Ability("Fire 3", 240, 30, 3.5, 2.5, self.HELPER.AstralFireMax, BLM.DamageType.Fire, self.HELPER), #2400
BLM.Ability("Blizzard 3", 240, 18, 3.5, 2.5, self.HELPER.UmbralIceMax, BLM.DamageType.Ice, self.HELPER), #2400
BLM.Ability("Fire 4", 260, 15, 2.8, 2.5, None, BLM.DamageType.Fire, self.HELPER)] #2400
# State including ability cooldowns, buff time remaining, mana, and Astral/Umbral
self.initialState = np.array([0] * (len(self.ABILITIES) + len(self.BUFFS)) + [BLM.MAXMANA] + [0])
self.state = self._reset()
# What the learner can pick between
self.action_space = spaces.Discrete(len(self.ABILITIES))
# What the learner can see to make a choice (cooldowns and buffs)
self.observation_space = spaces.MultiDiscrete([[0,180]] * (len(self.ABILITIES) + len(self.BUFFS)) + [[0, BLM.MAXMANA]] + [[-3,3]])
def __init__(self, env, monitor_path, video=True, **usercfg):
super(CEM, self).__init__(**usercfg)
self.env = wrappers.Monitor(env, monitor_path, force=True, video_callable=(None if video else False))
self.config.update(dict(
num_steps=env.spec.tags.get("wrapper_config.TimeLimit.max_episode_steps"), # maximum length of episode
n_iter=100, # number of iterations of CEM
batch_size=25, # number of samples per batch
elite_frac=0.2 # fraction of samples used as elite set
))
self.config.update(usercfg)
if isinstance(env.action_space, Discrete):
self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.n
elif isinstance(env.action_space, Box):
self.dim_theta = (env.observation_space.shape[0] + 1) * env.action_space.shape[0]
else:
raise NotImplementedError
# Initialize mean and standard deviation
self.theta_mean = np.zeros(self.dim_theta)
self.theta_std = np.ones(self.dim_theta)
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self, *args, **kwargs):
super(TestConverters, self).__init__(*args, **kwargs)
self.space_d = spaces.Discrete(4)
self.gym_out_d = 2
self.rf_out_d = [0, 0, 1, 0]
self.space_c = spaces.Box(-1, 1, [2, 4])
self.gym_out_c = np.random.uniform(low=-1, high=1, size=(2, 4))
self.rf_out_c = self.gym_out_c
self.space_b = spaces.MultiBinary(4)
self.gym_out_b = [0, 1, 0, 1]
self.rf_out_b = [[1, 0], [0, 1], [1, 0], [0, 1]]
self.space_t = spaces.Tuple((self.space_d,
self.space_c,
self.space_b,
spaces.Tuple((self.space_d, self.space_c))
))
self.gym_out_t = tuple([self.gym_out_d, self.gym_out_c, self.gym_out_b,
tuple([self.gym_out_d, self.gym_out_c])])
self.rf_out_t = tuple([self.rf_out_d, self.rf_out_c, self.rf_out_b,
tuple([self.rf_out_d, self.rf_out_c])])
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self, ball_speed=4, bat_speed=4, max_num_rounds=20):
SCREEN_WIDTH, SCREEN_HEIGHT = 160, 210
self.observation_space = spaces.Box(
low=0, high=255, shape=(SCREEN_HEIGHT, SCREEN_WIDTH, 3))
self.action_space = spaces.Discrete(3)
pygame.init()
self._surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
self._viewer = None
self._game = PongGame(
has_double_players=False,
window_size=(SCREEN_WIDTH, SCREEN_HEIGHT),
ball_speed=ball_speed,
bat_speed=bat_speed,
max_num_rounds=max_num_rounds)
def __init__(self,
action_space,
observation_space,
q_init=0.0,
learning_rate=0.1,
discount=1.0,
epsilon=0.05):
if not isinstance(action_space, spaces.Discrete):
raise TypeError("Action space type should be Discrete.")
if not isinstance(observation_space, spaces.Discrete):
raise TypeError("Observation space type should be Discrete.")
self._action_space = action_space
self._learning_rate = learning_rate
self._discount = discount
self._epsilon = epsilon
self._q = defaultdict(lambda: q_init * np.ones(action_space.n))
def __init__(self,
action_space,
observation_space,
batch_size=128,
learning_rate=1e-3,
discount=1.0,
epsilon=0.05):
if not isinstance(action_space, spaces.Discrete):
raise TypeError("Action space type should be Discrete.")
self._action_space = action_space
self._batch_size = batch_size
self._discount = discount
self._epsilon = epsilon
self._q_network = ConvNet(
num_channel_input=observation_space.shape[0],
num_output=action_space.n)
self._optimizer = optim.RMSprop(
self._q_network.parameters(), lr=learning_rate)
self._memory = ReplayMemory(100000)
def __init__(self,
action_space,
observation_space,
batch_size=128,
learning_rate=1e-3,
discount=1.0,
epsilon=0.05):
if not isinstance(action_space, spaces.Discrete):
raise TypeError("Action space type should be Discrete.")
self._action_space = action_space
self._batch_size = batch_size
self._discount = discount
self._epsilon = epsilon
self._q_network = FCNet(
input_size=reduce(lambda x, y: x * y, observation_space.shape),
output_size=action_space.n)
self._optimizer = optim.RMSprop(
self._q_network.parameters(), lr=learning_rate)
self._memory = ReplayMemory(100000)
def is_compound(space):
""" Checks whether a space is a compound space. These are non-scalar
`Box` spaces, `MultiDiscrete`, `MultiBinary` and `Tuple` spaces
(A Tuple space with a single, non-compound subspace is still considered
compound).
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, spaces.Discrete):
return False
elif isinstance(space, spaces.Box):
return len(space.shape) != 1 or space.shape[0] != 1
elif isinstance(space, (spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Tuple):
return True
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))
def num_discrete_actions(space):
"""
For a discrete space, gets the number of available actions as a tuple.
:param gym.Space space: The discrete space which to inspect.
:return tuple: Tuple of integers containing the number of discrete actions.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if not is_discrete(space):
raise TypeError("Space {} is not discrete".format(space))
if isinstance(space, spaces.Discrete):
return tuple((space.n,))
elif isinstance(space, spaces.MultiDiscrete):
# add +1 here as space.high is an inclusive bound
return tuple(space.high - space.low + 1)
elif isinstance(space, spaces.MultiBinary):
return (2,) * space.n
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space))) # pragma: no cover
def get_actions(game_or_env):
if isinstance(game_or_env, str):
env = gym.make(game_or_env)
else:
env = game_or_env
if isinstance(env.action_space, Discrete):
num_actions = env.action_space.n
elif isinstance(env.action_space, Box):
num_actions = np.prod(env.action_space.shape)
else:
raise Exception('Unsupported Action Space \'{}\''.format(
type(env.action_space).__name__))
indices = range(num_actions)
if env.spec.id in ['Pong-v0', 'Breakout-v0']:
# Gym currently specifies 6 actions for pong and breakout when only 3 are needed
# TODO: patch the environments instead
num_actions = 3
indices = [1 ,2, 3]
return num_actions, env.action_space, indices
def __init__(self, env, actrep=4, memlen=4, w=84, h=84, random_start=30):
print('Creating wrapper around Gym Environment')
self.env = env
self.memlen = memlen
self.W = w
self.H = h
self.actrep = actrep
self.random_start = random_start
if not isinstance(self.env.action_space, spaces.Discrete):
raise ValueError("Unsupported environment's (%s) action space. Expected: %s, Got: %s." %
(self.env.spec.id, self.env.action_space, spaces.Discrete))
self.action_space = list(range(self.env.action_space.n))
self.action_size = len(self.action_space)
self.stacked_s = None
for key in __custom_actions__:
if key == self.env.spec.id:
self.set_custom_actions(__custom_actions__[key])
break
print('Environment: %s. Action space: %s' % (self.env.spec.id, self.action_space))
def __init__(self, config=ENV_CONFIG):
self.config = config
if config["discrete_actions"]:
self.action_space = Discrete(10)
else:
self.action_space = Box(-1.0, 1.0, shape=(3,))
if config["use_depth_camera"]:
self.observation_space = Box(
-1.0, 1.0, shape=(config["x_res"], config["y_res"], 1))
else:
self.observation_space = Box(
0.0, 255.0, shape=(config["x_res"], config["y_res"], 3))
self._spec = lambda: None
self._spec.id = "Carla-v0"
self.server_port = None
self.server_process = None
self.client = None
self.num_steps = 0
self.prev_measurement = None
def configure(self, actions, frame_size, *, raw_array=False, max_step=-1):
'''
Usage:
self.super()._configure(actions, frame_size)
'''
self.frame_size = frame_size
self.raw_array = raw_array
self.image = Image.new('RGB', self.frame_size, 'black')
self.draw = ImageDraw.Draw(self.image)
self.max_step = max_step
self.step_cnt = 0
self.actions = actions
self.action_space = spaces.Discrete(len(actions))
self.observation_space = spaces.Box(0., 255., (*self.frame_size, 3))
self.__configured = True
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self, dim=(14, 9)):
self.dim = dim
self.size = dim[0] * dim[1]
self.max_blocks_per_turn = min(dim)
self.target_difficulty = None
self.target_pos = None
# Observe the world
self.observation_space = spaces.Tuple((
spaces.Box(0, num_block_type, shape=dim),
spaces.Box(np.array([0, 0]), np.array(dim)),
spaces.Discrete(num_directions),
spaces.Box(0, 1, shape=(1))
))
# Actions allow the world to be populated.
self.action_space = spaces.Discrete(num_actions)
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World((0,0))
self.moon = None
self.robots = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Up-Down: -1.0..-0.5 fire down engine, +0.5..+1.0 fire up engine, -0.5..0.5 off
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, up engine, right engin, down
self.action_space = spaces.Discrete(5)
self.hard_reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.init_red = 0.0025
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.pro_action_space = spaces.Discrete(3)
# Adversarial space is continuous on gravity here
grav_change_abs = np.array([0.0025])
self.adv_action_space = spaces.Box(-grav_change_abs,grav_change_abs)
self.observation_space = spaces.Box(self.low, self.high)
self._seed()
self.reset()
def __init__(self):
self._seed()
self.viewer = None
self.world = Box2D.b2World()
self.moon = None
self.lander = None
self.particles = []
self.prev_reward = None
high = np.array([np.inf]*8) # useful range is -1 .. +1, but spikes can be higher
self.observation_space = spaces.Box(-high, high)
if self.continuous:
# Action is two floats [main engine, left-right engines].
# Main engine: -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power.
# Left-right: -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off
self.action_space = spaces.Box(-1, +1, (2,))
else:
# Nop, fire left engine, main engine, right engine
self.action_space = spaces.Discrete(4)
self._reset()
def __init__(self, room_length=3, num_rooms_per_side=2):
assert room_length % 2 == 1, "room_length must be odd"
assert room_length >= 3, "room_length must be greater than 3"
assert num_rooms_per_side >= 1, "must have at least 1 room"
self.room_length = room_length
self.num_rooms_per_side = num_rooms_per_side
# 0 = up, 1 = right, 2 = down, 3 = left
self.action_space = spaces.Discrete(4)
self.max_pos = room_length * num_rooms_per_side - 1
obs_space = (self.max_pos + 1, self.max_pos + 1, 1)
self.observation_space = spaces.Box(low=0, high=1, shape=obs_space)
self.goal_reward = 1
self.goal_state = [self.max_pos, self.max_pos]
self._obs = np.zeros(obs_space)
self._reset()
def is_discrete(env):
return isinstance(env.action_space, Discrete)