From 0d7c79db75aa3b9e3124be52aa47f7442cb1202c Mon Sep 17 00:00:00 2001
From: Jan Caron <j.caron@fz-juelich.de>
Date: Thu, 30 Jul 2015 12:50:34 +0200
Subject: [PATCH] Multiprocessing implementation and minor changes!
 forward_model: DistributedForwardModel created which uses the multiprocessing
                package for distributed computations.

---
 pyramid/colormap.py                           |   6 +-
 pyramid/dataset.py                            |  26 +--
 pyramid/forwardmodel.py                       | 217 +++++++++++++++++-
 pyramid/magdata.py                            |   2 +-
 pyramid/ramp.py                               |  68 +++---
 pyramid/regularisator.py                      |   1 +
 pyramid/version.py                            |   2 +-
 scripts/phasemap/phasemap_from_dm3.py         |  21 +-
 scripts/phasemap/phasemap_from_image.py       |   8 +-
 .../reconstruction_2d_from_phasemap.py        |   7 +-
 .../reconstruction_3d_from_magdata.py         | 126 +++++-----
 scripts/temp/test_hyperspy.py                 |  43 ++++
 scripts/temp/test_multiprocessing.py          |  90 ++++----
 scripts/temp/test_multiprocessing_main.py     |  74 ++++++
 14 files changed, 508 insertions(+), 183 deletions(-)
 create mode 100644 scripts/temp/test_hyperspy.py
 create mode 100644 scripts/temp/test_multiprocessing_main.py

diff --git a/pyramid/colormap.py b/pyramid/colormap.py
index 4f9e7b4..48c9ff8 100644
--- a/pyramid/colormap.py
+++ b/pyramid/colormap.py
@@ -10,6 +10,7 @@ import numpy as np
 import matplotlib as mpl
 import matplotlib.pyplot as plt
 from PIL import Image
+from numbers import Number
 
 import logging
 
@@ -157,7 +158,10 @@ class DirectionalColormap(mpl.colors.LinearSegmentedColormap):
 
         '''
         cls._log.debug('Calling rgb_from_colorind_and_saturation')
-        c, s = np.ravel(colorind), np.ravel(saturation)
+        # Make sure colorind and saturation are arrays (if not make it so):
+        c = np.array([colorind]) if isinstance(colorind, Number) else colorind
+        s = np.array([saturation]) if isinstance(saturation, Number) else saturation
+        # Calculate rgb and the inverse and combine them:
         rgb_norm = (np.minimum(s, np.ones_like(s)).T * cls.HOLO_CMAP(c)[..., :3].T).T
         rgb_invs = (np.maximum(s-1, np.zeros_like(s)).T * cls.HOLO_CMAP_INV(c)[..., :3].T).T
         return (255.999 * (rgb_norm + rgb_invs)).astype(np.uint8)
diff --git a/pyramid/dataset.py b/pyramid/dataset.py
index 4262212..4a8f9af 100644
--- a/pyramid/dataset.py
+++ b/pyramid/dataset.py
@@ -238,7 +238,7 @@ class DataSet(object):
         if mask_list is None:  # if no masks are given, extract from phase maps:
             mask_list = [phase_map.mask for phase_map in self.phase_maps]
         if len(mask_list) == 1:  # just one phase_map --> 3D mask equals 2D mask
-            self.mask = np.expand_dims(mask_list[0], axis=0)
+            self.mask = np.expand_dims(mask_list[0], axis=0)  # z-dim is set to 1!
         else:  # 3D mask has to be constructed from 2D masks:
             mask_3d_inv = np.zeros(self.dim)
             for i, projector in enumerate(self.projectors):
@@ -360,27 +360,3 @@ class DataSet(object):
             phase_map.display_combined('{} ({})'.format(title, self.projectors[i].get_info()),
                                        cmap, limit, norm, gain, interpolation, grad_encode)
         plt.show()
-
-
-# TODO: Multiprocessing! ##########################################################################
-# TODO: Use proxy objects? (https://docs.python.org/2/library/multiprocessing.html#proxy-objects)
-# class DistributedDataSet(DataSet):
-#
-#    @property
-#    def count(self):
-#        return np.sum([len(data_set.projectors) for data_set in self.data_sets])
-#
-#    def __init__(self, a, dim, b_0=1, mask=None, Se_inv=None):
-#        # TODO: get number of processes!
-#        self.nprocs = 4
-#        self.data_sets = []
-#        for proc_ind in range(self.nprocs):
-#            # TODO: Create processes and let DataSet live there!
-#            self.data_sets.append(DataSet(a, dim, b_0, mask, Se_inv))
-#
-#    def __get_item__(self, key):
-#        return self.data_sets[key]
-#
-#    def append(self, phase_map, projector):
-#        self.data_sets[self.count % self.nprocs].append(phase_map, projector)
-###################################################################################################
diff --git a/pyramid/forwardmodel.py b/pyramid/forwardmodel.py
index da4b139..24173a4 100644
--- a/pyramid/forwardmodel.py
+++ b/pyramid/forwardmodel.py
@@ -6,15 +6,20 @@
 threedimensional magnetization distribution onto a two-dimensional phase map."""
 
 
+from __future__ import division
+
+import sys
 import numpy as np
+import multiprocessing as mp
 
 from pyramid.magdata import MagData
 from pyramid.ramp import Ramp
+from pyramid.dataset import DataSet
 
 import logging
 
 
-__all__ = ['ForwardModel']
+__all__ = ['ForwardModel', 'DistributedForwardModel']
 
 
 class ForwardModel(object):
@@ -152,9 +157,6 @@ class ForwardModel(object):
             result[hp[i]:hp[i+1]] = res
         return result
 
-    def _jac_dot_element(self, mag_vec, projector, phasemapper):
-            return phasemapper.jac_dot(projector.jac_dot(mag_vec))  # TODO: ???
-
     def jac_T_dot(self, x, vector):
         ''''Calculate the product of the transposed Jacobi matrix with a given `vector`.
 
