Skip to content
Snippets Groups Projects
Commit 09c9f502 authored by Jan Caron's avatar Jan Caron
Browse files

Merge

parents fa0ff9d0 a453cbad
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.Costfunction` class which represents a strategy to calculate
the so called `cost` of a threedimensional magnetization distribution."""
import numpy as np
from scipy.sparse.linalg import LinearOperator
from scipy.sparse import eye
from pyramid.forwardmodel import ForwardModel
from pyramid.regularisator import ZeroOrderRegularisator
import logging
class Costfunction(object):
'''Class for calculating the cost of a 3D magnetic distributions in relation to 2D phase maps.
Represents a strategy for the calculation of the `cost` of a 3D magnetic distribution in
relation to two-dimensional phase maps. The `cost` is a measure for the difference of the
simulated phase maps from the magnetic distributions to the given set of phase maps and from
a priori knowledge represented by a :class:`~.Regularisator` object. Furthermore this class
provides convenient methods for the calculation of the derivative :func:`~.jac` or the product
with the Hessian matrix :func:`~.hess_dot` of the costfunction, which can be used by
optimizers. All required data should be given in a :class:`~DataSet` object.
Attributes
----------
data_set: :class:`~dataset.DataSet`
:class:`~dataset.DataSet` object, which stores all information for the cost calculation.
regularisator : :class:`~.Regularisator`
Regularisator class that's responsible for the regularisation term.
regularisator: :class:`~Regularisator`
y : :class:`~numpy.ndarray` (N=1)
Vector which lists all pixel values of all phase maps one after another.
fwd_model : :class:`~.ForwardModel`
The Forward model instance which should be used for the simulation of the phase maps which
will be compared to `y`.
Se_inv : :class:`~numpy.ndarray` (N=2), optional
Inverted covariance matrix of the measurement errors. The matrix has size `NxN` with N
being the length of the targetvector y (vectorized phase map information).
m: int
Size of the image space.
n: int
Size of the input space.
'''
LOG = logging.getLogger(__name__+'.Costfunction')
def __init__(self, data_set, regularisator):
self.LOG.debug('Calling __init__')
self.data_set = data_set
self.fwd_model = ForwardModel(data_set)
self.regularisator = regularisator
# Extract important information:
self.y = data_set.phase_vec
self.Se_inv = data_set.Se_inv
self.n = data_set.n
self.m = data_set.m
self.LOG.debug('Created '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(data_set=%r, fwd_model=%r, regularisator=%r)' % \
(self.__class__, self.data_set, self.fwd_model, self.regularisator)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'Costfunction(data_set=%s, fwd_model=%s, regularisator=%s)' % \
(self.data_set, self.fwd_model, self.regularisator)
def init(self, x):
self(x)
def __call__(self, x):
self.LOG.debug('Calling __call__')
delta_y = self.fwd_model(x) - self.y
self.chisq = delta_y.dot(self.Se_inv.dot(delta_y)) + self.regularisator(x)
return self.chisq
def jac(self, x):
'''Calculate the derivative of the costfunction for a given magnetization distribution.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution, for which the Jacobi vector is calculated.
Returns
-------
result : :class:`~numpy.ndarray` (N=1)
Jacobi vector which represents the cost derivative of all voxels of the magnetization.
'''
self.LOG.debug('Calling jac')
assert len(x) == self.n
return (2 * self.fwd_model.jac_T_dot(x, self.Se_inv.dot(self.fwd_model(x) - self.y))
+ self.regularisator.jac(x))
def hess_dot(self, x, vector):
'''Calculate the product of a `vector` with the Hessian matrix of the costfunction.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution at which the Hessian is calculated. The Hessian
is constant in this case, thus `x` can be set to None (it is not used int the
computation). It is implemented for the case that in the future nonlinear problems
have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution which is multiplied by the Hessian.
Returns
-------
result : :class:`~numpy.ndarray` (N=1)
Product of the input `vector` with the Hessian matrix of the costfunction.
'''
self.LOG.debug('Calling hess_dot')
return (2 * self.fwd_model.jac_T_dot(x, self.Se_inv.dot(self.fwd_model.jac_dot(x, vector)))
+ self.regularisator.hess_dot(x, vector))
def hess_diag(self, _):
return np.ones(self.n)
class CFAdapterScipyCG(LinearOperator):
'''Adapter class making the :class:`~.Costfunction` class accessible for scipy cg methods.
This class provides an adapter for the :class:`~.Costfunction` to be usable with the
:func:`~.scipy.sparse.linalg.cg` function. the :func:`~.matvec` function is overwritten to
implement a multiplication with the Hessian of the adapted costfunction. This is used in the
:func:`~pyramid.reconstruction.optimise_sparse_cg` function of the
:mod:`~pyramid.reconstruction` module.
Attributes
----------
cost : :class:`~.Costfunction`
Costfunction which should be made usable in the :func:`~.scipy.sparse.linalg.cg` function.
'''
# TODO: make obsolete!
LOG = logging.getLogger(__name__+'.CFAdapterScipyCG')
def __init__(self, cost):
self.LOG.debug('Calling __init__')
self.cost = cost
def matvec(self, vector):
'''Matrix-vector multiplication with the Hessian of the adapted costfunction.
Parameters
----------
vector : :class:`~numpy.ndarray` (N=1)
Vector which will be multiplied by the Hessian matrix provided by the adapted
costfunction.
'''
self.LOG.debug('Calling matvec')
return self.cost.hess_dot(None, vector)
@property
def shape(self):
return (self.cost.data_set.n, self.cost.data_set.n)
@property
def dtype(self):
return np.dtype("d")
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.Costfunction` class which represents a strategy to calculate
the so called `cost` of a threedimensional magnetization distribution."""
import numpy as np
from scipy.sparse.linalg import LinearOperator
from scipy.sparse import eye
from pyramid.forwardmodel import ForwardModel
from pyramid.regularisator import ZeroOrderRegularisator
import logging
class Costfunction(object):
'''Class for calculating the cost of a 3D magnetic distributions in relation to 2D phase maps.
Represents a strategy for the calculation of the `cost` of a 3D magnetic distribution in
relation to two-dimensional phase maps. The `cost` is a measure for the difference of the
simulated phase maps from the magnetic distributions to the given set of phase maps and from
a priori knowledge represented by a :class:`~.Regularisator` object. Furthermore this class
provides convenient methods for the calculation of the derivative :func:`~.jac` or the product
with the Hessian matrix :func:`~.hess_dot` of the costfunction, which can be used by
optimizers. All required data should be given in a :class:`~DataSet` object.
Attributes
----------
data_set: :class:`~dataset.DataSet`
:class:`~dataset.DataSet` object, which stores all information for the cost calculation.
regularisator : :class:`~.Regularisator`
Regularisator class that's responsible for the regularisation term.
regularisator: :class:`~Regularisator`
y : :class:`~numpy.ndarray` (N=1)
Vector which lists all pixel values of all phase maps one after another.
fwd_model : :class:`~.ForwardModel`
The Forward model instance which should be used for the simulation of the phase maps which
will be compared to `y`.
Se_inv : :class:`~numpy.ndarray` (N=2), optional
Inverted covariance matrix of the measurement errors. The matrix has size `NxN` with N
being the length of the targetvector y (vectorized phase map information).
m: int
Size of the image space.
n: int
Size of the input space.
'''
LOG = logging.getLogger(__name__+'.Costfunction')
def __init__(self, data_set, regularisator):
self.LOG.debug('Calling __init__')
self.data_set = data_set
self.fwd_model = ForwardModel(data_set)
self.regularisator = regularisator
# Extract important information:
self.y = data_set.phase_vec
self.Se_inv = data_set.Se_inv
self.n = data_set.n
self.m = data_set.m
self.LOG.debug('Created '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(data_set=%r, fwd_model=%r, regularisator=%r)' % \
(self.__class__, self.data_set, self.fwd_model, self.regularisator)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'Costfunction(data_set=%s, fwd_model=%s, regularisator=%s)' % \
(self.data_set, self.fwd_model, self.regularisator)
def init(self, x):
self(x)
def __call__(self, x):
self.LOG.debug('Calling __call__')
delta_y = self.fwd_model(x) - self.y
self.chisq_m = delta_y.dot(self.Se_inv.dot(delta_y))
self.chisq_a = self.regularisator(x)
self.chisq = self.chisq_m + self.chisq_a
return self.chisq
def jac(self, x):
'''Calculate the derivative of the costfunction for a given magnetization distribution.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution, for which the Jacobi vector is calculated.
Returns
-------
result : :class:`~numpy.ndarray` (N=1)
Jacobi vector which represents the cost derivative of all voxels of the magnetization.
'''
self.LOG.debug('Calling jac')
assert len(x) == self.n
return (2 * self.fwd_model.jac_T_dot(x, self.Se_inv.dot(self.fwd_model(x) - self.y))
+ self.regularisator.jac(x))
def hess_dot(self, x, vector):
'''Calculate the product of a `vector` with the Hessian matrix of the costfunction.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution at which the Hessian is calculated. The Hessian
is constant in this case, thus `x` can be set to None (it is not used int the
computation). It is implemented for the case that in the future nonlinear problems
have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized magnetization distribution which is multiplied by the Hessian.
Returns
-------
result : :class:`~numpy.ndarray` (N=1)
Product of the input `vector` with the Hessian matrix of the costfunction.
'''
self.LOG.debug('Calling hess_dot')
return (2 * self.fwd_model.jac_T_dot(x, self.Se_inv.dot(self.fwd_model.jac_dot(x, vector)))
+ self.regularisator.hess_dot(x, vector))
def hess_diag(self, _):
return np.ones(self.n)
class CFAdapterScipyCG(LinearOperator):
'''Adapter class making the :class:`~.Costfunction` class accessible for scipy cg methods.
This class provides an adapter for the :class:`~.Costfunction` to be usable with the
:func:`~.scipy.sparse.linalg.cg` function. the :func:`~.matvec` function is overwritten to
implement a multiplication with the Hessian of the adapted costfunction. This is used in the
:func:`~pyramid.reconstruction.optimise_sparse_cg` function of the
:mod:`~pyramid.reconstruction` module.
Attributes
----------
cost : :class:`~.Costfunction`
Costfunction which should be made usable in the :func:`~.scipy.sparse.linalg.cg` function.
'''
# TODO: make obsolete!
LOG = logging.getLogger(__name__+'.CFAdapterScipyCG')
def __init__(self, cost):
self.LOG.debug('Calling __init__')
self.cost = cost
def matvec(self, vector):
'''Matrix-vector multiplication with the Hessian of the adapted costfunction.
Parameters
----------
vector : :class:`~numpy.ndarray` (N=1)
Vector which will be multiplied by the Hessian matrix provided by the adapted
costfunction.
'''
self.LOG.debug('Calling matvec')
return self.cost.hess_dot(None, vector)
@property
def shape(self):
return (self.cost.data_set.n, self.cost.data_set.n)
@property
def dtype(self):
return np.dtype("d")
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.DataSet` class for the collection of phase maps
and additional data like corresponding projectors."""
import numpy as np
from numbers import Number
import scipy.sparse as sp
import matplotlib.pyplot as plt
from pyramid.phasemap import PhaseMap
from pyramid.phasemapper import PhaseMapperRDFC
from pyramid.projector import Projector
from pyramid.kernel import Kernel
import logging
class DataSet(object):
'''Class for collecting phase maps and corresponding projectors.
Represents a collection of (e.g. experimentally derived) phase maps, stored as
:class:`~.PhaseMap` objects and corresponding projectors stored as :class:`~.Projector`
objects. At creation, the grid spacing `a` and the dimension `dim` of the magnetization
distribution have to be given. Data can be added via the :func:`~.append` method, where
a :class:`~.PhaseMap`, a :class:`~.Projector` and additional info have to be given.
Attributes
----------
a: float
The grid spacing in nm.
dim: tuple (N=3)
Dimensions of the 3D magnetization distribution.
phase_maps:
A list of all stored :class:`~.PhaseMap` objects.
b_0: double
The saturation induction in `T`.
mask: :class:`~numpy.ndarray` (N=3), optional
A boolean mask which defines the magnetized volume in 3D.
projectors: list of :class:`~.Projector`
A list of all stored :class:`~.Projector` objects.
phase_maps: list of :class:`~.PhaseMap`
A list of all stored :class:`~.PhaseMap` objects.
phase_vec: :class:`~numpy.ndarray` (N=1)
The concatenaded, vectorized phase of all ;class:`~.PhaseMap` objects.
m: int
Size of the image space.
n: int
Size of the input space.
'''
LOG = logging.getLogger(__name__+'.DataSet')
@property
def m(self):
return np.sum([len(p.phase_vec) for p in self.phase_maps])
@property
def Se_inv(self):
# TODO: better implementation, maybe get-method? more flexible? input in append?
return sp.eye(self.m)
@property
def phase_vec(self):
return np.concatenate([p.phase_vec for p in self.phase_maps])
@property
def hook_points(self):
result = [0]
for i, phase_map in enumerate(self.phase_maps):
result.append(result[i]+np.prod(phase_map.dim_uv))
return result
@property
def phase_mappers(self):
dim_uv_list = np.unique([p.dim_uv for p in self.projectors])
kernel_list = [Kernel(self.a, tuple(dim_uv)) for dim_uv in dim_uv_list]
return {kernel.dim_uv: PhaseMapperRDFC(kernel) for kernel in kernel_list}
def __init__(self, a, dim, b_0=1, mask=None):
self.LOG.debug('Calling __init__')
assert isinstance(a, Number), 'Grid spacing has to be a number!'
assert a >= 0, 'Grid spacing has to be a positive number!'
assert isinstance(dim, tuple) and len(dim) == 3, \
'Dimension has to be a tuple of length 3!'
if mask is not None:
assert mask.shape == dim, 'Mask dimensions must match!'
self.n = 3 * np.sum(self.mask)
else:
self.n = 3 * np.prod(dim)
self.a = a
self.dim = dim
self.b_0 = b_0
self.mask = mask
self.phase_maps = []
self.projectors = []
self.LOG.debug('Created: '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(a=%r, dim=%r, b_0=%r)' % (self.__class__, self.a, self.dim, self.b_0)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'DataSet(a=%s, dim=%s, b_0=%s)' % (self.a, self.dim, self.b_0)
def append(self, phase_map, projector): # TODO: include Se_inv or 2D mask??
'''Appends a data pair of phase map and projection infos to the data collection.`
Parameters
----------
phase_map: :class:`~.PhaseMap`
A :class:`~.PhaseMap` object which should be added to the data collection.
projector: :class:`~.Projector`
A :class:`~.Projector` object which should be added to the data collection.
Returns
-------
None
'''
self.LOG.debug('Calling append')
assert isinstance(phase_map, PhaseMap) and isinstance(projector, Projector), \
'Argument has to be a tuple of a PhaseMap and a Projector object!'
assert projector.dim == self.dim, '3D dimensions must match!'
assert phase_map.dim_uv == projector.dim_uv, 'Projection dimensions (dim_uv) must match!'
self.phase_maps.append(phase_map)
self.projectors.append(projector)
def create_phase_maps(self, mag_data):
'''Create a list of phasemaps with the projectors in the dataset for a given
:class:`~.MagData` object.
Parameters
----------
mag_data : :class:`~.MagData`
Magnetic distribution to which the projectors of the dataset should be applied.
Returns
-------
phase_maps : list of :class:`~.phasemap.PhaseMap`
A list of the phase maps resulting from the projections specified in the dataset.
'''
return [self.phase_mappers[projector.dim_uv](projector(mag_data))
for projector in self.projectors]
def display_phase(self, mag_data=None, title='Phase Map',
cmap='RdBu', limit=None, norm=None):
'''Display all phasemaps saved in the :class:`~.DataSet` as a colormesh.
Parameters
----------
mag_data : :class:`~.MagData`, optional
Magnetic distribution to which the projectors of the dataset should be applied. If not
given, the phase_maps in the dataset are used.
title : string, optional
The main part of the title of the plots. The default is 'Phase Map'. Additional
projector info is appended to this.
cmap : string, optional
The :class:`~matplotlib.colors.Colormap` which is used for the plots as a string.
The default is 'RdBu'.
limit : float, optional
Plotlimit for the phase in both negative and positive direction (symmetric around 0).
If not specified, the maximum amplitude of the phase is used.
norm : :class:`~matplotlib.colors.Normalize` or subclass, optional
Norm, which is used to determine the colors to encode the phase information.
If not specified, :class:`~matplotlib.colors.Normalize` is automatically used.
Returns
-------
None
'''
self.LOG.debug('Calling display')
if mag_data is not None:
phase_maps = self.create_phase_maps(mag_data)
else:
phase_maps = self.phase_maps
[phase_map.display_phase('{} ({})'.format(title, self.projectors[i].get_info()),
cmap, limit, norm)
for (i, phase_map) in enumerate(phase_maps)]
plt.show()
def display_combined(self, mag_data=None, title='Combined Plot', cmap='RdBu', limit=None,
norm=None, gain=1, interpolation='none', grad_encode='bright'):
'''Display all phasemaps and the resulting color coded holography images.
Parameters
----------
mag_data : :class:`~.MagData`, optional
Magnetic distribution to which the projectors of the dataset should be applied. If not
given, the phase_maps in the dataset are used.
title : string, optional
The title of the plot. The default is 'Combined Plot'.
cmap : string, optional
The :class:`~matplotlib.colors.Colormap` which is used for the plot as a string.
The default is 'RdBu'.
limit : float, optional
Plotlimit for the phase in both negative and positive direction (symmetric around 0).
If not specified, the maximum amplitude of the phase is used.
norm : :class:`~matplotlib.colors.Normalize` or subclass, optional
Norm, which is used to determine the colors to encode the phase information.
If not specified, :class:`~matplotlib.colors.Normalize` is automatically used.
gain : float, optional
The gain factor for determining the number of contour lines in the holographic
contour map. The default is 1.
interpolation : {'none, 'bilinear', 'cubic', 'nearest'}, optional
Defines the interpolation method for the holographic contour map.
No interpolation is used in the default case.
grad_encode: {'bright', 'dark', 'color', 'none'}, optional
Encoding mode of the phase gradient. 'none' produces a black-white image, 'color' just
encodes the direction (without gradient strength), 'dark' modulates the gradient
strength with a factor between 0 and 1 and 'bright' (which is the default) encodes
the gradient strength with color saturation.
Returns
-------
None
'''
self.LOG.debug('Calling display_combined')
if mag_data is not None:
phase_maps = self.create_phase_maps(mag_data)
else:
phase_maps = self.phase_maps
[phase_map.display_combined('{} ({})'.format(title, self.projectors[i].get_info()),
cmap, limit, norm, gain, interpolation, grad_encode)
for (i, phase_map) in enumerate(phase_maps)]
plt.show()
# TODO: method for constructing 3D mask from 2D masks?
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.DataSet` class for the collection of phase maps
and additional data like corresponding projectors."""
import numpy as np
from numbers import Number
import scipy.sparse as sp
import matplotlib.pyplot as plt
from pyramid.phasemap import PhaseMap
from pyramid.phasemapper import PhaseMapperRDFC
from pyramid.projector import Projector
from pyramid.kernel import Kernel
import logging
class DataSet(object):
'''Class for collecting phase maps and corresponding projectors.
Represents a collection of (e.g. experimentally derived) phase maps, stored as
:class:`~.PhaseMap` objects and corresponding projectors stored as :class:`~.Projector`
objects. At creation, the grid spacing `a` and the dimension `dim` of the magnetization
distribution have to be given. Data can be added via the :func:`~.append` method, where
a :class:`~.PhaseMap`, a :class:`~.Projector` and additional info have to be given.
Attributes
----------
a: float
The grid spacing in nm.
dim: tuple (N=3)
Dimensions of the 3D magnetization distribution.
phase_maps:
A list of all stored :class:`~.PhaseMap` objects.
b_0: double
The saturation induction in `T`.
mask: :class:`~numpy.ndarray` (N=3), optional
A boolean mask which defines the magnetized volume in 3D.
projectors: list of :class:`~.Projector`
A list of all stored :class:`~.Projector` objects.
phase_maps: list of :class:`~.PhaseMap`
A list of all stored :class:`~.PhaseMap` objects.
phase_vec: :class:`~numpy.ndarray` (N=1)
The concatenaded, vectorized phase of all ;class:`~.PhaseMap` objects.
m: int
Size of the image space.
n: int
Size of the input space.
'''
LOG = logging.getLogger(__name__+'.DataSet')
@property
def m(self):
return np.sum([len(p.phase_vec) for p in self.phase_maps])
@property
def Se_inv(self):
# TODO: better implementation, maybe get-method? more flexible? input in append?
return sp.eye(self.m)
@property
def phase_vec(self):
return np.concatenate([p.phase_vec for p in self.phase_maps])
@property
def hook_points(self):
result = [0]
for i, phase_map in enumerate(self.phase_maps):
result.append(result[i]+np.prod(phase_map.dim_uv))
return result
@property
def phase_mappers(self):
dim_uv_list = np.unique([p.dim_uv for p in self.projectors])
kernel_list = [Kernel(self.a, tuple(dim_uv)) for dim_uv in dim_uv_list]
return {kernel.dim_uv: PhaseMapperRDFC(kernel) for kernel in kernel_list}
def __init__(self, a, dim, b_0=1, mask=None):
self.LOG.debug('Calling __init__')
assert isinstance(a, Number), 'Grid spacing has to be a number!'
assert a >= 0, 'Grid spacing has to be a positive number!'
assert isinstance(dim, tuple) and len(dim) == 3, \
'Dimension has to be a tuple of length 3!'
if mask is not None:
assert mask.shape == dim, 'Mask dimensions must match!'
self.n = 3 * np.sum(mask)
else:
self.n = 3 * np.prod(dim)
self.a = a
self.dim = dim
self.b_0 = b_0
self.mask = mask
self.phase_maps = []
self.projectors = []
self.LOG.debug('Created: '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(a=%r, dim=%r, b_0=%r)' % (self.__class__, self.a, self.dim, self.b_0)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'DataSet(a=%s, dim=%s, b_0=%s)' % (self.a, self.dim, self.b_0)
def append(self, phase_map, projector): # TODO: include Se_inv or 2D mask??
'''Appends a data pair of phase map and projection infos to the data collection.`
Parameters
----------
phase_map: :class:`~.PhaseMap`
A :class:`~.PhaseMap` object which should be added to the data collection.
projector: :class:`~.Projector`
A :class:`~.Projector` object which should be added to the data collection.
Returns
-------
None
'''
self.LOG.debug('Calling append')
assert isinstance(phase_map, PhaseMap) and isinstance(projector, Projector), \
'Argument has to be a tuple of a PhaseMap and a Projector object!'
assert projector.dim == self.dim, '3D dimensions must match!'
assert phase_map.dim_uv == projector.dim_uv, 'Projection dimensions (dim_uv) must match!'
self.phase_maps.append(phase_map)
self.projectors.append(projector)
def create_phase_maps(self, mag_data):
'''Create a list of phasemaps with the projectors in the dataset for a given
:class:`~.MagData` object.
Parameters
----------
mag_data : :class:`~.MagData`
Magnetic distribution to which the projectors of the dataset should be applied.
Returns
-------
phase_maps : list of :class:`~.phasemap.PhaseMap`
A list of the phase maps resulting from the projections specified in the dataset.
'''
return [self.phase_mappers[projector.dim_uv](projector(mag_data))
for projector in self.projectors]
def display_phase(self, mag_data=None, title='Phase Map',
cmap='RdBu', limit=None, norm=None):
'''Display all phasemaps saved in the :class:`~.DataSet` as a colormesh.
Parameters
----------
mag_data : :class:`~.MagData`, optional
Magnetic distribution to which the projectors of the dataset should be applied. If not
given, the phase_maps in the dataset are used.
title : string, optional
The main part of the title of the plots. The default is 'Phase Map'. Additional
projector info is appended to this.
cmap : string, optional
The :class:`~matplotlib.colors.Colormap` which is used for the plots as a string.
The default is 'RdBu'.
limit : float, optional
Plotlimit for the phase in both negative and positive direction (symmetric around 0).
If not specified, the maximum amplitude of the phase is used.
norm : :class:`~matplotlib.colors.Normalize` or subclass, optional
Norm, which is used to determine the colors to encode the phase information.
If not specified, :class:`~matplotlib.colors.Normalize` is automatically used.
Returns
-------
None
'''
self.LOG.debug('Calling display')
if mag_data is not None:
phase_maps = self.create_phase_maps(mag_data)
else:
phase_maps = self.phase_maps
[phase_map.display_phase('{} ({})'.format(title, self.projectors[i].get_info()),
cmap, limit, norm)
for (i, phase_map) in enumerate(phase_maps)]
plt.show()
def display_combined(self, mag_data=None, title='Combined Plot', cmap='RdBu', limit=None,
norm=None, gain=1, interpolation='none', grad_encode='bright'):
'''Display all phasemaps and the resulting color coded holography images.
Parameters
----------
mag_data : :class:`~.MagData`, optional
Magnetic distribution to which the projectors of the dataset should be applied. If not
given, the phase_maps in the dataset are used.
title : string, optional
The title of the plot. The default is 'Combined Plot'.
cmap : string, optional
The :class:`~matplotlib.colors.Colormap` which is used for the plot as a string.
The default is 'RdBu'.
limit : float, optional
Plotlimit for the phase in both negative and positive direction (symmetric around 0).
If not specified, the maximum amplitude of the phase is used.
norm : :class:`~matplotlib.colors.Normalize` or subclass, optional
Norm, which is used to determine the colors to encode the phase information.
If not specified, :class:`~matplotlib.colors.Normalize` is automatically used.
gain : float, optional
The gain factor for determining the number of contour lines in the holographic
contour map. The default is 1.
interpolation : {'none, 'bilinear', 'cubic', 'nearest'}, optional
Defines the interpolation method for the holographic contour map.
No interpolation is used in the default case.
grad_encode: {'bright', 'dark', 'color', 'none'}, optional
Encoding mode of the phase gradient. 'none' produces a black-white image, 'color' just
encodes the direction (without gradient strength), 'dark' modulates the gradient
strength with a factor between 0 and 1 and 'bright' (which is the default) encodes
the gradient strength with color saturation.
Returns
-------
None
'''
self.LOG.debug('Calling display_combined')
if mag_data is not None:
phase_maps = self.create_phase_maps(mag_data)
else:
phase_maps = self.phase_maps
[phase_map.display_combined('{} ({})'.format(title, self.projectors[i].get_info()),
cmap, limit, norm, gain, interpolation, grad_encode)
for (i, phase_map) in enumerate(phase_maps)]
plt.show()
# TODO: method for constructing 3D mask from 2D masks?
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.ForwardModel` class which represents a strategy to map a
threedimensional magnetization distribution onto a two-dimensional phase map."""
import numpy as np
from pyramid.magdata import MagData
import logging
class ForwardModel(object):
'''Class for mapping 3D magnetic distributions to 2D phase maps.
Represents a strategy for the mapping of a 3D magnetic distribution to two-dimensional
phase maps. Can handle a list of `projectors` of :class:`~.Projector` objects, which describe
different projection angles, so many phase_maps can be created from one magnetic distribution.
All required data should be given in a :class:`~DataSet` object.
Attributes
----------
data_set: :class:`~dataset.DataSet`
:class:`~dataset.DataSet` object, which stores all required information calculation.
projectors : list of :class:`~.Projector`
A list of all :class:`~.Projector` objects representing the projection directions.
kernel : :class:`~.Kernel`
A kernel which describes the phasemapping of the 2D projected magnetization distribution.
a : float
The grid spacing in nm.
dim : tuple (N=3)
Dimensions of the 3D magnetic distribution.
m: int
Size of the image space. Number of pixels of the 2-dimensional projected grid.
n: int
Size of the input space. Number of voxels of the 3-dimensional grid.
'''
LOG = logging.getLogger(__name__+'.ForwardModel')
def __init__(self, data_set):
self.LOG.debug('Calling __init__')
self.data_set = data_set
self.phase_mappers = data_set.phase_mappers
self.m = data_set.m
self.n = data_set.n
self.hook_points = data_set.hook_points
self.mag_data = MagData(data_set.a, np.zeros((3,)+data_set.dim))
self.LOG.debug('Creating '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(data_set=%r)' % (self.__class__, self.data_set)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'ForwardModel(data_set=%s)' % (self.data_set)
def __call__(self, x):
self.LOG.debug('Calling __call__')
self.mag_data.set_vector(x, self.data_set.mask)
# TODO: Multiprocessing
result = np.zeros(self.m)
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
phase_map = self.phase_mappers[projector.dim_uv](projector(self.mag_data))
result[hp[i]:hp[i+1]] = phase_map.phase_vec
return np.reshape(result, -1)
def jac_dot(self, x, vector):
'''Calculate the product of the Jacobi matrix with a given `vector`.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Evaluation point of the jacobi-matrix. The Jacobi matrix is constant for a linear
problem, thus `x` can be set to None (it is not used int the computation). It is
implemented for the case that in the future nonlinear problems have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized form of the 3D magnetization distribution. First the `x`, then the `y` and
lastly the `z` components are listed.
Returns
-------
result_vector : :class:`~numpy.ndarray` (N=1)
Product of the Jacobi matrix (which is not explicitely calculated) with the input
`vector`.
'''
self.LOG.debug('Calling jac_dot')
self.mag_data.set_vector(vector, self.data_set.mask)
result = np.zeros(self.m)
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
mag_vec = self.mag_data.mag_vec
res = self.phase_mappers[projector.dim_uv].jac_dot(projector.jac_dot(mag_vec))
result[hp[i]:hp[i+1]] = res.flatten()
return result
def jac_T_dot(self, x, vector):
''''Calculate the product of the transposed Jacobi matrix with a given `vector`.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Evaluation point of the jacobi-matrix. The jacobi matrix is constant for a linear
problem, thus `x` can be set to None (it is not used int the computation). Is used
for the case that in the future nonlinear problems have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized form of all 2D phase maps one after another in one vector.
Returns
-------
result_vector : :class:`~numpy.ndarray` (N=1)
Product of the transposed Jacobi matrix (which is not explicitely calculated) with
the input `vector`.
'''
self.LOG.debug('Calling jac_T_dot')
result = np.zeros(self.n)
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
vec = vector[hp[i]:hp[i+1]]
result += projector.jac_T_dot(self.phase_mappers[projector.dim_uv].jac_T_dot(vec))
self.mag_data.mag_vec = result
return self.mag_data.get_vector(self.data_set.mask)
# -*- coding: utf-8 -*-
"""This module provides the :class:`~.ForwardModel` class which represents a strategy to map a
threedimensional magnetization distribution onto a two-dimensional phase map."""
import numpy as np
from pyramid.magdata import MagData
import logging
class ForwardModel(object):
'''Class for mapping 3D magnetic distributions to 2D phase maps.
Represents a strategy for the mapping of a 3D magnetic distribution to two-dimensional
phase maps. Can handle a list of `projectors` of :class:`~.Projector` objects, which describe
different projection angles, so many phase_maps can be created from one magnetic distribution.
All required data should be given in a :class:`~DataSet` object.
Attributes
----------
data_set: :class:`~dataset.DataSet`
:class:`~dataset.DataSet` object, which stores all required information calculation.
projectors : list of :class:`~.Projector`
A list of all :class:`~.Projector` objects representing the projection directions.
kernel : :class:`~.Kernel`
A kernel which describes the phasemapping of the 2D projected magnetization distribution.
a : float
The grid spacing in nm.
dim : tuple (N=3)
Dimensions of the 3D magnetic distribution.
m: int
Size of the image space. Number of pixels of the 2-dimensional projected grid.
n: int
Size of the input space. Number of voxels of the 3-dimensional grid.
'''
LOG = logging.getLogger(__name__+'.ForwardModel')
def __init__(self, data_set):
self.LOG.debug('Calling __init__')
self.data_set = data_set
self.phase_mappers = data_set.phase_mappers
self.m = data_set.m
self.n = data_set.n
self.hook_points = data_set.hook_points
self.mag_data = MagData(data_set.a, np.zeros((3,)+data_set.dim))
self.LOG.debug('Creating '+str(self))
def __repr__(self):
self.LOG.debug('Calling __repr__')
return '%s(data_set=%r)' % (self.__class__, self.data_set)
def __str__(self):
self.LOG.debug('Calling __str__')
return 'ForwardModel(data_set=%s)' % (self.data_set)
def __call__(self, x):
self.LOG.debug('Calling __call__')
self.mag_data.magnitude[:] = 0
self.mag_data.set_vector(x, self.data_set.mask)
# TODO: Multiprocessing
result = np.zeros(self.m)
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
phase_map = self.phase_mappers[projector.dim_uv](projector(self.mag_data))
result[hp[i]:hp[i+1]] = phase_map.phase_vec
return np.reshape(result, -1)
def jac_dot(self, x, vector):
'''Calculate the product of the Jacobi matrix with a given `vector`.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Evaluation point of the jacobi-matrix. The Jacobi matrix is constant for a linear
problem, thus `x` can be set to None (it is not used int the computation). It is
implemented for the case that in the future nonlinear problems have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized form of the 3D magnetization distribution. First the `x`, then the `y` and
lastly the `z` components are listed.
Returns
-------
result_vector : :class:`~numpy.ndarray` (N=1)
Product of the Jacobi matrix (which is not explicitely calculated) with the input
`vector`.
'''
self.LOG.debug('Calling jac_dot')
self.mag_data.magnitude[:] = 0
self.mag_data.set_vector(vector, self.data_set.mask)
result = np.zeros(self.m)
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
mag_vec = self.mag_data.mag_vec
res = self.phase_mappers[projector.dim_uv].jac_dot(projector.jac_dot(mag_vec))
result[hp[i]:hp[i+1]] = res.flatten()
return result
def jac_T_dot(self, x, vector):
''''Calculate the product of the transposed Jacobi matrix with a given `vector`.
Parameters
----------
x : :class:`~numpy.ndarray` (N=1)
Evaluation point of the jacobi-matrix. The jacobi matrix is constant for a linear
problem, thus `x` can be set to None (it is not used int the computation). Is used
for the case that in the future nonlinear problems have to be solved.
vector : :class:`~numpy.ndarray` (N=1)
Vectorized form of all 2D phase maps one after another in one vector.
Returns
-------
result_vector : :class:`~numpy.ndarray` (N=1)
Product of the transposed Jacobi matrix (which is not explicitely calculated) with
the input `vector`.
'''
self.LOG.debug('Calling jac_T_dot')
result = np.zeros(3*np.prod(self.data_set.dim))
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
vec = vector[hp[i]:hp[i+1]]
result += projector.jac_T_dot(self.phase_mappers[projector.dim_uv].jac_T_dot(vec))
self.mag_data.mag_vec = result
return self.mag_data.get_vector(self.data_set.mask)
......@@ -105,3 +105,38 @@ class Kernel(object):
self.LOG.debug('Calling __str__')
return 'Kernel(a=%s, dim_uv=%s, numcore=%s, geometry=%s)' % \
(self.a, self.dim_uv, self.numcore, self.geometry)
def _multiply_jacobi(self, vector):
self.LOG.debug('Calling _multiply_jacobi')
v_dim, u_dim = self.dim_uv
assert len(vector) == 2 * self.size, \
'vector size not compatible! vector: {}, size: {}'.format(len(vector), self.size)
result = np.zeros(self.size)
# Iterate over all contributing pixels (numbered consecutively)
for s in range(self.size): # column-wise (two columns at a time, u- and v-component)
i = s % u_dim # u-coordinate of current contributing pixel
j = int(s/u_dim) # v-coordinate of current ccontributing pixel
u_min = (u_dim-1) - i # u_dim-1: center of the kernel
u_max = (2*u_dim-1) - i # = u_min + u_dim
v_min = (v_dim-1) - j # v_dim-1: center of the kernel
v_max = (2*v_dim-1) - j # = v_min + v_dim
result += vector[s] * self.u[v_min:v_max, u_min:u_max].reshape(-1) # u
result -= vector[s+self.size] * self.v[v_min:v_max, u_min:u_max].reshape(-1) # v
return result
def _multiply_jacobi_T(self, vector):
self.LOG.debug('Calling _multiply_jacobi_T')
v_dim, u_dim = self.dim_uv
result = np.zeros(2*self.size)
# Iterate over all contributing pixels (numbered consecutively):
for s in range(self.size): # row-wise (two rows at a time, u- and v-component)
i = s % u_dim # u-coordinate of current contributing pixel
j = int(s/u_dim) # v-coordinate of current contributing pixel
u_min = (u_dim-1) - i # u_dim-1: center of the kernel
u_max = (2*u_dim-1) - i # = u_min + u_dim
v_min = (v_dim-1) - j # v_dim-1: center of the kernel
v_max = (2*v_dim-1) - j # = v_min + v_dim
result[s] = np.sum(vector*self.u[v_min:v_max, u_min:u_max].reshape(-1)) # u
result[s+self.size] = np.sum(vector*-self.v[v_min:v_max, u_min:u_max].reshape(-1)) # v
return result
#
This diff is collapsed.
......@@ -293,6 +293,7 @@ class PhaseMap(object):
phase_file = netCDF4.Dataset(filename, 'r', format='NETCDF4')
a = phase_file.a
phase = phase_file.variables['phase'][:]
#phase = phase[28:36, 28:36]
phase_file.close()
return PhaseMap(a, phase)
......
This diff is collapsed.
......@@ -117,11 +117,11 @@ def optimize_linear(data, regularisator=None, maxiter=1000, verbosity=0):
# Set up necessary objects:
cost = Costfunction(data, regularisator)
print cost(np.zeros(cost.n))
x_opt = cg.conj_grad_minimize(cost)
x_opt = jutil.cg.conj_grad_minimize(cost, max_iter=20)
print cost(x_opt)
# Create and return fitting MagData object:
mag_opt = MagData(data.a, np.zeros((3,)+data.dim))
mag_opt.mag_vec = x_opt
mag_opt.set_vector(x_opt, data.mask)
return mag_opt
......@@ -149,14 +149,37 @@ def optimize_nonlin(data, first_guess=None, regularisator=None):
LOG.debug('Calling optimize_cg')
if first_guess is None:
first_guess = MagData(data.a, np.zeros((3,)+data.dim))
x_0 = first_guess.mag_vec
x_0 = first_guess.get_vector(data.mask)
cost = Costfunction(data, regularisator)
# proj = fwd_model.data_set.projectors[0]
# proj_jac1 = np.array([proj.jac_dot(np.eye(proj.m, 1, -i).squeeze()) for i in range(proj.m)])
# proj_jac2 = np.array([proj.jac_T_dot(np.eye(proj.n, 1, -i).squeeze()) for i in range(proj.n)])
# print jac1, jac2.T, abs(jac1-jac2.T).sum()
# print jac1.shape, jac2.shape
# pm = fwd_model.phase_mappers[proj.dim_uv]
# pm_jac1 = np.array([pm.jac_dot(np.eye(pm.m)[:, i]) for i in range(pm.m)])
# pm_jac2 = np.array([pm.jac_T_dot(np.eye(pm.n)[:, i]) for i in range(pm.n)])
# print jac1, jac2.T, abs(jac1-jac2.T).sum()
# print jac1.shape, jac2.shape
# jac1 = np.array([fwd_model.jac_dot(x_0, np.eye(fwd_model.m)[:, i]) for i in range(fwd_model.m)])
# jac2 = np.array([fwd_model.jac_T_dot(x_0, np.eye(fwd_model.n)[:, i]) for i in range(fwd_model.n)])
# print proj_jac1.dot(pm_jac1)
# print (pm_jac2.dot(proj_jac2)).T
# print jac1
# print jac2.T
# print abs(jac1-jac2.T).sum()
# print jac1.shape, jac2.shape
assert len(x_0) == cost.n, (len(x_0), cost.m, cost.n)
result = minimizer.minimize(cost, x_0, options={"conv_rel": 1e-20}, tol={"max_iteration": 1})
result = jutil.minimizer.minimize(cost, x_0, options={"conv_rel":1e-2}, tol={"max_iteration":4})
x_opt = result.x
print cost(x_opt)
mag_opt = MagData(data.a, np.zeros((3,)+data.dim))
mag_opt.mag_vec = x_opt
mag_opt.set_vector(x_opt, data.mask)
return mag_opt
......
......@@ -110,28 +110,14 @@ class ZeroOrderRegularisator(Regularisator):
self.LOG.debug('Created '+str(self))
#class FirstOrderRegularisator(Regularisator):
# # TODO: Docstring!
#
# def __init__(self, fwd_model, lam, x_a=None):
# size_3d = fwd_model.size_3d
# dim = fwd_model.dim
# converter = IndexConverter(dim)
# row = []
# col = []
# data = []
#
# for i in range(size_3d):
# neighbours = converter.find_neighbour_ind(i)
#
# Sa_inv = csr_matrix(coo_matrix(data, (rows, columns)), shape=(3*size_3d, 3*size_3d))
#
# term2 = []
# for i in range(3):
# component = mag_data[i, ...]
# for j in range(3):
# if component.shape[j] > 1:
# term2.append(np.diff(component, axis=j).reshape(-1))
#
# super(FirstOrderRegularisator, self).__init__(Sa_inv, x_a)
# self.LOG.debug('Created '+str(self))
class FirstOrderRegularisator(Regularisator):
# TODO: Docstring!
def __init__(self, mask, lam, x_a=None):
import jutil
D0 = jutil.diff.get_diff_operator(mask, 0, 3)
D1 = jutil.diff.get_diff_operator(mask, 1, 3)
D = jutil.operator.VStack([D0, D1])
norm = jutil.norms.WeightedL2Square(D)
super(FirstOrderRegularisator, self).__init__(norm, lam)
self.LOG.debug('Created '+str(self))
......@@ -15,7 +15,7 @@ from pyramid.phasemap import PhaseMap
from pyramid.projector import SimpleProjector
from pyramid.phasemapper import pm
from pyramid.dataset import DataSet
from pyramid.regularisator import ZeroOrderRegularisator
from pyramid.regularisator import ZeroOrderRegularisator, FirstOrderRegularisator
import pyramid.reconstruction as rc
from time import clock
......@@ -50,13 +50,16 @@ phase_map = PhaseMap.load_from_netcdf4(PATH+'phase_map.nc')
with open(PATH + 'mask.pickle') as pf:
mask = pickle.load(pf)
# Setup:
data_set = DataSet(a, dim, b_0)
data_set = DataSet(a, dim, b_0, mask=mask)
data_set.append(phase_map, SimpleProjector(dim))
regularisator = ZeroOrderRegularisator(lam)
regularisator = FirstOrderRegularisator(mask, lam)
print "OOO"
# Reconstruct the magnetic distribution:
tic = clock()
mag_data_rec1 = rc.optimize_linear(data_set, regularisator=regularisator)
mag_data_rec = rc.optimize_nonlin(data_set, regularisator=regularisator)
mag_data_rec = rc.optimize_linear(data_set, regularisator=regularisator)
#mag_data_rec = rc.optimize_nonlin(data_set, regularisator=regularisator)
# .optimize_simple_leastsq(phase_map, mask, b_0, lam=lam, order=order)
print 'reconstruction time:', clock() - tic
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment