import inspect
import os
import subprocess
from itertools import cycle
from typing import Optional, List, Dict
import numpy as np
from scm.plams.interfaces.adfsuite.scmjob import SCMJob, SCMResults
from scm.plams.tools.units import Units
from scm.plams.core.functions import log
__all__ = ["CRSResults", "CRSJob"]
[docs]class CRSResults(SCMResults):
"""A |SCMResults| subclass for accessing results of |CRSJob|."""
_kfext = ".crskf"
_rename_map = {"CRSKF": "$JN.crskf"}
@property
def section(self) -> str:
try: # Return the cached value if possible
return self._section # type: ignore
except AttributeError:
try:
self._section = self.job.settings.input.property._h.upper()
except AttributeError:
self._section = self.job.settings.input.t.upper()
return self._section
[docs] def get_energy(self, energy_type: str = "deltag", compound_idx: int = 0, unit: str = "kcal/mol") -> float:
"""Returns the solute solvation energy from an Activity Coefficients calculation."""
E = self.readkf(self.section, energy_type)[compound_idx]
return Units.convert(E, "kcal/mol", unit)
[docs] def get_activity_coefficient(self, compound_idx: int = 0) -> float:
"""Return the solute activity coefficient from an Activity Coefficients calculation."""
return self.readkf(self.section, "gamma")[compound_idx]
[docs] def get_sigma_profile(self, subsection: str = "profil", as_df: bool = False) -> dict:
r"""Grab all sigma profiles, returning a dictionary of Numpy Arrays.
Values of :math:`\sigma` are stored under the ``"σ (e/A**2)"`` key.
Results can be returned as a Pandas DataFrame by settings *as_df* to ``True``.
The returned results can be plotted by passing them to the :meth:`CRSResults.plot` method.
.. note::
*as_df* = ``True`` requires the Pandas_ package.
Plotting requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. _Pandas: https://pandas.pydata.org/
"""
args = (subsection, "σ (e/A**2)", "chdval")
try:
return self._get_array_dict("SIGMAPROFILE", *args, as_df=as_df)
except KeyError:
return self._get_array_dict("PURESIGMAPROFILE", *args, as_df=as_df)
[docs] def get_sigma_potential(self, subsection: str = "mu", unit: str = "kcal/mol", as_df: bool = False) -> dict:
r"""Grab all sigma profiles, expressed in *unit*, and return a dictionary of Numpy Arrays.
Values of :math:`\sigma` are stored under the ``"σ (e/A**2)"`` key.
Results can be returned as a Pandas DataFrame by settings *as_df* to ``True``.
The returned results can be plotted by passing them to the :meth:`CRSResults.plot` method.
.. note::
*as_df* = ``True`` requires the Pandas_ package.
Plotting requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. _Pandas: https://pandas.pydata.org/
"""
args = (subsection, "σ (e/A**2)", "chdval")
try:
return self._get_array_dict("SIGMAPOTENTIAL", *args, unit=unit, as_df=as_df)
except KeyError:
return self._get_array_dict("PURESIGMAPOTENTIAL", *args, unit=unit, as_df=as_df)
[docs] def get_prop_names(self, section=None) -> list:
r"""Read the section of the .crskf file and return a list of the properties that were calculated. The section argument can be supplied to look at previously-calculated results. If no section name is supplied, the function defaults to using the most recent property that was calculated."""
if section is None:
section = self.section
try:
return self._kf.get_skeleton()[section]
except KeyError:
raise KeyError("Cannot find section name: " + str(section))
[docs] def get_results(self, section=None) -> dict:
r"""Read the section from the most recent calculation type and return the result as a dictionary."""
if section is None:
section = self.section
if hasattr(self, "_prop_dict") and self._prop_dict["section"] == section:
return self._prop_dict
props = self.get_prop_names()
try:
props.remove("ncomp")
props.remove("nitems")
except ValueError:
raise ValueError("Results object is missing or incomplete.")
# first get the two ranges for the indices
ncomp = self.readkf(section, "ncomp")
nitems = self.readkf(section, "nitems")
try:
nstruct = self.readkf(section, "nstruct")
except:
nstruct = ncomp
np_dict = {"section": section}
np_dict["ncomp"] = ncomp
chunk_length = 160
for prop in props:
tmp = self.readkf(section, prop)
if prop in ["filename", "name", "SMILES", "mol_filenames"]:
if len(tmp) / ncomp == chunk_length:
np_dict[prop] = [tmp[i : i + chunk_length].strip() for i in range(0, len(tmp), chunk_length)]
continue
else:
np_dict[prop] = tmp.split("\x00")
continue
if prop == "struct names":
if len(tmp) / nstruct == chunk_length:
np_dict[prop] = [tmp[i : i + chunk_length].strip() for i in range(0, len(tmp), chunk_length)]
continue
else:
np_dict[prop] = tmp.split("\x00")
continue
if not isinstance(tmp, list):
np_dict[prop] = tmp
else:
np_dict[prop] = np.array(tmp)
if len(tmp) == ncomp * nitems:
np_dict[prop].shape = (ncomp, nitems)
setattr(self, "_prop_dict", np_dict)
return np_dict
[docs] def get_multispecies_dist(self):
"""
This function returns multispecies distribution for each (compound,structure) pair. The format is a list
with indices corresponding to compound indices. Each item in the list is a dictionary with a structure name : list pair, where the structure name corresponds to a structure the compound can be exist as and the list is the distribution of that compound in that structure over the number of points (mole fractions, temperatures, pressures).
"""
res = self.get_results()
property_name = res["property"].rstrip()
if property_name == "LOGP":
nPhase = 2
else:
nPhase = 1
ncomp = self.readkf(self.section, "ncomp")
struct_names = res["struct names"]
num_points = self.readkf(self.section, "nitems")
valid_structs: List[List[str]] = [[] for _ in range(ncomp)]
comp_dist = res["comp distribution"].flatten()
for i in range(len(struct_names)):
for j in range(ncomp):
if res["valid structs"][i * ncomp + j]:
valid_structs[j].append(struct_names[i])
compositions: List[Dict[str, List[float]]] = [{vs: [] for vs in valid_structs[i]} for i in range(ncomp)]
idx = 0
for i in range(ncomp):
for nfrac in range(num_points):
for k in range(nPhase):
for j in range(len(valid_structs[i])):
compositions[i][valid_structs[i][j]].append(comp_dist[idx])
idx += 1
return compositions
[docs] def get_structure_energy(self, as_df: bool = False):
"""
If the OUTPUT_ENERGY_COMPONENTS is set to True in the input file, this funcion returns:
1. The energy of each structure in multispecies (units in kcal/mol).
2. Information related to association with other compound, if any.
It takes the following optional boolean keyword argument, *as_df*. If *as_df* is True, the result will be
returned as a Pandas DataFrame. Otherwise the result will be returned as a dictionary.
The following abbreviations are used in the dictionary or Pandas DataFrame for the energy:
* s_idx: the index for each unique structure
* CompIdx: the compound index in multispecies
* FormIdx: the form index in multispecies
* SpecIdx: the species index in multispecies
* StrucIdx: the structure index in multispecies
* z: the equilibrium concentration in multispecies
* coskf: the corresponding coskf file for each s_idx
* mu_res: the residual part of the pseudo-chemical potential
* mu_comb: the combinatorial part of the pseudo-chemical potential
* mu_disp: the energy contribution from the dispersive interaction
* mu_pdh: the energy contribution from the Pitzer-Debye-Hückel term
* mu_RTlnz: the energy contribution from the ideal mixing
* mu_Ecosmo: the Ecosmo energy
* Assoc: True if the structure has any association with other compound
* NumRepMonomer: the number of repeated monomers used for polymers
* NumStrucPerComp: the number of structures per compound used for dimers, trimers
The following abbreviations are used in the dictionary or Pandas DataFrame for the information related to association with other compound:
* ReqCompNameAssoc: the required compound name for the associating structure
* ReqCompIdxAssoc: the required compound index (CompIdx) for the associating structure
* NumReqCompAssoc: the number of the required compounds in the associating structure
"""
section = "EnegyComponent"
try:
nspecies = self.readkf(section, "nspecies")
except:
log("The section of EnergyComponent is not found in the crskf file.")
return None, None
ms_index = self.readkf(section, "ms_index")
ms_index = np.array(ms_index)
ms_index = ms_index.reshape(nspecies, 4)
mu_component = self.readkf(section, "mu_component")
mu_component = np.array(mu_component)
mu_component = mu_component.reshape(nspecies, int(len(mu_component) / nspecies))
species_molfrac = self.readkf(section, "species_molfrac")
species_coskf = self.readkf(section, "species_coskf").split("\x00")
species_coskf = [os.path.basename(x.rstrip()) for x in species_coskf]
Assoc = self.readkf(section, "Assoc")
NumRepMonmer = self.readkf(section, "NumRepMonmer")
NumStrucPerComp = self.readkf(section, "NumStrucPerComp")
dict_species = {}
dict_species["s_idx"] = [i + 1 for i in range(nspecies)]
dict_species["CompIdx"] = ms_index[:, 0]
dict_species["FormIdx"] = ms_index[:, 1]
dict_species["SpecIdx"] = ms_index[:, 2]
dict_species["StrucIdx"] = ms_index[:, 3]
dict_species["z"] = species_molfrac
dict_species["coskf"] = species_coskf
dict_species["mu_res"] = mu_component[:, 0]
dict_species["mu_comb"] = mu_component[:, 1]
dict_species["mu_disp"] = mu_component[:, 2]
dict_species["mu_pdh"] = mu_component[:, 3]
dict_species["mu_RTlnz"] = mu_component[:, 4]
dict_species["mu_Ecosmo"] = mu_component[:, 5]
dict_species["Assoc"] = Assoc
dict_species["NumRepMonmer"] = NumRepMonmer
dict_species["NumStrucPerComp"] = NumStrucPerComp
if np.sum(Assoc) > 0:
Assoc_s_idx = self.readkf(section, "Assoc_s_idx")
ReqCompIdxAssoc = self.readkf(section, "ReqCompIdxAssoc")
NumReqCompAssoc = self.readkf(section, "NumReqCompAssoc")
ReqCompNameAssoc = self.readkf(section, "ReqCompNameAssoc").split("\x00")
dict_Asson = {}
dict_Asson["Assoc_s_idx"] = Assoc_s_idx
dict_Asson["ReqCompIdxAssoc"] = ReqCompIdxAssoc
dict_Asson["NumReqCompAssoc"] = NumReqCompAssoc
dict_Asson["ReqCompNameAssoc"] = ReqCompNameAssoc
else:
dict_Asson = None
if as_df:
try:
import pandas as pd
return pd.DataFrame(dict_species), pd.DataFrame(dict_Asson)
except ImportError:
method = inspect.stack()[2][3]
raise ImportError("{}: as_df=True requires the 'pandas' package".format(method))
else:
return dict_species, dict_Asson
[docs] def plot(
self,
*arrays: "np.ndarray",
x_axis: Optional[str] = None,
plot_fig: bool = True,
x_label: Optional[str] = None,
y_label: Optional[str] = None,
):
"""Plot, show and return a series of COSMO-RS results as a matplotlib Figure instance.
Accepts the output of, *e.g.*, :meth:`CRSResults.get_sigma_profile`:
A dictionary of Numpy arrays or a Pandas DataFrame.
Returns a matplotlib Figure_ instance which can be further modified to the users liking.
Automatic plotting of the resulting figure can be disabled with the *plot_fig* argument.
.. note::
This method requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. note::
The name of the dictionary/DataFrame key containing the index (*i.e.* the x-axis) can,
and should, be manually specified in *x_axis* if a custom *x_axis* is passed
to :meth:`CRSResults._get_array_dict`.
This argument can be ignored otherwise.
.. _Figure: https://matplotlib.org/api/_as_gen/matplotlib.figure.Figure.html#matplotlib.figure.Figure
""" # noqa
def get_x_axis(array, x_axis):
"""Find and return the index and its name."""
if x_axis is None:
return np.arange(array.shape[1])
if isinstance(x_axis, str):
ret = self._prop_dict[x_axis]
else:
ret = np.array(x_axis, copy=False)
ret = ret.ravel() # Flatten it
return ret[: array.shape[1]]
# Check if matplotlib is installed
try:
import matplotlib
matplotlib.use("TkAgg") if plot_fig else matplotlib.use("Agg")
import matplotlib.pyplot as plt
except ImportError:
method = self.__class__.__name__ + ".plot"
raise ImportError("{}: this method requires the 'matplotlib' package".format(method))
self.get_results()
# Create a dictionary of 1d arrays
array_dict = {}
for array in arrays:
name: Optional[str] = None
if isinstance(array, str): # Array refers to a section in the kf file
name = array
array = self._prop_dict[array]
# Ensure it's a 2D array
array = np.array(array, ndmin=2, dtype=float, copy=False)
# Fill the array dict with 1d arrays
base_key = "" if name is None else name + " "
iterator = enumerate(array, 1) if array.shape[0] != 1 else zip(cycle(" "), array)
for i, array_1d in iterator:
key = f"{base_key}{i}"
array_dict[key] = array_1d
# Retrieve the index and its name
index = get_x_axis(array, x_axis)
# print ("INDEX::::", index)
if x_label is None:
if isinstance(x_axis, str):
x_label = x_axis
else:
x_label = ""
if y_label is None:
y_label = ""
# Assign various series to the plot
fig, ax = plt.subplots()
for k, v in array_dict.items():
ax.plot(index, v, label=k)
# Add the legend and x-label
ax.legend()
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
# Show and return
if plot_fig:
plt.show()
return fig
[docs] def _get_array_dict(
self,
section: str,
subsection: str,
x_axis: str,
index_subsection: str,
unit: str = "kcal/mol",
as_df: bool = False,
) -> dict:
"""Create dictionary or DataFrame containing all values in *section*/*subsection*.
Takes the following arguments:
* The *section*/*subsection* of the desired quantity.
* The desired name of the index (*x_axis*).
* The name of subsection containing the index (*index_subsection*).
* The *unit* of the output quanty (ignore this keyword if not applicable).
* If the result should be returned as Pandas DataFrame (*as_df*).
"""
ret = self._construct_array_dict(section, subsection, unit)
# Create the index
index = self.readarray(section, index_subsection, dtype=float)
if section in ("BINMIXCOEF", "COMPOSITIONLINE", "TERNARYMIX"):
ncomponent = 3 if section == "TERNARYMIX" else 2
index.shape = ncomponent, len(index) // ncomponent
iterator = np.nditer(index.astype(str), flags=["external_loop"], order="F")
ret[x_axis] = np.array([" / ".join(str(i) for i in item) for item in iterator])
else:
ret[x_axis] = index
# Return a dictionary of arrays or a DataFrame
if not as_df:
return ret
else:
return self._dict_to_df(ret, section, x_axis)
[docs] def _construct_array_dict(self, section: str, subsection: str, unit: str = "kcal/mol") -> dict:
"""Construct dictionary containing all values in *section*/*subsection*."""
# Use filenames as keys
_filenames = self.readkf(section, "filename").split()
filenames = [_filenames] if not isinstance(_filenames, list) else _filenames
# Grab the keys and the number of items per key
keys = [os.path.basename(key) for key in filenames] + ["Total"]
nitems = self.readkf(section, "nitems")
# Use sigma profiles/potentials as values
ratio = Units.conversion_ratio("kcal/mol", unit)
values = ratio * self.readarray(section, subsection, dtype=float)
values.shape = len(values) // nitems, nitems
ret = dict(zip(keys, values))
try:
ret["Total"] = self.readarray(section, subsection + "tot", dtype=float)
except KeyError:
pass
return ret
[docs] @staticmethod
def _dict_to_df(array_dict: dict, section: str, x_axis: str):
"""Attempt to convert a dictionary into a DataFrame."""
try:
import pandas as pd
except ImportError:
method = inspect.stack()[2][3]
raise ImportError("{}: as_df=True requires the 'pandas' package".format(method))
index = pd.Index(array_dict.pop(x_axis), name=x_axis)
df = pd.DataFrame(array_dict, index=index)
df.columns.name = section.lower()
return df
[docs]class CRSJob(SCMJob):
"""A |SCMJob| subclass intended for running COSMO-RS jobs."""
_command = "crs"
_result_type = CRSResults
_subblock_end = "end"
[docs] def __init__(self, **kwargs) -> None:
"""Initialize a :class:`CRSJob` instance."""
super().__init__(**kwargs)
self.settings.ignore_molecule = True
@staticmethod
def database() -> str:
database_path = os.path.join(os.environ["SCM_PKG_ADFCRSDIR"], "ADFCRS-2018")
if not os.path.isdir(database_path):
raise FileNotFoundError("The ADFCRS-2018 database does not seem to be installed")
return database_path
@staticmethod
def coskf_from_database(name: str) -> str:
if not name.endswith(".coskf"):
name += ".coskf"
return os.path.join(CRSJob.database(), name)
[docs] @staticmethod
def cos_to_coskf(filename: str) -> str:
"""Convert a .cos file into a .coskf file with the :code:`$AMSBIN/cosmo2kf` command.
Returns the filename of the new .coskf file.
"""
filename_out = filename + "kf"
try:
amsbin = os.environ["AMSBIN"]
except KeyError:
raise EnvironmentError(
"cos_to_coskf: Failed to load 'cosmo2kf' from '$AMSBIN/'; "
"the 'AMSBIN' environment variable has not been set"
)
args = [os.path.join(amsbin, "cosmo2kf"), filename, filename_out]
subprocess.run(args)
return filename_out