@@ -185,13 +187,206 @@ class ForwardModel(object):
         ramp_params = self.ramp.jac_T_dot(vector)  # calculate ramp_params separately!
         return np.concatenate((result, ramp_params))
 
+    def finalize(self):
+        pass
+
+
 # TODO: Multiprocessing! ##########################################################################
-#class DistributedForwardModel(ForwardModel):
+class DistributedForwardModel(ForwardModel):
+
+    def __init__(self, data_set, ramp_order=None, nprocs=1):
+        super(DistributedForwardModel, self).__init__(data_set, ramp_order)
+        self.nprocs = nprocs
+        img_per_proc = np.ceil(self.data_set.count / self.nprocs).astype(np.int)
+        hp = self.data_set.hook_points
+        self.proc_hook_points = [0]
+#        self.sub_fwd_models = []
+        self.pipes = []
+        self.processes = []
+        for proc_id in range(self.nprocs):
+            # 'PARENT Create SubDataSets:'
+            sub_data = DataSet(self.data_set.a, self.data_set.dim, self.data_set.b_0,
+                               self.data_set.mask, self.data_set.Se_inv)
+            # 'PARENT Distribute data to SubDataSets:'
+            start = proc_id*img_per_proc
+            stop = np.min(((proc_id+1)*img_per_proc, self.data_set.count))
+            self.proc_hook_points.append(hp[stop])
+            sub_data.phase_maps = self.data_set.phase_maps[start:stop]
+            sub_data.projectors = self.data_set.projectors[start:stop]
+            # '... PARENT Create SubForwardModel {}'.format(proc_id)
+            sub_fwd_model = ForwardModel(sub_data, ramp_order=None)  # ramps handled in master!
+            # '... PARENT Create communication pipe {}'.format(proc_id)
+            self.pipes.append(mp.Pipe())
+            # '... PARENT Create process {}'.format(proc_id)
+            p = mp.Process(name='Worker {}'.format(proc_id), target=_worker,
+                           args=(sub_fwd_model, self.pipes[proc_id][1]))
+            self.processes.append(p)
+            # '... PARENT Start process {}'.format(p.name)
+            p.start()
+        # 'PARENT Finish __init__\n'
+        self._log.debug('Creating '+str(self))
+
+#        print 'PARENT Distribute data to SubDataSets:'
+#        for count_id in range(self.data_set.count):
+#            proc_id = count_id % nprocs
+#            print '... PARENT send image {} to subDataSet {}'.format(count_id, proc_id)
+#            self.sub_data_sets[proc_id].append(self.data_set.phase_maps[count_id],
+#                                               self.data_set.projectors[count_id])
+#        print 'PARENT Start up processes:'
+#        self.sub_fwd_models = []
+#        self.pipes = []
+#        self.processes = []
+#        for proc_id in range(self.nprocs):
+#            print '... PARENT Create SubForwardModel {}'.format(proc_id)
+#            self.sub_fwd_models.append(ForwardModel(self.sub_data_sets[proc_id], ramp_order))
+#            print '... PARENT Create communication pipe {}'.format(proc_id)
+#            self.pipes.append(mp.Pipe())
+#            print '... PARENT Create process {}'.format(proc_id)
+#            p = mp.Process(name='Worker {}'.format(proc_id), target=_worker,
+#                           args=(self.sub_fwd_models[proc_id], self.pipes[proc_id][1]))
+#            self.processes.append(p)
+#            print '... PARENT Start process {}'.format(p.name)
+#            p.start()
+#        print 'PARENT Finish __init__\n'
+#        self._log.debug('Creating '+str(self))
+
+    def __call__(self, x):
+        # Extract ramp parameters if necessary (x will be shortened!):
+        # TODO: how to handle here? all in main thread? distribute to sub_x and give to process?
+        # TODO: later method of distributed dataset?
+        # TODO: After reconstruction is this saved in this ramp? solve with property?
+        x = self.ramp.extract_ramp_params(x)
+        # Simulate all phase maps and create result vector:
+        # 'PARENT Distribute input to processes and start working:'
+        for proc_id in range(self.nprocs):
+            # TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
+            # '... PARENT Send input to process {}'.format(proc_id)
+            self.pipes[proc_id][0].send(('__call__', (x,)))
+        # TODO: Calculate ramps here? There should be waiting time... Init result with ramps!
+        result = np.zeros(self.m)
+        hp = self.hook_points
+        php = self.proc_hook_points
+        # '... PARENT Calculate ramps:'
+        if self.ramp_order is not None:
+            for i in range(self.data_set.count):
+                result[hp[i]:hp[i+1]] += self.ramp(i).phase.ravel()
+        # 'PARENT Get process results from the pipes:'
+#        sub_results = []
+        for proc_id in range(self.nprocs):
+            # '... PARENT Retrieve results from process {}'.format(proc_id)
+            result[php[proc_id]:php[proc_id+1]] += self.pipes[proc_id][0].recv()
+        return result
+
+
+    def jac_dot(self, x, vector):
+        # Extract ramp parameters if necessary (x will be shortened!):
+        # TODO: how to handle here? all in main thread? distribute to sub_x and give to process?
+        # TODO: later method of distributed dataset?
+        # TODO: After reconstruction is this saved in this ramp? solve with property?
+        vector = self.ramp.extract_ramp_params(vector)
+        # Simulate all phase maps and create result vector:
+        # 'PARENT Distribute input to processes and start working:'
+        for proc_id in range(self.nprocs):
+            # TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
+            # '... PARENT Send input to process {}'.format(proc_id)
+            self.pipes[proc_id][0].send(('jac_dot', (None, vector)))
+        # TODO: Calculate ramps here? There should be waiting time...
+        result = np.zeros(self.m)
+        hp = self.hook_points
+        php = self.proc_hook_points
+        # '... PARENT Calculate ramps:'
+        if self.ramp_order is not None:
+            for i in range(self.data_set.count):
+                result[hp[i]:hp[i+1]] += self.ramp.jac_dot(i)
+        # 'PARENT Get process results from the pipes:'
+        for proc_id in range(self.nprocs):
+            # '... PARENT Retrieve results from process {}'.format(proc_id)
+            result[php[proc_id]:php[proc_id+1]] += self.pipes[proc_id][0].recv()
+        return result
+#        for proc_id in range(self.nprocs):
+#            # '... PARENT Retrieve results from process {}'.format(proc_id)
+#            sub_results.append(self.pipes[proc_id][0].recv())
 #
