Newer
Older
"""
These integrator classes started their life in PDSim (https://pdsim.readthedocs.io/en/latest/) and since have
been updated and some additional functionality has been added
"""
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
from __future__ import division, print_function
import abc, math
import numpy as np
import matplotlib.pyplot as plt
class AbstractODEIntegrator(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_initial_array(self):
pass
@abc.abstractmethod
def pre_step_callback(self):
pass
@abc.abstractmethod
def post_deriv_callback(self):
pass
@abc.abstractmethod
def post_step_callback(self):
pass
@abc.abstractmethod
def derivs(self):
pass
@abc.abstractmethod
def do_integration(self):
pass
class AbstractSimpleEulerODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
def do_integration(self,N,tmin,tmax):
"""
The simple Euler ODE integrator
Parameters
----------
N : integer
Number of steps taken (not including the initial step).
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
"""
# Step variables
self.t0 = tmin
self.h = (tmax-tmin)/(N)
# Get the initial value
self.xold = self.get_initial_array()
for self.Itheta in range(N):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
# Call the pre-step callback
self.pre_step_callback()
# Derivatives evaluated at old values of t = t0
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
# Calculate the new values
self.xnew = self.xold + self.h*self.f1
# Everything from this step is finished, now update for the next
# step coming
self.t0 += self.h
self.xold = self.xnew[:]
# Call the post-step callback
self.post_step_callback()
# Bump up once more
self.Itheta += 1
# Make sure we end up at the right place
assert((self.t0 - tmax) < 1e-10)
# No termination was requested
return False
class AbstractHeunODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
def do_integration(self, N, tmin, tmax):
"""
The Heun system of ODE integrator
Parameters
----------
N : integer
Number of steps taken. There will be N+1 entries in the state matrices
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
"""
# Step variables
self.t0 = tmin
self.h = (tmax-tmin)/(N)
# Get the initial value
self.xold = self.get_initial_array()
for self.Itheta in range(N):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
# Call the pre-step callback
self.pre_step_callback()
# Step 1: derivatives evaluated at old values
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
# Predicted values based on extrapolation using initial derivatives
self.xtemp = self.xold + self.h*self.f1
# Step 2: Evaluated at predicted step
self.f2 = self.derivs(self.t0 + self.h, self.xtemp)
# Get corrected values
self.xnew = self.xold + self.h/2.0*(self.f1 + self.f2)
# Everything from this step is finished, now update for the next
# step coming
self.t0 += self.h
self.xold = self.xnew
# Call the post-step callback
self.post_step_callback()
# Bump up once more
self.Itheta += 1
# No termination was requested
return False
class AbstractRK45ODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
def do_integration(self,
tmin=0,
tmax=2.0*math.pi,
hmin=1e-4,
atol=1e-3,
rtol=1e-10,
step_relax=0.9,
**kwargs):
"""
This function implements an adaptive Runge-Kutta-Feldberg 4th/5th order
solver for the system of equations
Parameters
----------
hmin : float
Minimum step size, something like 1e-5 usually is good. Don't make this too big or you may not be able to get a stable solution
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
eps_allowed : float
Maximum absolute error of any CV per step allowed. Don't make this parameter too big or you may not be able to get a stable solution. Also don't make it too small because then you are going to run into truncation error.
step_relax : float, optional
The relaxation factor that is used in the step resizing algorithm. Should be less than 1.0; you can play with this parameter to improve the adaptive resizing, but should not be necessary.
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
Notes
-----
Mathematically the adaptive solver can be expressed as::
k1=h*dy(xn ,t)
k2=h*dy(xn+1.0/4.0*k1 ,t+1.0/4.0*h)
k3=h*dy(xn+3.0/32.0*k1+9.0/32.0*k2 ,t+3.0/8.0*h)
k4=h*dy(xn+1932.0/2197.0*k1-7200.0/2197.0*k2+7296.0/2197.0*k3 ,t+12.0/13.0*h)
k5=h*dy(xn+439.0/216.0*k1-8.0*k2+3680.0/513.0*k3-845.0/4104.0*k4 ,t+h)
k6=h*dy(xn-8.0/27.0*k1+2.0*k2-3544.0/2565.0*k3+1859.0/4104.0*k4-11.0/40.0*k5 ,t+1.0/2.0*h)
where the function dy(y,t) returns a vector of the ODE expressions.
The new value is calculated from::
xnplus=xn+gamma1*k1+gamma2*k2+gamma3*k3+gamma4*k4+gamma5*k5+gamma6*k6
In the adaptive solver, the errors for a given step can be calculated from::
error=1.0/360.0*k1-128.0/4275.0*k3-2197.0/75240.0*k4+1.0/50.0*k5+2.0/55.0*k6
If the maximum absolute error is above allowed error, the step size is decreased and the step is
tried again until the error is below tolerance. If the error is better than required, the step
size is increased to minimize the number of steps required.
Before the step is run, a callback the ``step_callback`` method of this class is called. In the ``step_callback`` callback function you can do anything you want, but you must return
"""
# Get the starting array of variables
self.xold = self.get_initial_array()
# Start at an index of 0
self.Itheta = 0
self.t0 = tmin
self.tmax = tmax
self.h = hmin
self.minstepcount = 0
# gamma1=16.0/135.0
# gamma2=0.0
# gamma3=6656.0/12825.0
# gamma4=28561.0/56430.0
# gamma5=-9.0/50.0
# gamma6=2.0/55.0
#t is the independent variable here, where t takes on values in the bounded range [tmin,tmax]
while (self.t0 < self.tmax - 1e-10):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
self.stepAccepted = False
while not self.stepAccepted:
# reset the flag
self.disableAdaptive = False
if self.t0 + self.h > self.tmax:
self.disableAdaptive = True
self.h = self.tmax - self.t0
#
self.pre_step_callback()
# We check stepAccepted again because if the derived class
# sets the variable stepAccepted, we should not actually do the evaluation
if not self.stepAccepted:
if self.h < hmin and not self.disableAdaptive:
# Step is too small, just use the minimum step size
self.h = 1.0*hmin
self.disableAdaptive = True
if self.h == hmin:
self.minstepcount += 1
else:
self.minstepcount = 0
# Step 1: derivatives evaluated at old values
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
self.xnew1 = self.xold+self.h*(1.0/5.0)*self.f1
self.f2 = self.derivs(self.t0+1.0/5.0*self.h, self.xnew1)
self.xnew2 = self.xold+self.h*(+3.0/40.0*self.f1+9.0/40.0*self.f2)
self.f3 = self.derivs(self.t0+3.0/10.0*self.h, self.xnew2)
self.xnew3 = self.xold+self.h*(3.0/10.0*self.f1-9.0/10.0*self.f2+6.0/5.0*self.f3)
self.f4 = self.derivs(self.t0+3.0/5.0*self.h, self.xnew3)
self.xnew4 = self.xold+self.h*(-11.0/54.0*self.f1+5.0/2.0*self.f2-70/27.0*self.f3+35.0/27.0*self.f4)
self.f5 = self.derivs(self.t0+self.h, self.xnew4)
self.xnew5 = self.xold+self.h*(1631.0/55296*self.f1+175.0/512.0*self.f2+575.0/13824.0*self.f3+44275.0/110592.0*self.f4+253.0/4096.0*self.f5)
self.f6 = self.derivs(self.t0+7/8*self.h, self.xnew5)
# Updated values at the next step using 5-th order
self.xnew = self.xold + self.h*(37/378*self.f1 + 250/621*self.f3 + 125/594*self.f4 + 512/1771*self.f6)
# Estimation of error
error = abs(self.h*(-277/64512*self.f1+6925/370944*self.f3-6925/202752*self.f4-277.0/14336*self.f5+277/7084*self.f6))
error_threshold = atol + rtol*abs(self.xnew)
# max_error = np.sqrt(np.sum(np.power(error, 2)))
# rel_error = error/self.xnew
# rel_error[self.xnew == 0] = error[self.xnew == 0]
# max_error = np.max(np.abs(rel_error))
# print(max_error, error, self.xnew)
# print('error @h=',self.h, abs(error), abs(error_threshold))
# If the error is too large, make the step size smaller and try
# the step again
if (any(error > error_threshold)):
if not self.disableAdaptive:
# Take a smaller step next time, try again on this step
# But only if adaptive mode is on
downsize_factor = np.min(error_threshold/error)**(0.3)
# print('downsize', downsize_factor, error, error_threshold)
self.h *= step_relax*downsize_factor
self.stepAccepted=False
else:
# Accept the step regardless of whether the error
# is too large or not
self.stepAccepted = True
else:
self.stepAccepted = True
else:
pass
# print('accepted')
self.t0 += self.h
self.Itheta += 1
self.xold = self.xnew
self.post_step_callback()
# The error is already below the threshold
if (all(abs(error) < error_threshold) and self.disableAdaptive == False and np.max(error) > 0):
# Take a bigger step next time, since eps_allowed>max_error
upsize_factor = step_relax*np.max(error_threshold/error)**(0.2)
# print('upsizing', upsize_factor, (error_threshold/error)**(0.2), self.h, self.h*upsize_factor)
self.h *= upsize_factor
if not (self.t0 - tmax) < 1e-3:
raise AssertionError('(self.t0 - tmax) [' + str(self.t0 - tmax) + '] > 1e-3')
# No termination was requested
return False
if __name__ == '__main__':
class TestIntegrator(object):
"""
Implements the functions needed to satisfy the ABC requirements
This is the problem from wikipedia, where y' = y, with the explicit solution y = exp(t)
"""
def __init__(self):
self.x, self.y = [], []
def post_deriv_callback(self): pass
def premature_termination(self): return False
def get_initial_array(self):
return np.array([1.0])
def pre_step_callback(self):
if self.Itheta == 0:
self.x.append(self.t0)
self.y.append(self.xold[0])
def post_step_callback(self):
self.x.append(self.t0)
self.y.append(self.xold[0])
def derivs(self, t0, xold):
return np.array([xold[0]])
class TestEulerIntegrator(TestIntegrator, AbstractSimpleEulerODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
class TestHeunIntegrator(TestIntegrator, AbstractHeunODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
class TestRK45Integrator(TestIntegrator, AbstractRK45ODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
for N in [4, 11, 21]:
TEI = TestEulerIntegrator()
TEI.do_integration(N, 0.0, 4.0)
plt.plot(TEI.x, TEI.y, 'o-', label = 'Euler: ' + str(N))
for N in [4, 11]:
THI = TestHeunIntegrator()
THI.do_integration(N, 0.0, 4.0)
plt.plot(THI.x, THI.y, '^-', label = 'Heun: ' + str(N))
TRKI = TestRK45Integrator()
TRKI.do_integration(0.0, 4.0, rtol=1e-10)
plt.plot(TRKI.x, TRKI.y, '^-', label = 'RK45')
t = np.linspace(0, 4, 500)
plt.plot(t, np.exp(t), 'k', lw = 1)
plt.legend(loc='best')
plt.show()