Dies ist eine alte Version des Dokuments!
Programm zum Spielen von Space Invaders vom 20.12.18: # -*- coding: utf-8 -*- „“„ Created on Thu Dec 13 16:09:46 2018
@author: Johanna “„“
#!/usr/bin/env python from future import print_function
import sys, gym, time
# # Test yourself as a learning agent! Pass environment name as a command-line argument, for example: # # python keyboard_agent.py SpaceInvadersNoFrameskip-v4 #
env = gym.make('SpaceInvaders-v0' if len(sys.argv)<2 else sys.argv[1])
if not hasattr(env.action_space, 'n'):
raise Exception('Keyboard agent only supports discrete action spaces')
ACTIONS = env.action_space.n SKIP_CONTROL = 0 # Use previous control decision SKIP_CONTROL times, that's how you
# can test what skip is still usable.
human_agent_action = 0 human_wants_restart = False human_sets_pause = False
def key_press(key, mod):
global human_agent_action, human_wants_restart, human_sets_pause if key==0xff0d: human_wants_restart = True if key==32: human_sets_pause = not human_sets_pause a = int( key - ord('0') ) if a <= 0 or a >= ACTIONS: return human_agent_action = a
def key_release(key, mod):
global human_agent_action a = int( key - ord('0') ) if a <= 0 or a >= ACTIONS: return if human_agent_action == a: human_agent_action = 0
env.render() env.unwrapped.viewer.window.on_key_press = key_press env.unwrapped.viewer.window.on_key_release = key_release
def rollout(env):
global human_agent_action, human_wants_restart, human_sets_pause human_wants_restart = False obser = env.reset() skip = 0 total_reward = 0 total_timesteps = 0 while 1: if not skip: #print("taking action {}".format(human_agent_action)) a = human_agent_action total_timesteps += 1 skip = SKIP_CONTROL else: skip -= 1
obser, r, done, info = env.step(a) if r != 0: print("reward %0.3f" % r) total_reward += r window_still_open = env.render() if window_still_open==False: return False if done: break if human_wants_restart: break while human_sets_pause: env.render() time.sleep(0.1) time.sleep(0.1) print("timesteps %i reward %0.2f" % (total_timesteps, total_reward))
print(„ACTIONS={}“.format(ACTIONS)) print(„Press keys 1 2 3 … to take actions 1 2 3 …“) print(„No keys pressed is taking action 0“)
while 1:
window_still_open = rollout(env) if window_still_open==False: break
Programm zum Cartpool
import random import gym import numpy as np from collections import deque from keras.models import Sequential from keras.layers import Dense from keras.optimizers import Adam
class DQNAgent:
def __init__(self, state_size, action_size): self.state_size = state_size self.action_size = action_size self.memory = deque(maxlen=2000) self.gamma = 0.95 # discount rate self.epsilon = 1.0 # exploration rate self.epsilon_min = 0.01 self.epsilon_decay = 0.995 self.learning_rate = 0.001 self.model = self._build_model()
def _build_model(self): # Einfaches NN model = Sequential() model.add(Dense(24, input_dim=self.state_size, activation='relu')) model.add(Dense(24, activation='relu')) model.add(Dense(self.action_size, activation='linear')) model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate)) return model
def remember(self, state, action, reward, next_state, done): # merkt sich alle bisher durchlaufenen Zustände self.memory.append((state, action, reward, next_state, done))
def act(self, state): # epsilon-greedy: off-policy oder policy if np.random.rand() <= self.epsilon: return random.randrange(self.action_size) act_values = self.model.predict(state) return np.argmax(act_values[0]) # returns action
def replay(self, batch_size): # baut den Vektor der Q-Werte aus # als reward zum Zeitpunkt t + gamma*max(moegliche rewards zum Zeitpunkt t+1) minibatch = random.sample(self.memory, batch_size) states, targets_f = [], [] for state, action, reward, next_state, done in minibatch: target = reward if not done: target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0])) target_f = self.model.predict(state) target_f[0][action] = target # Filtering out states and targets for training states.append(state[0]) targets_f.append(target_f[0]) history = self.model.fit(np.array(states), np.array(targets_f), epochs=1, verbose=0) # Keeping track of loss loss = history.history['loss'][0] if self.epsilon > self.epsilon_min: self.epsilon *= self.epsilon_decay return loss
def load(self, name): self.model.load_weights(name)
def save(self, name): self.model.save_weights(name)
# Lernen:
# EPISODES mal durchspielen, bis der Mast umfaellt
import gym
EPISODES = 1000
env = gym.make('CartPole-v1') state_size = env.observation_space.shape[0] action_size = env.action_space.n agent = DQNAgent(state_size, action_size) done = False batch_size = 32
for e in range(EPISODES):
state = env.reset() state = np.reshape(state, [1, state_size]) for time in range(500): env.render() action = agent.act(state) next_state, reward, done, _ = env.step(action) reward = reward-abs(next_state[0]) if not done else -10 print(next_state[0], reward) next_state = np.reshape(next_state, [1, state_size]) agent.remember(state, action, reward, next_state, done) state = next_state if done: print("episode: {}/{}, score: {}, e: {:.2}" .format(e, EPISODES, time, agent.epsilon)) break if len(agent.memory) > batch_size: loss = agent.replay(batch_size) # Logging training loss every 10 timesteps if time % 10 == 0: print("episode: {}/{}, time: {}, loss: {:.4f}".format(e, EPISODES, time, loss))
# Und nochmal lernen
for e in range(EPISODES):
state = env.reset() state = np.reshape(state, [1, state_size]) for time in range(500): env.render() action = agent.act(state) next_state, reward, done, _ = env.step(action) reward = reward-abs(next_state[0]) if not done else -10 print(next_state[0], reward) next_state = np.reshape(next_state, [1, state_size]) agent.remember(state, action, reward, next_state, done) state = next_state if done: print("episode: {}/{}, score: {}, e: {:.2}" .format(e, EPISODES, time, agent.epsilon)) break if len(agent.memory) > batch_size: loss = agent.replay(batch_size) # Logging training loss every 10 timesteps if time % 10 == 0: print("episode: {}/{}, time: {}, loss: {:.4f}".format(e, EPISODES, time, loss))
dir(env) env.action_space env.observation_space state = env.reset() state