-#    def __init__(self, distributed_data_set, ramp_order=None):
-#        self.nprocs = distributed_data_set.nprocs
-#        self.fwd_models = []
-#        for proc_ind in range(self.nprocs):
-#            data_set = distributed_data_set[proc_ind]
-#            self.fwd_models.append(ForwardModel(data_set))
+#        # TODO: SORTING!!!! maybe return also the hookpoints?
+#        for i in range(self.data_set.count):  # Go over all projections!
+#            proc_id = i % self.nprocs
+#            j = i // self.nprocs  # index for subresult!
+#            result[hp[i]:hp[i+1]] = sub_results[proc_id][hp[j]:hp[j+1]]
+
+
+    def jac_T_dot(self, x, vector):
+        hp = self.hook_points
+        php = self.proc_hook_points
+        for proc_id in range(self.nprocs):
+            sub_vec = vector[php[proc_id]:php[proc_id+1]]
+            self.pipes[proc_id][0].send(('jac_T_dot', (None, sub_vec)))
+
+        ramp_params = self.ramp.jac_T_dot(vector)  # calculate ramp_params separately!
+        result = np.zeros(3*self.data_set.mask.sum())
+
+        for proc_id in range(self.nprocs):
+            sub_vec = vector[php[proc_id]:php[proc_id+1]]
+            result += self.pipes[proc_id][0].recv()
+
+        return np.concatenate((result, ramp_params))
+
+
+        print 'PARENT Distribute input to processes and start working:'
+        for proc_id in range(self.nprocs):
+            # TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
+            print '... PARENT Send input to process {}'.format(proc_id)
+            self.pipes[proc_id][0].send(('jac_T_dot', (None, vector)))
+        print 'PARENT Get process results from the pipes:'
+        sub_results = []
+        for proc_id in range(self.nprocs):
+            print '... PARENT Retrieve results from process {}'.format(proc_id)
+            sub_results.append(self.pipes[proc_id][0].recv())
+        result = np.zeros(self.m)
+        hp = self.hook_points
+        # TODO: Calculate ramps here? There should be waiting time...
+        # TODO: SORTING!!!! maybe return also the hookpoints?
+        for i in range(self.data_set.count):  # Go over all projections!
+            proc_id = i % self.nprocs
+            j = i // self.nprocs  # index for subresult!
+            result[hp[i]:hp[i+1]] = sub_results[proc_id][hp[j]:hp[j+1]]
+        return result
+
+
+
+        proj_T_result = np.zeros(3*np.prod(self.data_set.dim))
+        hp = self.hook_points
+        for i, projector in enumerate(self.data_set.projectors):
+            sub_vec = vector[hp[i]:hp[i+1]]
+            mapper = self.phase_mappers[projector.dim_uv]
+            proj_T_result += projector.jac_T_dot(mapper.jac_T_dot(sub_vec))
+        self.mag_data.mag_vec = proj_T_result
+        result = self.mag_data.get_vector(self.data_set.mask)
+        ramp_params = self.ramp.jac_T_dot(vector)  # calculate ramp_params separately!
+        return np.concatenate((result, ramp_params))
+
+    def finalize(self):
+        # 'PARENT Finalize processes:'
+        for proc_id in range(self.nprocs):
+            self.pipes[proc_id][0].send('STOP')
+        # 'PARENT Exit the completed processes:'
+        for p in self.processes:
+            p.join()
+        # 'PARENT All processes joint!'
+
+
+def _worker(fwd_model, pipe):
+    # Has to be directly accessible in the module as a function, NOT a method of a class instance!
+    # '... {} starting!'.format(mp.current_process().name)
+    sys.stdout.flush()
+    for method, arguments in iter(pipe.recv, 'STOP'):
+        # '... {} processes method {}'.format(mp.current_process().name, method)
+        sys.stdout.flush()
+        result = getattr(fwd_model, method)(*arguments)
+        pipe.send(result)
+    # '... ', mp.current_process().name, 'exiting!'
+    sys.stdout.flush()
+
+
+
+
 ###################################################################################################
