Source code for rlgraph.agents.impala_agent

# Copyright 2018 The RLgraph authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import copy

from rlgraph import get_backend
from rlgraph.utils.decorators import rlgraph_api
from rlgraph.utils import RLGraphError
from rlgraph.agents.agent import Agent
from rlgraph.components.common.dict_merger import DictMerger
from rlgraph.components.common.container_splitter import ContainerSplitter
from rlgraph.components.common.slice import Slice
from rlgraph.components.common.staging_area import StagingArea
from rlgraph.components.common.environment_stepper import EnvironmentStepper
from rlgraph.components.layers.preprocessing.reshape import ReShape
from rlgraph.components.layers.preprocessing.transpose import Transpose
from rlgraph.components.layers.preprocessing.concat import Concat
from rlgraph.components.neural_networks.actor_component import ActorComponent
from rlgraph.components.neural_networks.dynamic_batching_policy import DynamicBatchingPolicy
from rlgraph.components.loss_functions.impala_loss_function import IMPALALossFunction
from rlgraph.components.memories.fifo_queue import FIFOQueue
from rlgraph.components.memories.queue_runner import QueueRunner
from rlgraph.spaces import FloatBox, Dict, Tuple
from rlgraph.utils.util import default_dict

if get_backend() == "tf":
    import tensorflow as tf


