def __init__(self):
super(OffSwitchCartpoleProbEnv, self).__init__()
self.observation_space = spaces.Tuple((spaces.Discrete(2), self.observation_space))
self.threshold_crossed = False
# number of episodes in which the cart crossed the left/right threshold (first).
self.num_crosses = [0.,0.]
python类Tuple()的实例源码
offswitch_cartpole_prob.py 文件源码
项目:AI-Fight-the-Landlord
作者: YoungGer
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def __init__(self):
super(PredictObsCartpoleEnv, self).__init__()
self.cartpole = CartPoleEnv()
self.observation_space = self.cartpole.observation_space
self.action_space = spaces.Tuple((self.cartpole.action_space,) + (self.cartpole.observation_space,) * (NUM_PREDICTED_OBSERVATIONS))
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset()
def __init__(self, initialWealth=25.0, edge=0.6, maxWealth=250.0, maxRounds=300):
self.action_space = spaces.Discrete(int(maxWealth*100)) # betting in penny increments
self.observation_space = spaces.Tuple((
spaces.Box(0, maxWealth, [1]), # (w,b)
spaces.Discrete(maxRounds+1)))
self.reward_range = (0, maxWealth)
self.edge = edge
self.wealth = initialWealth
self.initialWealth = initialWealth
self.maxRounds = maxRounds
self.maxWealth = maxWealth
self._seed()
self._reset()
def __init__(self, initialWealth=25.0, edgePriorAlpha=7, edgePriorBeta=3, maxWealthAlpha=5.0, maxWealthM=200.0, maxRoundsMean=300.0, maxRoundsSD=25.0, reseed=True):
# store the hyperparameters for passing back into __init__() during resets so the same hyperparameters govern the next game's parameters, as the user expects: TODO: this is boilerplate, is there any more elegant way to do this?
self.initialWealth=float(initialWealth)
self.edgePriorAlpha=edgePriorAlpha
self.edgePriorBeta=edgePriorBeta
self.maxWealthAlpha=maxWealthAlpha
self.maxWealthM=maxWealthM
self.maxRoundsMean=maxRoundsMean
self.maxRoundsSD=maxRoundsSD
# draw this game's set of parameters:
edge = prng.np_random.beta(edgePriorAlpha, edgePriorBeta)
maxWealth = round(genpareto.rvs(maxWealthAlpha, maxWealthM, random_state=prng.np_random))
maxRounds = int(round(prng.np_random.normal(maxRoundsMean, maxRoundsSD)))
# add an additional global variable which is the sufficient statistic for the Pareto distribution on wealth cap;
# alpha doesn't update, but x_m does, and simply is the highest wealth count we've seen to date:
self.maxEverWealth = float(self.initialWealth)
# for the coinflip edge, it is total wins/losses:
self.wins = 0
self.losses = 0
# for the number of rounds, we need to remember how many rounds we've played:
self.roundsElapsed = 0
# the rest proceeds as before:
self.action_space = spaces.Discrete(int(maxWealth*100))
self.observation_space = spaces.Tuple((
spaces.Box(0, maxWealth, shape=[1]), # current wealth
spaces.Discrete(maxRounds+1), # rounds elapsed
spaces.Discrete(maxRounds+1), # wins
spaces.Discrete(maxRounds+1), # losses
spaces.Box(0, maxWealth, [1]))) # maximum observed wealth
self.reward_range = (0, maxWealth)
self.edge = edge
self.wealth = self.initialWealth
self.maxRounds = maxRounds
self.rounds = self.maxRounds
self.maxWealth = maxWealth
if reseed or not hasattr(self, 'np_random') : self._seed()
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset() # Number of
self.nA = 2
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset()
def __init__(self):
super(OffSwitchCartpoleEnv, self).__init__()
self.observation_space = spaces.Tuple((spaces.Discrete(2), self.observation_space))
self.left_threshold_crossed = False
# number of episodes in which the cart crossed the left/right threshold (first).
self.num_crosses = [0.,0.]
def __init__(self):
super(PredictActionsCartpoleEnv, self).__init__()
self.cartpole = CartPoleEnv()
self.observation_space = self.cartpole.observation_space
self.action_space = spaces.Tuple((self.cartpole.action_space,) * (NUM_PREDICTED_ACTIONS+1))
def __init__(self):
super(PredictObsCartpoleEnv, self).__init__()
self.cartpole = CartPoleEnv()
self.observation_space = self.cartpole.observation_space
self.action_space = spaces.Tuple((self.cartpole.action_space,) + (self.cartpole.observation_space,) * (NUM_PREDICTED_OBSERVATIONS))
def __init__(self, base=10, chars=False, starting_min_length=2):
"""
base: Number of distinct characters.
chars: If True, use uppercase alphabet. Otherwise, digits. Only affects
rendering.
starting_min_length: Minimum input string length. Ramps up as episodes
are consistently solved.
"""
self.base = base
# Keep track of this many past episodes
self.last = 10
# Cumulative reward earned this episode
self.episode_total_reward = None
# Running tally of reward shortfalls. e.g. if there were 10 points to earn and
# we got 8, we'd append -2
AlgorithmicEnv.reward_shortfalls = []
if chars:
self.charmap = [chr(ord('A')+i) for i in range(base)]
else:
self.charmap = [str(i) for i in range(base)]
self.charmap.append(' ')
# TODO: Not clear why this is a class variable rather than instance.
# Could lead to some spooky action at a distance if someone is working
# with multiple algorithmic envs at once. Also makes testing tricky.
AlgorithmicEnv.min_length = starting_min_length
# Three sub-actions:
# 1. Move read head left or write (or up/down)
# 2. Write or not
# 3. Which character to write. (Ignored if should_write=0)
self.action_space = Tuple(
[Discrete(len(self.MOVEMENTS)), Discrete(2), Discrete(self.base)]
)
# Can see just what is on the input tape (one of n characters, or nothing)
self.observation_space = Discrete(self.base + 1)
self._seed()
self.reset()
def __init__(self,
worlds_pickle_filename=os.path.join(os.path.dirname(__file__), "assets", "worlds_640x480_v0.pkl"),
world_idx=0,
initial_position = np.array([-20.0, -20.0]),
destination = np.array([520.0, 400.0]),
max_observation_range = 100.0,
destination_tolerance_range=20.0,
add_self_position_to_observation=False,
add_goal_position_to_observation=False):
worlds = EnvironmentCollection()
worlds.read(worlds_pickle_filename)
self.world = worlds.map_collection[world_idx]
self.set_destination(destination)
assert not (self.destination is None)
self.init_position = initial_position
self.state = self.init_position.copy()
self.max_observation_range = max_observation_range
self.destination_tolerance_range = destination_tolerance_range
self.viewer = None
self.num_beams = 16
self.max_speed = 5
self.add_self_position_to_observation = add_self_position_to_observation
self.add_goal_position_to_observation = add_goal_position_to_observation
low = np.array([0.0, 0.0])
high = np.array([self.max_speed, 2*pi])
self.action_space = Box(low, high)#Tuple( (Box(0.0, self.max_speed, (1,)), Box(0.0, 2*pi, (1,))) )
low = [-1.0] * self.num_beams
high = [self.max_observation_range] * self.num_beams
if add_self_position_to_observation:
low.extend([-10000., -10000.]) # x and y coords
high.extend([10000., 10000.])
if add_goal_position_to_observation:
low.extend([-10000., -10000.]) # x and y coords
high.extend([10000., 10000.])
self.observation_space = Box(np.array(low), np.array(high))
self.observation = []
def _render(self, mode='human', close=False):
if close:
# Nothing interesting to close
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
inp = "Total length of input instance: %d, step: %d\n" % (self.input_width, self.time)
outfile.write(inp)
x, y, action = self.read_head_position, self.write_head_position, self.last_action
if action is not None:
inp_act, out_act, pred = action
outfile.write("=" * (len(inp) - 1) + "\n")
y_str = "Output Tape : "
target_str = "Targets : "
if action is not None:
pred_str = self.charmap[pred]
x_str = self._render_observation()
for i in range(-2, len(self.target) + 2):
target_str += self._get_str_target(i)
if i < y - 1:
y_str += self._get_str_target(i)
elif i == (y - 1):
if action is not None and out_act == 1:
color = 'green' if pred == self.target[i] else 'red'
y_str += colorize(pred_str, color, highlight=True)
else:
y_str += self._get_str_target(i)
outfile.write(x_str)
outfile.write(y_str + "\n")
outfile.write(target_str + "\n\n")
if action is not None:
outfile.write("Current reward : %.3f\n" % self.last_reward)
outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward)
move = self.MOVEMENTS[inp_act]
outfile.write("Action : Tuple(move over input: %s,\n" % move)
out_act = out_act == 1
outfile.write(" write to the output tape: %s,\n" % out_act)
outfile.write(" prediction: %s)\n" % pred_str)
else:
outfile.write("\n" * 5)
return outfile
def _render(self, mode='human', close=False):
if close:
# Nothing interesting to close
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
inp = "Total length of input instance: %d, step: %d\n" % (self.input_width, self.time)
outfile.write(inp)
x, y, action = self.read_head_position, self.write_head_position, self.last_action
if action is not None:
inp_act, out_act, pred = action
outfile.write("=" * (len(inp) - 1) + "\n")
y_str = "Output Tape : "
target_str = "Targets : "
if action is not None:
pred_str = self.charmap[pred]
x_str = self._render_observation()
for i in range(-2, len(self.target) + 2):
target_str += self._get_str_target(i)
if i < y - 1:
y_str += self._get_str_target(i)
elif i == (y - 1):
if action is not None and out_act == 1:
color = 'green' if pred == self.target[i] else 'red'
y_str += colorize(pred_str, color, highlight=True)
else:
y_str += self._get_str_target(i)
outfile.write(x_str)
outfile.write(y_str + "\n")
outfile.write(target_str + "\n\n")
if action is not None:
outfile.write("Current reward : %.3f\n" % self.last_reward)
outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward)
move = self.MOVEMENTS[inp_act]
outfile.write("Action : Tuple(move over input: %s,\n" % move)
out_act = out_act == 1
outfile.write(" write to the output tape: %s,\n" % out_act)
outfile.write(" prediction: %s)\n" % pred_str)
else:
outfile.write("\n" * 5)
return outfile
def _render(self, mode='human', close=False):
if close:
# Nothing interesting to close
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
inp = "Total length of input instance: %d, step: %d\n" % (self.input_width, self.time)
outfile.write(inp)
x, y, action = self.read_head_position, self.write_head_position, self.last_action
if action is not None:
inp_act, out_act, pred = action
outfile.write("=" * (len(inp) - 1) + "\n")
y_str = "Output Tape : "
target_str = "Targets : "
if action is not None:
pred_str = self.charmap[pred]
x_str = self._render_observation()
for i in range(-2, len(self.target) + 2):
target_str += self._get_str_target(i)
if i < y - 1:
y_str += self._get_str_target(i)
elif i == (y - 1):
if action is not None and out_act == 1:
color = 'green' if pred == self.target[i] else 'red'
y_str += colorize(pred_str, color, highlight=True)
else:
y_str += self._get_str_target(i)
outfile.write(x_str)
outfile.write(y_str + "\n")
outfile.write(target_str + "\n\n")
if action is not None:
outfile.write("Current reward : %.3f\n" % self.last_reward)
outfile.write("Cumulative reward : %.3f\n" % self.episode_total_reward)
move = self.MOVEMENTS[inp_act]
outfile.write("Action : Tuple(move over input: %s,\n" % move)
out_act = out_act == 1
outfile.write(" write to the output tape: %s,\n" % out_act)
outfile.write(" prediction: %s)\n" % pred_str)
else:
outfile.write("\n" * 5)
return outfile
def _to_rf_space(space):
"""Converts Gym space instance into ReinforceFlow."""
if isinstance(space, spaces.Discrete):
return DiscreteOneHot(space.n)
if isinstance(space, spaces.MultiDiscrete):
# space.low > 0 will lead to unused first n actions.
# return Tuple([DiscreteOneHot(n) for n in space.high])
raise ValueError("MultiDiscrete spaces aren't supported yet.")
if isinstance(space, spaces.MultiBinary):
return Tuple([DiscreteOneHot(2) for _ in space.n])
if isinstance(space, spaces.Box):
return Continious(space.low, space.high)
if isinstance(space, spaces.Tuple):
converted_spaces = []
for sub_space in space.spaces:
converted_spaces.append(_to_rf_space(sub_space))
return Tuple(*converted_spaces)
raise ValueError("Unsupported space %s." % space)
def _make_gym2rf_converter(space):
"""Makes converter function that maps space samples Gym -> ReinforceFlow."""
# TODO: add spaces.MultiDiscrete support.
if isinstance(space, spaces.Discrete):
def converter(sample):
return one_hot(space.n, sample)
return converter
if isinstance(space, spaces.MultiBinary):
def converter(sample):
return tuple([one_hot(2, s) for s in sample])
return converter
if isinstance(space, spaces.Box):
return lambda sample: sample
if isinstance(space, spaces.Tuple):
sub_converters = []
for sub_space in space.spaces:
sub_converters.append(_make_gym2rf_converter(sub_space))
def converter(sample):
converted_tuple = []
for sub_sample, sub_converter in zip(sample, sub_converters):
converted_tuple.append(sub_converter(sub_sample))
return tuple(converted_tuple)
return converter
raise ValueError("Unsupported space %s." % space)
def is_discrete(space):
""" Checks if a space is discrete. A space is considered to
be discrete if it is derived from Discrete, MultiDiscrete
or MultiBinary.
A Tuple space is discrete if it contains only discrete
subspaces.
:raises TypeError: If the space is no `gym.Space`.
"""
assert_space(space)
if isinstance(space, (spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary)):
return True
elif isinstance(space, spaces.Box):
return False
elif isinstance(space, spaces.Tuple):
return all(map(is_discrete, space.spaces))
raise NotImplementedError("Unknown space {} of type {} supplied".format(space, type(space)))