Skip to content
Snippets Groups Projects
Forked from empyre / empyre
334 commits behind the upstream repository.
forwardmodel.py 14.47 KiB
# -*- coding: utf-8 -*-
# Copyright 2014 by Forschungszentrum Juelich GmbH
# Author: J. Caron
#
"""This module provides the :class:`~.ForwardModel` class which represents a strategy to map a
threedimensional magnetization distribution onto a two-dimensional phase map."""

from __future__ import division

import logging
import multiprocessing as mp
import sys

import numpy as np

from pyramid.dataset import DataSet
from pyramid.fielddata import VectorData
from pyramid.ramp import Ramp

__all__ = ['ForwardModel', 'DistributedForwardModel']


class ForwardModel(object):
    """Class for mapping 3D magnetic distributions to 2D phase maps.

    Represents a strategy for the mapping of a 3D magnetic distribution to two-dimensional
    phase maps. A :class:`~.DataSet` object is given which is used as input for the model
    (projectors, phase_mappers, etc.). A `ramp_order` can be specified to add polynomial ramps
    to the constructed phase maps (which can also be reconstructed!). A :class:`~.Ramp` class
    object will be constructed accordingly, which also holds all info about the ramps after a
    reconstruction.

    Attributes
    ----------
    data_set: :class:`~dataset.DataSet`
        :class:`~dataset.DataSet` object, which stores all required information calculation.
    ramp_order : int or None (default)
        Polynomial order of the additional phase ramp which will be added to the phase maps.
        All ramp parameters have to be at the end of the input vector and are split automatically.
        Default is None (no ramps are added).
    m: int
        Size of the image space. Number of pixels of the 2-dimensional projected grid.
    n: int
        Size of the input space. Number of voxels of the 3-dimensional grid.

    """

    _log = logging.getLogger(__name__ + '.ForwardModel')

    def __init__(self, data_set, ramp_order=None):
        self._log.debug('Calling __init__')
        self.data_set = data_set
        self.ramp_order = ramp_order
        self.phase_mappers = self.data_set.phase_mappers
        self.ramp = Ramp(self.data_set, self.ramp_order)
        self.m = self.data_set.m
        self.n = self.data_set.n
        if self.ramp.n is not None:  # Additional parameters have to be fitted!
            self.n += self.ramp.n
        self.shape = (self.m, self.n)
        self.hook_points = data_set.hook_points
        self.mag_data = VectorData(self.data_set.a, np.zeros((3,) + self.data_set.dim))
        self._log.debug('Creating ' + str(self))

    def __repr__(self):
        self._log.debug('Calling __repr__')
        return '%s(data_set=%r)' % (self.__class__, self.data_set)

    def __str__(self):
        self._log.debug('Calling __str__')
        return 'ForwardModel(data_set=%s)' % self.data_set

    def __call__(self, x):
        # Extract ramp parameters if necessary (x will be shortened!):
        x = self.ramp.extract_ramp_params(x)
        # Reset mag_data and fill with vector:
        self.mag_data.field[...] = 0
        self.mag_data.set_vector(x, self.data_set.mask)
        # Simulate all phase maps and create result vector:
        result = np.zeros(self.m)
        hp = self.hook_points
        for i, projector in enumerate(self.data_set.projectors):
            mapper = self.phase_mappers[projector.dim_uv]
            phase_map = mapper(projector(self.mag_data))
            phase_map += self.ramp(i)  # add ramp!
            result[hp[i]:hp[i + 1]] = phase_map.phase_vec
        return np.reshape(result, -1)

    def jac_dot(self, x, vector):
        """Calculate the product of the Jacobi matrix with a given `vector`.

        Parameters
        ----------
        x : :class:`~numpy.ndarray` (N=1)
            Evaluation point of the jacobi-matrix. The Jacobi matrix is constant for a linear
            problem, thus `x` can be set to None (it is not used int the computation). It is
            implemented for the case that in the future nonlinear problems have to be solved.
        vector : :class:`~numpy.ndarray` (N=1)
            Vectorized form of the 3D magnetization distribution. First the `x`, then the `y` and
            lastly the `z` components are listed. Ramp parameters are also added at the end if
            necessary.

        Returns
        -------
        result_vector : :class:`~numpy.ndarray` (N=1)
            Product of the Jacobi matrix (which is not explicitely calculated) with the input
            `vector`.

        """
        # Extract ramp parameters if necessary (vector will be shortened!):
        vector = self.ramp.extract_ramp_params(vector)
        # Reset mag_data and fill with vector:
        self.mag_data.field[...] = 0
        self.mag_data.set_vector(vector, self.data_set.mask)
        # Simulate all phase maps and create result vector:
        result = np.zeros(self.m)
        hp = self.hook_points
        for i, projector in enumerate(self.data_set.projectors):
            mag_vec = self.mag_data.field_vec
            mapper = self.phase_mappers[projector.dim_uv]
            res = mapper.jac_dot(projector.jac_dot(mag_vec))
            res += self.ramp.jac_dot(i)  # add ramp!
            result[hp[i]:hp[i + 1]] = res
        return result

    def jac_T_dot(self, x, vector):
        """'Calculate the product of the transposed Jacobi matrix with a given `vector`.

        Parameters
        ----------
        x : :class:`~numpy.ndarray` (N=1)
            Evaluation point of the jacobi-matrix. The jacobi matrix is constant for a linear
            problem, thus `x` can be set to None (it is not used int the computation). Is used
            for the case that in the future nonlinear problems have to be solved.
        vector : :class:`~numpy.ndarray` (N=1)
            Vectorized form of all 2D phase maps one after another in one vector.

        Returns
        -------
        result_vector : :class:`~numpy.ndarray` (N=1)
            Product of the transposed Jacobi matrix (which is not explicitely calculated) with
            the input `vector`. If necessary, transposed ramp parameters are concatenated.

        """
        proj_T_result = np.zeros(3 * np.prod(self.data_set.dim))
        hp = self.hook_points
        for i, projector in enumerate(self.data_set.projectors):
            sub_vec = vector[hp[i]:hp[i + 1]]
            mapper = self.phase_mappers[projector.dim_uv]
            proj_T_result += projector.jac_T_dot(mapper.jac_T_dot(sub_vec))
        self.mag_data.field_vec = proj_T_result
        result = self.mag_data.get_vector(self.data_set.mask)
        ramp_params = self.ramp.jac_T_dot(vector)  # calculate ramp_params separately!
        return np.concatenate((result, ramp_params))

    def finalize(self):
        """'Finalize the processes and let them join the master process (NOT USED HERE!).

        Returns
        -------
        None

        """
        pass