[docs]class IMPALAAgent(Agent): """ An Agent implementing the IMPALA algorithm described in [1]. The Agent contains both learner and actor API-methods, which will be put into the graph depending on the type (). [1] IMPALA: Scalable Distributed Deep-RL with Importance Weighted Actor-Learner Architectures - Espeholt, Soyer, Munos et al. - 2018 (https://arxiv.org/abs/1802.01561) """ default_internal_states_space = Tuple(FloatBox(shape=(256,)), FloatBox(shape=(256,)), add_batch_rank=False) #default_internal_states_space = Tuple(FloatBox(shape=(266,)), FloatBox(shape=(266,)), add_batch_rank=False) default_environment_spec = dict( type="deepmind_lab", level_id="seekavoid_arena_01", observations=["RGB_INTERLEAVED", "INSTR"], frameskip=4 ) # TODO: capacity FIFO Queue make configurable. def __init__(self, discount=0.99, fifo_queue_spec=None, architecture="large", environment_spec=None, weight_pg=None, weight_baseline=None, weight_entropy=None, worker_sample_size=100, dynamic_batching=False, **kwargs): """ Args: discount (float): The discount factor gamma. architecture (str): Which IMPALA architecture to use. One of "small" or "large". Will be ignored if `network_spec` is given explicitly in kwargs. Default: "large". fifo_queue_spec (Optional[dict,FIFOQueue]): The spec for the FIFOQueue to use for the IMPALA algorithm. environment_spec (dict): The spec for constructing an Environment object for an actor-type IMPALA agent. weight_pg (float): See IMPALALossFunction Component. weight_baseline (float): See IMPALALossFunction Component. weight_entropy (float): See IMPALALossFunction Component. worker_sample_size (int): How many steps the actor will perform in the environment each sample-run. dynamic_batching (bool): Whether to use the deepmind's custom dynamic batching op for wrapping the optimizer's step call. The batcher.so file must be compiled for this to work (see Docker file). Default: False. Keyword Args: type (str): One of "single", "actor" or "learner". Default: "single". num_actors (int): If type is "single", how many actors should be run in separate threads. """ type_ = kwargs.pop("type", "single") assert type_ in ["single", "actor", "learner"] self.type = type_ if self.type == "single": self.num_actors = kwargs.pop("num_actors", 1) else: self.num_actors = 0 self.worker_sample_size = worker_sample_size self.dynamic_batching = dynamic_batching # Network-spec by default is a "large architecture" IMPALA network. network_spec = default_dict(kwargs.pop( "network_spec", dict(type="rlgraph.components.papers.impala.impala_networks.{}IMPALANetwork". format("Large" if architecture == "large" else "Small")) ), dict(worker_sample_size=1 if self.type == "actor" else self.worker_sample_size + 1)) action_adapter_spec = kwargs.pop("action_adapter_spec", dict(type="baseline-action-adapter")) # Depending on the job-type, remove the pieces from the Agent-spec/graph we won't need. exploration_spec = kwargs.pop("exploration_spec", None) optimizer_spec = kwargs.pop("optimizer_spec", None) observe_spec = kwargs.pop("observe_spec", None) # Run everything in a single process. if self.type == "single": environment_spec = environment_spec or self.default_environment_spec update_spec = kwargs.pop("update_spec", None) # Actors won't need to learn (no optimizer needed in graph). elif self.type == "actor": optimizer_spec = None update_spec = kwargs.pop("update_spec", dict(do_updates=False)) environment_spec = environment_spec or self.default_environment_spec # Learners won't need to explore (act) or observe (insert into Queue). else: exploration_spec = None observe_spec = None update_spec = kwargs.pop("update_spec", None) environment_spec = None # Add previous-action/reward preprocessors to env-specific preprocessor spec. # TODO: remove this empty hard-coded preprocessor. kwargs.pop("preprocessing_spec", None) preprocessing_spec = dict(type="dict-preprocessor-stack", preprocessors=dict( # Flatten actions. previous_action=[ dict(type="reshape", flatten=True, flatten_categories=kwargs.get("action_space").num_categories) ], # Bump reward and convert to float32, so that it can be concatenated by the Concat layer. previous_reward=[ dict(type="reshape", new_shape=(1,)) #dict(type="convert_type", to_dtype="float32") ] )) # Limit communication in distributed mode between each actor and the learner (never between actors). execution_spec = kwargs.pop("execution_spec", None) if execution_spec is not None and execution_spec.get("mode") == "distributed": execution_spec["session_config"] = dict( type="monitored-training-session", allow_soft_placement=True, log_device_placement=False, device_filters=["/job:learner/task:0"] + ( ["/job:actor/task:{}".format(execution_spec["distributed_spec"]["task_index"])] if self.type == "actor" else ["/job:learner/task:0"] ) ) # If Actor, make non-chief in either case (even if task idx == 0). if self.type == "actor": execution_spec["distributed_spec"]["is_chief"] = False # Hard-set device to the CPU for actors. execution_spec["device_strategy"] = "custom" execution_spec["default_device"] = "/job:{}/task:{}/cpu".format(self.type, execution_spec["distributed_spec"]["task_index"]) # Now that we fixed the Agent's spec, call the super constructor. super(IMPALAAgent, self).__init__( discount=discount, preprocessing_spec=preprocessing_spec, network_spec=network_spec, action_adapter_spec=action_adapter_spec, exploration_spec=exploration_spec, optimizer_spec=optimizer_spec, observe_spec=observe_spec, update_spec=update_spec, execution_spec=execution_spec, name=kwargs.pop("name", "impala-{}-agent".format(self.type)), **kwargs ) # If we use dynamic batching, wrap the dynamic batcher around the policy's graph_fn that we # actually call below during our build. if self.dynamic_batching: self.policy = DynamicBatchingPolicy(policy_spec=self.policy, scope="") # Manually set the reuse_variable_scope for our policies (actor: mu, learner: pi). self.policy.propagate_sub_component_properties(dict(reuse_variable_scope="shared")) # Always use 1st learner as the parameter server for all policy variables. if self.execution_spec["mode"] == "distributed" and self.execution_spec["distributed_spec"]["cluster_spec"]: self.policy.propagate_sub_component_properties(dict(device=dict(variables="/job:learner/task:0/cpu"))) # Check whether we have an RNN. self.has_rnn = self.neural_network.has_rnn() # Check, whether we are running with GPU. self.has_gpu = self.execution_spec["gpu_spec"]["gpus_enabled"] is True and \ self.execution_spec["gpu_spec"]["num_gpus"] > 0 # Some FIFO-queue specs. self.fifo_queue_keys = ["terminals", "states", "action_probs", "initial_internal_states"] self.fifo_record_space = Dict( { "terminals": bool, "states": default_dict(copy.deepcopy(self.state_space), dict( previous_action=self.action_space, previous_reward=FloatBox() )), "action_probs": FloatBox(shape=(self.action_space.num_categories,)), "initial_internal_states": self.internal_states_space }, add_batch_rank=False, add_time_rank=(self.worker_sample_size + 1) ) # Take away again time-rank from initial-states (comes in only for one time-step). self.fifo_record_space["initial_internal_states"] = \ self.fifo_record_space["initial_internal_states"].with_time_rank(False) # Create our FIFOQueue (actors will enqueue, learner(s) will dequeue). self.fifo_queue = FIFOQueue.from_spec( fifo_queue_spec or dict(capacity=1), reuse_variable_scope="shared-fifo-queue", only_insert_single_records=True, record_space=self.fifo_record_space, device="/job:learner/task:0/cpu" if self.execution_spec["mode"] == "distributed" and self.execution_spec["distributed_spec"]["cluster_spec"] else None ) # Remove `states` key from input_spaces: not needed. del self.input_spaces["states"] # Add all our sub-components to the core. if self.type == "single": # Create a queue runner that takes care of pushing items into the queue from our actors. self.env_output_splitter = ContainerSplitter(tuple_length=8, scope="env-output-splitter") self.fifo_output_splitter = ContainerSplitter(*self.fifo_queue_keys, scope="fifo-output-splitter") self.staging_area = StagingArea(num_data=len(self.fifo_queue_keys)) # Slice some data from the EnvStepper (e.g only first internal states are needed). self.internal_states_slicer = Slice(scope="internal-states-slicer", squeeze=True) # TODO: add state transposer, remove action/rewards transposer (part of state). self.transpose_actions = ReShape(flip_batch_and_time_rank=True, time_major=True, scope="transpose-a", flatten_categories=False) self.transpose_rewards = ReShape(flip_batch_and_time_rank=True, time_major=True, scope="transpose-r") self.transpose_terminals = ReShape(flip_batch_and_time_rank=True, time_major=True, scope="transpose-t") self.transpose_action_probs = ReShape(flip_batch_and_time_rank=True, time_major=True, scope="transpose-a-probs-mu") self.concat = Concat(axis=1) # 1=the time rank (at the time of the concat, we are still batch-major) # Create an IMPALALossFunction with some parameters. self.loss_function = IMPALALossFunction( discount=self.discount, weight_pg=weight_pg, weight_baseline=weight_baseline, weight_entropy=weight_entropy ) # Merge back to insert into FIFO. self.fifo_input_merger = DictMerger(*self.fifo_queue_keys) dummy_flattener = ReShape(flatten=True) # dummy Flattener to calculate action-probs space self.environment_steppers = list() for i in range(self.num_actors): policy_spec = dict( network_spec=network_spec, action_adapter_spec=dict(type="baseline-action-adapter"), action_space=self.action_space ) env_stepper = EnvironmentStepper( environment_spec=environment_spec, actor_component_spec=ActorComponent( preprocessor_spec=preprocessing_spec, policy_spec=policy_spec, exploration_spec=exploration_spec ), state_space=self.state_space.with_batch_rank(), reward_space=float, # TODO <- float64 for deepmind? may not work for other envs internal_states_space=self.internal_states_space, num_steps=self.worker_sample_size, add_previous_action=True, add_previous_reward=True, add_action_probs=True, action_probs_space=dummy_flattener.get_preprocessed_space(self.action_space), scope="env-stepper-{}".format(i) ) if self.dynamic_batching: env_stepper.actor_component.policy.parent_component = None env_stepper.actor_component.policy = DynamicBatchingPolicy(policy_spec=env_stepper.actor_component.policy, scope="") env_stepper.actor_component.add_components(env_stepper.actor_component.policy) env_stepper.actor_component.policy.propagate_sub_component_properties(dict(reuse_variable_scope="shared")) self.environment_steppers.append(env_stepper) # Create the QueueRunners (one for each env-stepper). # - Take return value 1 of API-method step as record to insert. self.queue_runner = QueueRunner(self.fifo_queue, "step", 1, self.env_output_splitter, self.fifo_input_merger, self.internal_states_slicer, *self.environment_steppers) sub_components = [ self.fifo_output_splitter, self.fifo_queue, self.queue_runner, self.transpose_actions, self.transpose_rewards, self.transpose_terminals, self.transpose_action_probs, self.preprocessor, self.staging_area, self.concat, self.policy, self.loss_function, self.optimizer ] elif self.type == "actor": # No learning, no loss function. self.loss_function = None # A Dict Splitter to split things from the EnvStepper. self.env_output_splitter = ContainerSplitter(tuple_length=4, scope="env-output-splitter") self.states_dict_splitter = ContainerSplitter("RGB_INTERLEAVED", "INSTR", "previous_action", "previous_reward", scope="states-dict-splitter") # Slice some data from the EnvStepper (e.g only first internal states are needed). self.internal_states_slicer = Slice(scope="internal-states-slicer", squeeze=True) # Merge back to insert into FIFO. self.fifo_input_merger = DictMerger(*self.fifo_queue_keys) dummy_flattener = ReShape(flatten=True) # dummy Flattener to calculate action-probs space self.environment_stepper = EnvironmentStepper( environment_spec=environment_spec, actor_component_spec=ActorComponent(self.preprocessor, self.policy, self.exploration), state_space=self.state_space.with_batch_rank(), reward_space=float, # TODO <- float64 for deepmind? may not work for other envs internal_states_space=self.internal_states_space, num_steps=self.worker_sample_size, add_previous_action=True, add_previous_reward=True, add_action_probs=True, action_probs_space=dummy_flattener.get_preprocessed_space(self.action_space) ) sub_components = [ self.environment_stepper, self.env_output_splitter, self.internal_states_slicer, self.fifo_input_merger, self.states_dict_splitter, self.fifo_queue ] # Learner. else: self.environment_stepper = None # A Dict splitter to split up items from the queue. self.fifo_input_merger = None self.fifo_output_splitter = ContainerSplitter(*self.fifo_queue_keys, scope="fifo-output-splitter") self.states_dict_splitter = ContainerSplitter("INSTR", "RGB_INTERLEAVED", "previous_action", "previous_reward", scope="states-dict-splitter") self.internal_states_slicer = None self.transpose_states = Transpose( scope="transpose-states", device=dict(ops="/job:learner/task:0/cpu") ) self.transpose_terminals = Transpose( scope="transpose-terminals", device=dict(ops="/job:learner/task:0/cpu") ) self.transpose_action_probs = Transpose( scope="transpose-a-probs-mu", device=dict(ops="/job:learner/task:0/cpu") ) self.staging_area = StagingArea(num_data=len(self.fifo_queue_keys)) # Create an IMPALALossFunction with some parameters. self.loss_function = IMPALALossFunction( discount=self.discount, weight_pg=weight_pg, weight_baseline=weight_baseline, weight_entropy=weight_entropy, device="/job:learner/task:0/gpu" ) self.policy.propagate_sub_component_properties( dict(device=dict(variables="/job:learner/task:0/cpu", ops="/job:learner/task:0/gpu")) ) for component in [self.staging_area, self.preprocessor, self.optimizer]: component.propagate_sub_component_properties( dict(device="/job:learner/task:0/gpu") ) sub_components = [ self.fifo_output_splitter, self.fifo_queue, self.states_dict_splitter, self.transpose_states, self.transpose_terminals, self.transpose_action_probs, self.staging_area, self.preprocessor, self.policy, self.loss_function, self.optimizer ] # Add all the agent's sub-components to the root. self.root_component.add_components(*sub_components) # Define the Agent's (root Component's) API. self.define_api_methods(*sub_components) if self.auto_build: if self.type == "learner": build_options = dict( build_device_context="/job:learner/task:0/cpu", pin_global_variable_device="/job:learner/task:0/cpu" ) self._build_graph([self.root_component], self.input_spaces, optimizer=self.optimizer, build_options=build_options) else: self._build_graph([self.root_component], self.input_spaces, optimizer=self.optimizer, build_options=None) self.graph_built = True if self.has_gpu: # Get 1st return op of API-method `stage` of sub-component `staging-area` (which is the stage-op). self.stage_op = self.root_component.sub_components["staging-area"].api_methods["stage"]. \ out_op_columns[0].op_records[0].op # Initialize the stage. self.graph_executor.monitored_session.run_step_fn( lambda step_context: step_context.session.run(self.stage_op) ) # TODO remove after full refactor. self.dequeue_op = self.root_component.sub_components["fifo-queue"].api_methods["get_records"]. \ out_op_columns[0].op_records[0].op if self.type == "actor": self.enqueue_op = self.root_component.sub_components["fifo-queue"].api_methods["insert_records"]. \ out_op_columns[0].op_records[0].op
[docs] def define_api_methods(self, *sub_components): # TODO: Unify agents with/w/o synchronizable policy. # TODO: Unify Agents with/w/o get_action method (w/ env-stepper vs w/o). #global_scope_base = "environment-stepper/actor-component/" if self.type == "actor" else "" #super(IMPALAAgent, self).define_api_methods( # global_scope_base+"policy", # global_scope_base+"dict-preprocessor-stack" #) # Assemble the specific agent. if self.type == "single": self.define_api_methods_single(*sub_components) elif self.type == "actor": self.define_api_methods_actor(*sub_components) else: self.define_api_methods_learner(*sub_components)
[docs] def define_api_methods_single(self, fifo_output_splitter, fifo_queue, queue_runner, transpose_actions, transpose_rewards, transpose_terminals, transpose_action_probs, preprocessor, staging_area, concat, policy, loss_function, optimizer): @rlgraph_api(component=self.root_component) def setup_queue_runner(self_): return queue_runner.setup() @rlgraph_api(component=self.root_component) def get_queue_size(self_): return fifo_queue.get_size() @rlgraph_api(component=self.root_component) def update_from_memory(self_): # Pull n records from the queue. # Note that everything will come out as batch-major and must be transposed before the main-LSTM. # This is done by the network itself for all network inputs: # - preprocessed_s # - preprocessed_last_s_prime # But must still be done for actions, rewards, terminals here in this API-method via separate ReShapers. records = fifo_queue.get_records(self.update_spec["batch_size"]) preprocessed_s, actions, rewards, terminals, last_s_prime, action_probs_mu, \ initial_internal_states = fifo_output_splitter.split(records) preprocessed_last_s_prime = preprocessor.preprocess(last_s_prime) # Append last-next-state to the rest before sending it through the network. preprocessed_s_all = concat.apply(preprocessed_s, preprocessed_last_s_prime) # Flip actions, rewards, terminals to time-major. # TODO: Create components that are less input-space sensitive (those that have no variables should # TODO: be reused for any kind of processing) actions = transpose_actions.apply(actions) rewards = transpose_rewards.apply(rewards) terminals = transpose_terminals.apply(terminals) action_probs_mu = transpose_action_probs.apply(action_probs_mu) # If we use a GPU: Put everything on staging area (adds 1 time step policy lag, but makes copying # data into GPU more efficient). if self.has_gpu: stage_op = staging_area.stage( preprocessed_s_all, actions, rewards, terminals, action_probs_mu, initial_internal_states ) # Get data from stage again and continue. preprocessed_s_all, actions, rewards, terminals, action_probs_mu, \ initial_internal_states = staging_area.unstage() # endif. else: # TODO: No-op component? stage_op = None # Get the pi-action probs AND the values for all our states. state_values_pi, logits_pi, probs_pi, log_probabilities_pi, current_internal_states = \ policy.get_state_values_logits_probabilities_log_probs( preprocessed_s_all, initial_internal_states ) # Calculate the loss. loss, loss_per_item = loss_function.loss(log_probabilities_pi, action_probs_mu, state_values_pi, actions, rewards, terminals) if self.dynamic_batching: policy_vars = queue_runner.data_producing_components[0].actor_component.policy._variables() else: policy_vars = policy._variables() # TODO: dynbatching tboard check #return loss, loss, loss, loss # Pass vars and loss values into optimizer. step_op, loss, loss_per_item = optimizer.step(policy_vars, loss, loss_per_item) # Return optimizer op and all loss values. # TODO: Make it possible to return None from API-method without messing with the meta-graph. return step_op, (stage_op if stage_op else step_op), loss, loss_per_item
[docs] def define_api_methods_actor(self, env_stepper, env_output_splitter, internal_states_slicer, merger, states_dict_splitter, fifo_queue): """ Defines the API-methods used by an IMPALA actor. Actors only step through an environment (n-steps at a time), collect the results and push them into the FIFO queue. Results include: The actions actually taken, the discounted accumulated returns for each action, the probability of each taken action according to the behavior policy. Args: env_stepper (EnvironmentStepper): The EnvironmentStepper Component to setp through the Env n steps in a single op call. fifo_queue (FIFOQueue): The FIFOQueue Component used to enqueue env sample runs (n-step). """ # Perform n-steps in the env and insert the results into our FIFO-queue. @rlgraph_api(component=self.root_component) def perform_n_steps_and_insert_into_fifo(self_): #, internal_states, time_step=0): # Take n steps in the environment. step_results = env_stepper.step() terminals, states, action_log_probs, internal_states = env_output_splitter.split(step_results) initial_internal_states = internal_states_slicer.slice(internal_states, 0) record = merger.merge(terminals, states, action_log_probs, initial_internal_states) # Insert results into the FIFOQueue. insert_op = fifo_queue.insert_records(record) return insert_op, terminals @rlgraph_api(component=self.root_component) def reset(self): # Resets the environment running inside the agent. reset_op = env_stepper.reset() return reset_op
[docs] def define_api_methods_learner( self, fifo_output_splitter, fifo_queue, states_dict_splitter, transpose_states, transpose_terminals, transpose_action_probs, staging_area, preprocessor, policy, loss_function, optimizer ): """ Defines the API-methods used by an IMPALA learner. Its job is basically: Pull a batch from the FIFOQueue, split it up into its components and pass these through the loss function and into the optimizer for a learning update. Args: fifo_queue (FIFOQueue): The FIFOQueue Component used to enqueue env sample runs (n-step). splitter (ContainerSplitter): The DictSplitter Component to split up a batch from the queue along its items. policy (Policy): The Policy Component, which to update. loss_function (IMPALALossFunction): The IMPALALossFunction Component. optimizer (Optimizer): The optimizer that we use to calculate an update and apply it. """ @rlgraph_api(component=self.root_component) def update_from_memory(self_): # Pull n records from the queue. # Note that everything will come out as batch-major and must be transposed before the main-LSTM. # This is done by the network itself for all network inputs: # - preprocessed_s # - preprocessed_last_s_prime # But must still be done for actions, rewards, terminals here in this API-method via separate ReShapers. records = fifo_queue.get_records(self.update_spec["batch_size"]) terminals, states, action_probs_mu, initial_internal_states = fifo_output_splitter.split(records) # Flip everything to time-major. # TODO: Create components that are less input-space sensitive (those that have no variables should # TODO: be reused for any kind of processing) states = transpose_states.apply(states) terminals = transpose_terminals.apply(terminals) action_probs_mu = transpose_action_probs.apply(action_probs_mu) # If we use a GPU: Put everything on staging area (adds 1 time step policy lag, but makes copying # data into GPU more efficient). if self.has_gpu: stage_op = staging_area.stage(states, terminals, action_probs_mu, initial_internal_states) # Get data from stage again and continue. states, terminals, action_probs_mu, initial_internal_states = staging_area.unstage() else: # TODO: No-op component? stage_op = None # Preprocess actions and rewards inside the state (actions: flatten one-hot, rewards: expand). states = preprocessor.preprocess(states) # state_values_pi, _, _, log_probabilities_pi, current_internal_states = \ # policy.get_state_values_logits_probabilities_log_probs(states, initial_internal_states) # Only retrieve logits and do faster sparse softmax in loss. state_values_pi, logits, _, _, current_internal_states = \ policy.get_state_values_logits_probabilities_log_probs(states, initial_internal_states) # Isolate actions and rewards from states. _, _, actions, rewards = states_dict_splitter.split(states) # Calculate the loss. # step_op,\ <- DEBUG: fake step op loss, loss_per_item = loss_function.loss(logits, action_probs_mu, state_values_pi, actions, rewards, terminals) policy_vars = policy._variables() # Pass vars and loss values into optimizer. step_op, loss, loss_per_item = optimizer.step(policy_vars, loss, loss_per_item) # Return optimizer op and all loss values. # TODO: Make it possible to return None from API-method without messing with the meta-graph. return step_op, (stage_op if stage_op else step_op), loss, loss_per_item @rlgraph_api(component=self.root_component) def get_queue_size(self_): return fifo_queue.get_size()
[docs] def get_action(self, states, internal_states=None, use_exploration=True, extra_returns=None): pass
def _observe_graph(self, preprocessed_states, actions, internals, rewards, terminals): self.graph_executor.execute(("insert_records", [preprocessed_states, actions, rewards, terminals]))
[docs] def update(self, batch=None): if batch is None: # Include stage_op or not? if self.has_gpu: return self.graph_executor.execute("update_from_memory") else: return self.graph_executor.execute(("update_from_memory", None, ([0, 2, 3]))) else: raise RLGraphError("Cannot call update-from-batch on an IMPALA Agent.")
def __repr__(self): return "IMPALAAgent(type={})".format(self.type)