diff --git a/pyramid/magdata.py b/pyramid/magdata.py
index 88473d8..d5128fb 100644
--- a/pyramid/magdata.py
+++ b/pyramid/magdata.py
@@ -354,7 +354,7 @@ class MagData(object):
         self._log.debug('Calling set_vector')
         vector = np.asarray(vector, dtype=fft.FLOAT)
         assert np.size(vector) % 3 == 0, 'Vector has to contain all 3 components for every pixel!'
-        count = np.size(vector)/3
+        count = np.size(vector)//3
         if mask is not None:
             self.magnitude[0][mask] = vector[:count]  # x-component
             self.magnitude[1][mask] = vector[count:2*count]  # y-component
diff --git a/pyramid/ramp.py b/pyramid/ramp.py
index c12b0a0..819de44 100644
--- a/pyramid/ramp.py
+++ b/pyramid/ramp.py
@@ -80,7 +80,8 @@ class Ramp(object):
             # Iterate over all degrees of freedom:
             for dof in dof_list:
                 # Add the contribution of the current degree of freedom:
-                phase_ramp += self.param_cache[dof][index] * self.create_poly_mesh(dof, dim_uv)
+                phase_ramp += self.param_cache[dof][index] * \
+                              self.create_poly_mesh(self.data_set.a, dof, dim_uv)
             return PhaseMap(self.data_set.a, phase_ramp, mask=np.zeros(dim_uv, dtype=np.bool))
 
     def jac_dot(self, index):
@@ -108,7 +109,8 @@ class Ramp(object):
             # Iterate over all degrees of freedom:
             for dof in range(self.deg_of_freedom):
                 # Add the contribution of the current degree of freedom:
-                phase_ramp += self.param_cache[dof][index] * self.create_poly_mesh(dof, dim_uv)
+                phase_ramp += self.param_cache[dof][index] * \
+                              self.create_poly_mesh(self.data_set.a, dof, dim_uv)
             return np.ravel(phase_ramp)
 
     def jac_T_dot(self, vector):
@@ -132,36 +134,11 @@ class Ramp(object):
             # Iterate over all projectors:
             for i, projector in enumerate(self.data_set.projectors):
                 sub_vec = vector[hp[i]:hp[i+1]]
-                poly_mesh = self.create_poly_mesh(dof, projector.dim_uv)
+                poly_mesh = self.create_poly_mesh(self.data_set.a, dof, projector.dim_uv)
                 # Transposed ramp parameters: summed product of the vector with the poly-mesh:
                 result.append(np.sum(sub_vec * np.ravel(poly_mesh)))
         return result
 
