Skip to content
Snippets Groups Projects
dataset.py 15.3 KiB
Newer Older
Jörn Ungermann's avatar
Jörn Ungermann committed
# -*- coding: utf-8 -*-
# Copyright 2014 by Forschungszentrum Juelich GmbH
# Author: J. Caron
#
Jörn Ungermann's avatar
Jörn Ungermann committed
"""This module provides the :class:`~.DataSet` class for the collection of phase maps
and additional data like corresponding projectors."""

import logging
from numbers import Number
Jörn Ungermann's avatar
Jörn Ungermann committed

from pyramid.kernel import Kernel
from pyramid.phasemap import PhaseMap
from pyramid.phasemapper import PhaseMapperRDFC
from pyramid.projector import Projector
from pyramid.fielddata import ScalarData
from pyramid.ramp import Ramp
import matplotlib.pyplot as plt
Jörn Ungermann's avatar
Jörn Ungermann committed
import numpy as np
Jörn Ungermann's avatar
Jörn Ungermann committed

Jan Caron's avatar
Jan Caron committed
__all__ = ['DataSet']


Jörn Ungermann's avatar
Jörn Ungermann committed
class DataSet(object):
    """Class for collecting phase maps and corresponding projectors.
Jörn Ungermann's avatar
Jörn Ungermann committed

    Represents a collection of (e.g. experimentally derived) phase maps, stored as
    :class:`~.PhaseMap` objects and corresponding projectors stored as :class:`~.Projector`
    objects. At creation, the grid spacing `a` and the dimension `dim` of the magnetization
    distribution have to be given. Data can be added via the :func:`~.append` method, where
    a :class:`~.PhaseMap`, a :class:`~.Projector` and additional info have to be given.

    Attributes
    ----------
    a: float
        The grid spacing in nm.
    dim: tuple (N=3)
        Dimensions of the 3D magnetization distribution.
    b_0: double
        The saturation induction in `T`.
    mask: :class:`~numpy.ndarray` (N=3), optional
        A boolean mask which defines the magnetized volume in 3D.
    Se_inv : :class:`~numpy.ndarray` (N=2), optional
        Inverted covariance matrix of the measurement errors. The matrix has size `NxN` with N
        being the length of the targetvector y (vectorized phase map information).
    projectors: list of :class:`~.Projector`
        A list of all stored :class:`~.Projector` objects.
    phasemaps: list of :class:`~.PhaseMap`
        A list of all stored :class:`~.PhaseMap` objects.
    phase_vec: :class:`~numpy.ndarray` (N=1)
        The concatenaded, vectorized phase of all :class:`~.PhaseMap` objects.
    count(self): int
        Number of phase maps and projectors in the dataset.
    hook_points(self): :class:`~numpy.ndarray` (N=1)
        Hook points which determine the start of values of a phase map in the `phase_vec`.
        The length is `count + 1`.
Jörn Ungermann's avatar
Jörn Ungermann committed

Jörn Ungermann's avatar
Jörn Ungermann committed

    _log = logging.getLogger(__name__ + '.DataSet')
Jörn Ungermann's avatar
Jörn Ungermann committed

    @property
    def a(self):
        """The grid spacing in nm."""
        return self._a

    @a.setter
    def a(self, a):
        assert isinstance(a, Number), 'Grid spacing has to be a number!'
        assert a >= 0, 'Grid spacing has to be a positive number!'
        self._a = float(a)

    @property
    def mask(self):
        """A boolean mask which defines the magnetized volume in 3D."""
        return self._mask

    @mask.setter
    def mask(self, mask):
        if mask is not None:
            assert mask.shape == self.dim, 'Mask dimensions must match!'
        else:
            mask = np.ones(self.dim, dtype=bool)
        self._mask = mask.astype(np.bool)

Jörn Ungermann's avatar
Jörn Ungermann committed
    @property
    def m(self):
        """Size of the image space."""
        return np.sum([len(p.phase_vec) for p in self.phasemaps])
Jörn Ungermann's avatar
Jörn Ungermann committed

    @property
    def n(self):
        """Size of the input space."""
        return 3 * np.sum(self.mask)

        """Number of phase maps and projectors in the dataset."""
Jörn Ungermann's avatar
Jörn Ungermann committed
    @property
    def phase_vec(self):
        """The concatenaded, vectorized phase of all ;class:`~.PhaseMap` objects."""
        return np.concatenate([p.phase_vec for p in self.phasemaps])
Jörn Ungermann's avatar
Jörn Ungermann committed

    @property
    def hook_points(self):
        """Hook points which determine the start of values of a phase map in the `phase_vec`."""
Jörn Ungermann's avatar
Jörn Ungermann committed
        result = [0]
        for i, phasemap in enumerate(self.phasemaps):
            result.append(result[i] + np.prod(phasemap.dim_uv))
Jörn Ungermann's avatar
Jörn Ungermann committed
        return result

    @property
    def phasemaps(self):
        """List of all PhaseMaps in the DataSet."""
        return self._phasemaps

    @property
    def projectors(self):
        """List of all Projectors in the DataSet."""
        return self._projectors

Jörn Ungermann's avatar
Jörn Ungermann committed
    @property
        """List of all PhaseMappers in the DataSet."""
        return self._phasemappers

    @property
    def phasemapper_dict(self):
        """Dictionary of all PhaseMappers in the DataSet."""
        return self._phasemapper_dict
Jörn Ungermann's avatar
Jörn Ungermann committed

    def __init__(self, a, dim, b_0=1, mask=None, Se_inv=None):
Jan Caron's avatar
Jan Caron committed
        self._log.debug('Calling __init__')
Jörn Ungermann's avatar
Jörn Ungermann committed
        assert isinstance(dim, tuple) and len(dim) == 3, \
            'Dimension has to be a tuple of length 3!'
        self.a = a
        self.dim = dim
        self.b_0 = b_0
        self.mask = mask
        self._phasemaps = []
        self._projectors = []
        self._phasemappers = []
        self._phasemapper_dict = {}
        self._log.debug('Created: ' + str(self))
Jörn Ungermann's avatar
Jörn Ungermann committed

    def __repr__(self):
Jan Caron's avatar
Jan Caron committed
        self._log.debug('Calling __repr__')
        return '%s(a=%r, dim=%r, b_0=%r, mask=%r, Se_inv=%r)' % (self.__class__, self.a, self.dim,
                                                                 self.b_0, self.mask, self.Se_inv)
Jörn Ungermann's avatar
Jörn Ungermann committed

    def __str__(self):
Jan Caron's avatar
Jan Caron committed
        self._log.debug('Calling __str__')
Jörn Ungermann's avatar
Jörn Ungermann committed
        return 'DataSet(a=%s, dim=%s, b_0=%s)' % (self.a, self.dim, self.b_0)

    def _append_single(self, phasemap, projector, phasemapper=None):
        self._log.debug('Calling _append')
        assert isinstance(phasemap, PhaseMap) and isinstance(projector, Projector), \
            'Argument has to be a tuple of a PhaseMap and a Projector object!'
        dim_uv = projector.dim_uv
        assert projector.dim == self.dim, '3D dimensions must match!'
        assert phasemap.dim_uv == dim_uv, 'Projection dimensions (dim_uv) must match!'
        assert phasemap.a == self.a, 'Grid spacing must match!'
        # Create lookup key:
        if phasemapper is not None:
            key = dim_uv  # Create standard phasemapper, dim_uv is enough for identification!
        else:
            key = (dim_uv, str(phasemapper))  # Include string representation for identification!
        # Retrieve existing, use given or create new phasemapper:
        if key in self.phasemapper_dict:  # Retrieve existing phasemapper:
            phasemapper = self.phasemapper_dict[key]
        elif phasemapper is not None:  # Use given one (do nothing):
            pass
        else:  # Create new standard (RDFC) phasemapper:
            phasemapper = PhaseMapperRDFC(Kernel(self.a, dim_uv, self.b_0))
            self._phasemapper_dict[key] = phasemapper
        # Append everything to the lists (just contain pointers to objects!):
        self._phasemaps.append(phasemap)
        self._projectors.append(projector)
        self._phasemappers.append(phasemapper)

    def append(self, phasemap, projector, phasemapper=None):
        """Appends a data pair of phase map and projection infos to the data collection.`
Jörn Ungermann's avatar
Jörn Ungermann committed

        Parameters
        ----------
Jörn Ungermann's avatar
Jörn Ungermann committed
            A :class:`~.PhaseMap` object which should be added to the data collection.
        projector: :class:`~.Projector`
            A :class:`~.Projector` object which should be added to the data collection.
        phasemapper: :class:`~.PhaseMapper`, optional
            An optional :class:`~.PhaseMapper` object which should be added.
Jörn Ungermann's avatar
Jörn Ungermann committed

        Returns
        -------
        None

Jan Caron's avatar
Jan Caron committed
        self._log.debug('Calling append')
        if type(phasemap) is not list:
            phasemap = [phasemap]
        if type(projector) is not list:
            projector = [projector]
        if type(phasemapper) is not list:
            phasemapper = [phasemapper] * len(phasemap)
        assert len(phasemap) == len(projector),\
            ('Phasemaps and projectors must have same' +
             'length(phasemaps: {}, projectors: {})!'.format(len(phasemap), len(projector)))
        for i in range(len(phasemap)):
            self._append_single(phasemap[i], projector[i], phasemapper[i])
        # Reset the Se_inv matrix from phasemaps confidence matrices:
        self.set_Se_inv_diag_with_conf()

    def create_phasemaps(self, magdata, difference=True, ramp=None):
        """Create a list of phasemaps with the projectors in the dataset for a given
Jörn Ungermann's avatar
Jörn Ungermann committed

        Parameters
        ----------
Jörn Ungermann's avatar
Jörn Ungermann committed
            Magnetic distribution to which the projectors of the dataset should be applied.

        Returns
        -------
        phasemaps : list of :class:`~.phasemap.PhaseMap`
Jörn Ungermann's avatar
Jörn Ungermann committed
            A list of the phase maps resulting from the projections specified in the dataset.

        self._log.debug('Calling create_phasemaps')
        phasemaps = []
        for i, projector in enumerate(self.projectors):
            phasemap = self.phasemappers[i](mag_proj)
            if difference:
                phasemap -= self.phasemaps[i]
            if ramp is not None:
                assert type(ramp) == Ramp, 'correct_ramp has to be a Ramp object!'
                phasemap += ramp(index=i)
            phasemap.mask = mag_proj.get_mask()[0, ...]
            phasemaps.append(phasemap)
        return phasemaps
Jörn Ungermann's avatar
Jörn Ungermann committed

    def set_Se_inv_block_diag(self, cov_list):
        """Set the Se_inv matrix as a block diagonal matrix

        Parameters
        ----------
        cov_list: list of :class:`~numpy.ndarray`
            List of inverted covariance matrices (one for each projection).

        Returns
        -------
            None

        self._log.debug('Calling set_Se_inv_block_diag')
        assert len(cov_list) == len(self.phasemaps), 'Needs one covariance matrix per phase map!'
        self.Se_inv = sparse.block_diag(cov_list).tocsr()

    def set_Se_inv_diag_with_conf(self, conf_list=None):
        """Set the Se_inv matrix as a block diagonal matrix from a list of confidence matrizes.
        conf_list: list of :class:`~numpy.ndarray` (optional)
            List of 2D confidence matrizes (one for each projection) which define trust regions.
            If not given this uses the confidence matrizes of the phase maps.
        self._log.debug('Calling set_Se_inv_diag_with_conf')
        if conf_list is None:  # if no confidence matrizes are given, extract from the phase maps!
            conf_list = [phasemap.confidence for phasemap in self.phasemaps]
        cov_list = [sparse.diags(c.ravel().astype(np.float32), 0) for c in conf_list]
        self.set_Se_inv_block_diag(cov_list)

    def set_3d_mask(self, mask_list=None, threshold=0.9):
        """Set the 3D mask from a list of 2D masks.

        Parameters
        ----------
        mask_list: list of :class:`~numpy.ndarray` (optional)
            List of 2D masks, which represent the projections of the 3D mask. If not given this
            uses the mask matrizes of the phase maps. If just one phase map is present, the
            according mask is simply expanded to 3D and used directly.
        threshold: float, optional
            The threshold, describing the minimal number of 2D masks which have to extrude to the
            point in 3D to be considered valid as containing magnetisation. `threshold` is a
            relative number in the range of [0, 1]. The default is 0.9. Choosing a value of 1 is
            the strictest possible setting (every 2D mask has to contain a 3D point to be valid).
        self._log.debug('Calling set_3d_mask')
        if mask_list is None:  # if no masks are given, extract from phase maps:
            mask_list = [phasemap.mask for phasemap in self.phasemaps]
        if len(mask_list) == 1:  # just one phasemap --> 3D mask equals 2D mask
            self.mask = np.expand_dims(mask_list[0], axis=0)  # z-dim is set to 1!
        else:  # 3D mask has to be constructed from 2D masks:
            for i, projector in enumerate(self.projectors):
                mask_2d = self.phasemaps[i].mask.reshape(-1)  # 2D mask
                # Add extrusion of 2D mask:
                mask_3d += projector.weight.T.dot(mask_2d).reshape(self.dim)
            self.mask = np.where(mask_3d >= threshold * self.count, True, False)
    def save(self, filename, overwrite=True):
        """Saves the dataset as a collection of HDF5 files.

        Parameters
        ----------
        filename: str
            Base name of the files which the dataset is saved into. HDF5 files are supported.
        overwrite: bool, optional
            If True (default), an existing file will be overwritten, if False, this
            (silently!) does nothing.
        """
        from .file_io.io_dataset import save_dataset
        save_dataset(self, filename, overwrite)

        """If it exists, display the 3D mask of the magnetization distribution.
        self._log.debug('Calling plot_mask')
        if self.mask is not None:
            return ScalarData(self.a, self.mask).plot_mask(**kwargs)

    def plot_phasemaps(self, magdata=None, title='Phase Map', difference=False, ramp=None,
                       **kwargs):
        """Display all phasemaps saved in the :class:`~.DataSet` as a colormesh.
Jörn Ungermann's avatar
Jörn Ungermann committed

        Parameters
        ----------
        magdata : :class:`~.VectorData`, optional
Jörn Ungermann's avatar
Jörn Ungermann committed
            Magnetic distribution to which the projectors of the dataset should be applied. If not
            given, the phasemaps in the dataset are used.
Jörn Ungermann's avatar
Jörn Ungermann committed
        title : string, optional
            The main part of the title of the plots. The default is 'Phase Map'. Additional
            projector info is appended to this.

        Returns
        -------
        None

        self._log.debug('Calling plot_phasemaps')
            phasemaps = self.create_phasemaps(magdata, difference=difference, ramp=ramp)
Jörn Ungermann's avatar
Jörn Ungermann committed
        else:
        [phasemap.plot_phase('{} ({})'.format(title, self.projectors[i].get_info()), **kwargs)
         for (i, phasemap) in enumerate(phasemaps)]
Jörn Ungermann's avatar
Jörn Ungermann committed

    def plot_phasemaps_combined(self, magdata=None, title='Combined Plot', difference=False,
                                ramp=None, **kwargs):
        """Display all phasemaps and the resulting color coded holography images.
Jörn Ungermann's avatar
Jörn Ungermann committed

        Parameters
        ----------
        magdata : :class:`~.VectorData`, optional
Jörn Ungermann's avatar
Jörn Ungermann committed
            Magnetic distribution to which the projectors of the dataset should be applied. If not
            given, the phasemaps in the dataset are used.
Jörn Ungermann's avatar
Jörn Ungermann committed
        title : string, optional
            The title of the plot. The default is 'Combined Plot'.
        cmap : string, optional
            The :class:`~matplotlib.colors.Colormap` which is used for the plot as a string.
            The default is 'RdBu'.

        Returns
        -------
        None

        self._log.debug('Calling plot_phasemaps_combined')
            phasemaps = self.create_phasemaps(magdata, difference=difference, ramp=ramp)
Jörn Ungermann's avatar
Jörn Ungermann committed
        else:
            phasemaps = self.phasemaps
        for (i, phasemap) in enumerate(phasemaps):
            phasemap.plot_combined('{} ({})'.format(title, self.projectors[i].get_info()),
Jörn Ungermann's avatar
Jörn Ungermann committed
        plt.show()