class DistributedForwardModel(ForwardModel):
    """Multiprocessing class for mapping 3D magnetic distributions to 2D phase maps.

    Subclass of the :class:`~.ForwardModel` class which implements multiprocessing strategies
    to speed up the calculations. The interface is the same, internally, the processes and one
    ForwardModel operating on a subset of the DataSet per process are created during construction.
    Ramps are calculated in the main thread. The :func:`~.finalize` method can be used to force
    the processes to join if the class is no longer used.

    Attributes
    ----------
    data_set: :class:`~dataset.DataSet`
        :class:`~dataset.DataSet` object, which stores all required information calculation.
    ramp_order : int or None (default)
        Polynomial order of the additional phase ramp which will be added to the phase maps.
        All ramp parameters have to be at the end of the input vector and are split automatically.
        Default is None (no ramps are added).
    nprocs: int
        Number of processes which should be created. Default is 1 (not recommended).

    """

    def __init__(self, data_set, ramp_order=None, nprocs=1):
        # Evoke super constructor to set up the normal ForwardModel:
        super().__init__(data_set, ramp_order)
        # Initialize multirocessing specific stuff:
        self.nprocs = nprocs
        img_per_proc = np.ceil(self.data_set.count / self.nprocs).astype(np.int)
        hp = self.data_set.hook_points
        self.proc_hook_points = [0]
        self.pipes = []
        self.processes = []
        for proc_id in range(self.nprocs):
            # Create SubDataSets:
            sub_data = DataSet(self.data_set.a, self.data_set.dim, self.data_set.b_0,
                               self.data_set.mask, self.data_set.Se_inv)
            # Distribute data to SubDataSets:
            start = proc_id * img_per_proc
            stop = np.min(((proc_id + 1) * img_per_proc, self.data_set.count))
            self.proc_hook_points.append(hp[stop])
            sub_data.phase_maps = self.data_set.phase_maps[start:stop]
            sub_data.projectors = self.data_set.projectors[start:stop]
            # Create SubForwardModel:
            sub_fwd_model = ForwardModel(sub_data, ramp_order=None)  # ramps handled in master!
            # Create communication pipe:
            self.pipes.append(mp.Pipe())
            # Create process:
            p = mp.Process(name='Worker {}'.format(proc_id), target=_worker,
                           args=(sub_fwd_model, self.pipes[proc_id][1]))
            self.processes.append(p)
            # Start process:
            p.start()
        self._log.debug('Creating ' + str(self))

    def __call__(self, x):
        # Extract ramp parameters if necessary (x will be shortened!):
        x = self.ramp.extract_ramp_params(x)
        # Distribute input to processes and start working:
        for proc_id in range(self.nprocs):
            self.pipes[proc_id][0].send(('__call__', (x,)))
        # Initialize result vector and shorten hook point names:
        result = np.zeros(self.m)
        hp = self.hook_points
        php = self.proc_hook_points
        # Calculate ramps (if necessary):
        if self.ramp_order is not None:
            for i in range(self.data_set.count):
                result[hp[i]:hp[i + 1]] += self.ramp(i).phase.ravel()
        # Get process results from the pipes:
        for proc_id in range(self.nprocs):
            result[php[proc_id]:php[proc_id + 1]] += self.pipes[proc_id][0].recv()
        # Return result:
        return result

    def jac_dot(self, x, vector):
        """Calculate the product of the Jacobi matrix with a given `vector`.

        Parameters
        ----------
        x : :class:`~numpy.ndarray` (N=1)
            Evaluation point of the jacobi-matrix. The Jacobi matrix is constant for a linear
            problem, thus `x` can be set to None (it is not used int the computation). It is
            implemented for the case that in the future nonlinear problems have to be solved.
        vector : :class:`~numpy.ndarray` (N=1)
            Vectorized form of the 3D magnetization distribution. First the `x`, then the `y` and
            lastly the `z` components are listed. Ramp parameters are also added at the end if
            necessary.

        Returns
        -------
        result_vector : :class:`~numpy.ndarray` (N=1)
            Product of the Jacobi matrix (which is not explicitely calculated) with the input
            `vector`.

        """
        # Extract ramp parameters if necessary (x will be shortened!):
        vector = self.ramp.extract_ramp_params(vector)
        # Distribute input to processes and start working:
        for proc_id in range(self.nprocs):
            self.pipes[proc_id][0].send(('jac_dot', (None, vector)))
        # Initialize result vector and shorten hook point names:
        result = np.zeros(self.m)
        hp = self.hook_points
        php = self.proc_hook_points
        # Calculate ramps (if necessary):
        if self.ramp_order is not None:
            for i in range(self.data_set.count):
                result[hp[i]:hp[i + 1]] += self.ramp.jac_dot(i)
        # Get process results from the pipes:
        for proc_id in range(self.nprocs):
            result[php[proc_id]:php[proc_id + 1]] += self.pipes[proc_id][0].recv()
        # Return result:
        return result

    def jac_T_dot(self, x, vector):
        """'Calculate the product of the transposed Jacobi matrix with a given `vector`.

        Parameters
        ----------
        x : :class:`~numpy.ndarray` (N=1)
            Evaluation point of the jacobi-matrix. The jacobi matrix is constant for a linear
            problem, thus `x` can be set to None (it is not used int the computation). Is used
            for the case that in the future nonlinear problems have to be solved.
        vector : :class:`~numpy.ndarray` (N=1)
            Vectorized form of all 2D phase maps one after another in one vector.

        Returns
        -------
        result_vector : :class:`~numpy.ndarray` (N=1)
            Product of the transposed Jacobi matrix (which is not explicitely calculated) with
            the input `vector`. If necessary, transposed ramp parameters are concatenated.

        """
        php = self.proc_hook_points
        # Distribute input to processes and start working:
        for proc_id in range(self.nprocs):
            sub_vec = vector[php[proc_id]:php[proc_id + 1]]
            self.pipes[proc_id][0].send(('jac_T_dot', (None, sub_vec)))
        # Calculate ramps:
        ramp_params = self.ramp.jac_T_dot(vector)  # calculate ramp_params separately!
        # Initialize result vector:
        result = np.zeros(3 * self.data_set.mask.sum())
        # Get process results from the pipes:
        for proc_id in range(self.nprocs):
            result += self.pipes[proc_id][0].recv()
        # Return result:
        return np.concatenate((result, ramp_params))

    def finalize(self):
        """'Finalize the processes and let them join the master process.

        Returns
        -------
        None

        """
        # Finalize processes:
        for proc_id in range(self.nprocs):
            self.pipes[proc_id][0].send('STOP')
        # Exit the completed processes:
        for p in self.processes:
            p.join()


def _worker(fwd_model, pipe):
    # Has to be directly accessible in the module as a function, NOT a method of a class instance!
    print('... {} starting!'.format(mp.current_process().name))
    sys.stdout.flush()
    for method, arguments in iter(pipe.recv, 'STOP'):
        # '... {} processes method {}'.format(mp.current_process().name, method)
        sys.stdout.flush()
        result = getattr(fwd_model, method)(*arguments)
        pipe.send(result)
    print('... ', mp.current_process().name, 'exiting!')
    sys.stdout.flush()