import os
import time
import numpy as nm
import numpy.linalg as nla
import scipy as sc
from sfepy.base.base import output, assert_, get_default, debug, Struct
from sfepy.fem.evaluate import eval_equations
from sfepy.solvers.ts import TimeStepper
from sfepy.fem.meshio import HDF5MeshIO
from sfepy.solvers import Solver, eig
from sfepy.linalg import MatrixAction
from utils import iter_sym, create_pis, create_scalar_pis
[docs]class MiniAppBase(Struct):
[docs] def any_from_conf(name, problem, kwargs):
try:
cls = kwargs['class']
except KeyError:
raise KeyError("set 'class' for MiniApp %s!" % name)
obj = cls(name, problem, kwargs)
return obj
any_from_conf = staticmethod(any_from_conf)
def __init__(self, name, problem, kwargs):
Struct.__init__(self, name=name, problem=problem, **kwargs)
self.problem.clear_equations()
self.set_default('requires', [])
self.set_default('is_linear', False)
self.set_default('dtype', nm.float64)
self.set_default('term_mode', None)
self.set_default('set_volume', 'total')
# Application-specific options.
self.app_options = self.process_options()
[docs] def process_options(self):
"""
Setup application-specific options.
Subclasses should implement this method as needed.
Returns
-------
app_options : Struct instance
The application options.
"""
[docs] def init_solvers(self, problem):
"""For linear problems, assemble the matrix and try to presolve the
linear system."""
if self.is_linear:
output('linear problem, trying to presolve...')
tt = time.clock()
ev = problem.get_evaluator()
state = problem.create_state()
try:
mtx_a = ev.eval_tangent_matrix(state(), is_full=True)
except ValueError:
output('matrix evaluation failed, giving up...')
raise
problem.set_linear(True)
problem.init_solvers(mtx=mtx_a, presolve=True)
output('...done in %.2f s' % (time.clock() - tt))
def _get_volume(self, volume):
if isinstance(volume, dict):
return volume[self.set_volume]
else:
return volume
[docs]class CorrSolution(Struct):
"""
Class for holding solutions of corrector problems.
"""
[docs] def iter_solutions(self):
if hasattr(self, 'components'):
for indx in self.components:
key = ('%d' * len(indx)) % indx
yield key, self.states[indx]
else:
yield '', self.state
[docs]class CorrMiniApp(MiniAppBase):
def __init__(self, name, problem, kwargs):
MiniAppBase.__init__(self, name, problem, kwargs)
self.output_dir = self.problem.output_dir
self.set_default('save_name', '(not_set)')
self.set_default('dump_name', self.save_name)
self.set_default('dump_variables', [])
self.set_default('save_variables', self.dump_variables)
self.save_name = os.path.normpath(os.path.join(self.output_dir,
self.save_name))
self.dump_name = os.path.normpath(os.path.join(self.output_dir,
self.dump_name))
[docs] def setup_output(self, save_format=None, dump_format=None,
post_process_hook=None, file_per_var=None):
"""Instance attributes have precedence!"""
self.set_default('dump_format', dump_format)
self.set_default('save_format', save_format)
self.set_default('post_process_hook', post_process_hook)
self.set_default('file_per_var', file_per_var)
[docs] def get_save_name_base(self):
return self.save_name
[docs] def get_dump_name_base(self):
return self.get_save_name_base()
[docs] def get_save_name(self):
return '.'.join((self.get_save_name_base(), self.save_format))
[docs] def get_dump_name(self):
if self.dump_format is not None:
return '.'.join((self.get_dump_name_base(), self.dump_format))
else:
return None
[docs] def get_output(self, corr_sol, is_dump=False, extend=True, variables=None):
if variables is None:
variables = self.problem.get_variables()
to_output = variables.state_to_output
if is_dump:
var_names = self.dump_variables
extend = False
else:
var_names = self.save_variables
out = {}
for key, sol in corr_sol.iter_solutions():
for var_name in var_names:
if key:
skey = var_name + '_' + key
else:
skey = var_name
dof_vector = sol[var_name]
if is_dump:
var = variables[var_name]
shape = (var.n_dof / var.n_components,
var.n_components)
out[skey] = Struct(name = 'dump', mode = 'nodes',
data = dof_vector,
dofs = var.dofs,
shape = shape,
var_name = var_name)
else:
aux = to_output(dof_vector,
var_info={var_name: (True, var_name)},
extend=extend)
if self.post_process_hook is not None:
aux = self.post_process_hook(aux, self.problem,
None,
extend=extend)
for _key, val in aux.iteritems():
if key:
new_key = _key + '_' + key
else:
new_key = _key
out[new_key] = val
return out
[docs] def save(self, state, problem, variables=None):
save_name = self.get_save_name()
if save_name is not None:
extend = not self.file_per_var
out = self.get_output(state, extend=extend,
variables=variables)
problem.save_state(save_name, out=out,
file_per_var=self.file_per_var)
dump_name = self.get_dump_name()
if dump_name is not None:
problem.save_state(dump_name,
out=self.get_output(state, is_dump=True,
variables=variables),
file_per_var=False)
[docs]class ShapeDimDim(CorrMiniApp):
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
clist, pis = create_pis(problem, self.variables[0])
corr_sol = CorrSolution(name=self.name,
states=pis,
components=clist)
return corr_sol
[docs]class ShapeDim(CorrMiniApp):
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
clist, pis = create_scalar_pis(problem, self.variables[0])
corr_sol = CorrSolution(name=self.name,
states=pis,
components=clist)
return corr_sol
[docs]class OnesDim(CorrMiniApp):
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
var_name = self.variables[0]
var = problem.get_variables(auto_create=True)[var_name]
dim = problem.domain.mesh.dim
nnod = var.n_nod
e00 = nm.zeros((nnod, dim), dtype=nm.float64)
e1 = nm.ones((nnod,), dtype=nm.float64)
ones = nm.zeros((dim,), dtype=nm.object)
clist = []
for ir in range(dim):
aux = e00.copy()
aux[:,ir] = e1
ones[ir] = {var_name : nm.ascontiguousarray(aux)}
clist.append('pi_%d' % (ir,))
corr_sol = CorrSolution(name=self.name,
states=ones,
components=clist)
return corr_sol
[docs]class CopyData(CorrMiniApp):
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
var_name = self.variable
clist = ['data']
dn = self.data
if type(dn) is list:
data = problem
for ii in dn:
data = data.get(ii, 'None')
else:
data = problem.get(dn, 'None')
ndof, ndim = data.shape
state = {var_name: data.reshape((ndof * ndim,))}
corr_sol = CorrSolution(name=self.name,
state=state,
components=clist)
return corr_sol
[docs]class CorrNN(CorrMiniApp):
""" __init__() kwargs:
{
'ebcs' : [],
'epbcs' : [],
'equations' : {},
'set_variables' : None,
},
"""
[docs] def set_variables_default(variables, ir, ic, set_var, data):
for (var, req, comp) in set_var:
variables[var].set_data(data[req].states[ir,ic][comp])
set_variables_default = staticmethod(set_variables_default)
def __init__(self, name, problem, kwargs):
"""When dim is not in kwargs, problem dimension is used."""
CorrMiniApp.__init__(self, name, problem, kwargs)
self.set_default('dim', problem.get_dim())
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
problem.set_equations(self.equations)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials(problem.ts)
self.init_solvers(problem)
variables = problem.get_variables()
states = nm.zeros((self.dim, self.dim), dtype=nm.object)
clist = []
for ir in range(self.dim):
for ic in range(self.dim):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, ir, ic,
self.set_variables, data)
else:
self.set_variables(variables, ir, ic, **data)
state = problem.solve()
assert_(state.has_ebc())
states[ir,ic] = state.get_parts()
clist.append((ir, ic))
corr_sol = CorrSolution(name=self.name,
states=states,
components=clist)
self.save(corr_sol, problem)
return corr_sol
[docs]class CorrN(CorrMiniApp):
[docs] def set_variables_default(variables, ir, set_var, data):
for (var, req, comp) in set_var:
variables[var].set_data(data[req].states[ir][comp])
set_variables_default = staticmethod(set_variables_default)
def __init__(self, name, problem, kwargs):
"""When dim is not in kwargs, problem dimension is used."""
CorrMiniApp.__init__(self, name, problem, kwargs)
self.set_default('dim', problem.get_dim())
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
problem.set_equations(self.equations)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials(problem.ts)
self.init_solvers(problem)
variables = problem.get_variables()
states = nm.zeros((self.dim,), dtype=nm.object)
clist = []
for ir in range(self.dim):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, ir,
self.set_variables, data)
else:
self.set_variables(variables, ir, **data)
state = problem.solve()
assert_(state.has_ebc())
states[ir] = state.get_parts()
clist.append((ir,))
corr_sol = CorrSolution(name=self.name,
states=states,
components=clist)
self.save(corr_sol, problem)
return corr_sol
[docs]class CorrDimDim(CorrNN):
pass
[docs]class CorrDim(CorrN):
pass
[docs]class CorrOne(CorrMiniApp):
[docs] def set_variables_default(variables, set_var, data):
for (var, req, comp) in set_var:
variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default)
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
problem.set_equations(self.equations)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials(problem.ts)
self.init_solvers(problem)
variables = problem.get_variables()
if hasattr(self, 'set_variables'):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, self.set_variables,
data)
else:
self.set_variables(variables, **data)
state = problem.solve()
assert_(state.has_ebc())
corr_sol = CorrSolution(name=self.name,
state=state.get_parts())
self.save(corr_sol, problem)
return corr_sol
[docs]class CorrSetBCS(CorrMiniApp):
def __call__(self, problem=None, data=None):
from sfepy.base.base import select_by_names
from sfepy.fem.variables import Variables
from sfepy.fem.state import State
from sfepy.fem.conditions import Conditions
problem = get_default(problem, self.problem)
conf_ebc = select_by_names(problem.conf.ebcs, self.ebcs)
conf_epbc = select_by_names(problem.conf.epbcs, self.epbcs)
ebcs = Conditions.from_conf(conf_ebc, problem.domain.regions)
epbcs = Conditions.from_conf(conf_epbc, problem.domain.regions)
conf_variables = select_by_names(problem.conf.variables, self.variable)
problem.set_variables(conf_variables)
variables = Variables.from_conf(conf_variables, problem.fields)
variables.equation_mapping(ebcs, epbcs, problem.ts, problem.functions)
state = State(variables)
state.fill(0.0)
state.apply_ebc()
corr_sol = CorrSolution(name=self.name,
state=state.get_parts())
self.save(corr_sol, problem, variables)
return corr_sol
[docs]class CorrEqPar(CorrOne):
"""
The corrector which equation can be parametrized via 'eq_pars',
the dimension is given by the number of parameters.
Example:
'equations': 'dw_diffusion.5.Y(mat.k, q, p) =
dw_surface_integrate.5.%s(q)',
'eq_pars': ('bYMp', 'bYMm'),
'class': cb.CorrEqPar,
"""
def __init__(self, name, problem, kwargs):
"""When dim is not in kwargs, problem dimension is used."""
CorrMiniApp.__init__(self, name, problem, kwargs)
self.set_default('dim', len(self.eq_pars))
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
states = nm.zeros((self.dim,), dtype=nm.object)
clist = []
eqns ={}
for ir in range(self.dim):
for key_eq, val_eq in self.equations.iteritems():
eqns[key_eq] = val_eq % self.eq_pars[ir]
problem.set_equations(eqns)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials(problem.ts)
self.init_solvers(problem)
variables = problem.get_variables()
if hasattr(self, 'set_variables'):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, self.set_variables,
data)
else:
self.set_variables(variables, **data)
state = problem.solve()
assert_(state.has_ebc())
states[ir] = state.get_parts()
clist.append((ir,))
corr_sol = CorrSolution(name=self.name,
states=states,
components=clist)
self.save(corr_sol, problem)
return corr_sol
[docs]class PressureEigenvalueProblem(CorrMiniApp):
"""Pressure eigenvalue problem solver for time-dependent correctors."""
[docs] def presolve(self, mtx):
"""Prepare A^{-1} B^T for the Schur complement."""
mtx_a = mtx['A']
mtx_bt = mtx['BT']
output('full A size: %.3f MB' % (8.0 * nm.prod(mtx_a.shape) / 1e6))
output('full B size: %.3f MB' % (8.0 * nm.prod(mtx_bt.shape) / 1e6))
ls = Solver.any_from_conf(self.problem.ls_conf,
presolve=True, mtx=mtx_a)
if self.mode == 'explicit':
tt = time.clock()
mtx_aibt = nm.zeros(mtx_bt.shape, dtype=mtx_bt.dtype)
for ic in xrange(mtx_bt.shape[1]):
mtx_aibt[:,ic] = ls(mtx_bt[:,ic].toarray().squeeze())
output('mtx_aibt: %.2f s' % (time.clock() - tt))
action_aibt = MatrixAction.from_array(mtx_aibt)
else:
##
# c: 30.08.2007, r: 13.02.2008
def fun_aibt(vec):
# Fix me for sparse mtx_bt...
rhs = sc.dot(mtx_bt, vec)
out = ls(rhs)
return out
action_aibt = MatrixAction.from_function(fun_aibt,
(mtx_a.shape[0],
mtx_bt.shape[1]),
nm.float64)
mtx['action_aibt'] = action_aibt
[docs] def solve_pressure_eigenproblem(self, mtx, eig_problem=None,
n_eigs=0, check=False):
"""G = B*AI*BT or B*AI*BT+D"""
def get_slice(n_eigs, nn):
if n_eigs > 0:
ii = slice(0, n_eigs)
elif n_eigs < 0:
ii = slice(nn + n_eigs, nn)
else:
ii = slice(0, 0)
return ii
eig_problem = get_default(eig_problem, self.eig_problem)
n_eigs = get_default(n_eigs, self.n_eigs)
check = get_default(check, self.check)
mtx_c, mtx_b, action_aibt = mtx['C'], mtx['B'], mtx['action_aibt']
mtx_g = mtx_b * action_aibt.to_array() # mtx_b must be sparse!
if eig_problem == 'B*AI*BT+D':
mtx_g += mtx['D'].toarray()
mtx['G'] = mtx_g
output(mtx_c.shape, mtx_g.shape)
eigs, mtx_q = eig(mtx_c.toarray(), mtx_g, method='eig.sgscipy')
if check:
ee = nm.diag(sc.dot(mtx_q.T * mtx_c, mtx_q)).squeeze()
oo = nm.diag(sc.dot(sc.dot(mtx_q.T, mtx_g), mtx_q)).squeeze()
try:
assert_(nm.allclose(ee, eigs))
assert_(nm.allclose(oo, nm.ones_like(eigs)))
except ValueError:
debug()
nn = mtx_c.shape[0]
if isinstance(n_eigs, tuple):
output('required number of eigenvalues: (%d, %d)' % n_eigs)
if sum(n_eigs) < nn:
ii0 = get_slice(n_eigs[0], nn)
ii1 = get_slice(-n_eigs[1], nn)
eigs = nm.concatenate((eigs[ii0], eigs[ii1]))
mtx_q = nm.concatenate((mtx_q[:,ii0], mtx_q[:,ii1]), 1)
else:
output('required number of eigenvalues: %d' % n_eigs)
if (n_eigs != 0) and (abs(n_eigs) < nn):
ii = get_slice(n_eigs, nn)
eigs = eigs[ii]
mtx_q = mtx_q[:,ii]
## from sfepy.base.plotutils import pylab, iplot
## pylab.semilogy(eigs)
## pylab.figure(2)
## iplot(eigs)
## pylab.show()
## debug()
out = Struct(eigs=eigs, mtx_q=mtx_q)
return out
def __call__(self, problem=None, data=None):
problem = get_default(problem, self.problem)
problem.set_equations(self.equations)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials()
mtx = problem.equations.eval_tangent_matrices(problem.create_state()(),
problem.mtx_a,
by_blocks=True)
self.presolve(mtx)
evp = self.solve_pressure_eigenproblem(mtx)
return Struct(name=self.name, ebcs=self.ebcs, epbcs=self.epbcs,
mtx=mtx, evp=evp)
[docs]class TCorrectorsViaPressureEVP(CorrMiniApp):
"""
Time correctors via the pressure eigenvalue problem.
"""
[docs] def setup_equations(self, equations, problem=None):
"""
Set equations, update boundary conditions and materials.
"""
problem = get_default(problem, self.problem)
problem.set_equations(equations)
problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs,
lcbc_names=self.get('lcbcs', []))
problem.update_materials() # Assume parameters constant in time.
[docs] def compute_correctors(self, evp, sign, state0, ts, dump_name, save_name,
problem=None, vec_g=None):
problem = get_default(problem, self.problem)
eigs = evp.evp.eigs
mtx_q = evp.evp.mtx_q
mtx = evp.mtx
nr, nc = mtx_q.shape
if vec_g is not None:
output('nonzero pressure EBC: max = %e, min = %e' \
% (vec_g.max(), vec_g.min()))
one = nm.ones((nc,), dtype=nm.float64)
vu, vp = self.dump_variables
variables = problem.get_variables()
var_u = variables[vu]
var_p = variables[vp]
##
# follow_epbc = False -> R1 = - R2 as required. ? for other correctors?
vec_p0 = sign * var_p.get_reduced(state0[vp], follow_epbc=False)
## print state0
## print vec_p0
## print vec_p0.min(), vec_p0.max(), nla.norm(vec_p0)
## debug()
# xi0 = Q^{-1} p(0) = Q^T G p(0)
vec_xi0 = sc.dot(mtx_q.T, sc.dot(mtx['G'],
vec_p0[:,nm.newaxis])).squeeze()
action_aibt = mtx['action_aibt']
e_e_qg = 0.0
iee_e_qg = 0.0
format = '====== time %%e (step %%%dd of %%%dd) ====='\
% ((ts.n_digit,) * 2)
for step, time in ts:
output(format % (time, step + 1, ts.n_step))
e_e = nm.exp(- eigs * time)
e_e_qp = e_e * vec_xi0 # exp(-Et) Q^{-1} p(0)
if vec_g is not None:
Qg = sc.dot(mtx_q.T, vec_g)
e_e_qg = e_e * Qg
iee_e_qg = ((one - e_e) / eigs) * Qg
vec_p = sc.dot(mtx_q, e_e_qp + iee_e_qg)
vec_dp = - sc.dot(mtx_q, (eigs * e_e_qp - e_e_qg))
vec_u = action_aibt(vec_dp)
## bbb = sc.dot(vec_dp.T, - mtx['C'] * vec_p0)
vec_u = var_u.get_full(vec_u)
vec_p = var_p.get_full(vec_p)
# BC nodes - time derivative of constant is zero!
vec_dp = var_p.get_full(vec_dp, force_value=0.0)
## aaa = sc.dot(vec_xi0.T, eigs * (eigs * e_e_qp))
## print aaa
## print bbb
self.save(dump_name, save_name, vec_u, vec_p, vec_dp, ts, problem)
[docs] def save(self, dump_name, save_name, vec_u, vec_p, vec_dp, ts, problem):
"""
1. saves raw correctors into hdf5 files (filename)
2. saves correctors transformed to output for visualization
"""
vu, vp = self.dump_variables
out = {vu : Struct(name='dump', mode='nodes', data=vec_u,
dofs=None, var_name=vu),
vp : Struct(name='dump', mode='nodes', data=vec_p,
dofs=None, var_name=vp),
'd'+vp : Struct(name='dump', mode='nodes', data=vec_dp,
dofs=None, var_name=vp)}
problem.save_state(dump_name, out=out, file_per_var=False, ts=ts)
# For visualization...
out = {}
extend = not self.file_per_var
variables = self.problem.get_variables()
to_output = variables.state_to_output
out.update(to_output(vec_u, var_info={vu : (True, vu)},
extend=extend))
out.update(to_output(vec_p, var_info={vp : (True, vp)},
extend=extend))
out.update(to_output(vec_dp, var_info={vp : (True, 'd'+vp)},
extend=extend))
if self.post_process_hook is not None:
out = self.post_process_hook(out, problem,
{vu : vec_u,
vp : vec_p, 'd'+vp : vec_dp},
extend=extend)
problem.save_state(save_name, out=out,
file_per_var=self.file_per_var, ts=ts)
[docs] def verify_correctors(self, sign, state0, filename, problem=None):
problem = get_default(problem, self.problem)
io = HDF5MeshIO(filename)
ts = TimeStepper(*io.read_time_stepper())
ts.set_step(0)
problem.equations.init_time(ts)
variables = self.problem.get_variables()
vu, vp = self.dump_variables
vdp = self.verify_variables[-1]
p0 = sign * state0[vp]
format = '====== time %%e (step %%%dd of %%%dd) ====='\
% ((ts.n_digit,) * 2)
vv = variables
ok = True
for step, time in ts:
output(format % (time, step + 1, ts.n_step))
data = io.read_data(step)
if step == 0:
assert_(nm.allclose(data[vp].data, p0))
state0 = problem.create_state()
state0.set_full(data[vu].data, vu)
state0.set_full(data[vp].data, vp)
vv[vdp].set_data(data['d'+vp].data)
problem.update_time_stepper(ts)
state = problem.solve(state0)
state, state0 = state(), state0()
err = nla.norm(state - state0) / nla.norm(state0)
output(state.min(), state.max())
output(state0.min(), state0.max())
output('>>>>>', err)
ok = ok and (err < 1e-12)
problem.advance(ts)
return ok
[docs]class CoefDummy(MiniAppBase):
"""
Dummy class serving for computing and returning its requirements.
"""
def __call__(self, volume=None, problem=None, data=None):
return data
[docs]class TSTimes(MiniAppBase):
"""Coefficient-like class, returns times of the time stepper."""
def __call__(self, volume=None, problem=None, data=None):
problem = get_default(problem, self.problem)
return problem.get_time_solver().ts.times
[docs]class VolumeFractions(MiniAppBase):
"""Coefficient-like class, returns volume fractions of given regions within
the whole domain."""
def __call__(self, volume=None, problem=None, data=None):
problem = get_default(problem, self.problem)
vf = {}
for region_name in self.regions:
vkey = 'volume_%s' % region_name
key = 'fraction_%s' % region_name
equations, variables = problem.create_evaluable(self.expression % region_name)
val = eval_equations(equations, variables)
vf[vkey] = nm.asarray(val, dtype=nm.float64)
vf[key] = vf[vkey] / self._get_volume(volume)
return vf
[docs]class CoefSymSym(MiniAppBase):
[docs] def set_variables_default(variables, ir, ic, mode, set_var, data):
mode2var = {'row' : 0, 'col' : 1}
idx = mode2var[mode]
val = data[set_var[idx][1]].states[ir, ic][set_var[idx][2]]
variables[set_var[idx][0]].set_data(val)
set_variables_default = staticmethod(set_variables_default)
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
dim, sym = problem.get_dim(get_sym=True)
coef = nm.zeros((sym, sym), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
for ir, (irr, icr) in enumerate(iter_sym(dim)):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, irr, icr, 'row',
self.set_variables, data)
else:
self.set_variables(variables, irr, icr, 'row', **data)
for ic, (irc, icc) in enumerate(iter_sym(dim)):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, irc, icc, 'col',
self.set_variables, data)
else:
self.set_variables(variables, irc, icc, 'col', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ir,ic] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefFMSymSym(MiniAppBase):
"""
Fading memory sym x sym coefficients.
"""
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
dim, sym = problem.get_dim(get_sym=True)
filename = self.set_variables(None, None, None, 0, 0,
'filename', **data)
ts = TimeStepper(*HDF5MeshIO(filename).read_time_stepper())
coef = nm.zeros((ts.n_step, sym, sym), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
for ir, (irr, icr) in enumerate(iter_sym(dim)):
filename = self.set_variables(None, None, None, irr, icr,
'filename', **data)
io = HDF5MeshIO(filename)
for step, time in ts:
self.set_variables(variables, io, step, None, None,
'row', **data)
for ic, (irc, icc) in enumerate(iter_sym(dim)):
self.set_variables(variables, None, None, irc, icc,
'col', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[step,ir,ic] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefDimSym(MiniAppBase):
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
dim, sym = problem.get_dim(get_sym=True)
coef = nm.zeros((dim, sym), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
for ir in range(dim):
self.set_variables(variables, ir, None, 'row', **data)
for ic, (irc, icc) in enumerate(iter_sym(dim)):
self.set_variables(variables, irc, icc, 'col', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ir,ic] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefNN(MiniAppBase):
[docs] def set_variables_default(variables, ir, ic, mode, set_var, data):
mode2var = {'row' : 0, 'col' : 1}
if mode == 'col':
ir = ic
idx = mode2var[mode]
val = data[set_var[idx][1]].states[ir][set_var[idx][2]]
variables[set_var[idx][0]].set_data(val)
set_variables_default = staticmethod(set_variables_default)
def __init__(self, name, problem, kwargs):
"""When dim is not in kwargs, problem dimension is used."""
MiniAppBase.__init__(self, name, problem, kwargs)
self.set_default('dim', problem.get_dim())
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
coef = nm.zeros((self.dim, self.dim), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
if isinstance(self.set_variables, list):
for ir in range(self.dim):
self.set_variables_default(variables, ir, None, 'row',
self.set_variables, data)
for ic in range(self.dim):
self.set_variables_default(variables, None, ic, 'col',
self.set_variables, data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ir,ic] = val
else:
for ir in range(self.dim):
self.set_variables(variables, ir, None, 'row', **data)
for ic in range(self.dim):
self.set_variables(variables, None, ic, 'col', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ir,ic] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefN(MiniAppBase):
[docs] def set_variables_default(variables, ir, set_var, data):
for (var, req, comp) in set_var:
variables[var].set_data(data[req].states[ir][comp])
set_variables_default = staticmethod(set_variables_default)
def __init__(self, name, problem, kwargs):
"""When dim is not in kwargs, problem dimension is used."""
MiniAppBase.__init__(self, name, problem, kwargs)
self.set_default('dim', problem.get_dim())
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
coef = nm.zeros((self.dim,), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
for ir in range(self.dim):
if isinstance(self.set_variables, list):
self.set_variables_default(variables, ir, self.set_variables,
data)
else:
self.set_variables(variables, ir, **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ir] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefDimDim(CoefNN):
pass
[docs]class CoefDim(CoefN):
pass
[docs]class CoefSym(MiniAppBase):
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
dim, sym = problem.get_dim(get_sym=True)
coef = nm.zeros((sym,), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
self.set_variables(variables, None, None, 'col', **data)
for ii, (ir, ic) in enumerate(iter_sym(dim)):
self.set_variables(variables, ir, ic, 'row', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[ii] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefFMSym(MiniAppBase):
"""
Fading memory sym coefficients.
"""
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
dim, sym = problem.get_dim(get_sym=True)
filename = self.set_variables(None, 0, 0, 'filename', **data)
ts = TimeStepper(*HDF5MeshIO(filename).read_time_stepper())
coef = nm.zeros((ts.n_step, sym), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
self.set_variables(variables, None, None, 'col', **data)
for ii, (ir, ic) in enumerate(iter_sym(dim)):
filename = self.set_variables(None, ir, ic, 'filename', **data)
io = HDF5MeshIO(filename)
for step, time in ts:
self.set_variables(variables, io, step, 'row', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[step,ii] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefOne(MiniAppBase):
[docs] def set_variables_default(variables, set_var, data):
for (var, req, comp) in set_var:
variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default)
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
if isinstance(self.set_variables, list):
self.set_variables_default(variables, self.set_variables,
data)
else:
self.set_variables(variables, **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef = val / self._get_volume(volume)
return coef
[docs]class CoefFMOne(MiniAppBase):
"""
Fading memory scalar coefficients.
"""
def __call__(self, volume, problem=None, data=None):
problem = get_default(problem, self.problem)
filename = self.set_variables(None, None, None, 'filename', **data)
io = HDF5MeshIO(filename)
ts = TimeStepper(*io.read_time_stepper())
coef = nm.zeros((ts.n_step, 1), dtype=self.dtype)
term_mode = self.term_mode
equations, variables = problem.create_evaluable(self.expression,
term_mode=term_mode)
self.set_variables(variables, None, None, 'col', **data)
for step, time in ts:
self.set_variables(variables, io, step, 'row', **data)
val = eval_equations(equations, variables,
term_mode=term_mode)
coef[step] = val
coef /= self._get_volume(volume)
return coef
[docs]class CoefSum(MiniAppBase):
def __call__(self, volume, problem=None, data=None):
coef = nm.zeros_like(data[self.requires[0]])
for i in range(len(self.requires)):
coef += data[self.requires[i]]
return coef
[docs]class CoefEval(MiniAppBase):
"""
Evaluate expression.
"""
def __call__(self, volume, problem=None, data=None):
expr = self.expression
for i in range(len(self.requires)):
expr = expr.replace(self.requires[i],
"data['%s']" % self.requires[i])
coef = eval(expr)
return coef
[docs]class CoefNone(MiniAppBase):
def __call__(self, volume, problem=None, data=None):
coef = 0.0
return coef