-
Notifications
You must be signed in to change notification settings - Fork 22
/
random_process.py
86 lines (67 loc) · 2.57 KB
/
random_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import numpy as np
class OrnsteinUhlenbeckProcess:
"""
Ornstein-Uhnlenbeck process
Based on http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab
"""
def __init__(self, action_dim, mu=0, theta=0.15, sigma=0.2):
self.action_dim = action_dim
self.mu = mu
self.theta = theta
self.sigma = sigma
self.X = np.ones(self.action_dim) * self.mu
def reset(self):
self.X = np.ones(self.action_dim) * self.mu
def sample(self):
dx = self.theta * (self.mu - self.X)
dx = dx + self.sigma * np.random.randn(len(self.X))
self.X = self.X + dx
return self.X
class GaussianNoise:
"""
Simple Gaussian noise
"""
def __init__(self, action_dim, sigma=0.2):
self.action_dim = action_dim
self.sigma = sigma
def sample(self):
s = np.random.normal(scale=self.sigma, size=self.action_dim)
return s
class AdaptiveParamNoiseSpec(object):
"""
OpenAI adaptive parameter noise
From OpenAI Baselines: https://github.com/openai/baselines/blob/master/baselines/ddpg/noise.py
"""
def __init__(self, initial_stddev=0.1, desired_action_stddev=0.2, adaptation_coefficient=1.01):
"""
Note that initial_stddev and current_stddev refer to std of parameter noise,
but desired_action_stddev refers to (as name notes) desired std in action space
"""
self.initial_stddev = initial_stddev
self.desired_action_stddev = desired_action_stddev
self.adaptation_coefficient = adaptation_coefficient
self.current_stddev = initial_stddev
def adapt(self, distance):
if distance > self.desired_action_stddev:
# Decrease stddev.
self.current_stddev /= self.adaptation_coefficient
else:
# Increase stddev.
self.current_stddev *= self.adaptation_coefficient
def get_stats(self):
stats = {
'param_noise_stddev': self.current_stddev,
}
return stats
def __repr__(self):
fmt = 'AdaptiveParamNoiseSpec(initial_stddev={}, desired_action_stddev={}, adaptation_coefficient={})'
return fmt.format(self.initial_stddev, self.desired_action_stddev, self.adaptation_coefficient)
def ddpg_distance_metric(actions1, actions2):
"""
Compute "distance" between actions taken by two policies at the same states
Expects numpy arrays
"""
diff = actions1 - actions2
mean_diff = np.mean(np.square(diff), axis=0)
dist = np.sqrt(np.mean(mean_diff))
return dist