660e6757ddfb7帮我将以下程序的逻辑转换为mermaid.live输入以生成sequenceDiagram: { code: import numpy as np import tensorflow as tf from tensorflow.keras import layers import gym
GAMMA = 0.99 TAU = 0.005 BATCH_SIZE = 64 BUFFER_SIZE = 1000000 ACTOR_LR = 0.001 CRITIC_LR = 0.002
class ReplayBuffer: def init(self, buffer_size): self.buffer_size = buffer_size self.buffer = [] self.position = 0
def add(self, state, action, reward, next_state, done):
transition = (state, action, reward, next_state, done)
if len(self.buffer) < self.buffer_size:
self.buffer.append(transition)
else:
self.buffer[self.position] = transition
self.position = (self.position + 1) % self.buffer_size
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), size=batch_size)
return [self.buffer[i] for i in indices]
def __len__(self):
return len(self.buffer)
class DDPG: def init(self, state_dim, action_dim, max_action): self.actor = self.create_actor(state_dim, action_dim, max_action) self.actor_target = self.create_actor(state_dim, action_dim, max_action) self.actor_target.set_weights(self.actor.get_weights())
self.critic = self.create_critic(state_dim, action_dim)
self.critic_target = self.create_critic(state_dim, action_dim)
self.critic_target.set_weights(self.critic.get_weights())
self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=ACTOR_LR)
self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=CRITIC_LR)
def create_actor(self, state_dim, action_dim, max_action):
inputs = layers.Input(shape=(state_dim,))
x = layers.Dense(400, activation='relu')(inputs)
x = layers.Dense(300, activation='relu')(x)
x = layers.Dense(action_dim, activation='tanh')(x)
outputs = max_action * x
return tf.keras.Model(inputs=inputs, outputs=outputs)
def create_critic(self, state_dim, action_dim):
state_inputs = layers.Input(shape=(state_dim,))
action_inputs = layers.Input(shape=(action_dim,))
x = layers.Concatenate()([state_inputs, action_inputs])
x = layers.Dense(400, activation='relu')(x)
x = layers.Dense(300, activation='relu')(x)
outputs = layers.Dense(1)(x)
return tf.keras.Model(inputs=[state_inputs, action_inputs], outputs=outputs)
def train(self, replay_buffer):
sample = replay_buffer.sample(BATCH_SIZE)
state, action, reward, next_state, done = list(map(np.array, zip(*sample)))
with tf.GradientTape() as tape:
target_actions = self.actor_target(next_state)
target_q_values = self.critic_target([next_state, target_actions])
target_values = reward + GAMMA * target_q_values * (1 - done)
q_values = self.critic([state, action])
critic_loss = tf.reduce_mean((q_values - target_values) ** 2)
critic_grads = tape.gradient(critic_loss, self.critic.trainable_variables)
self.critic_optimizer.apply_gradients(zip(critic_grads, self.critic.trainable_variables))
with tf.GradientTape() as tape:
actions = self.actor(state)
actor_loss = -tf.reduce_mean(self.critic([state, actions]))
actor_grads = tape.gradient(actor_loss, self.actor.trainable_variables)
self.actor_optimizer.apply_gradients(zip(actor_grads, self.actor.trainable_variables))
# 更新目标网络
self.update_target_networks()
def update_target_networks(self): actor_weights = self.actor.get_weights() actor_target_weights = self.actor_target.get_weights() critic_weights = self.critic.get_weights() critic_target_weights = self.critic_target.get_weights()
for i in range(len(actor_weights)):
actor_target_weights[i] = TAU * actor_weights[i] + (1 - TAU) * actor_target_weights[i]
for i in range(len(critic_weights)):
critic_target_weights[i] = TAU * critic_weights[i] + (1 - TAU) * critic_target_weights[i]
self.actor_target.set_weights(actor_target_weights)
self.critic_target.set_weights(critic_target_weights)
def select_action(self, state): state = np.expand_dims(state, axis=0) return self.actor(state).numpy().flatten() agent = DDPG(state_dim, action_dim, max_action) replay_buffer = ReplayBuffer(BUFFER_SIZE)
episode_rewards = []
for episode in range(1, 101): state = env.reset() episode_reward = 0
for t in range(1, 201):
action = agent.select_action(state)
next_state, reward, done, _ = env.step(action)
replay_buffer.add(state, action, reward, next_state, done)
if len(replay_buffer) >= BATCH_SIZE:
agent.train(replay_buffer)
state = next_state
episode_reward += reward
if done:
break
}