from collections import OrderedDict, defaultdict
from ...core.functions import add_to_instance
from ...core.basejob import MultiJob
from ...core.results import Results
from ...core.settings import Settings
from ...mol.molecule import Molecule
from ...mol.atom import Atom
from ...interfaces.adfsuite.ams import AMSJob
from ...interfaces.adfsuite.amsanalysis import AMSAnalysisJob, AMSAnalysisResults
from ...tools.units import Units
from .amsmdjob import AMSNVEJob
import numpy as np
from typing import List
__all__ = ['AMSRDFJob', 'AMSMSDJob', 'AMSMSDResults', 'AMSVACFJob', 'AMSVACFResults']
class AMSConvenientAnalysisJob(AMSAnalysisJob):
def __init__(self,
previous_job, # needs to be finished
atom_indices=None,
**kwargs):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
All other settings can be set as for AMS
"""
AMSAnalysisJob.__init__(self, **kwargs)
self.previous_job = previous_job
self.atom_indices = atom_indices
def _get_max_dt_frames(self, max_correlation_time_fs):
if max_correlation_time_fs is None:
return None
historylength = self.previous_job.results.readrkf('History', 'nEntries')
max_dt_frames = int(max_correlation_time_fs / self.previous_job.results.get_time_step())
max_dt_frames = min(max_dt_frames, historylength // 2)
return max_dt_frames
def _parent_prerun(self, section):
# use previously run previous_job
assert self.previous_job.status != 'created', "You can only pass in a finished AMSJob"
self.settings.input.TrajectoryInfo.Trajectory.KFFileName = self.previous_job.results.rkfpath()
if self.atom_indices and self._parent_write_atoms:
self.settings.input[section].Atoms.Atom = self.atom_indices
class AMSMSDResults(AMSAnalysisResults):
"""Results class for AMSMSDJob
"""
def get_msd(self):
"""
returns time [fs], msd [ang^2]
"""
msd_xy = self.get_xy()
time = np.array(msd_xy.x[0]) # fs
y = np.array(msd_xy.y) * Units.convert(1.0, 'bohr', 'angstrom')**2
return time, y
def get_linear_fit(self, start_time_fit_fs=None, end_time_fit_fs=None):
"""
Fits the MSD between start_time_fit_fs and end_time_fit_fs
Returns a 3-tuple LinRegress.result, fit_x (fs), fit_y (ang^2)
result.slope is given in ang^2/fs
"""
from scipy.stats import linregress
time, y = self.get_msd()
end_time_fit_fs = end_time_fit_fs or max(time)
start_time_fit_fs = start_time_fit_fs or self.job.start_time_fit_fs
if start_time_fit_fs >= end_time_fit_fs:
start_time_fit_fs = end_time_fit_fs / 2
y = y[time >= start_time_fit_fs]
time = time[time >= start_time_fit_fs]
y = y[time <= end_time_fit_fs]
time = time[time <= end_time_fit_fs]
result = linregress(time, y)
fit_x = time
fit_y = result.slope * fit_x + result.intercept
return result, fit_x, fit_y
def get_diffusion_coefficient(self, start_time_fit_fs=None, end_time_fit_fs=None):
"""
Returns D in m^2/s
"""
result, _, _ = self.get_linear_fit(start_time_fit_fs=start_time_fit_fs, end_time_fit_fs=end_time_fit_fs)
D = result.slope * 1e-20 / (6 * 1e-15) # convert from ang^2/fs to m^2/s, divide by 6 because 3-dimensional (2d)
return D
[docs]class AMSMSDJob(AMSConvenientAnalysisJob):
"""A convenient class wrapping around the trajectory analysis MSD tool
"""
_result_type = AMSMSDResults
_parent_write_atoms = True
[docs] def __init__(self,
previous_job, # needs to be finished
max_correlation_time_fs:float=10000,
start_time_fit_fs:float=2000,
atom_indices:List[int]=None,
**kwargs):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
max_correlation_time_fs: float
Maximum correlation time in femtoseconds
start_time_fit_fs : float
Smallest correlation time for the linear fit
atom_indices : List[int]
If None, use all atoms. Otherwise use the specified atom indices (starting with 1)
kwargs: dict
Other options to AMSAnalysisJob
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.max_correlation_time_fs = max_correlation_time_fs
self.start_time_fit_fs = start_time_fit_fs
[docs] def prerun(self):
"""
Constructs the final settings
"""
self._parent_prerun('MeanSquareDisplacement') # trajectory and atom_indices handled
max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
self.settings.input.Task = 'MeanSquareDisplacement'
self.settings.input.MeanSquareDisplacement.Property = 'DiffusionCoefficient'
self.settings.input.MeanSquareDisplacement.StartTimeSlope = self.start_time_fit_fs
self.settings.input.MeanSquareDisplacement.MaxFrame = max_dt_frames
[docs] def postrun(self):
"""
Creates msd.txt, fit_msd.txt, and D.txt
"""
time, msd = self.results.get_msd()
with open(self.path+'/msd.txt', 'w') as f:
f.write("#Time(fs) MSD(ang^2)")
for x,y in zip(time, msd):
f.write(f'{x} {y}\n')
_, fit_x, fit_y = self.results.get_linear_fit()
with open(self.path+'/fit_msd.txt', 'w') as f:
f.write("#Time(fs), LinearFitToMSD(ang^2)")
for x, y in zip(fit_x, fit_y):
f.write(f'{x} {y}\n')
D = self.results.get_diffusion_coefficient()
with open(self.path+'/D.txt', 'w') as f:
f.write(f'{D}\n')
class AMSVACFResults(AMSAnalysisResults):
"""Results class for AMSVACFJob
"""
def get_vacf(self):
xy = self.get_xy()
time = np.array(xy.x[0]) # fs
y = np.array(xy.y)
return time, y
def get_power_spectrum(self, max_freq=None):
max_freq = max_freq or self.job.max_freq or 5000
xy = self.get_xy('Spectrum')
freq = np.array(xy.x[0])
y = np.array(xy.y)
y = y[freq < max_freq]
freq = freq[freq < max_freq]
return freq, y
[docs]class AMSVACFJob(AMSConvenientAnalysisJob):
"""A class for calculating the velocity autocorrelation function and its power spectrum
"""
_result_type = AMSVACFResults
_parent_write_atoms = True
[docs] def __init__(self,
previous_job, # needs to be finished
max_correlation_time_fs=5000, # fs
max_freq=5000, # cm^-1
atom_indices=None,
**kwargs):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
max_correlation_time_fs: float
Maximum correlation time in femtoseconds
max_freq: float
The maximum frequency for the power spectrum in cm^-1
atom_indices: List[int]
Atom indices (starting with 1). If None, use all atoms.
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.max_correlation_time_fs = max_correlation_time_fs
self.max_freq = max_freq
[docs] def prerun(self):
"""
Creates final settings
"""
self._parent_prerun('AutoCorrelation') # trajectory and atom_indices handled
max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
self.settings.input.Task = 'AutoCorrelation'
self.settings.input.AutoCorrelation.Property = 'Velocities'
self.settings.input.AutoCorrelation.MaxFrame = max_dt_frames
[docs] def postrun(self):
"""
Creates vacf.txt and power_spectrum.txt
"""
try:
time, vacf = self.results.get_vacf()
with open(self.path+'/vacf.txt', 'w') as f:
f.write("#Time(fs) VACF")
for x,y in zip(time, vacf):
f.write(f'{x} {y}\n')
freq, intens = self.results.get_power_spectrum()
with open(self.path+'/power_spectrum.txt', 'w') as f:
f.write("#Frequency(cm^-1) Intensity(arb.units)")
for x,y in zip(freq, intens):
f.write(f'{x} {y}\n')
except:
pass
class AMSRDFResults(AMSAnalysisResults):
"""Results class for AMSRDFJob
"""
def get_rdf(self):
"""
Returns a 2-tuple r, rdf.
r: numpy array of float (angstrom)
rdf: numpy array of float
"""
xy = self.get_xy()
r = np.array(xy.x[0])
y = np.array(xy.y)
return r, y
[docs]class AMSRDFJob(AMSConvenientAnalysisJob):
_result_type = AMSRDFResults
_parent_write_atoms = False
[docs] def __init__(self,
previous_job,
atom_indices=None,
atom_indices_to=None,
rmin=0.5,
rmax=6,
rstep=0.1,
**kwargs
):
"""
previous_job: AMSJob
AMSJob with finished MD trajectory.
atom_indices: List[int]
Atom indices (starting with 1). If None, calculate RDF *from* all atoms.
atom_indices_to: List[int]
Atom indices (starting with 1). If None, calculate RDF *to* all atoms.
rmin: float
Minimum distance (angstrom)
rmax: float
Maximum distance (angstrom)
rstep: float
Bin width for the histogram (angstrom)
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.atom_indices_to = atom_indices_to
self.rmin = rmin
self.rmax = rmax
self.rstep = rstep
[docs] def prerun(self):
"""
Creates the final settings. Do not call or override this method.
"""
self._parent_prerun('RadialDistribution')
self.settings.input.Task = 'RadialDistribution'
main_mol = self.previous_job.results.get_main_molecule()
if not self.atom_indices:
self.atom_indices = list(range(1, len(main_mol)+1))
if not self.atom_indices_to:
self.atom_indices_to = list(range(1, len(main_mol)+1))
self.settings.input.RadialDistribution.AtomsFrom.Atom = self.atom_indices
self.settings.input.RadialDistribution.AtomsTo.Atom = self.atom_indices_to
self.settings.input.RadialDistribution.Range = f'{self.rmin} {self.rmax} {self.rstep}'
[docs] def postrun(self):
"""
Creates rdf.txt. Do not call or override this method.
"""
try:
r, gr = self.results.get_rdf()
with open(self.path+'/rdf.txt', 'w') as f:
f.write("#r(angstrom) g(r)")
for x,y in zip(r, gr):
f.write(f'{x} {y}\n')
except:
pass
class AMSConvenientAnalysisPerRegionResults(Results):
def _getter(self, analysis_job_type, method, kwargs):
assert self.job.analysis_job_type is analysis_job_type, f"{method}() can only be called for {analysis_job_type}, tried for type {self.job.analysis_job_type}"
ret = {}
for name, job in self.job.children.items():
ret[name] = getattr(job.results, method)(**kwargs)
return ret
def get_diffusion_coefficient(self, **kwargs):
return self._getter(AMSMSDJob, 'get_diffusion_coefficient', kwargs)
def get_msd(self, **kwargs):
return self._getter(AMSMSDJob, 'get_msd', kwargs)
def get_linear_fit(self, **kwargs):
return self._getter(AMSMSDJob, 'get_linear_fit', kwargs)
def get_vacf(self, **kwargs):
return self._getter(AMSVACFJob, 'get_vacf', kwargs)
def get_power_spectrum(self, **kwargs):
return self._getter(AMSVACFJob, 'get_power_spectrum', kwargs)
class AMSConvenientAnalysisPerRegionJob(MultiJob):
_result_type = AMSConvenientAnalysisPerRegionResults
def __init__(self, previous_job, analysis_job_type, name=None, regions=None, per_element=False, **kwargs):
MultiJob.__init__(self, children=OrderedDict(), name=name or 'analysis_per_region')
self.previous_job = previous_job
self.analysis_job_type = analysis_job_type
self.analysis_job_kwargs = kwargs
self.regions_dict = regions
self.per_element = per_element
@staticmethod
def get_regions_dict(molecule, per_element:bool=False):
regions_dict = defaultdict(lambda: [])
for i, at in enumerate(molecule, 1):
regions = set([at.properties.region]) if isinstance(at.properties.region, str) else at.properties.region
if len(regions) == 0:
region_name = 'NoRegion' if not per_element else f'NoRegion_{at.symbol}'
regions_dict[region_name].append(i)
for region in regions:
region_name = region if not per_element else f'{region}_{at.symbol}'
regions_dict[region_name].append(i)
regions_dict['All'].append(i)
if per_element:
regions_dict[f'All_{at.symbol}'].append(i)
return regions_dict
def prerun(self):
regions_dict = self.regions_dict or self.get_regions_dict(self.previous_job.results.get_main_molecule(), per_element=self.per_element)
for region in regions_dict:
self.children[region] = self.analysis_job_type(
previous_job=self.previous_job,
name=region,
atom_indices = regions_dict[region],
**self.analysis_job_kwargs
)
@staticmethod
def get_mean_std_per_region(list_of_jobs, function_name, **kwargs):
"""
list_of_jobs: List[AMSConvenientAnalysisPerRegionJob]
List of jobs over which to average
function_name: str
e.g. 'get_msd', 'get_power_spectrum'
"""
if isinstance(list_of_jobs, dict):
list_of_jobs = [x for x in list_of_jobs.values()]
frequencies = None # same for all jobs
all_x = defaultdict(lambda: [])
all_y = defaultdict(lambda: [])
for vacfjob in list_of_jobs:
# let's calculate mean and std for each region
results_dict = getattr(vacfjob.results, function_name)(**kwargs)
for region_name, ret_value in results_dict.items():
if np.isscalar(ret_value):
all_x[region_name].append(np.atleast_1d(ret_value))
elif len(ret_value) == 2:
all_x[region_name].append(np.atleast_1d(ret_value[0]))
all_y[region_name].append(np.atleast_1d(ret_value[1]))
mean_x = {}
std_x = {}
mean_y = {}
std_y = {}
for region_name, values in all_x.items():
mean_x[region_name] = np.mean(values, axis=0)
std_x[region_name] = np.std(values, axis=0)
if len(all_y) > 0:
mean_y[region_name] = np.mean(all_y[region_name], axis=0)
std_y[region_name] = np.std(all_y[region_name], axis=0)
if len(mean_y) > 0:
return mean_x, std_x, mean_y, std_y
else:
return mean_x, std_x