-    def create_poly_mesh(self, deg_of_freedom, dim_uv):
-        '''Create a polynomial mesh for the ramp calculation for a specific degree of freedom.
-
-        Parameters
-        ----------
-        deg_of_freedom : int
-            Current degree of freedom for which the mesh should be created. 0 corresponds to a
-            simple offset, 1 corresponds to a linear ramp in u-direction, 2 to a linear ramp in
-            v-direction and so on.
-        dim_uv : tuple (N=2)
-            Dimensions of the 2D mesh that should be created.
-
-        Returns
-        -------
-        result_mesh : :class:`~numpy.ndarray` (N=2)
-            Polynomial mesh that was created and can be used for further calculations.
-
-        '''
-        # Determine if u-direction (u_or_v == 1) or v-direction (u_or_v == 0)!
-        u_or_v = (deg_of_freedom - 1) % 2
-        # Determine polynomial order:
-        order = (deg_of_freedom + 1) // 2
-        # Return polynomial mesh:
-        return (np.indices(dim_uv)[u_or_v] * self.data_set.a) ** order
-
     def extract_ramp_params(self, x):
         '''Extract the ramp parameters of an input vector and return the rest.
 
@@ -188,3 +165,38 @@ class Ramp(object):
             x, ramp_params = np.split(x, [-self.n])
             self.param_cache = ramp_params.reshape((self.deg_of_freedom, self.data_set.count))
         return x
+
+    @classmethod
+    def create_poly_mesh(cls, a, deg_of_freedom, dim_uv):
+        '''Create a polynomial mesh for the ramp calculation for a specific degree of freedom.
+
+        Parameters
+        ----------
+        deg_of_freedom : int
+            Current degree of freedom for which the mesh should be created. 0 corresponds to a
+            simple offset, 1 corresponds to a linear ramp in u-direction, 2 to a linear ramp in
+            v-direction and so on.
+        dim_uv : tuple (N=2)
+            Dimensions of the 2D mesh that should be created.
+
+        Returns
+        -------
+        result_mesh : :class:`~numpy.ndarray` (N=2)
+            Polynomial mesh that was created and can be used for further calculations.
+
+        '''# TODO: Docstring a!
+        # Determine if u-direction (u_or_v == 1) or v-direction (u_or_v == 0)!
+        u_or_v = (deg_of_freedom - 1) % 2
+        # Determine polynomial order:
+        order = (deg_of_freedom + 1) // 2
+        # Return polynomial mesh:
+        return (np.indices(dim_uv)[u_or_v] * a) ** order
+
+    @classmethod
+    def create_ramp(cls, a, dim_uv, params):
+        #TODO: Docstring!
+        phase_ramp = np.zeros(dim_uv)
+        dof_list = range(len(params))
+        for dof in dof_list:
+            phase_ramp += params[dof] * cls.create_poly_mesh(a, dof, dim_uv)
+        return PhaseMap(a, phase_ramp, mask=np.zeros(dim_uv, dtype=np.bool))
diff --git a/pyramid/regularisator.py b/pyramid/regularisator.py
index 73cc7de..bacc1d6 100644
--- a/pyramid/regularisator.py
+++ b/pyramid/regularisator.py
@@ -247,6 +247,7 @@ class NoneRegularisator(Regularisator):
         self._log.debug('Calling __init__')
         self.norm = None
         self.lam = 0
+        self.add_params = None
         self._log.debug('Created '+str(self))
 
     def __call__(self, x):
diff --git a/pyramid/version.py b/pyramid/version.py
index 9d9f9c9..73516bd 100644
--- a/pyramid/version.py
+++ b/pyramid/version.py
@@ -1,3 +1,3 @@
 # THIS FILE IS GENERATED BY THE PYRAMID SETUP.PY
 version = "0.1.0-dev"
-hg_revision = "711e66d3213b+"
+hg_revision = "58741ae0132a+"
diff --git a/scripts/phasemap/phasemap_from_dm3.py b/scripts/phasemap/phasemap_from_dm3.py
index f76b8ac..507d48f 100644
--- a/scripts/phasemap/phasemap_from_dm3.py
+++ b/scripts/phasemap/phasemap_from_dm3.py
@@ -4,7 +4,8 @@
 
 import os
 import numpy as np
-from pyDM3reader import DM3lib as dm3
+import hyperspy.hspy as hp
+from PIL import Image
 import pyramid as py
 import logging.config
 
@@ -12,17 +13,21 @@ import logging.config
 logging.config.fileConfig(py.LOGGING_CONFIG, disable_existing_loggers=False)
 
 ###################################################################################################
-path_mag = 'zi_an_elongated_nanorod.dm3'
-path_ele = 'zi_an_elongated_nanorod_mip.dm3'
-filename = 'phasemap_dm3_zi_an_elongated_nanorod.nc'
+path_mag = 'zi_an_magnetite_4_particles_magnetic.dm3'
+path_ele = 'zi_an_magnetite_4_particles_electric_smooth.dm3'
+filename = 'phasemap_dm3_zi_an_magnetite_4_particles.nc'
 a = 1.
-dim_uv = None
-threshold = 4.5
+dim_uv = (512, 512)
+threshold = 0.5
+flip_up_down = True
 ###################################################################################################
 
 # Load images:
-im_mag = dm3.DM3(os.path.join(py.DIR_FILES, 'dm3', path_mag)).image
-im_ele = dm3.DM3(os.path.join(py.DIR_FILES, 'dm3', path_ele)).image
+im_mag = Image.fromarray(hp.load(os.path.join(py.DIR_FILES, 'dm3', path_mag)).data)
+im_ele = Image.fromarray(hp.load(os.path.join(py.DIR_FILES, 'dm3', path_ele)).data)
+if flip_up_down:
+    im_mag = im_mag.transpose(Image.FLIP_TOP_BOTTOM)
+    im_ele = im_ele.transpose(Image.FLIP_TOP_BOTTOM)
 if dim_uv is not None:
     im_mag = im_mag.resize(dim_uv)
     im_ele = im_ele.resize(dim_uv)
diff --git a/scripts/phasemap/phasemap_from_image.py b/scripts/phasemap/phasemap_from_image.py
index 35f2175..ef89eea 100644
--- a/scripts/phasemap/phasemap_from_image.py
+++ b/scripts/phasemap/phasemap_from_image.py
@@ -12,15 +12,15 @@ import logging.config
 logging.config.fileConfig(py.LOGGING_CONFIG, disable_existing_loggers=False)
 
 ###################################################################################################
-path_mag = 'trevor_magnetite_m20.bmp'
-path_mask = 'trevor_magnetite_mask20.bmp'
-filename = 'phasemap_bmp_trevor_magnetite_m20.nc'
+path_mag = 'Arnaud_M.tif'
+path_mask = 'Arnaud_MIP_mask.tif'
+filename = 'phasemap_tif_martial_magnetite.nc'
 a = 0.4  # nm
 dim_uv = None
 max_phase = 1
 threshold = 0.5
 offset = 0
-flip_up_down = True
+flip_up_down = False
 ###################################################################################################
 
 # Load images:
diff --git a/scripts/reconstruction/reconstruction_2d_from_phasemap.py b/scripts/reconstruction/reconstruction_2d_from_phasemap.py
index 1efb602..a10e1b6 100644
--- a/scripts/reconstruction/reconstruction_2d_from_phasemap.py
+++ b/scripts/reconstruction/reconstruction_2d_from_phasemap.py
@@ -11,14 +11,15 @@ import logging.config
 logging.config.fileConfig(pr.LOGGING_CONFIG, disable_existing_loggers=False)
 
 ###################################################################################################
-phase_name = 'phasemap_bmp_trevor_magnetite_m20'
+phase_name = 'phasemap_tif_martial_magnetite'
 b_0 = 0.6  # in T
 lam = 1E-3
 max_iter = 100
 buffer_pixel = 0
-order = 1
+order = 0
 ###################################################################################################
 
+
 # Load phase map:
 phase_map = pr.PhaseMap.load_from_netcdf4(phase_name+'.nc')
 phase_map.pad((buffer_pixel, buffer_pixel))
@@ -47,7 +48,7 @@ mag_name = '{}_lam={}'.format(phase_name.replace('phasemap', 'magdata_rec'), lam
 mag_data_rec.save_to_netcdf4(mag_name+'.nc')
 
 # Plot stuff:
-mag_data_rec.quiver_plot('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/128.))
+mag_data_rec.quiver_plot('Reconstructed Distribution', ar_dens=int(np.ceil(np.max(dim)/128.)))
 phase_map.crop((buffer_pixel, buffer_pixel))
 phase_map.display_combined('Input Phase')
 phase_map -= fwd_model.ramp(index=0)
diff --git a/scripts/reconstruction/reconstruction_3d_from_magdata.py b/scripts/reconstruction/reconstruction_3d_from_magdata.py
index 2fa182e..be11ea4 100644
--- a/scripts/reconstruction/reconstruction_3d_from_magdata.py
+++ b/scripts/reconstruction/reconstruction_3d_from_magdata.py
@@ -3,6 +3,7 @@
 
 
 import numpy as np
+import multiprocessing as mp
 import pyramid as pr
 from jutil.taketime import TakeTime
 import logging.config
@@ -24,67 +25,74 @@ offset_max = 2
 ramp_max = 0.01
 order = 1
 show_input = True
+# TODO: toggle multiprocessing if nprocs > 1
 ###################################################################################################
 
-# Load magnetization distribution:
-mag_data = pr.MagData.load_from_netcdf4(mag_name+'.nc')
-dim = mag_data.dim
-
-# Construct data set and regularisator:
-data = pr.DataSet(mag_data.a, mag_data.dim, b_0)
-
-# Construct projectors:
-projectors = []
-for angle in angles:
-    angle_rad = angle*np.pi/180
-    if axes['x']:
-        projectors.append(pr.XTiltProjector(mag_data.dim, angle_rad, dim_uv))
-    if axes['y']:
-        projectors.append(pr.YTiltProjector(mag_data.dim, angle_rad, dim_uv))
-
-# Add projectors and construct according phase maps:
-data.projectors = projectors
-data.phase_maps = data.create_phase_maps(mag_data)
-
-for i, phase_map in enumerate(data.phase_maps):
-    offset = np.random.uniform(-offset_max, offset_max)
-    ramp = np.random.uniform(-ramp_max, ramp_max), np.random.uniform(-ramp_max, ramp_max)
-    phase_map.add_ramp(offset, ramp)
-
-# Add noise if necessary:
-if noise != 0:
-    for i, phase_map in enumerate(data.phase_maps):
-        phase_map.phase += np.random.normal(0, noise, phase_map.dim_uv)
 
-        data.phase_maps[i] = phase_map
+if __name__ == '__main__':
+
+    mp.freeze_support()
+
+    # Load magnetization distribution:
+    mag_data = pr.MagData.load_from_netcdf4(mag_name+'.nc')
+    dim = mag_data.dim
+
+    # Construct data set and regularisator:
+    data = pr.DataSet(mag_data.a, mag_data.dim, b_0)
+
+    # Construct projectors:
+    projectors = []
+    for angle in angles:
+        angle_rad = angle*np.pi/180
+        if axes['x']:
+            projectors.append(pr.XTiltProjector(mag_data.dim, angle_rad, dim_uv))
+        if axes['y']:
+            projectors.append(pr.YTiltProjector(mag_data.dim, angle_rad, dim_uv))
+
+    # Add projectors and construct according phase maps:
+    data.projectors = projectors
+    data.phase_maps = data.create_phase_maps(mag_data)
 
-# Plot input:
-if show_input:
     for i, phase_map in enumerate(data.phase_maps):
-        phase_map.display_phase()
-
-# Construct mask:
-if use_internal_mask:
-    data.mask = mag_data.get_mask()  # Use perfect mask from mag_data!
-else:
-    data.set_3d_mask()  # Construct mask from 2D phase masks!
-
-# Construct regularisator, forward model and costfunction:
-fwd_model = pr.ForwardModel(data, ramp_order=order)
-reg = pr.FirstOrderRegularisator(data.mask, lam, add_params=fwd_model.ramp.n)
-cost = pr.Costfunction(fwd_model, reg)
-
-# Reconstruct and save:
-with TakeTime('reconstruction time'):
-    mag_data_rec = pr.reconstruction.optimize_linear(cost, max_iter=max_iter)
-    param_cache = cost.fwd_model.ramp.param_cache
-mag_name_rec = mag_name.replace('magdata', 'magdata_rec')
-mag_data_rec.save_to_netcdf4(mag_name_rec+'.nc')
-
-# Plot stuff:
-
-data.display_mask(ar_dens=np.ceil(np.max(dim)/64.))
-mag_data.quiver_plot3d('Original Distribution', ar_dens=np.ceil(np.max(dim)/64.))
-mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.))
-mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.),
-                           coloring='amplitude')
+        offset = np.random.uniform(-offset_max, offset_max)
+        ramp_u = np.random.uniform(-ramp_max, ramp_max)
+        ramp_v = np.random.uniform(-ramp_max, ramp_max)
+        phase_map += pr.Ramp.create_ramp(phase_map.a, phase_map.dim_uv, (offset, ramp_u, ramp_v))
+
+    # Add noise if necessary:
+    if noise != 0:
+        for i, phase_map in enumerate(data.phase_maps):
+            phase_map.phase += np.random.normal(0, noise, phase_map.dim_uv)
+            data.phase_maps[i] = phase_map
+
+    # Plot input:
+    if show_input:
+        for i, phase_map in enumerate(data.phase_maps):
+            phase_map.display_phase()
+
+    # Construct mask:
+    if use_internal_mask:
+        data.mask = mag_data.get_mask()  # Use perfect mask from mag_data!
+    else:
+        data.set_3d_mask()  # Construct mask from 2D phase masks!
+
+    # Construct regularisator, forward model and costfunction:
+    fwd_model = pr.DistributedForwardModel(data, ramp_order=order, nprocs=4)
+    reg = pr.FirstOrderRegularisator(data.mask, lam, add_params=fwd_model.ramp.n)
+    cost = pr.Costfunction(fwd_model, reg)
+
+    # Reconstruct and save:
+    with TakeTime('reconstruction time'):
+        mag_data_rec = pr.reconstruction.optimize_linear(cost, max_iter=max_iter)
+        param_cache = cost.fwd_model.ramp.param_cache
+    fwd_model.finalize()
+    mag_name_rec = mag_name.replace('magdata', 'magdata_rec')
+    mag_data_rec.save_to_netcdf4(mag_name_rec+'.nc')
+
+    # Plot stuff:
+
+    data.display_mask(ar_dens=np.ceil(np.max(dim)/64.))
+    mag_data.quiver_plot3d('Original Distribution', ar_dens=np.ceil(np.max(dim)/64.))
+    mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.))
+    mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.),
+                               coloring='amplitude')
diff --git a/scripts/temp/test_hyperspy.py b/scripts/temp/test_hyperspy.py
new file mode 100644
index 0000000..a48cb12
--- /dev/null
+++ b/scripts/temp/test_hyperspy.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Jul 29 15:31:59 2015
+
+@author: Jan
+"""
+
+
+import os
+import numpy as np
+import hyperspy.hspy as hp
+import pyramid as pr
+
+
+phase_map = pr.PhaseMap.load_from_netcdf4('phasemap_tif_martial_magnetite.nc')
+mag_rec = pr.MagData.load_from_netcdf4('magdata_rec_tif_martial_magnetite_lam=0.001.nc')
+ramp_params = (0.350598105531, 0.00022362574109514882, 0.00075016020407003121)
+phase_ramp = pr.Ramp.create_ramp(phase_map.a, phase_map.dim_uv, ramp_params)
+phase_corr = phase_map - phase_ramp
+phase_rec = pr.pm(mag_rec)
+phase_diff = phase_rec - phase_corr
+
+#mag_rec.quiver_plot('Reconstructed distribution', ar_dens=4)
+#phase_map.display_combined('Original Input')
+phase_corr.display_combined('Input Corrected')
+#phase_ramp.display_phase('Ramp')
+#phase_rec.display_combined('Reconstructed Phase')
+#phase_diff.display_phase('Difference')
+
+#mag_rec.save_to_llg(os.getcwd()+'/mag_rec.txt')
+#np.save('phase_input_corr.npy', phase_corr.phase)
+#np.savetxt('phase_input_corr.txt', phase_corr.phase)
+
+phase_corr_hp = hp.signals.Image(phase_map.phase)
+
+phase_corr_hp.axes_manager[0].name = 'X'
+phase_corr_hp.axes_manager[0].units = 'nm'
+phase_corr_hp.axes_manager[0].scale = phase_corr.a
+phase_corr_hp.axes_manager[1].name = 'Y'
+phase_corr_hp.axes_manager[1].units = 'nm'
+phase_corr_hp.axes_manager[1].scale = phase_corr.a
+#phase_corr_hp.save('phase_input_corr.rpl')
+phase_corr_hp.plot(axes_ticks=True)
diff --git a/scripts/temp/test_multiprocessing.py b/scripts/temp/test_multiprocessing.py
index 421ac83..719168e 100644
--- a/scripts/temp/test_multiprocessing.py
+++ b/scripts/temp/test_multiprocessing.py
@@ -34,47 +34,53 @@ class Comparer(object):
         return 'Comparer({}) compares with {}: {}'.format(self.reference, value, outcome)
 
 
+class Main(object):
+
+    def __call__(self):
+        nprocs = 4
+        nvalues = 17
+        values = range(nvalues)
+        comparers = np.asarray([Comparer(i) for i in range(nvalues)])
+        print 'PARENT Create communication pipes'
+        pipes = [mp.Pipe() for i in range(nprocs)]
+        sleep(1)
+        print 'PARENT Setup a list of processes that we want to run'
+        processes = []
+        proc_ids = np.asarray([i % nprocs for i in range(nvalues)])
+        for proc_id in range(nprocs):
+            selection = comparers[np.where(proc_ids == proc_id, True, False)]
+            processes.append(mp.Process(name='WORKER {}'.format(proc_id), target=worker,
+                                        args=(selection, pipes[proc_id][1])))
+        sleep(1)
+        print 'PARENT Run processes'
+        for p in processes:
+            p.start()
+        sleep(1)
+        print 'PARENT Distribute work'
+        for i in range(nvalues):
+            proc_id = i % nprocs
+            comparer_id = i // nprocs
+            pipes[proc_id][0].send((values[i], comparer_id))
+        sleep(1)
+        print 'PARENT Get process results from the pipes'
+        results = []
+        for i in range(nvalues):
+            proc_id = i % nprocs
+            results.append(pipes[proc_id][0].recv())
+        sleep(1)
+        print 'PARENT Print results'
+        for result in results:
+            print result
+        sleep(1)
+        print 'PARENT Finalize processes'
+        for i in range(nprocs):
+            pipes[i][0].send('STOP')
+        sleep(1)
+        print 'PARENT Exit the completed processes'
+        for p in processes:
+            p.join()
+
+
 if __name__ == '__main__':
     mp.freeze_support()
-    nprocs = 4
-    nvalues = 17
-    values = range(nvalues)
-    comparers = np.asarray([Comparer(i) for i in range(nvalues)])
-    print 'PARENT Create communication pipes'
-    pipes = [mp.Pipe() for i in range(nprocs)]
-    sleep(1)
-    print 'PARENT Setup a list of processes that we want to run'
-    processes = []
-    proc_ids = np.asarray([i % nprocs for i in range(nvalues)])
-    for proc_id in range(nprocs):
-        selection = comparers[np.where(proc_ids == proc_id, True, False)]
-        processes.append(mp.Process(name='WORKER {}'.format(proc_id), target=worker,
-                                    args=(selection, pipes[proc_id][1])))
-    sleep(1)
-    print 'PARENT Run processes'
-    for p in processes:
-        p.start()
-    sleep(1)
-    print 'PARENT Distribute work'
-    for i in range(nvalues):
-        proc_id = i % nprocs
-        comparer_id = i // nprocs
-        pipes[proc_id][0].send((values[i], comparer_id))
-    sleep(1)
-    print 'PARENT Get process results from the pipes'
-    results = []
-    for i in range(nvalues):
-        proc_id = i % nprocs
-        results.append(pipes[proc_id][0].recv())
-    sleep(1)
-    print 'PARENT Print results'
-    for result in results:
-        print result
-    sleep(1)
-    print 'PARENT Finalize processes'
-    for i in range(nprocs):
-        pipes[i][0].send('STOP')
-    sleep(1)
-    print 'PARENT Exit the completed processes'
-    for p in processes:
-        p.join()
+    Main()()
diff --git a/scripts/temp/test_multiprocessing_main.py b/scripts/temp/test_multiprocessing_main.py
new file mode 100644
index 0000000..3a0b88c
--- /dev/null
+++ b/scripts/temp/test_multiprocessing_main.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Jul 22 11:02:00 2015
+
+@author: Jan
+"""
+
+
+import multiprocessing as mp
+import numpy as np
+
+import pyramid as pr
+from jutil.taketime import TakeTime
+
+
+if __name__ == '__main__':
+
+    mp.freeze_support()
+
+    # Parameters:
+    b_0 = 1
+    order = None
+    nprocs = 2
+    count = 40
+
+    # Load stuff:
+    mag_data = pr.MagData.load_from_netcdf4('magdata.nc')
+    with TakeTime('reference') as timer:
+        phase_map = pr.pm(mag_data)
+        print '--- Reference time:  ', timer.dt
+
+    # Construct dataset:
+    data = pr.DataSet(mag_data.a, mag_data.dim, b_0, mag_data.get_mask())
+    [data.append(phase_map, pr.SimpleProjector(mag_data.dim)) for i in range(count)]
+
+    # Reference:
+    phase_map.display_combined('Reference')
+
+    # Normal ForwardModel:
+    fwd_model_norm = pr.ForwardModel(data, order)
+    with TakeTime('reference') as timer:
+        phase_norm = fwd_model_norm.jac_T_dot(None, data.phase_vec)#data.mag_data.get_vector(data.mask))
+#        phase_map_norm = pr.PhaseMap(data.a, phase_norm.reshape(phase_map.dim_uv))
+        print '--- Normal time:     ', timer.dt
+#    phase_map_norm.display_combined('Normal')
+#
+#    # Normal ForwardModel:
+#    fwd_model_norm = pr.ForwardModel(data, order)
+#    with TakeTime('reference') as timer:
+#        phase_norm = fwd_model_norm(mag_data.get_vector(data.mask))
+##        phase_map_norm = pr.PhaseMap(data.a, phase_norm.reshape(phase_map.dim_uv))
+#        print '--- Normal time:     ', timer.dt
+##    phase_map_norm.display_combined('Normal')
+
+    # Distributed ForwardModel:
+    fwd_model_dist = pr.forwardmodel.DistributedForwardModel(data, ramp_order=order, nprocs=nprocs)
+    with TakeTime('distributed') as timer:
+        phase_dist = fwd_model_dist.jac_T_dot(None, data.phase_vec)#mag_data.get_vector(data.mask))
+#        phase_map_dist = pr.PhaseMap(data.a, phase_dist.reshape(phase_map.dim_uv))
+        print '--- Distributed time:', timer.dt
+#    phase_map_dist.display_combined('Distributed')
+#    fwd_model_dist.finalize()
+
+    # Distributed ForwardModel:
+    with TakeTime('distributed') as timer:
+        phase_dist = fwd_model_dist.jac_T_dot(None, data.phase_vec)#mag_data.get_vector(data.mask))
+#        phase_map_dist = pr.PhaseMap(data.a, phase_dist.reshape(phase_map.dim_uv))
+        print '--- Distributed time:', timer.dt
+#    phase_map_dist.display_combined('Distributed')
+    fwd_model_dist.finalize()
+
+    diff = phase_dist - phase_norm
+    print diff.min(), diff.max()
+#    assert np.testing.assert_almost_equal(phase_dist, phase_norm, decimal=5)
-- 
GitLab