Commit 183d936e by Jigyasa Watwani

code for error trends and full model. Need to fix.

parent c396e5ce
import dolfin as df
import numpy as np
import os
df.set_log_level(df.LogLevel.ERROR)
df.parameters['form_compiler']['optimize'] = True
class Growing_Domain(object):
def __init__(self, parameters):
# read in parameters
for key in parameters:
setattr(self, key, parameters[key])
# create mesh, mixed element and function space
L = self.system_size
if self.dimension==1:
self.mesh = df.IntervalMesh(self.resolution, 0.0, L)
assert self.bulk_elasticity == self.shear_elasticity
assert self.bulk_viscosity == self.shear_viscosity
elif self.dimension==2:
if hasattr(self, 'meshfile'):
print('Using %s' % str(self.meshfile))
self.mesh = df.Mesh(str(self.meshfile))
else:
self.mesh = df.RectangleMesh(df.Point(0,0), df.Point(L,L),
self.resolution, self.resolution)
scalar_element = df.FiniteElement('P', self.mesh.ufl_cell(), 1)
vector_element = df.VectorElement('P', self.mesh.ufl_cell(), 1)
if self.morphogen:
# u, v, rho, c
mixed_element = df.MixedElement([vector_element, vector_element,
scalar_element, scalar_element])
self.savedata = self.save_data_uvrhoc
else:
# u, v, rho
mixed_element = df.MixedElement([vector_element, vector_element, scalar_element])
self.savedata = self.save_data_uvrho
# define function space with this mixed element
self.function_space = df.FunctionSpace(self.mesh, mixed_element)
# dirichlet boundaries for rho
self.bc = df.DirichletBC(self.function_space.sub(2), df.Constant(0.0), 'on_boundary')
# define the reqd functions on this function space
self.f = df.Function(self.function_space) # f at current time
self.f0 = df.Function(self.function_space) # f at t =0
def advection(self, conc, vel, tconc):
return (df.inner(df.div(vel * conc), tconc))
def active_stress(self, rho, c):
return self.lamda * rho/(rho + self.saturation_rho) * c * df.Identity(self.dimension)
def epsilon(self, v):
return df.sym(df.nabla_grad(v))
def passive_stress(self, u, v):
eps_u, eps_v = self.epsilon(u), self.epsilon(v)
elastic_stress = (self.shear_elasticity * eps_u +
(self.bulk_elasticity - self.shear_elasticity/self.dimension) *
df.tr(eps_u) * df.Identity(self.dimension))
viscous_stress = (self.shear_viscosity * eps_v +
(self.bulk_viscosity - self.shear_viscosity/self.dimension) *
df.tr(eps_v) * df.Identity(self.dimension))
return (elastic_stress + viscous_stress)
def stress(self, u, v, rho, c):
return (self.passive_stress(u, v) + self.active_stress(rho, c))
def reaction_rho(self, rho, trho):
return self.turnover_rho * df.inner(rho - self.average_rho, trho)
def reaction_c(self, c, tc):
return self.turnover_c * df.inner(c - self.average_c, tc)
def diffusion_reaction_rho(self, rho, trho):
return (self.diffusion_rho * df.inner(df.nabla_grad(rho), df.nabla_grad(trho))
+ self.reaction_rho(rho, trho))
def diffusion_reaction_c(self, c, tc):
return (self.diffusion_c * df.inner(df.nabla_grad(c), df.nabla_grad(tc))
+ self.reaction_c(c, tc))
def setup_initial_conditions(self):
if self.dimension==1:
self.zero_vector = df.Constant((0.0,))
elif self.dimension==2:
self.zero_vector = df.Constant((0.0, 0.0))
u0 = df.interpolate(self.zero_vector, self.function_space.sub(0).collapse())
rho0 = df.interpolate(df.Expression('1 - cos(2*pi*sqrt(x[0]*x[0]+x[1]*x[1])/L)',L=self.system_size, pi=np.pi, degree=1), self.function_space.sub(2).collapse())
# add noise
noise_u = self.noise_level * (2*np.random.random(u0.vector().size())-1)
u0.vector()[:] = u0.vector()[:] + noise_u
# noise_rho = self.noise_level * (2*np.random.random(rho0.vector().size())-1)
# rho0.vector()[:] = rho0.vector()[:] + noise_rho
if self.morphogen:
c0 = df.interpolate(df.Expression('1 + 0.2*cos(2*pi*sqrt(x[0]*x[0]+x[1]*x[1])/L)',L=self.system_size, pi=np.pi, degree=1), self.function_space.sub(3).collapse())
# add noise
# noise_c = self.noise_level * (2*np.random.random(c0.vector().size())-1)
# c0.vector()[:] = c0.vector()[:] + noise_c
else:
c0 = df.Constant(1.0)
VFS = self.function_space.sub(1).collapse()
v0 = df.Function(VFS)
tv = df.TestFunction(VFS)
vform = (self.friction * df.inner(v0, tv)
+ df.inner(self.stress(u0, v0, rho0, c0), self.epsilon(tv))
) * df.dx
df.solve(vform == 0, v0)
if self.morphogen:
df.assign(self.f0, [u0, v0, rho0, c0])
else:
df.assign(self.f0, [u0, v0, rho0])
def setup_weak_forms(self):
if self.morphogen:
u0, v0, rho0, c0 = df.split(self.f0)
u, v, rho, c = df.split(self.f)
tu, tv, trho, tc = df.TestFunctions(self.function_space)
else:
u0, v0, rho0 = df.split(self.f0)
u, v, rho = df.split(self.f)
tu, tv, trho = df.TestFunctions(self.function_space)
c = df.Constant(1.0)
uform = (df.inner((u - u0)/self.timestep, tu)
- df.inner(v0, tu)
)
vform = (self.friction * df.inner(v, tv)
+ df.inner(self.stress(u, v, rho, c), self.epsilon(tv))
)
rhoform = (df.inner((rho - rho0)/self.timestep, trho)
+ self.advection(rho0, v0, trho)
+ self.diffusion_reaction_rho(rho, trho)
)
if self.morphogen:
cform = (df.inner((c - c0)/self.timestep, tc)
+ self.advection(c0, v0, tc)
+ self.diffusion_reaction_c(c, tc)
)
self.form = (uform + vform + rhoform + cform) * df.dx
else:
self.form = (uform + vform + rhoform) * df.dx
def save_data_uvrho(self):
u, v, rho = self.f0.split(deepcopy=True)
self.uFile.write_checkpoint(u, 'displacement', self.time, append=True)
self.vFile.write_checkpoint(v, 'velocity', self.time, append=True)
self.rhoFile.write_checkpoint(rho, 'density', self.time, append=True)
def save_data_uvrhoc(self):
u, v, rho, c = self.f0.split(deepcopy=True)
self.uFile.write_checkpoint(u, 'displacement', self.time, append=True)
self.vFile.write_checkpoint(v, 'velocity', self.time, append=True)
self.rhoFile.write_checkpoint(rho, 'density', self.time, append=True)
self.cFile.write_checkpoint(c, 'concentration', self.time, append=True)
def solve(self,DIR=''):
# for saving data
self.uFile = df.XDMFFile(os.path.join(DIR, '%s_displacement.xdmf' % self.timestamp))
self.vFile = df.XDMFFile(os.path.join(DIR, '%s_velocity.xdmf' % self.timestamp))
self.rhoFile = df.XDMFFile(os.path.join(DIR, '%s_density.xdmf' % self.timestamp))
if self.morphogen:
self.cFile = df.XDMFFile(os.path.join(DIR, '%s_concentration.xdmf' % self.timestamp))
self.setup_initial_conditions()
self.setup_weak_forms()
# time-variables
self.time = 0.0
savesteps = int(self.savetime/self.timestep)
maxsteps = int(self.maxtime/self.timestep)
for steps in range(1, maxsteps + 1):
# solve
df.solve(self.form == 0, self.f, self.bc)
# update
df.assign(self.f0, self.f)
if self.morphogen:
u, v, rho, c = self.f.split(deepcopy=True)
else:
u, v, rho = self.f.split(deepcopy=True)
if steps % savesteps == 0:
self.savedata()
# update time
self.time += self.timestep
# move mesh
# displacement = df.project(self.v*self.timestep, self.function_space)
df.ALE.move(self.mesh, u)
self.uFile.close()
self.vFile.close()
self.rhoFile.close()
if self.morphogen:
self.cFile.close()
if __name__ == '__main__':
import json, datetime
assert os.path.isfile('parameters.json'), 'parameters.json file not found' # assert that parameters.json is a valid file, otherwise
# give an error message parameters.json file not found
# load the parameters
with open('parameters.json') as jsonFile:
params = json.load(jsonFile)
# parse parameters
assert params['dimension'] in (1,2)
timestamp = datetime.datetime.now().strftime("%d%m%y-%H%M%S")
params['timestamp'] = timestamp
gd = Growing_Domain(params)
gd.solve()
with open(params['timestamp'] + '_parameters.json', "w") as fp:
json.dump(params, fp, indent=4)
\ No newline at end of file
import dolfin as df
import numpy as np
import vedo as vd
import os
import h5py
from matplotlib.widgets import Slider
import matplotlib.pyplot as plt
from tempfile import TemporaryDirectory
def get_data(params, DIR=''):
savesteps = int(params['maxtime']/params['savetime'])
times = np.arange(savesteps+1) * params['savetime']
# Read mesh geometry from h5 file
var = 'density'
h5 = h5py.File(os.path.join(DIR, '%s_%s.h5' % (params['timestamp'], var)), "r")
# should be in the loop if remeshing
topology = np.array(h5['%s/%s_0/mesh/topology'%(var,var)])
geometry = []
for i in range(len(times)):
geometry.append(np.array(h5['%s/%s_%d/mesh/geometry'%(var,var,i)]))
h5.close()
geometry = np.array(geometry)
if params['dimension']==1:
geometry, zeros= np.dsplit(geometry, 2)
print(geometry, topology)
raise SystemExit
# create a mesh
if params['dimension']==1:
mesh = df.IntervalMesh(params['resolution'], 0, params['system_size'])
else:
mesh = df.Mesh()
editor = df.MeshEditor()
editor.open(mesh,
type='triangle' if params['dimension']==2 else 'tetrahedron',
tdim=params['dimension'], gdim=params['dimension'])
editor.init_vertices(len(geometry[0]))
editor.init_cells(len(topology))
for i in range(len(geometry[0])):
editor.add_vertex(i, geometry[0][i])
for i in range(len(topology)):
editor.add_cell(i, topology[i])
editor.close()
# Read concentration data
concentration = np.zeros((len(times), len(geometry[0])))
cFile = os.path.join(DIR, '%s_%s.xdmf' % (params['timestamp'],var))
with df.XDMFFile(cFile) as cFile:
for steps in range(savesteps+1):
if params['dimension']==1:
mesh.coordinates()[:] = geometry[steps]
VFS = df.FunctionSpace(mesh, 'P', 1)
c = df.Function(VFS)
cFile.read_checkpoint(c, var, steps)
concentration[steps] = c.compute_vertex_values(mesh)
return (times, geometry, topology, concentration)
def visualize(params, DIR='', offscreen=False):
(times, geometry, topology, concentration) = get_data(params, DIR)
radius = np.linalg.norm(geometry, axis=2)
if params['dimension']==1:
fig, axc = plt.subplots(1,1, figsize=(8,8))
axc.set_xlabel(r'$x$')
axc.set_xlim(np.min(radius), np.max(radius))
axc.set_ylim(np.min(concentration), np.max(concentration))
axc.set_ylabel(r'$c(x,t)$')
cplot, = axc.plot(radius[0], concentration[0])
def update(value):
ti = np.abs(times-value).argmin()
cplot.set_ydata(concentration[ti])
cplot.set_xdata(radius[ti])
plt.draw()
sax = plt.axes([0.1, 0.92, 0.7, 0.02])
slider = Slider(sax, r'$t/\tau$', min(times), max(times),
valinit=min(times), valfmt='%3.1f',
fc='#999999')
slider.drawon = False
slider.on_changed(update)
plt.show()
else:
# heat map
n_cmap_vals = 16
scalar_cmap = 'viridis'
geometry = np.dstack((geometry, np.zeros(geometry.shape[0:2])))
cmin, cmax = np.min(concentration), np.max(concentration)
plotter = vd.plotter.Plotter(axes=0)
poly = vd.utils.buildPolyData(geometry[0], topology)
scalar_actor = vd.mesh.Mesh(poly)
#scalar_actor.computeNormals(points=True, cells=True)
scalar_actor.pointdata['concentration'] = concentration[0]
scalar_actor.cmap(scalar_cmap, concentration[0], vmin=cmin, vmax=cmax, n=n_cmap_vals)
scalar_actor.add_scalarbar(title = r'$c_{numerical}$',
pos=(0.8, 0.04), nlabels=2,
# titleYOffset=15, titleFontSize=28, size=(100, 600)
)
plotter += scalar_actor
def update(idx):
scalar_actor.points(pts=geometry[idx], transformed=False)
scalar_actor.pointdata['concentration'] = concentration[idx]
def slider_update(widget, event):
value = widget.GetRepresentation().GetValue()
idx = (abs(times-value)).argmin()
update(idx)
slider = plotter.add_slider(slider_update, pos=[(0.1,0.94), (0.5,0.94)],
xmin=times[0], xmax=times.max(),
value=times[0], title=r"$t/\tau$")
vd.show(interactive=(not offscreen), zoom=0.8)
if offscreen:
FPS = 10
movFile = '%s_numerical.mov' % params['timestamp']
fps = float(FPS)
command = "ffmpeg -y -r"
options = "-b:v 3600k -qscale:v 4 -vcodec mpeg4"
tmp_dir = TemporaryDirectory()
get_filename = lambda x: os.path.join(tmp_dir.name, x)
for tt in range(len(times)):
idx = (abs(times-times[tt])).argmin()
update(idx)
slider.GetRepresentation().SetValue(times[tt])
fr = get_filename("%03d.png" % tt)
vd.io.screenshot(fr)
os.system(command + " " + str(fps)
+ " -i " + tmp_dir.name + os.sep
+ "%03d.png " + options + " " + movFile)
tmp_dir.cleanup()
# c(r,t) vs r
fig1, axc1 = plt.subplots(1,1, figsize=(8,8))
axc1.set_xlabel(r'$r$')
axc1.set_xlim(np.min(radius), np.max(radius))
axc1.set_ylim(np.min(concentration), np.max(concentration))
axc1.set_ylabel(r'$c(r,t)$')
cplot, = axc1.plot(radius[0], concentration[0],'o', ms=3)
def update(value):
ti = np.abs(times-value).argmin()
cplot.set_ydata(concentration[ti])
cplot.set_xdata(radius[ti])
plt.draw()
sax = plt.axes([0.1, 0.92, 0.7, 0.02])
slider1 = Slider(sax, r'$t/\tau$', min(times), max(times),
valinit=min(times), valfmt='%3.1f',
fc='#999999')
slider1.drawon = False
slider1.on_changed(update)
plt.show()
if __name__ == '__main__':
import argparse, json
parser = argparse.ArgumentParser()
parser.add_argument('-j','--jsonfile', help='parameters file', required=True)
parser.add_argument('--onscreen', action=argparse.BooleanOptionalAction)
args = parser.parse_args()
with open(args.jsonfile) as jsonFile:
params = json.load(jsonFile)
visualize(params, DIR=os.path.dirname(args.jsonfile), offscreen=(not args.onscreen))
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -152,8 +152,8 @@ if d==1:
axc.set_xlim(np.min(radius), np.max(radius))
axc.set_ylim(min(np.min(sol), np.min(concentration)), max(np.max(sol), np.max(concentration)))
axc.set_ylabel(r'$c(x,t)$')
c_exactplot, = axc.plot(radius[0], sol[0])
cplot, = axc.plot(radius[0], concentration[0])
c_exactplot, = axc.plot(radius[0], sol[0], label='$c_{exact}$')
cplot, = axc.plot(radius[0], concentration[0], label='$c_{numerical}$')
def update(value):
ti = np.abs(times-value).argmin()
......@@ -169,6 +169,7 @@ if d==1:
fc='#999999')
slider.drawon = False
slider.on_changed(update)
axc.legend()
plt.show()
else:
......@@ -184,7 +185,7 @@ else:
scalar_actor.pointdata['concentration'] = sol[0]
scalar_actor.cmap(scalar_cmap, sol[0], vmin=cmin, vmax=cmax, n=n_cmap_vals)
scalar_actor.add_scalarbar(title = r'$c_{analytical}$',
scalar_actor.add_scalarbar(title = r'$c_{analytical}}$',
pos=(0.8, 0.04), nlabels=2,
# titleYOffset=15, titleFontSize=28, size=(100, 600)
)
......@@ -192,7 +193,7 @@ else:
def update(idx):
scalar_actor.points(pts=geometry[idx], transformed=False)
scalar_actor.pointdata['concentration'] = sol[idx]
scalar_actor.pointdata['concentration'] =sol[idx]
def slider_update(widget, event):
value = widget.GetRepresentation().GetValue()
......@@ -204,7 +205,27 @@ else:
value=times[0], title=r"$t/\tau$")
vd.show(interactive=(not offscreen), zoom=0.8)
# make movie
if offscreen:
FPS = 10
movFile = '%s_analytical.mov' % timestamp
fps = float(FPS)
command = "ffmpeg -y -r"
options = "-b:v 3600k -qscale:v 4 -vcodec mpeg4"
tmp_dir = TemporaryDirectory()
get_filename = lambda x: os.path.join(tmp_dir.name, x)
for tt in range(len(times)):
idx = (abs(times-times[tt])).argmin()
update(idx)
slider.GetRepresentation().SetValue(times[tt])
fr = get_filename("%03d.png" % tt)
vd.io.screenshot(fr)
os.system(command + " " + str(fps)
+ " -i " + tmp_dir.name + os.sep
+ "%03d.png " + options + " " + movFile)
tmp_dir.cleanup()
# c(r,t) vs r
fig1, axc1 = plt.subplots(1,1, figsize=(8,8))
axc1.set_xlabel(r'$r$')
axc1.set_xlim(np.min(radius), np.max(radius))
......@@ -221,32 +242,13 @@ else:
plt.draw()
sax = plt.axes([0.1, 0.92, 0.7, 0.02])
slider = Slider(sax, r'$t/\tau$', min(times), max(times),
slider1 = Slider(sax, r'$t/\tau$', min(times), max(times),
valinit=min(times), valfmt='%3.1f',
fc='#999999')
slider.drawon = False
slider.on_changed(update)
slider1.drawon = False
slider1.on_changed(update)
axc1.legend()
plt.show()
# make movie
if offscreen:
FPS = 10
movFile = '%s_analytical.mov' % timestamp
fps = float(FPS)
command = "ffmpeg -y -r"
options = "-b:v 3600k -qscale:v 4 -vcodec mpeg4"
tmp_dir = TemporaryDirectory()
get_filename = lambda x: os.path.join(tmp_dir.name, x)
for tt in range(len(times)):
idx = (abs(times-times[tt])).argmin()
update(idx)
slider.GetRepresentation().SetValue(times[tt])
fr = get_filename("%03d.png" % tt)
vd.io.screenshot(fr)
os.system(command + " " + str(fps)
+ " -i " + tmp_dir.name + os.sep
+ "%03d.png " + options + " " + movFile)
tmp_dir.cleanup()
import dolfin as df
import numpy as np
import progressbar
# import progressbar
import scipy as sc
import os
df.set_log_level(df.LogLevel.ERROR)
......@@ -12,11 +13,11 @@ class GrowthDiffusion(object):
for key in parameters:
setattr(self, key, parameters[key])
sigma = {'none': '0.0',
'linear': 'b/(L0+b*t)',
'linear': 'b/(L0 + b*t)',
'exponential': 'b'}
# define the growth velocity field
growth_direction = tuple(['x['+str(i)+']' for i in range(self.dimension)])
growth_direction = tuple(['x['+str(i)+']' for i in range(self.dimension)]) # r hat = (x hat, y hat)
#growth_direction = ('x[0]', )
self.growth_direction = df.Expression(growth_direction, degree=1)
self.sigma = df.Expression(sigma[self.growth],
......@@ -49,18 +50,23 @@ class GrowthDiffusion(object):
savesteps = int(self.savetime/self.timestep)
maxsteps = int(self.maxtime/self.timestep)
self.time = 0.0
cFile = df.XDMFFile('%s_concentration.xdmf' %params['timestamp'])
cFile = df.XDMFFile('%s_concentration.xdmf' %params['timestamp']) # create the file here
# initial condition
r2 = "+".join(['x['+str(i)+']*x['+str(i)+']' for i in range(self.dimension)])
c0 = '1 + 0.1*cos(pi*m*(%s)/L)' % r2
c0 = df.Expression(c0, pi=np.pi, m=1, L=self.system_size, degree=1)
if self.dimension==2:
c0 = '1 + jn(0, m*sqrt(%s)/L)' % r2
c0 = df.Expression(c0, m=sc.special.jn_zeros(1,1), L=self.system_size, degree=1)
else:
c0 = '1 + cos(m*pi*sqrt(%s)/L)' % r2
c0 = df.Expression(c0, pi=np.pi, m=2, L=self.system_size, degree=1)
self.c0.assign(df.project(c0, self.SFS))
# save data
cFile.write_checkpoint(self.c0, 'concentration', self.time)
# time stepping
for steps in progressbar.progressbar(range(1, maxsteps + 1)):
for steps in range(1, maxsteps + 1):
# get velocity
self.sigma.t = self.time
self.velocity.assign(df.project(self.sigma*self.growth_direction, self.VFS))
......@@ -75,13 +81,15 @@ class GrowthDiffusion(object):
displacement = df.project(self.velocity*self.timestep, self.VFS)
df.ALE.move(self.mesh, displacement)
self.time += self.timestep
cFile.close()
if __name__ == '__main__':
import json, datetime
assert os.path.isfile('parameters.json'), 'parameters.json file not found'
assert os.path.isfile('parameters.json'), 'parameters.json file not found' # assert that parameters.json is a valid file, otherwise
# give an error message parameters.json file not found
# load the parameters
with open('parameters.json') as jsonFile:
params = json.load(jsonFile)
......
import argparse, json
import dolfin as df
import numpy as np
import scipy as sc
import matplotlib.pyplot as plt
df.set_log_level(df.LogLevel.ERROR)
df.parameters['form_compiler']['optimize'] = True
#################### read parameters except dt ###############################################################
parser = argparse.ArgumentParser()
parser.add_argument('-j','--jsonfile', help='parameters file', required=True)
args = parser.parse_args()
with open(args.jsonfile) as jsonFile:
params = json.load(jsonFile)
dt_array = np.linspace(0.01, 0.1, 11)
error_array = np.zeros(len(dt_array))
for i in range(0, len(dt_array)):
####################### compute numerical solution ########################################################
# set up mesh
if params['dimension']==1:
mesh = df.IntervalMesh(params['resolution'], 0, params['system_size'])
elif params['dimension']==2:
mesh = df.Mesh('disk.xml.gz')
# function space, function, test function
SFS = df.FunctionSpace(mesh, 'P', 1)
VFS = df.VectorFunctionSpace(mesh, 'P', 1)
c = df.Function(SFS)
c0 = df.Function(SFS)
tc = df.TestFunction(SFS)
# initial condition
r2 = "+".join(['x['+str(i)+']*x['+str(i)+']' for i in range(params['dimension'])])
if params['dimension']==2:
c0 = '1 + jn(0, m*sqrt(%s)/L)' % r2
c0 = df.interpolate(df.Expression(c0, m=sc.special.jn_zeros(1,1), L=params['system_size'], degree=1), SFS)
else:
c0 = '1 + cos(m*pi*sqrt(%s)/L)' % r2
c0 = df.interpolate(df.Expression(c0, pi=np.pi, m=2, L=params['system_size'], degree=1), SFS)
# initialize arrays
maxsteps = int(params['maxtime']/dt_array[i])
times = np.linspace(0, params['maxtime'], maxsteps+1)
radius = np.zeros((len(times), mesh.num_vertices()))
radius[0] = np.linalg.norm(mesh.coordinates()[:], axis=1)
c_numerical = np.zeros_like(radius)
c_numerical[0] = c0.compute_vertex_values(mesh)
# velocity
sigma = {'none': '0.0',
'linear': 'b/(L0 + b*t)',
'exponential': 'b'}
growth_direction = tuple(['x['+str(i)+']' for i in range(params['dimension'])]) # r hat = (x hat, y hat)
#growth_direction = ('x[0]', )
growth_direction = df.Expression(growth_direction, degree=1)
sigma = df.Expression(sigma[params['growth']],
L0=params['system_size'],
b=params['growth_parameter'],
t=0, degree=1)
velocity = df.project(sigma * growth_direction, VFS)
# form
form = ( df.inner((c - c0)/dt_array[i], tc)
+ df.inner(df.div(velocity*c0), tc)
+ params['Dc'] * df.inner(df.nabla_grad(c), df.nabla_grad(tc))
- df.inner(params['reaction_rate'] * c, tc) ) * df.dx
# time stepping
time = 0.0
for steps in range(1, maxsteps + 1):
# get velocity
sigma.t = time
velocity.assign(df.project(sigma*growth_direction, VFS))
# solve numerically
df.solve(form == 0, c)
c_numerical[steps] = c.compute_vertex_values(mesh)
# update
c0.assign(c)
# move mesh
displacement = df.project(velocity*dt_array[i], VFS)
df.ALE.move(mesh, displacement)
# update radius array
radius[steps] = np.linalg.norm(mesh.coordinates()[:], axis=1)
#update time
time += dt_array[i]
##################### compute analytical solution ########################################
# modes entering the solution
p, m = 1, 2
if params['dimension'] == 1:
mode = m * np.pi
else:
mode = sc.special.jn_zeros(1, p) # pth zero of bessel 1
# initialize arrays
c_analytical = np.zeros_like(radius)
# time loop
t=0
for steps in range(0, maxsteps + 1):
mode_indep_time_part = {'none': np.exp(params['reaction_rate']*t),
'exponential': np.exp((params['reaction_rate'] - params['dimension'] * params['growth_parameter']) * t),
'linear': (params['system_size']/(params['system_size'] + params['growth_parameter'] * t))**params['dimension'] * np.exp(params['reaction_rate']*t)
}
mode_dep_time_part = {'none': np.exp(-mode**2 * params['Dc'] * t/params['system_size']**2),
'exponential': np.exp(-mode**2 * params['Dc'] * (1 - np.exp(-2 * params['growth_parameter'] * t))/(2 * params['growth_parameter'] * params['system_size']**2)),
'linear': np.exp(-mode**2 * params['Dc'] * t/(params['system_size']*(params['system_size'] + params['growth_parameter'] * t)))
}
domain_length = {'none': params['system_size'],
'exponential': params['system_size'] * np.exp(params['growth_parameter'] * t),
'linear': params['system_size'] + params['growth_parameter'] * t}
if params['dimension']==1:
c_analytical[steps] = (1 + mode_dep_time_part[params['growth']] * np.cos(mode * radius[steps]/domain_length[params['growth']])) * mode_indep_time_part[params['growth']]
else:
c_analytical[steps] = (1 + mode_dep_time_part[params['growth']] * sc.special.j0(mode * radius[steps]/ domain_length[params['growth']])) * mode_indep_time_part[params['growth']]
t += dt_array[i]
###################### compute error ######################################################
error_array[i] = np.max(np.abs(c_numerical[-1]-c_analytical[-1]))
##################### plot error vs dt ####################################################
plt.loglog(dt_array, error_array, 'bo')
plt.xlabel('log(dt)')
plt.ylabel('log(error)')
plt.show()
......@@ -98,7 +98,7 @@ def visualize(params, DIR='', offscreen=False):
#scalar_actor.computeNormals(points=True, cells=True)
scalar_actor.pointdata['concentration'] = concentration[0]
scalar_actor.cmap(scalar_cmap, concentration[0], vmin=cmin, vmax=cmax, n=n_cmap_vals)
scalar_actor.add_scalarbar(title = r'$c$',
scalar_actor.add_scalarbar(title = r'$c_{numerical}$',
pos=(0.8, 0.04), nlabels=2,
# titleYOffset=15, titleFontSize=28, size=(100, 600)
)
......@@ -118,7 +118,24 @@ def visualize(params, DIR='', offscreen=False):
value=times[0], title=r"$t/\tau$")
vd.show(interactive=(not offscreen), zoom=0.8)
if offscreen:
FPS = 10
movFile = '%s_numerical.mov' % params['timestamp']
fps = float(FPS)
command = "ffmpeg -y -r"
options = "-b:v 3600k -qscale:v 4 -vcodec mpeg4"
tmp_dir = TemporaryDirectory()
get_filename = lambda x: os.path.join(tmp_dir.name, x)
for tt in range(len(times)):
idx = (abs(times-times[tt])).argmin()
update(idx)
slider.GetRepresentation().SetValue(times[tt])
fr = get_filename("%03d.png" % tt)
vd.io.screenshot(fr)
os.system(command + " " + str(fps)
+ " -i " + tmp_dir.name + os.sep
+ "%03d.png " + options + " " + movFile)
tmp_dir.cleanup()
# c(r,t) vs r
fig1, axc1 = plt.subplots(1,1, figsize=(8,8))
......@@ -135,33 +152,13 @@ def visualize(params, DIR='', offscreen=False):
plt.draw()
sax = plt.axes([0.1, 0.92, 0.7, 0.02])
slider = Slider(sax, r'$t/\tau$', min(times), max(times),
slider1 = Slider(sax, r'$t/\tau$', min(times), max(times),
valinit=min(times), valfmt='%3.1f',
fc='#999999')
slider.drawon = False
slider.on_changed(update)
slider1.drawon = False
slider1.on_changed(update)
plt.show()
# make movie
if offscreen:
FPS = 10
movFile = '%s.mov' % params['timestamp']
fps = float(FPS)
command = "ffmpeg -y -r"
options = "-b:v 3600k -qscale:v 4 -vcodec mpeg4"
tmp_dir = TemporaryDirectory()
get_filename = lambda x: os.path.join(tmp_dir.name, x)
for tt in range(len(times)):
idx = (abs(times-times[tt])).argmin()
update(idx)
slider.GetRepresentation().SetValue(times[tt])
fr = get_filename("%03d.png" % tt)
vd.io.screenshot(fr)
os.system(command + " " + str(fps)
+ " -i " + tmp_dir.name + os.sep
+ "%03d.png " + options + " " + movFile)
tmp_dir.cleanup()
if __name__ == '__main__':
import argparse, json
......
import dolfin as df
import numpy as np
# import progressbar
import scipy as sc
import os
df.set_log_level(df.LogLevel.ERROR)
df.parameters['form_compiler']['optimize'] = True
class GrowthDiffusion(object):
def __init__(self, parameters):
# read in parameters
for key in parameters:
setattr(self, key, parameters[key])
sigma = {'none': '0.0',
'linear': 'b/(L0 + b*t)',
'exponential': 'b'}
# define the growth velocity field
growth_direction = tuple(['x['+str(i)+']' for i in range(self.dimension)]) # r hat = (x hat, y hat)
#growth_direction = ('x[0]', )
self.growth_direction = df.Expression(growth_direction, degree=1)
self.sigma = df.Expression(sigma[self.growth],
L0=self.system_size,
b=self.growth_parameter,
t=0, degree=1)
# set up mesh
if self.dimension==1:
self.mesh = df.IntervalMesh(self.resolution, 0, self.system_size)
elif self.dimension==2:
self.mesh = df.Mesh('disk.xml.gz')
elif self.dimension==3:
self.mesh = df.Mesh('sphere.xml.gz')
# create mesh, function space, define function, test function
self.SFS = df.FunctionSpace(self.mesh, 'P', 1)
self.VFS = df.VectorFunctionSpace(self.mesh, 'P', 1)
self.c1 = df.Function(self.SFS)
self.c2 = df.Function(self.SFS)
self.c10 = df.Function(self.SFS)
self.c20 = df.Function(self.SFS)
tc1 = df.TestFunction(self.SFS)
tc2 = df.TestFunction(self.SFS)
def diffusion(conc, tconc):
return df.inner(df.nabla_grad(conc), df.nabla_grad(tconc))
def advection(velocity, conc0, tconc):
return df.inner(df.div(velocity * conc0), tconc)
def reaction_c1(a, b, c1, c2, tconc):
return df.inner(a - (b+1)*c1 + c1**2*c2, tconc)
def reaction_c2(a, b, c1, c2, tconc):
return df.inner(b*c1 - c1**2*c2, tconc)
self.velocity = df.project(self.sigma * self.growth_direction, self.VFS)
form1 = df.inner((self.c1 - self.c10)/self.timestep, tc1)
+ advection(self.velocity, self.c10, tc1)
+ self.Dc1 * diffusion(self.c1, tc1)
- reaction_c1(self.a, self.b, self.c1, self.c2, tc1))
form2 = df.inner((self.c2 - self.c20)/self.timestep, tc2)
+ advection(self.velocity, self.c20, tc2)
+ self.Dc2 * diffusion(self.c2, tc2)
- reaction_c2(self.a, self.b, self.c1, self.c2, tc1))
self.form = ( form1 + form2 ) * df.dx
def solve(self):
savesteps = int(self.savetime/self.timestep)
maxsteps = int(self.maxtime/self.timestep)
self.time = 0.0
c1File = df.XDMFFile('%s_c1.xdmf' %params['timestamp']) # create the file here
c2File = df.XDMFFile('%s_c2' %params['timestamp'])
# initial condition
r2 = "+".join(['x['+str(i)+']*x['+str(i)+']' for i in range(self.dimension)])
if self.dimension==2:
c10 = '1 + jn(0, m*sqrt(%s)/L)' % r2
c10 = df.Expression(c10, m=sc.special.jn_zeros(1,1), L=self.system_size, degree=1)
c20 = '1 + jn(0, m*sqrt(%s)/L)' % r2
c20 = df.Expression(c20, m=sc.special.jn_zeros(1,2), L=self.system_size, degree=1)
else:
c10 = '1 + cos(m*pi*sqrt(%s)/L)' % r2
c10 = df.Expression(c10, pi=np.pi, m=2, L=self.system_size, degree=1)
c20 = '1 + cos(m*pi*sqrt(%s)/L)' % r2
c20 = df.Expression(c20, pi=np.pi, m=3, L=self.system_size, degree=1)
self.c10.assign(df.project(c10, self.SFS))
self.c20.assign(df.project(c20, self.SFS))
# save data
c1File.write_checkpoint(self.c10, 'c1', self.time)
c2File.write_checkpoint(self.c20, 'c2', self.time)
# time stepping
for steps in range(1, maxsteps + 1):
# get velocity
self.sigma.t = self.time
self.velocity.assign(df.project(self.sigma*self.growth_direction, self.VFS))
# solve
df.solve(self.form == 0, self.c)
# update
self.c0.assign(self.c)
# save data
if steps % savesteps == 0:
cFile.write_checkpoint(self.c0, 'concentration', self.time, append=True)
# move mesh
displacement = df.project(self.velocity*self.timestep, self.VFS)
df.ALE.move(self.mesh, displacement)
self.time += self.timestep
cFile.close()
if __name__ == '__main__':
import json, datetime
assert os.path.isfile('parameters.json'), 'parameters.json file not found' # assert that parameters.json is a valid file, otherwise
# give an error message parameters.json file not found
# load the parameters
with open('parameters.json') as jsonFile:
params = json.load(jsonFile)
# parse parameters
assert params['dimension'] in (1,2,3)
assert params['growth'] in ('none','linear','exponential'), 'Unknown growth model'
if params['dimension'] in (2,3):
del params['resolution']
timestamp = datetime.datetime.now().strftime("%d%m%y-%H%M%S")
params['timestamp'] = timestamp
gd = GrowthDiffusion(params)
gd.solve()
with open(params['timestamp'] + '_parameters.json', "w") as fp:
json.dump(params, fp, indent=4)
\ No newline at end of file
{
"dimension" : 2,
"resolution" : 15,
"system_size" : 1,
"Dc" : 0.1,
"growth" : "linear",
"growth_parameter" : 0.1,
"reaction_rate" : 0.0,
"timestep" : 0.01,
"savetime": 0.1,
"maxtime" : 5.0
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment