In this article, I am going to implement a custom Tensorflow Agents metric that calculates the maximal discounted reward.
First, I have to import the metric-related modules and the driver module (the driver runs the simulation). Additionally, I need an environment. I’m going to use the one I implemented in this article.
from tf_agents.metrics import tf_py_metric
from tf_agents.metrics import py_metric
from tf_agents.drivers import py_driver
from tf_agents.drivers import dynamic_episode_driver
My metric needs to store the rewards and discounts from the current episode and the maximal discounted total score. For that, I need two arrays (for the episode scores) and one variable to keep the maximal reward.
class MaxEpisodeScoreMetric(py_metric.PyStepMetric):
def __init__(self, name='MaxEpisodeScoreMetric'):
super(py_metric.PyStepMetric, self).__init__(name)
self.rewards = []
self.discounts = []
self.max_discounted_reward = None
self.reset()
The reset function is mandatory, and it allows the metric instance to be reused by separate driver runs.
#add it inside the MaxEpisodeScoreMetric class
def reset(self):
self.rewards = []
self.discounts = []
self.max_discounted_reward = None
In the call function, I am going to copy the reward and discount of the current step to the arrays. Then, if the current step is also the last step of an episode, I am going to calculate the discounted reward using the Bellman equation.
After that, I compare the total discounted reward of the current episode with the maximal reward. If I got a value larger than the current maximum, I would replace the maximum with the new value.
Because the instance is not reset between episodes, I need to clear the lists I use to keep the episode rewards and discounts.
#add it inside the MaxEpisodeScoreMetric class
def call(self, trajectory):
self.rewards += trajectory.reward
self.discounts += trajectory.discount
if(trajectory.is_last()):
adjusted_discounts = [1.0] + self.discounts # because a step has its value + the discount of the NEXT step (Bellman equation)
adjusted_discounts = adjusted_discounts[:-1] # dropping the discount of the last step because it is not followed by a next step, so the value is useless
discounted_reward = np.sum(np.multiply(self.rewards, adjusted_discounts))
print(self.rewards, adjusted_discounts, discounted_reward)
if self.max_discounted_reward == None:
self.max_discounted_reward = discounted_reward
if discounted_reward > self.max_discounted_reward:
self.max_discounted_reward = discounted_reward
self.rewards = []
self.discounts = []
In the result function, I don’t need to perform any additional operations, so I return the maximal discounted total reward.
#add it inside the MaxEpisodeScoreMetric class
def result(self):
return self.max_discounted_reward
I want to use my metric as a Tensorflow metric, so I had to wrap it with a class extending TFPyMetric.
class TFMaxEpisodeScoreMetric(tf_py_metric.TFPyMetric):
def __init__(self, name='MaxEpisodeScoreMetric', dtype=tf.float32):
py_metric = MaxEpisodeScoreMetric()
super(TFMaxEpisodeScoreMetric, self).__init__(
py_metric=py_metric, name=name, dtype=dtype)
Finally, I can add the metric to the driver’s observers and run the driver.
#tf_env is from the article mentioned in the second paragraph
tf_policy = random_tf_policy.RandomTFPolicy(action_spec=tf_env.action_spec(),
time_step_spec=tf_env.time_step_spec())
max_score = TFMaxEpisodeScoreMetric()
observers = [max_score]
driver = dynamic_episode_driver.DynamicEpisodeDriver(tf_env, tf_policy, observers, num_episodes=1000)
final_time_step, policy_state = driver.run()
print('Max score:', max_score.result().numpy())
Result:
Max score: 1.715