Source code for prince_cr.solvers.propagation

"""Contains solvers, i.e. integrators, kernels, steppers, for PriNCe."""

import numpy as np

from prince_cr.cosmology import H
from prince_cr.data import PRINCE_UNITS, EnergyGrid
from prince_cr.util import info
import prince_cr.config as config

from .partial_diff import DifferentialOperator, SemiLagrangianSolver

[docs]class UHECRPropagationResult(object): """Reduced version of solver class, that only holds the result vector and defined add and multiply """ def __init__(self, state, egrid, spec_man): self.spec_man = spec_man self.egrid = egrid self.state = state def to_dict(self): dic = {} dic['egrid'] = self.egrid dic['state'] = self.state dic['known_spec'] = self.known_species return dic @classmethod def from_dict(cls, dic): egrid = dic['egrid'] edim = egrid.size state = dic['state'] known_spec = dic['known_spec'] from ..data import SpeciesManager spec_man = SpeciesManager(known_spec, edim) return cls(state, egrid, spec_man) @property def known_species(self): return self.spec_man.known_species def __add__(self, other): cumstate = self.state + other.state if not np.array_equal(self.egrid, other.egrid): raise Exception( 'Cannot add Propagation Results, they are defined in different energy grids!' ) if not np.array_equal(self.known_species, other.known_species): raise Exception( 'Cannot add Propagation Results, have different species managers!' ) else: return UHECRPropagationResult(cumstate, self.egrid, self.spec_man) def __mul__(self, number): if not np.isscalar(number): raise Exception( 'Can only multiply result by scalar number, got type {:} instead!'. format(type(number))) else: newstate = self.state * number return UHECRPropagationResult(newstate, self.egrid, self.spec_man)
[docs] def get_solution(self, nco_id): """Returns the spectrum in energy per nucleon""" spec = self.spec_man.ncoid2sref[nco_id] return self.egrid, self.state[spec.lidx():spec.uidx()]
[docs] def get_solution_scale(self, nco_id, epow=0): """Returns the spectrum scaled back to total energy""" spec = self.spec_man.ncoid2sref[nco_id] egrid = spec.A * self.egrid return egrid, egrid**epow * self.state[spec.lidx():spec.uidx()] / spec.A
def _check_id_grid(self, nco_ids, egrid): # Take egrid from first id ( doesn't cover the range for iron for example) # create a common egrid or used supplied one if egrid is None: max_mass = max([s.A for s in self.spec_man.species_refs]) emin_log, emax_log, nbins = list(config.cosmic_ray_grid) emax_log = np.log10(max_mass * 10**emax_log) nbins *= 4 com_egrid = EnergyGrid(emin_log, emax_log, nbins).grid else: com_egrid = egrid if isinstance(nco_ids, list): pass elif nco_ids == 'CR': nco_ids = [s for s in self.known_species if s >= 100] elif nco_ids == 'nu': nco_ids = [ s for s in self.known_species if s in [11, 12, 13, 14, 15, 16] ] elif nco_ids == 'all': nco_ids = self.known_species elif isinstance(nco_ids, tuple): select, vmin, vmax = nco_ids nco_ids = [ s for s in self.known_species if vmin <= select(s) <= vmax ] return nco_ids, com_egrid def _collect_interpolated_spectra(self, nco_ids, epow, egrid=None): """Collect interpolated spectra in a 2D array. Used by get_solution_group and get_lnA""" nco_ids, com_egrid = self._check_id_grid(nco_ids, egrid) # collect all the spectra in 2d array of dimension spectra = np.zeros((len(nco_ids), com_egrid.size)) for idx, pid in enumerate(nco_ids): curr_egrid, curr_spec = self.get_solution_scale(pid, epow) mask = curr_spec > 0. if np.count_nonzero(mask) > 0: res = np.exp( np.interp( np.log(com_egrid), np.log(curr_egrid[mask]), np.log(curr_spec[mask]), left=np.nan, right=np.nan)) else: res = np.zeros_like(com_egrid) spectra[idx] = np.nan_to_num(res) return nco_ids, com_egrid, spectra
[docs] def get_solution_group(self, nco_ids, epow=3, egrid=None): """Return the summed spectrum (in total energy) for all elements in the range""" _, com_egrid, spectra = self._collect_interpolated_spectra(nco_ids, epow, egrid) spectrum = spectra.sum(axis=0) return com_egrid, spectrum
[docs] def get_lnA(self, nco_ids, egrid=None): """Return the average ln(A) as a function of total energy for all elements in the range""" nco_ids, com_egrid, spectra = self._collect_interpolated_spectra(nco_ids, 0, egrid) # get the average and variance by using the spectra as weights lnA = np.array([np.log(self.spec_man.ncoid2sref[el].A) for el in nco_ids]) average = ( lnA[:, np.newaxis] * spectra).sum(axis=0) / spectra.sum(axis=0) variance = (lnA[:, np.newaxis]**2 * spectra).sum(axis=0) / spectra.sum(axis=0) - average**2 return com_egrid, average, variance
def get_energy_density(self, nco_id): from scipy.integrate import trapz A = self.spec_man.ncoid2sref[nco_id].A return trapz(A * self.egrid * self.get_solution(nco_id), self.egrid)
class UHECRPropagationSolver(object): def __init__( self, initial_z, final_z, prince_run, enable_adiabatic_losses=True, enable_pairprod_losses=True, enable_photohad_losses=True, enable_injection_jacobian=True, enable_partial_diff_jacobian=True, z_offset=0., ): self.initial_z = initial_z + z_offset self.final_z = final_z + z_offset self.z_offset = z_offset self.current_z_rates = None self.recomp_z_threshold = config.update_rates_z_threshold self.spec_man = prince_run.spec_man self.egrid = prince_run.cr_grid.grid self.ebins = prince_run.cr_grid.bins # Flags to enable/disable different loss types self.enable_adiabatic_losses = enable_adiabatic_losses self.enable_pairprod_losses = enable_pairprod_losses self.enable_photohad_losses = enable_photohad_losses # Flag for Jacobian injection: # if True injection in jacobion # if False injection only per dz-step self.enable_injection_jacobian = enable_injection_jacobian self.enable_partial_diff_jacobian = enable_partial_diff_jacobian self.had_int_rates = prince_run.int_rates self.adia_loss_rates_grid = prince_run.adia_loss_rates_grid self.pair_loss_rates_grid = prince_run.pair_loss_rates_grid self.adia_loss_rates_bins = prince_run.adia_loss_rates_bins self.pair_loss_rates_bins = prince_run.pair_loss_rates_bins self.intp = None self.state = np.zeros(prince_run.dim_states) self.result = None self.dim_states = prince_run.dim_states self.list_of_sources = [] self.diff_operator = DifferentialOperator(prince_run.cr_grid, prince_run.spec_man.nspec).operator self.semi_lag_solver = SemiLagrangianSolver(prince_run.cr_grid) # Configuration of BLAS backend self.using_cupy = False if config.has_cupy and config.linear_algebra_backend.lower() == 'cupy': self.eqn_derivative = self.eqn_deriv_cupy self.using_cupy = True elif config.has_mkl and config.linear_algebra_backend.lower() == 'mkl': self.eqn_derivative = self.eqn_deriv_mkl else: self.eqn_derivative = self.eqn_deriv_standard # Reset counters self.ncallsf = 0 self.ncallsj = 0 @property def known_species(self): return self.spec_man.known_species @property def res(self): if self.result is None: self.result = UHECRPropagationResult(self.state, self.egrid, self.spec_man) return self.result def add_source_class(self, source_instance): self.list_of_sources.append(source_instance) def dldz(self, z): return -1. / ((1. + z) * H(z) * PRINCE_UNITS.cm2sec) def injection(self, dz, z): """This needs to return the injection rate at each redshift value z""" f = self.dldz(z) * dz * PRINCE_UNITS.cm2sec if len(self.list_of_sources) > 1: return f * np.sum( [s.injection_rate(z) for s in self.list_of_sources], axis=0) else: return f * self.list_of_sources[0].injection_rate(z) def _update_jacobian(self, z): info(5, 'Updating jacobian matrix at redshift', z) # enable photohadronic losses, or use a zero matrix if self.enable_photohad_losses: self.jacobian = self.had_int_rates.get_hadr_jacobian( z, self.dldz(z), force_update=True) else: self.jacobian = self.had_int_rates.get_hadr_jacobian( z, self.dldz(z), force_update=True) self.jacobian.data *= 0. self.last_hadr_jac = None def semi_lagrangian(self, delta_z, z, state): z = z - self.z_offset # if no continuous losses are enables, just return the state. if not self.enable_adiabatic_losses and not self.enable_pairprod_losses: return state conloss = np.zeros_like(self.adia_loss_rates_bins.energy_vector) if self.enable_adiabatic_losses: conloss += self.adia_loss_rates_bins.loss_vector(z) if self.enable_pairprod_losses: conloss += self.pair_loss_rates_bins.loss_vector(z) conloss *= self.dldz(z) * delta_z method = config.semi_lagr_method # ------------------------------------------------------------------- # numpy interpolator # ------------------------------------------------------------------- if method == 'intp_numpy': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[lidx:uidx] = self.semi_lag_solver.interpolate( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # local gradient interpolation # ------------------------------------------------------------------- elif method == 'gradient': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[lidx:uidx] = self.semi_lag_solver.interpolate_gradient( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # linear lagrange weigts # ------------------------------------------------------------------- elif method == 'linear': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[lidx: uidx] = self.semi_lag_solver.interpolate_linear_weights( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # quadratic lagrange weigts # ------------------------------------------------------------------- elif method == 'quadratic': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[ lidx: uidx] = self.semi_lag_solver.interpolate_quadratic_weights( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # cubic lagrange weigts # ------------------------------------------------------------------- elif method == 'cubic': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[lidx: uidx] = self.semi_lag_solver.interpolate_cubic_weights( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # 4th order lagrange weigts # ------------------------------------------------------------------- elif method == '4th_order': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[ lidx: uidx] = self.semi_lag_solver.interpolate_4thorder_weights( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # 5th order lagrange weigts # ------------------------------------------------------------------- elif method == '5th_order': for spec in self.spec_man.species_refs: lbin, ubin = spec.lbin(), spec.ubin() lidx, uidx = spec.lidx(), spec.uidx() state[ lidx: uidx] = self.semi_lag_solver.interpolate_5thorder_weights( conloss[lbin:ubin], state[lidx:uidx]) # ------------------------------------------------------------------- # finite diff euler steps # ------------------------------------------------------------------- elif method == 'finite_diff': conloss = np.zeros_like(self.adia_loss_rates_grid.energy_vector) if self.enable_adiabatic_losses: conloss += self.adia_loss_rates_grid.loss_vector(z) if self.enable_pairprod_losses: conloss += self.pair_loss_rates_grid.loss_vector(z) state[:] = state + self.dldz( z) * delta_z * self.diff_operator.dot(conloss * state) # ------------------------------------------------------------------- # if method was not in list before, raise an Expection # ------------------------------------------------------------------- else: raise Exception( 'Unknown semi-lagrangian method ({:})'.format(method)) return state def eqn_deriv_standard(self, z, state, *args): z = z - self.z_offset self.ncallsf += 1 # Update rates/cross sections only if solver requests to do so if abs(z - self.current_z_rates) > self.recomp_z_threshold: self._update_jacobian(z) self.current_z_rates = z r = self.jacobian.dot(state) if self.enable_injection_jacobian: r += self.injection(1., z)[:, np.newaxis] if self.enable_partial_diff_jacobian: conloss = np.zeros_like(self.adia_loss_rates_grid.energy_vector) if self.enable_adiabatic_losses: conloss += self.adia_loss_rates_grid.loss_vector(z) if self.enable_pairprod_losses: conloss += self.pair_loss_rates_grid.loss_vector(z) r += self.dldz(z) * self.diff_operator.dot( conloss[:, np.newaxis] * state) return r def eqn_deriv_cupy(self, z, state, *args): import cupy z = z - self.z_offset self.ncallsf += 1 # Update rates/cross sections only if solver requests to do so if abs(z - self.current_z_rates) > self.recomp_z_threshold: self._update_jacobian(z) self.current_z_rates = z state = cupy.array(state) r = self.jacobian.dot(state) if self.enable_injection_jacobian: r += cupy.array(self.injection(1., z))[:, np.newaxis] if self.enable_partial_diff_jacobian: conloss = np.zeros_like(self.adia_loss_rates_grid.energy_vector) if self.enable_adiabatic_losses: conloss += self.adia_loss_rates_grid.loss_vector(z) if self.enable_pairprod_losses: conloss += self.pair_loss_rates_grid.loss_vector(z) r += self.dldz(z) * self.diff_operator.dot( cupy.array(conloss)[:, np.newaxis] * state) return cupy.asnumpy(r) def eqn_deriv_mkl(self, z, state, *args): from prince_cr.solvers.mkl_interface import csrmm, csrmv if state.shape[1] > 1: matrix_input = True else: matrix_input = False # Here starts the eqn_deriv content z = z - self.z_offset self.ncallsf += 1 # Update rates/cross sections only if solver requests to do so if abs(z - self.current_z_rates) > self.recomp_z_threshold: self._update_jacobian(z) self.current_z_rates = z res = np.zeros(state.shape) if matrix_input: state = np.ascontiguousarray(state) res = csrmm(1., self.jacobian, state, 0., res) else: res = csrmv(1., self.jacobian, state, 0., res) if self.enable_injection_jacobian: res += self.injection(1., z)[:, np.newaxis] if self.enable_partial_diff_jacobian: conloss = np.zeros_like(self.adia_loss_rates_grid.energy_vector) if self.enable_adiabatic_losses: conloss += self.adia_loss_rates_grid.loss_vector(z) if self.enable_pairprod_losses: conloss += self.pair_loss_rates_grid.loss_vector(z) if matrix_input: res = csrmm(self.dldz(z), self.diff_operator, conloss[:, np.newaxis] * state, 1., res) else: res = csrmv(self.dldz(z), self.diff_operator, conloss[:, np.newaxis] * state, 1., res) return res class UHECRPropagationSolverBDF(UHECRPropagationSolver): def __init__(self, *args, **kwargs): self.atol = kwargs.pop('atol', 1e40) self.rtol = kwargs.pop('rtol', 1e-10) super(UHECRPropagationSolverBDF, self).__init__(*args, **kwargs) def _init_solver(self, dz): initial_state = np.zeros(self.dim_states) self._update_jacobian(self.initial_z) self.current_z_rates = self.initial_z # find the maximum injection and reduce the system by this self.red_idx = np.nonzero(self.injection(1., 0.))[0].max() # Convert csr_matrix from GPU to scipy try: sparsity = self.had_int_rates.get_hadr_jacobian( self.initial_z, 1.).get() except AttributeError: sparsity = self.had_int_rates.get_hadr_jacobian( self.initial_z, 1.) from prince_cr.util import PrinceBDF self.r = PrinceBDF( self.eqn_derivative, self.initial_z, initial_state, self.final_z, max_step=np.abs(dz), atol=self.atol, rtol=self.rtol, # jac = self.eqn_jac, jac_sparsity=sparsity, vectorized=True) def solve(self, dz=1e-3, verbose=True, summary=False, extended_output=False, full_reset=False, progressbar=False): from time import time from prince_cr.util import PrinceProgressBar reset_counter = 0 stepcount = 0 dz = -1 * dz start_time = time() info(2, 'Setting up Solver') self._init_solver(dz) info(2, 'Solver initialized in {0} s'.format(time() - start_time)) info(2, 'Starting integration.') with PrinceProgressBar( bar_type=progressbar, nsteps=-(self.initial_z - self.final_z) / dz) as pbar: while self.r.status == 'running': # print '------ at', self.r.t, '------' if verbose: info(3, "Integrating at z = {0}".format(self.r.t)) # -------------------------------------------- # Evaluate a step # -------------------------------------------- self.r.step() # -------------------------------------------- # Some last checks and resets # -------------------------------------------- if verbose: print('last step:', self.r.step_size) print('h_abs:', self.r.h_abs) print('LU decomp:', self.r.nlu) print('current order:', self.r.dense_output().order) print('---' * 20) stepcount += 1 reset_counter += 1 pbar.update() if self.r.status == 'failed': raise Exception( 'Integrator failed at t = {:}, try adjusting the tolerances'. format(self.r.t)) if verbose: print('Integrator finished with t = {:}, last step was dt = {:}'.format( self.r.t, self.r.step_size)) # after each run we delete the solver to save memory self.state = self.r.y.copy() if summary or verbose: print('Summary:') print('---------------------') print('status: ', self.r.status) print('time: ', self.r.t) print('last step:', self.r.step_size) print('RHS eval: ', self.r.nfev) print('Jac eval: ', self.r.njev) print('LU decomp:', self.r.nlu) print('---------------------') del self.r end_time = time() info(2, 'Integration completed in {0} s'.format(end_time - start_time)) class UHECRPropagationSolverEULER(UHECRPropagationSolver): def __init__(self, *args, **kwargs): super(UHECRPropagationSolverEULER, self).__init__(*args, **kwargs) def solve(self, dz=1e-3, verbose=True, extended_output=False, initial_inj=False, disable_inj=False, progressbar=False): from prince_cr.util import PrinceProgressBar from time import time self._update_jacobian(self.initial_z) self.current_z_rates = self.initial_z dz = -1 * dz start_time = time() curr_z = self.initial_z if initial_inj: initial_state = np.atleast_2d(self.injection(dz, self.initial_z)) print(initial_state) else: initial_state = np.zeros((self.dim_states,1)) state = initial_state info(2, 'Starting integration.') with PrinceProgressBar( bar_type=progressbar, nsteps=-(self.initial_z - self.final_z) / dz) as pbar: while curr_z + dz >= self.final_z: if verbose: info(3, "Integrating at z={0}".format(curr_z)) # -------------------------------------------- # Solve for hadronic interactions # -------------------------------------------- if verbose: print('Solving hadr losses at t =', curr_z) self._update_jacobian(curr_z) state += self.eqn_derivative(curr_z, state) * dz # -------------------------------------------- # Apply the injection # -------------------------------------------- if not disable_inj: if not self.enable_injection_jacobian and not self.enable_partial_diff_jacobian: if verbose: print('applying injection at t =', curr_z) state += self.injection(dz, curr_z) # -------------------------------------------- # Apply the semi lagrangian # -------------------------------------------- if not self.enable_partial_diff_jacobian: if self.enable_adiabatic_losses or self.enable_pairprod_losses: if verbose: print('applying semi lagrangian at t =', curr_z) state = self.semi_lagrangian(dz, curr_z, state) # -------------------------------------------- # Some last checks and resets # -------------------------------------------- if curr_z < -1 * dz: print('break at z =', curr_z) break curr_z += dz pbar.update() end_time = time() info(2, 'Integration completed in {0} s'.format(end_time - start_time)) self.state = state