Skip to content
Snippets Groups Projects
Commit 0d7c79db authored by Jan Caron's avatar Jan Caron
Browse files

Multiprocessing implementation and minor changes!

forward_model: DistributedForwardModel created which uses the multiprocessing
               package for distributed computations.
parent d136112f
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from PIL import Image
from numbers import Number
import logging
......@@ -157,7 +158,10 @@ class DirectionalColormap(mpl.colors.LinearSegmentedColormap):
'''
cls._log.debug('Calling rgb_from_colorind_and_saturation')
c, s = np.ravel(colorind), np.ravel(saturation)
# Make sure colorind and saturation are arrays (if not make it so):
c = np.array([colorind]) if isinstance(colorind, Number) else colorind
s = np.array([saturation]) if isinstance(saturation, Number) else saturation
# Calculate rgb and the inverse and combine them:
rgb_norm = (np.minimum(s, np.ones_like(s)).T * cls.HOLO_CMAP(c)[..., :3].T).T
rgb_invs = (np.maximum(s-1, np.zeros_like(s)).T * cls.HOLO_CMAP_INV(c)[..., :3].T).T
return (255.999 * (rgb_norm + rgb_invs)).astype(np.uint8)
......
......@@ -238,7 +238,7 @@ class DataSet(object):
if mask_list is None: # if no masks are given, extract from phase maps:
mask_list = [phase_map.mask for phase_map in self.phase_maps]
if len(mask_list) == 1: # just one phase_map --> 3D mask equals 2D mask
self.mask = np.expand_dims(mask_list[0], axis=0)
self.mask = np.expand_dims(mask_list[0], axis=0) # z-dim is set to 1!
else: # 3D mask has to be constructed from 2D masks:
mask_3d_inv = np.zeros(self.dim)
for i, projector in enumerate(self.projectors):
......@@ -360,27 +360,3 @@ class DataSet(object):
phase_map.display_combined('{} ({})'.format(title, self.projectors[i].get_info()),
cmap, limit, norm, gain, interpolation, grad_encode)
plt.show()
# TODO: Multiprocessing! ##########################################################################
# TODO: Use proxy objects? (https://docs.python.org/2/library/multiprocessing.html#proxy-objects)
# class DistributedDataSet(DataSet):
#
# @property
# def count(self):
# return np.sum([len(data_set.projectors) for data_set in self.data_sets])
#
# def __init__(self, a, dim, b_0=1, mask=None, Se_inv=None):
# # TODO: get number of processes!
# self.nprocs = 4
# self.data_sets = []
# for proc_ind in range(self.nprocs):
# # TODO: Create processes and let DataSet live there!
# self.data_sets.append(DataSet(a, dim, b_0, mask, Se_inv))
#
# def __get_item__(self, key):
# return self.data_sets[key]
#
# def append(self, phase_map, projector):
# self.data_sets[self.count % self.nprocs].append(phase_map, projector)
###################################################################################################
......@@ -6,15 +6,20 @@
threedimensional magnetization distribution onto a two-dimensional phase map."""
from __future__ import division
import sys
import numpy as np
import multiprocessing as mp
from pyramid.magdata import MagData
from pyramid.ramp import Ramp
from pyramid.dataset import DataSet
import logging
__all__ = ['ForwardModel']
__all__ = ['ForwardModel', 'DistributedForwardModel']
class ForwardModel(object):
......@@ -152,9 +157,6 @@ class ForwardModel(object):
result[hp[i]:hp[i+1]] = res
return result
def _jac_dot_element(self, mag_vec, projector, phasemapper):
return phasemapper.jac_dot(projector.jac_dot(mag_vec)) # TODO: ???
def jac_T_dot(self, x, vector):
''''Calculate the product of the transposed Jacobi matrix with a given `vector`.
......@@ -185,13 +187,206 @@ class ForwardModel(object):
ramp_params = self.ramp.jac_T_dot(vector) # calculate ramp_params separately!
return np.concatenate((result, ramp_params))
def finalize(self):
pass
# TODO: Multiprocessing! ##########################################################################
#class DistributedForwardModel(ForwardModel):
class DistributedForwardModel(ForwardModel):
def __init__(self, data_set, ramp_order=None, nprocs=1):
super(DistributedForwardModel, self).__init__(data_set, ramp_order)
self.nprocs = nprocs
img_per_proc = np.ceil(self.data_set.count / self.nprocs).astype(np.int)
hp = self.data_set.hook_points
self.proc_hook_points = [0]
# self.sub_fwd_models = []
self.pipes = []
self.processes = []
for proc_id in range(self.nprocs):
# 'PARENT Create SubDataSets:'
sub_data = DataSet(self.data_set.a, self.data_set.dim, self.data_set.b_0,
self.data_set.mask, self.data_set.Se_inv)
# 'PARENT Distribute data to SubDataSets:'
start = proc_id*img_per_proc
stop = np.min(((proc_id+1)*img_per_proc, self.data_set.count))
self.proc_hook_points.append(hp[stop])
sub_data.phase_maps = self.data_set.phase_maps[start:stop]
sub_data.projectors = self.data_set.projectors[start:stop]
# '... PARENT Create SubForwardModel {}'.format(proc_id)
sub_fwd_model = ForwardModel(sub_data, ramp_order=None) # ramps handled in master!
# '... PARENT Create communication pipe {}'.format(proc_id)
self.pipes.append(mp.Pipe())
# '... PARENT Create process {}'.format(proc_id)
p = mp.Process(name='Worker {}'.format(proc_id), target=_worker,
args=(sub_fwd_model, self.pipes[proc_id][1]))
self.processes.append(p)
# '... PARENT Start process {}'.format(p.name)
p.start()
# 'PARENT Finish __init__\n'
self._log.debug('Creating '+str(self))
# print 'PARENT Distribute data to SubDataSets:'
# for count_id in range(self.data_set.count):
# proc_id = count_id % nprocs
# print '... PARENT send image {} to subDataSet {}'.format(count_id, proc_id)
# self.sub_data_sets[proc_id].append(self.data_set.phase_maps[count_id],
# self.data_set.projectors[count_id])
# print 'PARENT Start up processes:'
# self.sub_fwd_models = []
# self.pipes = []
# self.processes = []
# for proc_id in range(self.nprocs):
# print '... PARENT Create SubForwardModel {}'.format(proc_id)
# self.sub_fwd_models.append(ForwardModel(self.sub_data_sets[proc_id], ramp_order))
# print '... PARENT Create communication pipe {}'.format(proc_id)
# self.pipes.append(mp.Pipe())
# print '... PARENT Create process {}'.format(proc_id)
# p = mp.Process(name='Worker {}'.format(proc_id), target=_worker,
# args=(self.sub_fwd_models[proc_id], self.pipes[proc_id][1]))
# self.processes.append(p)
# print '... PARENT Start process {}'.format(p.name)
# p.start()
# print 'PARENT Finish __init__\n'
# self._log.debug('Creating '+str(self))
def __call__(self, x):
# Extract ramp parameters if necessary (x will be shortened!):
# TODO: how to handle here? all in main thread? distribute to sub_x and give to process?
# TODO: later method of distributed dataset?
# TODO: After reconstruction is this saved in this ramp? solve with property?
x = self.ramp.extract_ramp_params(x)
# Simulate all phase maps and create result vector:
# 'PARENT Distribute input to processes and start working:'
for proc_id in range(self.nprocs):
# TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
# '... PARENT Send input to process {}'.format(proc_id)
self.pipes[proc_id][0].send(('__call__', (x,)))
# TODO: Calculate ramps here? There should be waiting time... Init result with ramps!
result = np.zeros(self.m)
hp = self.hook_points
php = self.proc_hook_points
# '... PARENT Calculate ramps:'
if self.ramp_order is not None:
for i in range(self.data_set.count):
result[hp[i]:hp[i+1]] += self.ramp(i).phase.ravel()
# 'PARENT Get process results from the pipes:'
# sub_results = []
for proc_id in range(self.nprocs):
# '... PARENT Retrieve results from process {}'.format(proc_id)
result[php[proc_id]:php[proc_id+1]] += self.pipes[proc_id][0].recv()
return result
def jac_dot(self, x, vector):
# Extract ramp parameters if necessary (x will be shortened!):
# TODO: how to handle here? all in main thread? distribute to sub_x and give to process?
# TODO: later method of distributed dataset?
# TODO: After reconstruction is this saved in this ramp? solve with property?
vector = self.ramp.extract_ramp_params(vector)
# Simulate all phase maps and create result vector:
# 'PARENT Distribute input to processes and start working:'
for proc_id in range(self.nprocs):
# TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
# '... PARENT Send input to process {}'.format(proc_id)
self.pipes[proc_id][0].send(('jac_dot', (None, vector)))
# TODO: Calculate ramps here? There should be waiting time...
result = np.zeros(self.m)
hp = self.hook_points
php = self.proc_hook_points
# '... PARENT Calculate ramps:'
if self.ramp_order is not None:
for i in range(self.data_set.count):
result[hp[i]:hp[i+1]] += self.ramp.jac_dot(i)
# 'PARENT Get process results from the pipes:'
for proc_id in range(self.nprocs):
# '... PARENT Retrieve results from process {}'.format(proc_id)
result[php[proc_id]:php[proc_id+1]] += self.pipes[proc_id][0].recv()
return result
# for proc_id in range(self.nprocs):
# # '... PARENT Retrieve results from process {}'.format(proc_id)
# sub_results.append(self.pipes[proc_id][0].recv())
#
# def __init__(self, distributed_data_set, ramp_order=None):
# self.nprocs = distributed_data_set.nprocs
# self.fwd_models = []
# for proc_ind in range(self.nprocs):
# data_set = distributed_data_set[proc_ind]
# self.fwd_models.append(ForwardModel(data_set))
# # TODO: SORTING!!!! maybe return also the hookpoints?
# for i in range(self.data_set.count): # Go over all projections!
# proc_id = i % self.nprocs
# j = i // self.nprocs # index for subresult!
# result[hp[i]:hp[i+1]] = sub_results[proc_id][hp[j]:hp[j+1]]
def jac_T_dot(self, x, vector):
hp = self.hook_points
php = self.proc_hook_points
for proc_id in range(self.nprocs):
sub_vec = vector[php[proc_id]:php[proc_id+1]]
self.pipes[proc_id][0].send(('jac_T_dot', (None, sub_vec)))
ramp_params = self.ramp.jac_T_dot(vector) # calculate ramp_params separately!
result = np.zeros(3*self.data_set.mask.sum())
for proc_id in range(self.nprocs):
sub_vec = vector[php[proc_id]:php[proc_id+1]]
result += self.pipes[proc_id][0].recv()
return np.concatenate((result, ramp_params))
print 'PARENT Distribute input to processes and start working:'
for proc_id in range(self.nprocs):
# TODO: better with Pool()? Can Pool be more persistent by subclassing custom Pool?
print '... PARENT Send input to process {}'.format(proc_id)
self.pipes[proc_id][0].send(('jac_T_dot', (None, vector)))
print 'PARENT Get process results from the pipes:'
sub_results = []
for proc_id in range(self.nprocs):
print '... PARENT Retrieve results from process {}'.format(proc_id)
sub_results.append(self.pipes[proc_id][0].recv())
result = np.zeros(self.m)
hp = self.hook_points
# TODO: Calculate ramps here? There should be waiting time...
# TODO: SORTING!!!! maybe return also the hookpoints?
for i in range(self.data_set.count): # Go over all projections!
proc_id = i % self.nprocs
j = i // self.nprocs # index for subresult!
result[hp[i]:hp[i+1]] = sub_results[proc_id][hp[j]:hp[j+1]]
return result
proj_T_result = np.zeros(3*np.prod(self.data_set.dim))
hp = self.hook_points
for i, projector in enumerate(self.data_set.projectors):
sub_vec = vector[hp[i]:hp[i+1]]
mapper = self.phase_mappers[projector.dim_uv]
proj_T_result += projector.jac_T_dot(mapper.jac_T_dot(sub_vec))
self.mag_data.mag_vec = proj_T_result
result = self.mag_data.get_vector(self.data_set.mask)
ramp_params = self.ramp.jac_T_dot(vector) # calculate ramp_params separately!
return np.concatenate((result, ramp_params))
def finalize(self):
# 'PARENT Finalize processes:'
for proc_id in range(self.nprocs):
self.pipes[proc_id][0].send('STOP')
# 'PARENT Exit the completed processes:'
for p in self.processes:
p.join()
# 'PARENT All processes joint!'
def _worker(fwd_model, pipe):
# Has to be directly accessible in the module as a function, NOT a method of a class instance!
# '... {} starting!'.format(mp.current_process().name)
sys.stdout.flush()
for method, arguments in iter(pipe.recv, 'STOP'):
# '... {} processes method {}'.format(mp.current_process().name, method)
sys.stdout.flush()
result = getattr(fwd_model, method)(*arguments)
pipe.send(result)
# '... ', mp.current_process().name, 'exiting!'
sys.stdout.flush()
###################################################################################################
......@@ -354,7 +354,7 @@ class MagData(object):
self._log.debug('Calling set_vector')
vector = np.asarray(vector, dtype=fft.FLOAT)
assert np.size(vector) % 3 == 0, 'Vector has to contain all 3 components for every pixel!'
count = np.size(vector)/3
count = np.size(vector)//3
if mask is not None:
self.magnitude[0][mask] = vector[:count] # x-component
self.magnitude[1][mask] = vector[count:2*count] # y-component
......
......@@ -80,7 +80,8 @@ class Ramp(object):
# Iterate over all degrees of freedom:
for dof in dof_list:
# Add the contribution of the current degree of freedom:
phase_ramp += self.param_cache[dof][index] * self.create_poly_mesh(dof, dim_uv)
phase_ramp += self.param_cache[dof][index] * \
self.create_poly_mesh(self.data_set.a, dof, dim_uv)
return PhaseMap(self.data_set.a, phase_ramp, mask=np.zeros(dim_uv, dtype=np.bool))
def jac_dot(self, index):
......@@ -108,7 +109,8 @@ class Ramp(object):
# Iterate over all degrees of freedom:
for dof in range(self.deg_of_freedom):
# Add the contribution of the current degree of freedom:
phase_ramp += self.param_cache[dof][index] * self.create_poly_mesh(dof, dim_uv)
phase_ramp += self.param_cache[dof][index] * \
self.create_poly_mesh(self.data_set.a, dof, dim_uv)
return np.ravel(phase_ramp)
def jac_T_dot(self, vector):
......@@ -132,36 +134,11 @@ class Ramp(object):
# Iterate over all projectors:
for i, projector in enumerate(self.data_set.projectors):
sub_vec = vector[hp[i]:hp[i+1]]
poly_mesh = self.create_poly_mesh(dof, projector.dim_uv)
poly_mesh = self.create_poly_mesh(self.data_set.a, dof, projector.dim_uv)
# Transposed ramp parameters: summed product of the vector with the poly-mesh:
result.append(np.sum(sub_vec * np.ravel(poly_mesh)))
return result
def create_poly_mesh(self, deg_of_freedom, dim_uv):
'''Create a polynomial mesh for the ramp calculation for a specific degree of freedom.
Parameters
----------
deg_of_freedom : int
Current degree of freedom for which the mesh should be created. 0 corresponds to a
simple offset, 1 corresponds to a linear ramp in u-direction, 2 to a linear ramp in
v-direction and so on.
dim_uv : tuple (N=2)
Dimensions of the 2D mesh that should be created.
Returns
-------
result_mesh : :class:`~numpy.ndarray` (N=2)
Polynomial mesh that was created and can be used for further calculations.
'''
# Determine if u-direction (u_or_v == 1) or v-direction (u_or_v == 0)!
u_or_v = (deg_of_freedom - 1) % 2
# Determine polynomial order:
order = (deg_of_freedom + 1) // 2
# Return polynomial mesh:
return (np.indices(dim_uv)[u_or_v] * self.data_set.a) ** order
def extract_ramp_params(self, x):
'''Extract the ramp parameters of an input vector and return the rest.
......@@ -188,3 +165,38 @@ class Ramp(object):
x, ramp_params = np.split(x, [-self.n])
self.param_cache = ramp_params.reshape((self.deg_of_freedom, self.data_set.count))
return x
@classmethod
def create_poly_mesh(cls, a, deg_of_freedom, dim_uv):
'''Create a polynomial mesh for the ramp calculation for a specific degree of freedom.
Parameters
----------
deg_of_freedom : int
Current degree of freedom for which the mesh should be created. 0 corresponds to a
simple offset, 1 corresponds to a linear ramp in u-direction, 2 to a linear ramp in
v-direction and so on.
dim_uv : tuple (N=2)
Dimensions of the 2D mesh that should be created.
Returns
-------
result_mesh : :class:`~numpy.ndarray` (N=2)
Polynomial mesh that was created and can be used for further calculations.
'''# TODO: Docstring a!
# Determine if u-direction (u_or_v == 1) or v-direction (u_or_v == 0)!
u_or_v = (deg_of_freedom - 1) % 2
# Determine polynomial order:
order = (deg_of_freedom + 1) // 2
# Return polynomial mesh:
return (np.indices(dim_uv)[u_or_v] * a) ** order
@classmethod
def create_ramp(cls, a, dim_uv, params):
#TODO: Docstring!
phase_ramp = np.zeros(dim_uv)
dof_list = range(len(params))
for dof in dof_list:
phase_ramp += params[dof] * cls.create_poly_mesh(a, dof, dim_uv)
return PhaseMap(a, phase_ramp, mask=np.zeros(dim_uv, dtype=np.bool))
......@@ -247,6 +247,7 @@ class NoneRegularisator(Regularisator):
self._log.debug('Calling __init__')
self.norm = None
self.lam = 0
self.add_params = None
self._log.debug('Created '+str(self))
def __call__(self, x):
......
# THIS FILE IS GENERATED BY THE PYRAMID SETUP.PY
version = "0.1.0-dev"
hg_revision = "711e66d3213b+"
hg_revision = "58741ae0132a+"
......@@ -4,7 +4,8 @@
import os
import numpy as np
from pyDM3reader import DM3lib as dm3
import hyperspy.hspy as hp
from PIL import Image
import pyramid as py
import logging.config
......@@ -12,17 +13,21 @@ import logging.config
logging.config.fileConfig(py.LOGGING_CONFIG, disable_existing_loggers=False)
###################################################################################################
path_mag = 'zi_an_elongated_nanorod.dm3'
path_ele = 'zi_an_elongated_nanorod_mip.dm3'
filename = 'phasemap_dm3_zi_an_elongated_nanorod.nc'
path_mag = 'zi_an_magnetite_4_particles_magnetic.dm3'
path_ele = 'zi_an_magnetite_4_particles_electric_smooth.dm3'
filename = 'phasemap_dm3_zi_an_magnetite_4_particles.nc'
a = 1.
dim_uv = None
threshold = 4.5
dim_uv = (512, 512)
threshold = 0.5
flip_up_down = True
###################################################################################################
# Load images:
im_mag = dm3.DM3(os.path.join(py.DIR_FILES, 'dm3', path_mag)).image
im_ele = dm3.DM3(os.path.join(py.DIR_FILES, 'dm3', path_ele)).image
im_mag = Image.fromarray(hp.load(os.path.join(py.DIR_FILES, 'dm3', path_mag)).data)
im_ele = Image.fromarray(hp.load(os.path.join(py.DIR_FILES, 'dm3', path_ele)).data)
if flip_up_down:
im_mag = im_mag.transpose(Image.FLIP_TOP_BOTTOM)
im_ele = im_ele.transpose(Image.FLIP_TOP_BOTTOM)
if dim_uv is not None:
im_mag = im_mag.resize(dim_uv)
im_ele = im_ele.resize(dim_uv)
......
......@@ -12,15 +12,15 @@ import logging.config
logging.config.fileConfig(py.LOGGING_CONFIG, disable_existing_loggers=False)
###################################################################################################
path_mag = 'trevor_magnetite_m20.bmp'
path_mask = 'trevor_magnetite_mask20.bmp'
filename = 'phasemap_bmp_trevor_magnetite_m20.nc'
path_mag = 'Arnaud_M.tif'
path_mask = 'Arnaud_MIP_mask.tif'
filename = 'phasemap_tif_martial_magnetite.nc'
a = 0.4 # nm
dim_uv = None
max_phase = 1
threshold = 0.5
offset = 0
flip_up_down = True
flip_up_down = False
###################################################################################################
# Load images:
......
......@@ -11,14 +11,15 @@ import logging.config
logging.config.fileConfig(pr.LOGGING_CONFIG, disable_existing_loggers=False)
###################################################################################################
phase_name = 'phasemap_bmp_trevor_magnetite_m20'
phase_name = 'phasemap_tif_martial_magnetite'
b_0 = 0.6 # in T
lam = 1E-3
max_iter = 100
buffer_pixel = 0
order = 1
order = 0
###################################################################################################
# Load phase map:
phase_map = pr.PhaseMap.load_from_netcdf4(phase_name+'.nc')
phase_map.pad((buffer_pixel, buffer_pixel))
......@@ -47,7 +48,7 @@ mag_name = '{}_lam={}'.format(phase_name.replace('phasemap', 'magdata_rec'), lam
mag_data_rec.save_to_netcdf4(mag_name+'.nc')
# Plot stuff:
mag_data_rec.quiver_plot('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/128.))
mag_data_rec.quiver_plot('Reconstructed Distribution', ar_dens=int(np.ceil(np.max(dim)/128.)))
phase_map.crop((buffer_pixel, buffer_pixel))
phase_map.display_combined('Input Phase')
phase_map -= fwd_model.ramp(index=0)
......
......@@ -3,6 +3,7 @@
import numpy as np
import multiprocessing as mp
import pyramid as pr
from jutil.taketime import TakeTime
import logging.config
......@@ -24,67 +25,74 @@ offset_max = 2
ramp_max = 0.01
order = 1
show_input = True
# TODO: toggle multiprocessing if nprocs > 1
###################################################################################################
# Load magnetization distribution:
mag_data = pr.MagData.load_from_netcdf4(mag_name+'.nc')
dim = mag_data.dim
# Construct data set and regularisator:
data = pr.DataSet(mag_data.a, mag_data.dim, b_0)
# Construct projectors:
projectors = []
for angle in angles:
angle_rad = angle*np.pi/180
if axes['x']:
projectors.append(pr.XTiltProjector(mag_data.dim, angle_rad, dim_uv))
if axes['y']:
projectors.append(pr.YTiltProjector(mag_data.dim, angle_rad, dim_uv))
# Add projectors and construct according phase maps:
data.projectors = projectors
data.phase_maps = data.create_phase_maps(mag_data)
for i, phase_map in enumerate(data.phase_maps):
offset = np.random.uniform(-offset_max, offset_max)
ramp = np.random.uniform(-ramp_max, ramp_max), np.random.uniform(-ramp_max, ramp_max)
phase_map.add_ramp(offset, ramp)
# Add noise if necessary:
if noise != 0:
for i, phase_map in enumerate(data.phase_maps):
phase_map.phase += np.random.normal(0, noise, phase_map.dim_uv)
data.phase_maps[i] = phase_map
if __name__ == '__main__':
mp.freeze_support()
# Load magnetization distribution:
mag_data = pr.MagData.load_from_netcdf4(mag_name+'.nc')
dim = mag_data.dim
# Construct data set and regularisator:
data = pr.DataSet(mag_data.a, mag_data.dim, b_0)
# Construct projectors:
projectors = []
for angle in angles:
angle_rad = angle*np.pi/180
if axes['x']:
projectors.append(pr.XTiltProjector(mag_data.dim, angle_rad, dim_uv))
if axes['y']:
projectors.append(pr.YTiltProjector(mag_data.dim, angle_rad, dim_uv))
# Add projectors and construct according phase maps:
data.projectors = projectors
data.phase_maps = data.create_phase_maps(mag_data)
# Plot input:
if show_input:
for i, phase_map in enumerate(data.phase_maps):
phase_map.display_phase()
# Construct mask:
if use_internal_mask:
data.mask = mag_data.get_mask() # Use perfect mask from mag_data!
else:
data.set_3d_mask() # Construct mask from 2D phase masks!
# Construct regularisator, forward model and costfunction:
fwd_model = pr.ForwardModel(data, ramp_order=order)
reg = pr.FirstOrderRegularisator(data.mask, lam, add_params=fwd_model.ramp.n)
cost = pr.Costfunction(fwd_model, reg)
# Reconstruct and save:
with TakeTime('reconstruction time'):
mag_data_rec = pr.reconstruction.optimize_linear(cost, max_iter=max_iter)
param_cache = cost.fwd_model.ramp.param_cache
mag_name_rec = mag_name.replace('magdata', 'magdata_rec')
mag_data_rec.save_to_netcdf4(mag_name_rec+'.nc')
# Plot stuff:
data.display_mask(ar_dens=np.ceil(np.max(dim)/64.))
mag_data.quiver_plot3d('Original Distribution', ar_dens=np.ceil(np.max(dim)/64.))
mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.))
mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.),
coloring='amplitude')
offset = np.random.uniform(-offset_max, offset_max)
ramp_u = np.random.uniform(-ramp_max, ramp_max)
ramp_v = np.random.uniform(-ramp_max, ramp_max)
phase_map += pr.Ramp.create_ramp(phase_map.a, phase_map.dim_uv, (offset, ramp_u, ramp_v))
# Add noise if necessary:
if noise != 0:
for i, phase_map in enumerate(data.phase_maps):
phase_map.phase += np.random.normal(0, noise, phase_map.dim_uv)
data.phase_maps[i] = phase_map
# Plot input:
if show_input:
for i, phase_map in enumerate(data.phase_maps):
phase_map.display_phase()
# Construct mask:
if use_internal_mask:
data.mask = mag_data.get_mask() # Use perfect mask from mag_data!
else:
data.set_3d_mask() # Construct mask from 2D phase masks!
# Construct regularisator, forward model and costfunction:
fwd_model = pr.DistributedForwardModel(data, ramp_order=order, nprocs=4)
reg = pr.FirstOrderRegularisator(data.mask, lam, add_params=fwd_model.ramp.n)
cost = pr.Costfunction(fwd_model, reg)
# Reconstruct and save:
with TakeTime('reconstruction time'):
mag_data_rec = pr.reconstruction.optimize_linear(cost, max_iter=max_iter)
param_cache = cost.fwd_model.ramp.param_cache
fwd_model.finalize()
mag_name_rec = mag_name.replace('magdata', 'magdata_rec')
mag_data_rec.save_to_netcdf4(mag_name_rec+'.nc')
# Plot stuff:
data.display_mask(ar_dens=np.ceil(np.max(dim)/64.))
mag_data.quiver_plot3d('Original Distribution', ar_dens=np.ceil(np.max(dim)/64.))
mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.))
mag_data_rec.quiver_plot3d('Reconstructed Distribution', ar_dens=np.ceil(np.max(dim)/64.),
coloring='amplitude')
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 29 15:31:59 2015
@author: Jan
"""
import os
import numpy as np
import hyperspy.hspy as hp
import pyramid as pr
phase_map = pr.PhaseMap.load_from_netcdf4('phasemap_tif_martial_magnetite.nc')
mag_rec = pr.MagData.load_from_netcdf4('magdata_rec_tif_martial_magnetite_lam=0.001.nc')
ramp_params = (0.350598105531, 0.00022362574109514882, 0.00075016020407003121)
phase_ramp = pr.Ramp.create_ramp(phase_map.a, phase_map.dim_uv, ramp_params)
phase_corr = phase_map - phase_ramp
phase_rec = pr.pm(mag_rec)
phase_diff = phase_rec - phase_corr
#mag_rec.quiver_plot('Reconstructed distribution', ar_dens=4)
#phase_map.display_combined('Original Input')
phase_corr.display_combined('Input Corrected')
#phase_ramp.display_phase('Ramp')
#phase_rec.display_combined('Reconstructed Phase')
#phase_diff.display_phase('Difference')
#mag_rec.save_to_llg(os.getcwd()+'/mag_rec.txt')
#np.save('phase_input_corr.npy', phase_corr.phase)
#np.savetxt('phase_input_corr.txt', phase_corr.phase)
phase_corr_hp = hp.signals.Image(phase_map.phase)
phase_corr_hp.axes_manager[0].name = 'X'
phase_corr_hp.axes_manager[0].units = 'nm'
phase_corr_hp.axes_manager[0].scale = phase_corr.a
phase_corr_hp.axes_manager[1].name = 'Y'
phase_corr_hp.axes_manager[1].units = 'nm'
phase_corr_hp.axes_manager[1].scale = phase_corr.a
#phase_corr_hp.save('phase_input_corr.rpl')
phase_corr_hp.plot(axes_ticks=True)
......@@ -34,47 +34,53 @@ class Comparer(object):
return 'Comparer({}) compares with {}: {}'.format(self.reference, value, outcome)
class Main(object):
def __call__(self):
nprocs = 4
nvalues = 17
values = range(nvalues)
comparers = np.asarray([Comparer(i) for i in range(nvalues)])
print 'PARENT Create communication pipes'
pipes = [mp.Pipe() for i in range(nprocs)]
sleep(1)
print 'PARENT Setup a list of processes that we want to run'
processes = []
proc_ids = np.asarray([i % nprocs for i in range(nvalues)])
for proc_id in range(nprocs):
selection = comparers[np.where(proc_ids == proc_id, True, False)]
processes.append(mp.Process(name='WORKER {}'.format(proc_id), target=worker,
args=(selection, pipes[proc_id][1])))
sleep(1)
print 'PARENT Run processes'
for p in processes:
p.start()
sleep(1)
print 'PARENT Distribute work'
for i in range(nvalues):
proc_id = i % nprocs
comparer_id = i // nprocs
pipes[proc_id][0].send((values[i], comparer_id))
sleep(1)
print 'PARENT Get process results from the pipes'
results = []
for i in range(nvalues):
proc_id = i % nprocs
results.append(pipes[proc_id][0].recv())
sleep(1)
print 'PARENT Print results'
for result in results:
print result
sleep(1)
print 'PARENT Finalize processes'
for i in range(nprocs):
pipes[i][0].send('STOP')
sleep(1)
print 'PARENT Exit the completed processes'
for p in processes:
p.join()
if __name__ == '__main__':
mp.freeze_support()
nprocs = 4
nvalues = 17
values = range(nvalues)
comparers = np.asarray([Comparer(i) for i in range(nvalues)])
print 'PARENT Create communication pipes'
pipes = [mp.Pipe() for i in range(nprocs)]
sleep(1)
print 'PARENT Setup a list of processes that we want to run'
processes = []
proc_ids = np.asarray([i % nprocs for i in range(nvalues)])
for proc_id in range(nprocs):
selection = comparers[np.where(proc_ids == proc_id, True, False)]
processes.append(mp.Process(name='WORKER {}'.format(proc_id), target=worker,
args=(selection, pipes[proc_id][1])))
sleep(1)
print 'PARENT Run processes'
for p in processes:
p.start()
sleep(1)
print 'PARENT Distribute work'
for i in range(nvalues):
proc_id = i % nprocs
comparer_id = i // nprocs
pipes[proc_id][0].send((values[i], comparer_id))
sleep(1)
print 'PARENT Get process results from the pipes'
results = []
for i in range(nvalues):
proc_id = i % nprocs
results.append(pipes[proc_id][0].recv())
sleep(1)
print 'PARENT Print results'
for result in results:
print result
sleep(1)
print 'PARENT Finalize processes'
for i in range(nprocs):
pipes[i][0].send('STOP')
sleep(1)
print 'PARENT Exit the completed processes'
for p in processes:
p.join()
Main()()
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 22 11:02:00 2015
@author: Jan
"""
import multiprocessing as mp
import numpy as np
import pyramid as pr
from jutil.taketime import TakeTime
if __name__ == '__main__':
mp.freeze_support()
# Parameters:
b_0 = 1
order = None
nprocs = 2
count = 40
# Load stuff:
mag_data = pr.MagData.load_from_netcdf4('magdata.nc')
with TakeTime('reference') as timer:
phase_map = pr.pm(mag_data)
print '--- Reference time: ', timer.dt
# Construct dataset:
data = pr.DataSet(mag_data.a, mag_data.dim, b_0, mag_data.get_mask())
[data.append(phase_map, pr.SimpleProjector(mag_data.dim)) for i in range(count)]
# Reference:
phase_map.display_combined('Reference')
# Normal ForwardModel:
fwd_model_norm = pr.ForwardModel(data, order)
with TakeTime('reference') as timer:
phase_norm = fwd_model_norm.jac_T_dot(None, data.phase_vec)#data.mag_data.get_vector(data.mask))
# phase_map_norm = pr.PhaseMap(data.a, phase_norm.reshape(phase_map.dim_uv))
print '--- Normal time: ', timer.dt
# phase_map_norm.display_combined('Normal')
#
# # Normal ForwardModel:
# fwd_model_norm = pr.ForwardModel(data, order)
# with TakeTime('reference') as timer:
# phase_norm = fwd_model_norm(mag_data.get_vector(data.mask))
## phase_map_norm = pr.PhaseMap(data.a, phase_norm.reshape(phase_map.dim_uv))
# print '--- Normal time: ', timer.dt
## phase_map_norm.display_combined('Normal')
# Distributed ForwardModel:
fwd_model_dist = pr.forwardmodel.DistributedForwardModel(data, ramp_order=order, nprocs=nprocs)
with TakeTime('distributed') as timer:
phase_dist = fwd_model_dist.jac_T_dot(None, data.phase_vec)#mag_data.get_vector(data.mask))
# phase_map_dist = pr.PhaseMap(data.a, phase_dist.reshape(phase_map.dim_uv))
print '--- Distributed time:', timer.dt
# phase_map_dist.display_combined('Distributed')
# fwd_model_dist.finalize()
# Distributed ForwardModel:
with TakeTime('distributed') as timer:
phase_dist = fwd_model_dist.jac_T_dot(None, data.phase_vec)#mag_data.get_vector(data.mask))
# phase_map_dist = pr.PhaseMap(data.a, phase_dist.reshape(phase_map.dim_uv))
print '--- Distributed time:', timer.dt
# phase_map_dist.display_combined('Distributed')
fwd_model_dist.finalize()
diff = phase_dist - phase_norm
print diff.min(), diff.max()
# assert np.testing.assert_almost_equal(phase_dist, phase_norm, decimal=5)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment