Source code for calorine.gpumd.io

from warnings import warn
from collections.abc import Iterable
from pathlib import Path
from typing import List, Tuple, Union

import numpy as np
from ase import Atoms
from ase.io import read, write
from pandas import DataFrame


[docs] def read_kappa(filename: str) -> DataFrame: """Parses a file in ``kappa.out`` format from GPUMD and returns the content as a data frame. More information concerning file format, content and units can be found `here <https://gpumd.org/gpumd/output_files/kappa_out.html>`__. Parameters ---------- filename Input file name. """ data = np.loadtxt(filename) if isinstance(data[0], np.float64): # If only a single row in kappa.out, append a dimension data = data.reshape(1, -1) tags = 'kx_in kx_out ky_in ky_out kz_tot'.split() if len(data[0]) != len(tags): raise ValueError( f'Input file contains {len(data[0])} data columns.' f' Expected {len(tags)} columns.' ) df = DataFrame(data=data, columns=tags) df['kx_tot'] = df.kx_in + df.kx_out df['ky_tot'] = df.ky_in + df.ky_out return df
[docs] def read_hac(filename: str, exclude_currents: bool = True, exclude_in_out: bool = True) -> DataFrame: """Parses a file in ``hac.out`` format from GPUMD and returns the content as a data frame. More information concerning file format, content and units can be found `here <https://gpumd.org/gpumd/output_files/hac_out.html>`__. Parameters ---------- filename Input file name. exclude_currents Do not include currents in output to save memory. exclude_in_out Do not include `in` and `out` parts of conductivity in output to save memory. """ data = np.loadtxt(filename) if isinstance(data[0], np.float64): # If only a single row in hac.out, append a dimension data = data.reshape(1, -1) tags = 'time' tags += ' jin_jtot_x jout_jtot_x jin_jtot_y jout_jtot_y jtot_jtot_z' tags += ' kx_in kx_out ky_in ky_out kz_tot' tags = tags.split() if len(data[0]) != len(tags): raise ValueError( f'Input file contains {len(data[0])} data columns.' f' Expected {len(tags)} columns.' ) df = DataFrame(data=data, columns=tags) df['kx_tot'] = df.kx_in + df.kx_out df['ky_tot'] = df.ky_in + df.ky_out df['jjx_tot'] = df.jin_jtot_x + df.jout_jtot_x df['jjy_tot'] = df.jin_jtot_y + df.jout_jtot_y df['jjz_tot'] = df.jtot_jtot_z del df['jtot_jtot_z'] if exclude_in_out: # remove columns with in/out data to save space for col in df: if 'in' in col or 'out' in col: del df[col] if exclude_currents: # remove columns with currents to save space for col in df: if col.startswith('j'): del df[col] return df
[docs] def read_thermo(filename: str, natoms: int = 1) -> DataFrame: """Parses a file in ``thermo.out`` format from GPUMD and returns the content as a data frame. More information concerning file format, content and units can be found `here <https://gpumd.org/gpumd/output_files/thermo_out.html>`__. Parameters ---------- filename Input file name. natoms Number of atoms; used to normalize energies. """ data = np.loadtxt(filename) if isinstance(data[0], np.float64): # If only a single row in loss.out, append a dimension data = data.reshape(1, -1) if len(data[0]) == 9: # orthorhombic box tags = 'temperature kinetic_energy potential_energy' tags += ' stress_xx stress_yy stress_zz' tags += ' cell_xx cell_yy cell_zz' elif len(data[0]) == 12: # orthorhombic box with stresses in Voigt notation (v3.3.1+) tags = 'temperature kinetic_energy potential_energy' tags += ' stress_xx stress_yy stress_zz stress_yz stress_xz stress_xy' tags += ' cell_xx cell_yy cell_zz' elif len(data[0]) == 15: # triclinic box tags = 'temperature kinetic_energy potential_energy' tags += ' stress_xx stress_yy stress_zz' tags += ( ' cell_xx cell_xy cell_xz cell_yx cell_yy cell_yz cell_zx cell_zy cell_zz' ) elif len(data[0]) == 18: # triclinic box with stresses in Voigt notation (v3.3.1+) tags = 'temperature kinetic_energy potential_energy' tags += ' stress_xx stress_yy stress_zz stress_yz stress_xz stress_xy' tags += ( ' cell_xx cell_xy cell_xz cell_yx cell_yy cell_yz cell_zx cell_zy cell_zz' ) else: raise ValueError( f'Input file contains {len(data[0])} data columns.' ' Expected 9, 12, 15 or 18 columns.' ) df = DataFrame(data=data, columns=tags.split()) assert natoms > 0, 'natoms must be positive' df.kinetic_energy /= natoms df.potential_energy /= natoms return df
[docs] def read_xyz(filename: str) -> Atoms: """ Reads the structure input file (``model.xyz``) for GPUMD and returns the structure. This is a wrapper function around :func:`ase.io.read_xyz` since the ASE implementation does not read velocities properly. Parameters ---------- filename Name of file from which to read the structure. Returns ------- Structure as ASE Atoms object with additional per-atom arrays representing atomic masses, velocities etc. """ structure = read(filename, format='extxyz') if structure.has('vel'): structure.set_velocities(structure.get_array('vel')) return structure
[docs] def read_runfile(filename: str) -> List[Tuple[str, list]]: """ Parses a GPUMD input file in ``run.in`` format and returns the content in the form a list of keyword-value pairs. Parameters ---------- filename Input file name. Returns ------- List of keyword-value pairs. """ data = [] with open(filename, 'r') as f: for k, line in enumerate(f.readlines()): flds = line.split() if len(flds) == 0: continue elif len(flds) == 1: raise ValueError(f'Line {k} contains only one field:\n{line}') keyword = flds[0] values = tuple(flds[1:]) if keyword in ['time_step', 'velocity']: values = float(values[0]) elif keyword in ['dump_thermo', 'dump_position', 'dump_restart', 'run']: values = int(values[0]) elif len(values) == 1: values = values[0] data.append((keyword, values)) return data
[docs] def write_runfile( file: Path, parameters: List[Tuple[str, Union[int, float, Tuple[str, float]]]] ): """Write a file in run.in format to define input parameters for MD simulation. Parameters ---------- file Path to file to be written. parameters : dict Defines all key-value pairs used in run.in file (see GPUMD documentation for a complete list). Values can be either floats, integers, or lists/tuples. """ with open(file, 'w') as f: # Write all keywords with parameter(s) for key, val in parameters: f.write(f'{key} ') if isinstance(val, Iterable) and not isinstance(val, str): for v in val: f.write(f'{v} ') else: f.write(f'{val}') f.write('\n')
[docs] def write_xyz(filename: str, structure: Atoms, groupings: List[List[List[int]]] = None): """ Writes a structure into GPUMD input format (`model.xyz`). Parameters ---------- filename Name of file to which the structure should be written. structure Input structure. groupings Groups into which the individual atoms should be divided in the form of a list of list of lists. Specifically, the outer list corresponds to the grouping methods, of which there can be three at the most, which contains a list of groups in the form of lists of site indices. The sum of the lengths of the latter must be the same as the total number of atoms. Raises ------ ValueError Raised if parameters are incompatible. """ # Make a local copy of the atoms object _structure = structure.copy() # Check velocties parameter velocities = _structure.get_velocities() if velocities is None or np.max(np.abs(velocities)) < 1e-6: has_velocity = 0 else: has_velocity = 1 # Check groupings parameter if groupings is None: number_of_grouping_methods = 0 else: number_of_grouping_methods = len(groupings) if number_of_grouping_methods > 3: raise ValueError('There can be no more than 3 grouping methods!') for g, grouping in enumerate(groupings): all_indices = [i for group in grouping for i in group] if len(all_indices) != len(_structure) or set(all_indices) != set( range(len(_structure)) ): raise ValueError( f'The indices listed in grouping method {g} are' ' not compatible with the input structure!' ) # Allowed keyword=value pairs. Use ASEs extyz write functionality. # pbc="pbc_a pbc_b pbc_c" # lattice="ax ay az bx by bz cx cy cz" # properties=property_name:data_type:number_of_columns # species:S:1 # pos:R:3 # mass:R:1 # vel:R:3 # group:I:number_of_grouping_methods if _structure.has('mass'): # If structure already has masses set, use those warn('Structure already has array "mass"; will use existing values.') else: _structure.new_array('mass', _structure.get_masses()) if has_velocity: _structure.new_array('vel', _structure.get_velocities()) if groupings is not None: group_indices = np.array( [ [ [ group_index for group_index, group in enumerate(grouping) if structure_idx in group ] for grouping in groupings ] for structure_idx in range(len(_structure)) ] ).squeeze() # pythoniccc _structure.new_array('group', group_indices) write(filename=filename, images=_structure, write_info=True, format='extxyz')
[docs] def read_mcmd(filename: str, accumulate: bool = True) -> DataFrame: """Parses a Monte Carlo output file in ``mcmd.out`` format and returns the content in the form of a DataFrame. Parameters ---------- filename Path to file to be parsed. accumulate If ``True`` the MD steps between subsequent Monte Carlo runs in the same output file will be accumulated. Returns ------- DataFrame containing acceptance ratios and concentrations (if available), as well as key Monte Carlo parameters. """ with open(filename, 'r') as f: lines = f.readlines() data = [] offset = 0 step = 0 accummulated_step = 0 for line in lines: if line.startswith('# mc'): flds = line.split() mc_type = flds[2] md_steps = int(flds[3]) mc_trials = int(flds[4]) temperature_initial = float(flds[5]) temperature_final = float(flds[6]) if mc_type.endswith('sgc'): ntypes = int(flds[7]) species = [flds[8+2*k] for k in range(ntypes)] phis = {f'phi_{flds[8+2*k]}': float(flds[9+2*k]) for k in range(ntypes)} kappa = float(flds[8+2*ntypes]) if mc_type == 'vcsgc' else np.nan elif line.startswith('# num_MD_steps'): continue else: flds = line.split() previous_step = step step = int(flds[0]) if step <= previous_step and accumulate: offset += previous_step accummulated_step = step + offset record = dict( step=accummulated_step, mc_type=mc_type, md_steps=md_steps, mc_trials=mc_trials, temperature_initial=temperature_initial, temperature_final=temperature_final, acceptance_ratio=float(flds[1]), ) if mc_type.endswith('sgc'): record.update(phis) if mc_type == 'vcsgc': record['kappa'] = kappa concentrations = {f'conc_{s}': float(flds[k]) for k, s in enumerate(species, start=2)} record.update(concentrations) data.append(record) df = DataFrame.from_dict(data) return df
[docs] def read_thermodynamic_data(directory_name: str) -> DataFrame: """Parses the data in a GPUMD output directory and returns the content in the form of a :class:`DataFrame`. This function reads the ``thermo.out`` and ``run.in`` files, and returns the thermodynamic data including the time (in ps), the pressure (in GPa), the side lengths of the simulation cell (in Å), and the volume (in Å:sup:`3`). Parameters ---------- directory_name Path to directory to be parsed. Returns ------- :class:`DataFrame` containing (augmented) thermodynamic data. """ try: params = read_runfile(f'{directory_name}/run.in') except FileNotFoundError: raise FileNotFoundError(f'No `run.in` file found in {directory_name}') blocks = [] time_step = 1.0 # GPUMD default dump_thermo = None for p, v in params: if p == 'time_step': time_step = v elif p == 'dump_thermo': dump_thermo = v elif p == 'run': if dump_thermo is None: raise ValueError( 'Could not extract value of `dump_thermo` keyword from `run.in` file.') # We do not require dump_thermo to exist for subsequent # runs if it has been used for atleast one before. # But if there has been no new dump_thermo, we # should not create a block. if (dump_thermo != 'DEFINEDONCE'): blocks.append(dict( nsteps=v, time_step=time_step, dump_thermo=dump_thermo, )) dump_thermo = 'DEFINEDONCE' try: df = read_thermo(f'{directory_name}/thermo.out') except FileNotFoundError: raise FileNotFoundError(f'No `thermo.out` file found in {directory_name}') expected_rows = sum([int(round(b['nsteps'] / b['dump_thermo'], 0)) for b in blocks if b['dump_thermo'] is not None]) if len(df) != expected_rows: warn(f'Number of rows in `thermo.out` file ({len(df)}) does not match' f' expectation based on `run.in` file ({expected_rows}).') if len(df) > expected_rows: raise ValueError(f'Too many rows in `thermo.out` file ({len(df)}) compared to' f' expectation based on `run.in` file ({expected_rows}).') # Fewer rows than expected should be ok, since the run may have crashed/not completed yet. times = [] offset = 0.0 for b in blocks: ns = int(round(b['nsteps'] / b['dump_thermo'], 0)) block_times = np.array(range(1, 1 + ns)) \ * b['dump_thermo'] * b['time_step'] * 1e-3 # in ps block_times += offset times.extend(block_times) offset = times[-1] df['time'] = times[:len(df)] df['pressure'] = (df.stress_xx + df.stress_yy + df.stress_zz) / 3 if 'cell_xy' in df: df['alat'] = np.sqrt(df.cell_xx ** 2 + df.cell_xy ** 2 + df.cell_xz ** 2) df['blat'] = np.sqrt(df.cell_yx ** 2 + df.cell_yy ** 2 + df.cell_yz ** 2) df['clat'] = np.sqrt(df.cell_zx ** 2 + df.cell_zy ** 2 + df.cell_zz ** 2) volume = (df.cell_xx * df.cell_yy * df.cell_zz + df.cell_xy * df.cell_yz * df.cell_xz + df.cell_xz * df.cell_yx * df.cell_zy - df.cell_xx * df.cell_yz * df.cell_zy - df.cell_xy * df.cell_yx * df.cell_zz - df.cell_xz * df.cell_yy * df.cell_zx) else: df['alat'] = df.cell_xx df['blat'] = df.cell_yy df['clat'] = df.cell_zz volume = (df.cell_xx * df.cell_yy * df.cell_zz) df['volume'] = volume return df