""" grow_tree.py is the main module of the code and the one you invoke from
the command line to start a simulation.
To start a simulation with parameters read from a file simulation.ini
simply invoke this module as::
python grow_tree.py simulation.ini
The output will be written in a file name simulation.h5 with a
`HDF5 <http://www.hdfgroup.org/HDF5//>`_ format.
"""
# NOTES:
#
# In this module and in all the rest, the geometrical dimension always
# corresponds to the last axis of an array. For example to save the locations
# of k points we use an array with shape [k, 3].
#
# All-caps variable names are in general global variables that are read
# from the input file. We use globals().update(...) so they are avaliable
# everywhere.
#
import sys
import time
from numpy import *
from numpy.random import rand, randn, seed
from numpy.linalg import norm
from scipy.integrate import odeint, ode
from readinput import load_input
import tree
from refinement import Box, containing_box
import mpolar
from datafile import DataFile
import os.path
from angles import branching_angles
import parameters as param_descriptors
import electrodes
from contexttimer import ContextTimer
latest_phi = None
EXTERNAL_FIELD_VECTOR = None
ELECTRODE = None
X, Y, Z = 0, 1, 2
class TooLongTimestep(Exception):
pass
[docs]def main():
""" This is the main function of the code it is the starting point of
a simulation. """
# Load input parameters from the input file and add the, in allcaps
# to the global namespace.
global EXTERNAL_FIELD_VECTOR, ELECTRODE
parameters = load_input(sys.argv[1], param_descriptors)
globals().update(dict((key.upper(), item)
for key, item in parameters.iteritems()))
if RANDOM_SEED >= 0:
seed(RANDOM_SEED)
EXTERNAL_FIELD_VECTOR = array([0.0, 0.0, EXTERNAL_FIELD])
ELECTRODE = init_electrode()
# init a tree from scratch
tr, r0, q0 = init_from_scratch(INITIAL_NODES)
dt = TIME_STEP
t = r_[0:END_TIME:dt]
r, q = r0, q0
dfile = DataFile(OUT_FILE, parameters=parameters)
branched = False
for i, it in enumerate(t):
# with ContextTimer("plotting"):
# plot_projections(r, q)
# pylab.savefig('tree_%.3d.png' % i)
# print 't = %g\ttree_%.3d.png' % (it, i)
print "%d/%d t = %g" % (i, len(t), it)
branch_prob = BRANCHING_PROBABILITY
if SINGLE_BRANCHING_TIME > 0:
if it > SINGLE_BRANCHING_TIME:
if not branched:
branch_prob = inf
branched = True
if SINGLE_BRANCHING_Z != 0 and not branched:
zterm = r[tr.terminals()[0], Z]
if zterm < SINGLE_BRANCHING_Z:
if not branched:
branch_prob = inf
branched = True
r, q = adapt_step(tr, r, q, dt, p=branch_prob)
with ContextTimer("saving %d" % i):
phi = solve_phi(r, q)
dfile.add_step(it, tr, r, q, phi,
error=error, error_dq=error_dq)
if END_WITH_RECONNECTION and tr.reconnects(r):
print "Finishing due to a reconnection."
break
[docs]def init_from_scratch(n=0):
""" Init a 'tree' with the root node plus n additional nodes in a vertical
string. """
tr = tree.Tree()
root = tr.make_root()
for i in xrange(n):
tr.extend([i,])
r0 = tr.zeros(dim=3)
k = r0.shape[0]
r0[:, Z] = -arange(k) * CONDUCTOR_THICKNESS
q0 = zeros((k, ))
return tr, r0, q0
[docs]def adapt_step(tr, r0, q0, dt, p=0.0):
""" Performs a step of duration dt but divides it into sub steps
to make sure that the length of a channel is never longer than ``MAX_STEP``.
"""
current_dt = dt
r, q = r0, q0
remaining_steps = 1
while remaining_steps > 0:
try:
r, q = step(tr, r, q, current_dt, p=p)
remaining_steps -= 1
except TooLongTimestep:
current_dt /= 2.
remaining_steps *= 2
return r, q
[docs]def step(tr, r, q0, dt, p=0.0):
""" Performs an elementary step, including relaxation and advancing
the channels.
Arguments:
* *tr*: the :class:`tree.Tree` instance containing the tree structure.
* *r*: an array containing the node locations.
* *q0*: an array containing the charges of the nodes.
* *dt*: the time step.
"""
iterm = tr.terminals()
box = containing_box(r, electrode=ELECTRODE)
box.set_charges(r, q0,
max_charges=MAX_CHARGES_PER_BOX,
min_length=16 * CONDUCTOR_THICKNESS)
box.build_lists(recurse=True)
box.set_field_evaluation(r[iterm, :])
# 1. Calculate the velocities at t
v0 = velocities(box, tr, r, q0)
# 2. Relax the tree from t to t + dt
q1 = relax(box, tr, r, q0, dt)
# 3. Calculate the velocities again at t + dt
v1 = velocities(box, tr, r, q1)
# 4. Extend the tree with the leap-frog algo.
v = 0.5 * (v0 + v1)
# 5. Branch some of the tips
vabs = sqrt(sum(v**2, axis=1))
# If the longest step is longer than MAX_STEP, raise an exception
# telling the calling function to reduce dt.
if (max(vabs) * dt) > MAX_STEP:
raise TooLongTimestep
does_branch = rand(*iterm.shape) < (p * vabs * dt)
radv = empty((sum(does_branch) + sum(vabs > 0), 3))
j = 0
for i, branches in enumerate(does_branch):
if not branches:
if vabs[i] > 0:
radv[j, :] = r[iterm[i], :] + dt * v[i, :]
j += 1
else:
# Note that slow channels, although unlikely, may branch.
# However, not if their velocity is 0
dr1, dr2 = symmetric_gaussian(dt * v[i, :], BRANCHING_SIGMA)
radv[j, :] = r[iterm[i], :] + dr1
radv[j + 1, :] = r[iterm[i], :] + dr2
j += 2
rnew = concatenate((r, radv), axis=0)
qnew = concatenate((q1, zeros((sum(does_branch)
+ sum(vabs > 0),))), axis=0)
tr.extend(sort(r_[iterm[vabs > 0],
iterm[does_branch]]))
return rnew, qnew
[docs]def velocities(box, tr, r, q):
""" Calculates the electric fields at the tips of the tree and from
them obtains the propagation velocities of the *streamers* """
iterm = tr.terminals()
# When we have a single charge the velocity is simply given by the
# external electric field
if len(q) == 1:
return TIP_MOBILITY * external_field(r[iterm, :])
box.update_charges(q)
box.upward(MULTIPOLAR_TERMS)
box.downward()
box.solve_all(a=CONDUCTOR_THICKNESS, field=True)
box.collect_solutions(field=True)
sfields = self_fields(tr, r, q)
E = (MAXWELL_FACTOR * box.field
+ MAXWELL_FACTOR * sfields
+ external_field(r[iterm, :]))
absE = sqrt(sum(E**2, axis=1))
# An unit vector with the same direction as E
u = E / absE[:, newaxis]
# Now we can calculate the absolute value of the velocity
vabs = TIP_MOBILITY * where(absE > TIP_MIN_FIELD, absE - TIP_MIN_FIELD, 0)
v = u * vabs[:, newaxis]
return v
[docs]def self_fields(tr, r, q):
""" Calculates the fields created by the charges at the streamer tips
on themselves. """
iterm = tr.terminals()
parents = tr.parents()[iterm]
dr = r[iterm, :] - r[parents, :]
u = dr / (sqrt(sum(dr**2, axis=1)))[:, newaxis]
return q[iterm][:, newaxis] * u / CONDUCTOR_THICKNESS**2
[docs]def relax(box, tr, r, q0, dt):
""" Relax the conductor :class:`tree.Tree` *tr* for a time *dt*.
Arguments:
* *tr*: the :class:`tree.Tree` instance containing the tree structure.
* *r*: an array containing the node locations.
* *q0*: an array containing the charges of the nodes.
* *dt*: the time step.
"""
global latest_phi, error, error_dq
#with ContextTimer("re-computing Ohm matrix"):
# If we have an electrode, we fix q[0] by setting the first row of
# M to zero.
fix = [] if ELECTRODE is None else [0]
# On Fri Aug 31 11:46:47 2012 I found a factor 2 here that I do not know
# where it comes from. Probably it was a reliq of the mid-points approach
# (But it was duplicated in ohm_matrix anyway!). I am removing it.
M = CONDUCTANCE * tr.ohm_matrix(r, fix=fix)
n = len(q0)
def f(t0, q):
global latest_phi, error, error_dq
phi = solve_phi(r, q, box)
# err = sqrt(sum((phi - box.phi)**2)) / len(phi)
latest_phi = phi
error = phi - latest_phi
error_dq = M.dot(error)
dq = M.dot(phi + external_potential(r))
return dq
d = ode(f).set_integrator('vode', nsteps=250000, rtol=1e-8)
d.set_initial_value(q0, 0.0)
d.integrate(dt)
return d.y
def solve_phi(r, q, box=None):
if len(q) >= FMM_THRESHOLD and box is not None:
# with ContextTimer("FMM") as ct_fmm:
box.update_charges(q)
box.upward(MULTIPOLAR_TERMS)
box.downward()
box.solve_all(a=CONDUCTOR_THICKNESS, field=False)
box.collect_solutions(field=False)
phi = MAXWELL_FACTOR * box.phi
else:
# with ContextTimer("direct") as ct_direct:
if ELECTRODE is None:
rx, qx = r, q
else:
rx, qx = ELECTRODE.extend(r, q)
phi0 = MAXWELL_FACTOR * mpolar.direct(rx, qx, r,
CONDUCTOR_THICKNESS)
phi = phi0
return phi
[docs]def symmetric_gaussian(dr, sigma):
""" Samples a branch from a symmetric, gaussian branching model.
In a plane perpendicular to dr we sample dr1 from a cylindrically
symmetric gaussian distribution; the two branching points are dr1 and
its symmetric vector wrt dr. """
u = dr / norm(dr)
# We find two unit vectors orthonormal to u (also dr); note that this
# fails if u is parallel to x !!!
ex = array([1.0, 0, 0])
e1 = ex - dot(u, ex) * u
e1 = e1 / norm(e1)
e2 = cross(u, e1)
if not BRANCH_IN_XZ:
p, q = sigma * randn(2)
else:
if FIXED_BRANCHING_ANGLE > 0:
p, q = norm(dr) * tan(FIXED_BRANCHING_ANGLE / 2), 0.0
else:
p, q = sigma * randn(), 0.0
dr1 = dr + (p * e1 + q * e2)
dr2 = dr - (p * e1 + q * e2)
if FIXED_BRANCHING_ANGLE > 0:
# This is to avoid too long segments at branching points.
# Presently I am doing it only here to preserve compatibility
# with the algorithm described in the paper as of Sat Mar 23 20:58:46 2013
dr1 *= norm(dr) / norm(dr1)
dr2 *= norm(dr) / norm(dr2)
return dr1, dr2
[docs]def external_field(r):
""" Calculates the external field at points *r*. This is calculated
from ``EXTERNAL_FIELD`` and ``ELECTRODE_POTENTIAL``. As the code stands now
only these two possibilities are physically meaningful:
1. Specify ``EXTERNAL_FIELD`` with a planar electrode or with no electrode,
but use ``ELECTRODE_POTENTIAL=0``.
2. ``ELECTRODE_POTENTIAL != 0``, but ``ELECTRODE_GEOMETRY = 'sphere'`` and
``EXTERNAL_FIELD = 0``.
However, we allow the user to shoot himself on his foot, so he can
select any arbitrary combination of these parameters. Beware.
"""
field = EXTERNAL_FIELD_VECTOR[newaxis, :]
if ELECTRODE_POTENTIAL == 0:
return field
center = array([0.0, 0.0, ELECTRODE_RADIUS])
dr = r - center[newaxis, :]
rabs = sqrt(sum(dr**2, axis=1))
field = field + (ELECTRODE_RADIUS * ELECTRODE_POTENTIAL
* dr / rabs[:, newaxis]**3)
return field
[docs]def external_potential(r):
""" Calculates the external potential at points *r*. See above, in
external_field for the risks here.
"""
phi = -dot(r, EXTERNAL_FIELD_VECTOR)
if ELECTRODE_POTENTIAL == 0:
return phi
center = array([0.0, 0.0, ELECTRODE_RADIUS])
dr = r - center[newaxis, :]
rabs = sqrt(sum(dr**2, axis=1))
phi = phi + ELECTRODE_RADIUS * ELECTRODE_POTENTIAL / rabs
return phi
[docs]def init_electrode():
""" Uses the input parameters to select an electrode geometry. """
def planar():
""" Planar electrode. Always located at z=0. """
return electrodes.Planar(0)
def sphere():
""" Sphere electrode. Located at [0, 0, -ELECTRODE_RADIUS]. """
center = array([0.0, 0.0, ELECTRODE_RADIUS])
return electrodes.Sphere(center, ELECTRODE_RADIUS)
def null():
""" No electrode. """
# return electrodes.NullElectrode()
# This is actually faster:
return None
d = dict(planar=planar, plane=planar, sphere=sphere, null=null, none=null)
try:
return d[ELECTRODE_GEOMETRY]()
except KeyError:
raise KeyError("Electrode geometry '%s' not recognized"
% ELECTRODE_GEOMETRY)
def plot_projections(r, q):
X, Y, Z = 0, 1, 2
names = ["X", "Y", "Z"]
axes = [(X, Z), (Y, Z), (X, Y)]
pylab.subplots_adjust(wspace=0.35, hspace=0.25, right=0.95, top=0.95)
for i, (d1, d2) in enumerate(axes):
ax = pylab.subplot(2, 2, i)
ax.clear()
ax.scatter(r[:, d1], r[:, d2], c=q,
s=5.0, faceted=False),
ax.set_xlabel(names[d1])
ax.set_ylabel(names[d2])
#ax.quiver(r[iterm, 0], r[iterm, 2], field[0, :], field[2, :])
#ax.set_xlim([0.0, 1.0])
#ax.set_ylim([-0.2, 1.0])
if __name__ == '__main__':
main()