Source code for rlgraph.environments.grid_world

# Copyright 2018 The RLgraph authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math
import numpy as np
import random
from six.moves import xrange as range_
import time

from rlgraph.environments import Environment
import rlgraph.spaces as spaces


[docs]class GridWorld(Environment): """ A classic grid world where the action space is up,down,left,right and the field types are: 'S' : starting point ' ' : free space 'W' : wall (blocks) 'H' : hole (terminates episode) (to be replaced by W in save-mode) 'F' : fire (usually causing negative reward) 'G' : goal state (terminates episode) TODO: Create an option to introduce a continuous action space. """ # Some built-in maps. MAPS = { "chain": [ "G S F G" ], "2x2": [ "SH", " G" ], "4x4": [ "S ", " H H", " H", "H G" ], "8x8": [ "S ", " ", " H ", " H ", " H ", " HH H ", " H H H ", " H G" ], "8x16": [ "S H ", " H HH ", " FF WWWWWWW", " H W ", " FF W H ", " W ", " FF W ", " H H G" ], "16x16": [ "S H ", " HH ", " FF W W", " W ", "WWW FF H ", " W ", " FFFF W ", " H H ", " H ", " H HH ", "WWWW WWWWWWW", " H W W ", " FF W H W ", "WWWW WW W ", " FF W ", " H H G" ] } def __init__(self, world="4x4", save_mode=False, reward_function="sparse", state_representation="discr"): """ Args: world (Union[str,List[str]]): Either a string to map into `MAPS` or a list of strings describing the rows of the world (e.g. ["S ", " G"] for a two-row/two-column world with start and goal state). save_mode (bool): Whether to replace holes (H) with walls (W). Default: False. reward_function (str): One of sparse: hole=-1, fire=-1, goal=50, all other steps=-1 rich: hole=-100, fire=-10, goal=50 state_representation (str): One of "discr_pos", "xy_pos", "cam" """ # Build our map. if isinstance(world, str): self.description = world world = self.MAPS[world] else: self.description = "custom-map" world = np.array(list(map(list, world))) # Apply safety switch. world[world == 'H'] = ("H" if not save_mode else "F") # `world` is a list of lists that needs to be indexed using y/x pairs (first row, then column). self.world = world self.n_row, self.n_col = self.world.shape (start_x,), (start_y,) = np.nonzero(self.world == "S") # Figure out our state space. assert state_representation in ["discr", "xy", "cam"] self.state_representation = state_representation # Discrete states (single int from 0 to n). if self.state_representation == "discr": state_space = spaces.IntBox(self.n_row * self.n_col) # x/y position (2 ints). elif self.state_representation == "xy_pos": state_space = spaces.IntBox(low=(0, 0), high=(self.n_col, self.n_row), shape=(2,)) # Camera outputting a 2D color image of the world. else: state_space = spaces.IntBox(0, 255, shape=(self.n_row, self.n_col, 3)) self.default_start_pos = self.get_discrete_pos(start_x, start_y) self.discrete_pos = self.default_start_pos assert reward_function in ["sparse", "rich"] # TODO: "potential"-based reward self.reward_function = reward_function # Store the goal position for proximity calculations (for "potential" reward function). (self.goal_x,), (self.goal_y,) = np.nonzero(self.world == "G") # Call the super's constructor. super(GridWorld, self).__init__(state_space=state_space, action_space=spaces.IntBox(4)) # Reset ourselves. self.state = None self.camera_pixels = None # only used, if state_representation=='cam' self.reward = None self.is_terminal = None self.reset(randomize=False)
[docs] def seed(self, seed=None): if seed is None: seed = time.time() np.random.seed(seed) return seed
[docs] def reset(self, randomize=False): """ Args: randomize (bool): Whether to start the new episode in a random position (instead of "S"). This could be an empty space (" "), the default start ("S") or a fire field ("F"). """ if randomize is False: self.discrete_pos = self.default_start_pos else: # Move to a random first position (" ", "S", or "F" (ouch!) are all ok to start in). while True: self.discrete_pos = random.choice(range(self.n_row * self.n_col)) if self.world[self.y, self.x] in [" ", "S", "F"]: break self.reward = 0.0 self.is_terminal = False self.refresh_state() return self.state
[docs] def step(self, actions, set_discrete_pos=None): """ Action map: 0: up 1: right 2: down 3: left Args: actions (int): An integer 0-3 that describes the next action. set_discrete_pos (Optional[int]): An integer to set the current discrete position to before acting. Returns: tuple: State Space (Space), reward (float), is_terminal (bool), info (usually None). """ # Process possible manual setter instruction. if set_discrete_pos is not None: assert isinstance(set_discrete_pos, int) and 0 <= set_discrete_pos < self.state_space.flat_dim self.discrete_pos = set_discrete_pos # then perform an action possible_next_positions = self.get_possible_next_positions(self.discrete_pos, actions) # determine the next state based on the transition function probs = [x[1] for x in possible_next_positions] next_state_idx = np.random.choice(len(probs), p=probs) self.discrete_pos = possible_next_positions[next_state_idx][0] next_x = self.discrete_pos // self.n_col next_y = self.discrete_pos % self.n_col # determine reward and done flag next_state_type = self.world[next_y, next_x] if next_state_type == "H": self.is_terminal = True self.reward = -5 if self.reward_function == "sparse" else -10 elif next_state_type == "F": self.is_terminal = False self.reward = -3 if self.reward_function == "sparse" else -10 elif next_state_type in [" ", "S"]: self.is_terminal = False self.reward = -1 elif next_state_type == "G": self.is_terminal = True self.reward = 1 if self.reward_function == "sparse" else 50 else: raise NotImplementedError self.refresh_state() return self.state, self.reward, self.is_terminal, None
[docs] def render(self): # paints itself for row in range_(len(self.world)): for col, val in enumerate(self.world[row]): if self.x == col and self.y == row: print("X", end="") else: print(val, end="") print() print()
def __str__(self): return "GridWorld({})".format(self.description)
[docs] def refresh_state(self): if self.state_representation == "discr": self.state = self.discrete_pos elif self.state_representation == "xy_pos": self.state = (self.x, self.y) # Camera. else: self.update_cam_pixels() self.state = self.camera_pixels
[docs] def get_possible_next_positions(self, discrete_pos, action): """ Given a discrete position value and an action, returns a list of possible next states and their probabilities. Only next states with non-zero probabilities will be returned. For now: Implemented as a deterministic MDP. Args: discrete_pos (int): The discrete position to return possible next states for. action (int): The action choice. Returns: List[Tuple[int,float]]: A list of tuples (s', p(s'\|s,a)). Where s' is the next discrete position and p(s'\|s,a) is the probability of ending up in that position when in state s and taking action a. """ x = discrete_pos // self.n_col y = discrete_pos % self.n_col coords = np.array([x, y]) increments = np.array([[0, -1], [1, 0], [0, 1], [-1, 0]]) next_coords = np.clip( coords + increments[action], [0, 0], [self.n_row - 1, self.n_col - 1] ) next_pos = self.get_discrete_pos(next_coords[0], next_coords[1]) pos_type = self.world[y, x] next_pos_type = self.world[next_coords[1], next_coords[0]] # TODO: Allow stochasticity in this env. Right now, all probs are 1.0. # Next field is a wall or we are already terminal. Stay where we are. if next_pos_type == "W" or pos_type in ["H", "G"]: return [(discrete_pos, 1.)] # Move to next field. else: return [(next_pos, 1.)]
[docs] def update_cam_pixels(self): # Init camera? if self.camera_pixels is None: self.camera_pixels = np.zeros(shape=(self.n_row, self.n_col, 3), dtype=float) self.camera_pixels[:, :, :] = 0 # reset everything # 1st channel -> walls (127) and goal (255) # 2nd channel -> dangers (fire=127, holes=255) # 3rd channel -> pawn position (255) for row in range_(self.n_row): for col in range_(self.n_col): field = self.world[row, col] if field == "F": self.camera_pixels[row, col, 0] = 127 elif field == "H": self.camera_pixels[row, col, 0] = 255 elif field == "W": self.camera_pixels[row, col, 1] = 127 elif field == "G": self.camera_pixels[row, col, 1] = 255 # will this work (goal==2x wall)? # Overwrite player's position. self.camera_pixels[self.y, self.x, 2] = 255
[docs] def get_dist_to_goal(self): return math.sqrt((self.x - self.goal_x) ** 2 + (self.y - self.goal_y) ** 2)
[docs] def get_discrete_pos(self, x, y): """ Returns a single, discrete int-value. Calculated by walking down the rows of the grid first (starting in upper left corner), then along the col-axis. Args: x (int): The x-coordinate. y (int): The y-coordinate. Returns: int: The discrete pos value corresponding to the given x and y. """ return x * self.n_col + y
@property def x(self): return self.discrete_pos // self.n_col @property def y(self): return self.discrete_pos % self.n_col