Source code for bcSim

#!/usr/bin/env python

import numpy as np
from BurstCube.utils import getFilenameFromDetails
from BurstCube.simGenerator import configurator


[docs]class simFiles: def __init__(self, config_file): """Object for a multiple simulations over energy and angle.""" self.conf = configurator(config_file) self.sims = self.loadFiles()
[docs] def loadFiles(self): """ Parameters ---------- self : null Returns ---------- sfs : array numpy array containing information about each sim file. """ basename = self.conf.config['run']['basename'] sfs = [] for ze, az, energy in [(ze, az, energy) for ze in self.conf.zebins for az in self.conf.azbins for energy in self.conf.ebins]: fname = getFilenameFromDetails({'base': basename, 'keV': energy, 'ze': ze, 'az': az}) sf = simFile(self.conf.config['run']['simdir'] + '/'+fname+'.inc1.id1.sim', self.conf.config['run']['srcdir'] + '/'+fname+'.source', self.conf.config['run']['simdir'] + '/'+fname+'.stdout.gz') sfs.append(sf) return sfs
[docs] def applyEres(self): """Applies the energy resolution to the deposited energy in each sim file and returns a flattened array for plotting. Returns ---------- null : numpy array numpy array that contains all of the events with eres applied. """ return np.array([sf.ED_res for sf in self.sims]).flatten()
[docs] def calculateAeff(self, useEres=False, sigma=2.0): """Calculates effective area from the information contained within the .sim files. Parameters ---------- self : null useEres : boolean Switch to use the energy resolution in the calculation. sigma : float Width of energy cut in sigma. Default is 2.0. Returns ---------- aeffs : array Numpy array containing effective area of detector. """ aeffs = np.zeros(len(self.sims), dtype={'names': ['az', 'ze', 'keV', 'aeff', 'aeff_eres', 'aeff_eres_modfrac'], 'formats': ['float32', 'float32', 'float32', 'float32', 'float32', 'float32']}) if useEres: e = self.conf.config['detector']['resolution']['energy'] w = self.conf.config['detector']['resolution']['width'] e_interp = np.array([sf.energy for sf in self.sims]) res_interp = np.interp(e_interp, e, w) else: res_interp = np.zeros(len(self.sims)) for i, sf in enumerate(self.sims): frac, mod_frac = sf.passEres() aeff = sf.calculateAeff(useEres, res_interp[i]*sigma) aeffs[i] = (sf.azimuth, sf.zenith, sf.energy, aeff, aeff*frac, aeff*mod_frac) return aeffs
[docs] def getAllTriggerProbability(self, num_detectors=4, test=False): """Returns the probabability of hitting each detector in each simulation. Parameters ---------- num_detectors : int Number of detectors in the simulation test : boolean run a quick test over a limited number of files (20) Returns ---------- det_prob : numpy array Contains information from all the files about the energy, angles and probability of hitting a given detector """ names = ['energy', 'az', 'ze', 'stat_err', 'prob_det_vol'] formats = ['float32', 'float32', 'float32', 'float32', 'float32'] for i in range(num_detectors-1): names = np.append(names, 'prob_det_vol%i' % (i+1)) formats = np.append(formats, 'float32') print(names) print(formats) det_prob = np.empty(len(self.sims), dtype={'names': names, 'formats': formats}) if test: dotest = 1 else: dotest = len(self.sims) print("Analyzing", len(self.sims), "files") for j in range(dotest): holder = self.sims[j].getTriggerProbability(num_det=num_detectors, test=test) det_prob[j] = np.array([tuple(holder)], dtype=det_prob.dtype) return det_prob
[docs]class simFile: def __init__(self, simFile, sourceFile, logFile=''): """Object for a single megalib simulation. The main attributes are dictionaries associated with the simulation file, the source file, and the geometry file (`simDict`, `srcDict`, `geoDict`, and `logDict`.). Parameters ---------- simFile : string simulation file output from Cosima sourceFile: sting config file used as input to Cosima logFile: string stdout from Cosima. Optional. Returns ---------- simFile : simFile Object """ self.simFile = simFile self.srcFile = sourceFile self.logFile = logFile print("Loading " + self.simFile) self.simDict = self.fileToDict(simFile, '#', None) self.srcDict = self.fileToDict(sourceFile, '#', None) self.geoDict = self.fileToDict(self.srcDict['Geometry'][0], '//', None) self.eres = self.getEresFromFile(self.geoDict['Include'][1]) if logFile: self.logDict = self.logToDict(self.logFile) else: print("Log file not provided. Not loading.") @property def energy(self): return float(self.srcDict['One.Spectrum'][0][1]) @property def zenith(self): return float(self.srcDict['One.Beam'][0][1]) @property def azimuth(self): return float(self.srcDict['One.Beam'][0][2]) @property def ED_res(self): try: self._ED_res except AttributeError: print('Applying Eres on {}'.format(self.simFile)) self._ED_res = self._applyEres(self.eres['energy'], self.eres['sigma']) return(self._ED_res)
[docs] def fileToDict(self, filename, commentString='#', termString=None): from os import path '''Turn a MEGAlib file into a a python dictionary''' megaDict = {} filename = path.expandvars(filename) with open(filename) as f: for line in f: if commentString not in line: if ';' in line: contents = line.split(';') else: contents = line.split() if len(contents) > 1: if contents[0] in megaDict: if ';' in line: megaDict[contents[0]].append( contents[1:]) else: megaDict[contents[0]].append( contents[1]) else: if len(contents[1:]) > 1: megaDict[contents[0]] = [contents[1:]] else: megaDict[contents[0]] = contents[1:] if termString is not None: if termString in line: return megaDict return megaDict
[docs] def getEresFromFile(self, filename, commentString='//'): from os import path eres_lines = [] filename = path.expandvars(filename) with open(filename) as f: for line in f: if commentString not in line: if 'EnergyResolution' in line: eres_lines.append(line) eres = np.zeros(len(eres_lines), dtype={'names': ['energy', 'sigma'], 'formats': ['float32', 'float32']}) for i, res in enumerate(eres_lines): res_split = res.split() eres[i] = (res_split[-2], res_split[-1]) return eres
[docs] def logToDict(self, filename): """Reads a gzipped log file and populates a dictionary with information about the events. Returns this dictionary. Parameters ---------- filename: string or byte like object Name of log file to read. Returns --------- eventDict : dictionary Dictionary with the key being the trigger number (int). Each value is a tuple of the event number (int) and the detector (str) that was hit. """ import gzip import re from os import path filename = path.expandvars(filename) # Try to read the gzip and fallback to normal if it doesn't exist. try: f = gzip.open(filename, 'rb') except FileNotFoundError: filename = filename[:-3] f = open(filename, 'rb') file_content = f.read() f.close() eventDict = {} lines = file_content.splitlines() for i, line in enumerate(lines): if 'Storing event' in str(line): evtnums = [int(s) for s in str(line).split() if s.isdigit()] detname = re.search('\"(.*)\"', str(lines[i-1])) eventDict[evtnums[0]] = (evtnums[1], detname.group(1)) return eventDict
[docs] def getHits(self, detID=4): """Get the hit details of all of the events in the sim file. Ignores the a,b,and c details. Parameters ---------- detID : int Detector ID (usually 4) Returns ---------- hits : numpy structured array Five column Structured array. Columns are all floats and are `x_pos`, `y_pos`, `z_pos`, `E`, and `tobs`. """ IDstr = 'HTsim {}'.format(detID) dt = np.dtype([('x_pos', np.float64), ('y_pos', np.float64), ('z_pos', np.float64), ('E', np.float64), ('tobs', np.float64)]) # Get all the rest of the events events = [np.array(evt, dtype=np.float64) for evt in self.simDict[IDstr]] hits = np.zeros((len(events),), dtype=dt) for i, evt in enumerate(events): hits[i] = tuple(e for e in evt[:5]) return hits
[docs] def printDetails(self): """Prints the general information about specific sim files. """ print('Sim File: ' + self.simFile) print('Source File: ' + self.srcFile) print('Geometry File: ' + self.srcDict['Geometry'][0][0]) print('Surrounding Sphere: ' + self.geoDict['SurroundingSphere'][0][0]) print('Triggers: ' + self.srcDict['FFPS.NTriggers'][0]) print('Generated Particles: ' + self.simDict['TS'][0]) print('Azimuth: ' + str(self.azimuth)) print('Zenith: ' + str(self.zenith)) print('Energy: ' + str(self.energy))
[docs] def calculateAeff(self, useEres=False, width=0.): """Calculates effective area of sim file. Parameters ---------- useEres : Boolean Switch to either use the energy resolution in the calculation or not. If you don't use this it assumes all of the events are good. width: Float The energy width that is used to reject events in the selection. Returns ---------- Aeff : Float The effective area. """ from math import pi r_sphere = float(self.geoDict['SurroundingSphere'][0][0]) generated_particles = int(self.simDict['TS'][0]) if useEres: triggers = np.where((self.ED_res >= self.energy - width) & (self.ED_res <= self.energy + width)) triggers = len(triggers[0]) else: triggers = int(self.srcDict['FFPS.NTriggers'][0]) return r_sphere**2*pi*triggers/generated_particles
def _applyEres(self, energies, widths): """Takes the energy deposited in the detector (ED) and adds a noise factor on an event by event basis based on the energy resolution of the detector. Does a linear interpolation between the varous energy measurements. Assumes gaussian noise. To-do: * Need to make the interpolation non-linear (functional) * Need to make sure it goes above and below the lowest (highest) value measurements. Parameters ---------- energies : list List of energies in keV at which the energy resolution (width) is calculated. widths: list List of energy widths (resolution) in keV. Returns ---------- ED + noise : numpy array An array of energy values that have been corrected for the energy resolution. """ ED = np.array([float(ed) for ed in self.simDict['ED']]) e_interp = list(set(ED)) res_interp = np.interp(e_interp, energies, widths) indexes = [e_interp.index(ed) for ed in ED] noise = [np.random.normal(0, res_interp[i], 1)[0] for i in indexes] return ED + noise
[docs] def passEres(self, alpha=2.57, escape=30.0): """Calculates the fraction of events that are good (fully absorbed) and those that escape. The default escape photon energy is for CsI (30.0 keV). An alpha of 2.57 is based on 10% energy resolution at 662 keV with 1/sqrt(E) scaling. Sigma is calculated as the FWHM or eres diveded by 2.35. Returns ---------- frac : float mod_frac : float """ ed = np.array(self.simDict['ED']).astype(np.float) ec = np.array(self.simDict['EC']).astype(np.float) ns = np.array(self.simDict['NS']).astype(np.float) tot = ed + ec + ns ediff = tot - ed ediff2 = ediff - escape sigma = np.where(ed != 0, ed*alpha/np.sqrt(ed)/2.35, 0) good = np.sum(np.fabs(ediff) < sigma) mod = np.sum(np.fabs(ediff2) < sigma) frac = float(good)/float(len(ed)) mod_frac = (float(mod)+float(good))/float(len(ed)) return frac, mod_frac
[docs] def getTriggerProbability(self, num_det=4, test=False): """Takes a single simFile (from bcSim.simFiles(config.yaml)) and returns the probability of hitting in each detector Parameters ---------- num_det : integer Number of detectors to get triggers for. test : boolean Run a quick test over a limited number of events (20) Returns ---------- prob_det_info : numpy array Array is (energy, az, ze, error, prob1, prob2, ...) """ from math import sqrt det_vol = np.zeros(num_det) hits = self.getHits() if test: dotest = 20 else: dotest = len(hits) print("analyzing", len(hits), "events") # stat_err = len(hits) for key, value in self.logDict.items(): for i in range(num_det): if str(i) in value[1]: det_vol[i] += 1 elif i == 0: if '_' not in value[1]: det_vol[0] += 1 prob_det_info = [self.energy, self.azimuth, self.zenith] prob_det_info = np.append(prob_det_info, sqrt(len(hits))/float(len(hits))) for i in range(num_det): prob_det_info = np.append(prob_det_info, det_vol[i]/float(len(hits))) return prob_det_info