Coverage for calorine / nep / io.py: 100%
258 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-26 13:54 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-03-26 13:54 +0000
1from os.path import exists
2from os.path import join as join_path
3from typing import Any, Iterable, NamedTuple, TextIO
4from warnings import warn
6import numpy as np
7from ase import Atoms
8from ase.io import read, write
9from ase.stress import voigt_6_to_full_3x3_stress
10from pandas import DataFrame
13def read_loss(filename: str) -> DataFrame:
14 """Parses a file in `loss.out` format from GPUMD and returns the
15 content as a data frame. More information concerning file format,
16 content and units can be found `here
17 <https://gpumd.org/nep/output_files/loss_out.html>`__.
19 Parameters
20 ----------
21 filename
22 input file name
24 """
25 data = np.loadtxt(filename)
26 if isinstance(data[0], np.float64):
27 # If only a single row in loss.out, append a dimension
28 data = data.reshape(1, -1)
29 if len(data[0]) == 6:
30 tags = 'total_loss L1 L2'
31 tags += ' RMSE_P_train'
32 tags += ' RMSE_P_test'
33 elif len(data[0]) == 10:
34 tags = 'total_loss L1 L2'
35 tags += ' RMSE_E_train RMSE_F_train RMSE_V_train'
36 tags += ' RMSE_E_test RMSE_F_test RMSE_V_test'
37 elif len(data[0]) == 14:
38 tags = 'total_loss L1 L2'
39 tags += ' RMSE_E_train RMSE_F_train RMSE_V_train RMSE_Q_train RMSE_Z_train'
40 tags += ' RMSE_E_test RMSE_F_test RMSE_V_test RMSE_Q_test RMSE_Z_test'
41 else:
42 raise ValueError(
43 f'Input file contains {len(data[0])} data columns. Expected 6 or 10 columns.'
44 )
45 generations = range(100, len(data) * 100 + 1, 100)
46 df = DataFrame(data=data[:, 1:], columns=tags.split(), index=generations)
47 return df
50def _write_structure_in_nep_format(structure: Atoms, f: TextIO) -> None:
51 """Write structure block into a file-like object in format readable by nep executable.
53 Parameters
54 ----------
55 structure
56 input structure; must hold information regarding energy and forces
57 f
58 file-like object to which to write
59 """
61 # Allowed keyword=value pairs. Use ASEs extyz write functionality.:
62 # lattice="ax ay az bx by bz cx cy cz" (mandatory)
63 # energy=energy_value (mandatory)
64 # virial="vxx vxy vxz vyx vyy vyz vzx vzy vzz" (optional)
65 # weight=relative_weight (optional)
66 # properties=property_name:data_type:number_of_columns
67 # species:S:1 (mandatory)
68 # pos:R:3 (mandatory)
69 # force:R:3 or forces:R:3 (mandatory)
71 # If a structure is to be used for training, it needs to either have target
72 # 1. energies and forces,
73 # 2. dipole, denoted `dipole="dx dy dz"` in the info string, or
74 # 3. polarizability/susceptibility, denoted `pol="pxx pxy pxz pyx pyy pyz pzx pzy pzz"`
75 # in the info string.
76 has_energies_and_forces = True
77 try:
78 structure.get_potential_energy()
79 structure.get_forces() # calculate forces to have them on the Atoms object
80 except RuntimeError:
81 has_energies_and_forces = False
83 has_dipole = 'dipole' in structure.info.keys()
84 has_pol = 'pol' in structure.info.keys()
86 if not has_energies_and_forces and not has_dipole and not has_pol:
87 raise RuntimeError('Failed to retrieve target energies/forces,'
88 ' dipoles, or polarizabilities for structure')
89 if np.isclose(structure.get_volume(), 0):
90 raise ValueError('Structure cell must have a non-zero volume!')
91 try:
92 structure.get_stress()
93 except RuntimeError:
94 warn('Failed to retrieve stresses for structure')
95 write(filename=f, images=structure, write_info=True, format='extxyz')
98def write_structures(outfile: str, structures: list[Atoms]) -> None:
99 """Writes structures for training/testing in format readable by nep executable.
101 Parameters
102 ----------
103 outfile
104 output filename
105 structures
106 list of structures with energy, forces, and (possibly) stresses
107 """
108 with open(outfile, 'w') as f:
109 for structure in structures:
110 _write_structure_in_nep_format(structure, f)
113def write_nepfile(parameters: NamedTuple, dirname: str) -> None:
114 """Writes parameters file for NEP construction.
116 Parameters
117 ----------
118 parameters
119 input parameters; see `here <https://gpumd.org/nep/input_parameters/index.html>`__
120 dirname
121 directory in which to place input file and links
122 """
123 with open(join_path(dirname, 'nep.in'), 'w') as f:
124 for key, val in parameters.items():
125 f.write(f'{key} ')
126 if isinstance(val, Iterable):
127 f.write(' '.join([f'{v}' for v in val]))
128 else:
129 f.write(f'{val}')
130 f.write('\n')
133def read_nepfile(filename: str) -> dict[str, Any]:
134 """Returns the content of a configuration file (`nep.in`) as a dictionary.
136 Parameters
137 ----------
138 filename
139 input file name
140 """
141 int_vals = ['version', 'neuron', 'generation', 'batch', 'population',
142 'mode', 'model_type', 'charge_mode']
143 float_vals = ['lambda_1', 'lambda_2', 'lambda_e', 'lambda_f', 'lambda_v',
144 'lambda_q', 'lambda_shear', 'force_delta', 'atomic_v', 'zbl']
145 settings = {}
146 with open(filename) as f:
147 for line in f.readlines():
148 # remove comments - throw away everything after a '#'
149 cleaned = line.split('#', 1)[0].strip()
150 flds = cleaned.split()
151 if len(flds) == 0:
152 continue
153 settings[flds[0]] = ' '.join(flds[1:])
154 for key, val in settings.items():
155 if key in int_vals:
156 settings[key] = int(val)
157 elif key in float_vals:
158 settings[key] = float(val)
159 elif key in ['cutoff', 'n_max', 'l_max', 'basis_size', 'type_weight']:
160 settings[key] = [float(v) for v in val.split()]
161 elif key == 'type':
162 types = val.split()
163 types[0] = int(types[0])
164 settings[key] = types
165 return settings
168def read_structures(dirname: str) -> tuple[list[Atoms], list[Atoms]]:
169 """Parses the output files with training and test data from a nep run and returns their
170 content as two lists of structures, representing training and test data, respectively.
171 Target and predicted data are included in the :attr:`info` dict of the :class:`Atoms`
172 objects.
174 Parameters
175 ----------
176 dirname
177 Directory from which to read output files.
179 """
180 path = join_path(dirname)
181 if not exists(path):
182 raise FileNotFoundError(f'Directory {path} does not exist')
184 # fetch model type from nep input file
185 nep_info = read_nepfile(f'{path}/nep.in')
186 model_type = nep_info.get('model_type', 0)
188 # set up which files to parse, what dimensions to expect etc
189 # depending on the type of model that is parsed
190 if model_type == 0:
191 charge_mode = int(nep_info.get('charge_mode', 0))
192 if charge_mode not in [0, 1, 2]:
193 raise ValueError(f'Unknown charge_mode: {charge_mode}')
194 # files to parse: (sname, size, mandatory, includes_target, per_atom)
195 files_to_parse = [
196 ('energy', 1, True, True, False),
197 ('force', 3, True, True, True),
198 ('virial', 6, True, True, False),
199 ('stress', 6, True, True, False),
200 ]
201 if charge_mode in [1, 2]:
202 # files to parse: (sname, size, includes_target, per_atom)
203 files_to_parse += [
204 ('charge', 1, True, False, True),
205 ('bec', 9, False, True, True),
206 ]
207 elif model_type == 1:
208 # files to parse: (sname, size, includes_target, per_atom)
209 files_to_parse = [('dipole', 3, True, True, False)]
210 if nep_info.get('atomic_v', 0) == 1:
211 files_to_parse = [('dipole', 3, True, True, True)]
212 elif model_type == 2:
213 # files to parse: (sname, size, includes_target, per_atom)
214 files_to_parse = [('polarizability', 6, True, True, False)]
215 if nep_info.get('atomic_v', 0) == 1:
216 files_to_parse = [('polarizability', 6, True, True, True)]
217 else:
218 raise ValueError(f'Unknown model_type: {model_type}')
220 # read training and test data
221 structures = {}
222 for stype in ['train', 'test']:
223 filename = join_path(dirname, f'{stype}.xyz')
224 try:
225 structures[stype] = read(filename, format='extxyz', index=':')
226 except FileNotFoundError:
227 warn(f'File {filename} not found.')
228 structures[stype] = []
229 continue
231 n_structures = len(structures[stype])
233 # loop over files from which to read target data and predictions
234 for sname, size, mandatory, includes_target, per_atom in files_to_parse:
235 infile = f'{sname}_{stype}.out'
236 path = join_path(dirname, infile)
237 if not exists(path):
238 if mandatory:
239 raise FileNotFoundError(f'File {path} does not exist')
240 else:
241 continue
242 ts, ps = _read_data_file(path, includes_target=includes_target)
244 if ts is not None:
245 if ts.shape[1] != size:
246 raise ValueError(f'Target data in {infile} has unexpected shape:'
247 f' {ts.shape} (expected: (-1, {size}))')
248 if ps.shape[1] != size:
249 raise ValueError(f'Predicted data in {infile} has unexpected shape:'
250 f' {ps.shape} (expected: (-1, {size}))')
252 if per_atom:
253 # data per-atom, e.g., forces, per-atom-virials, Born effective charges ...
254 n_atoms_total = sum([len(s) for s in structures[stype]])
255 if len(ps) != n_atoms_total:
256 raise ValueError(f'Number of atoms in {infile} ({len(ps)})'
257 f' and {stype}.xyz ({n_atoms_total}) inconsistent.')
258 n = 0
259 for structure in structures[stype]:
260 nat = len(structure)
261 if ts is not None:
262 t = np.array(ts[n: n + nat]).reshape(nat, size)
263 structure.new_array(f'{sname}_target', t)
264 p = np.array(ps[n: n + nat]).reshape(nat, size)
265 structure.new_array(f'{sname}_predicted', p)
266 n += nat
267 else:
268 # data per structure, e.g., energy, virials, stress
269 if len(ps) != n_structures:
270 raise ValueError(f'Number of structures in {infile} ({len(ps)})'
271 f' and {stype}.xyz ({n_structures}) inconsistent.')
272 for k, structure in enumerate(structures[stype]):
273 assert ts is not None, 'This should not occur. Please report.'
274 t = ts[k]
275 assert np.shape(t) == (size,)
276 structure.info[f'{sname}_target'] = t
277 p = ps[k]
278 assert np.shape(p) == (size,)
279 structure.info[f'{sname}_predicted'] = p
281 # special handling of target data for BECs
282 # The target data for BECs need not be complete. In this case nep writes
283 # zeros for every component (not optimal). If we encounter such a case we set
284 # all components to nan instead in order to be able to quickly filter for
285 # this case when analyzing data.
286 for s in structures[stype]:
287 if 'bec_target' in s.info and np.allclose(s.info['bec_target'], 0):
288 nat = len(s)
289 size = 9
290 s.info['bec_target'] = np.array(size * nat * [np.nan]).reshape(nat, size)
292 # special handling of per-atom TNEP
293 # Data has to be loaded as dipole/polarizability since NEP saves them in dipole_*.out
294 # Dipole/polarizability arrays are therefore moved to atomic_v here
295 if nep_info.get('atomic_v', 0) == 1:
296 if model_type == 1:
297 s.new_array('atomic_v_target', s.arrays['dipole_target'])
298 s.new_array('atomic_v_predicted', s.arrays['dipole_predicted'])
299 del s.arrays['dipole_target']
300 del s.arrays['dipole_predicted']
301 if model_type == 2:
302 s.new_array('atomic_v_target', s.arrays['polarizability_target'])
303 s.new_array('atomic_v_predicted', s.arrays['polarizability_predicted'])
304 del s.arrays['polarizability_target']
305 del s.arrays['polarizability_predicted']
307 return structures['train'], structures['test']
310def _read_data_file(
311 path: str,
312 includes_target: bool = True,
313):
314 """Private function that parses *.out files and
315 returns their content for further processing.
316 """
317 with open(path, 'r') as f:
318 lines = f.readlines()
319 target, predicted = [], []
320 for line in lines:
321 flds = line.split()
322 if includes_target:
323 if len(flds) % 2 != 0:
324 raise ValueError(f'Incorrect number of columns in {path} ({len(flds)}).')
325 n = len(flds) // 2
326 predicted.append([float(s) for s in flds[:n]])
327 target.append([float(s) for s in flds[n:]])
328 else:
329 predicted.append([float(s) for s in flds])
330 target = None
331 if target is not None:
332 target = np.array(target)
333 predicted = np.array(predicted)
334 return target, predicted
337def get_parity_data(
338 structures: list[Atoms],
339 property: str,
340 selection: list[str] = None,
341 flatten: bool = True,
342) -> DataFrame:
343 """Returns the predicted and target energies, forces, virials or stresses
344 from a list of structures in a format suitable for generating parity plots.
346 The structures should have been read using :func:`read_structures
347 <calorine.nep.read_structures>`, such that the `info` object is
348 populated with keys of the form `<property>_<type>` where `<property>`
349 is, e.g., `energy` or `force` and `<type>` is one of `predicted` or `target`.
351 The resulting parity data is returned as a tuple of dicts, where each entry
352 corresponds to a list.
354 Parameters
355 ----------
356 structures
357 List of structures as read with :func:`read_structures <calorine.nep.read_structures>`.
358 property
359 One of `energy`, `force`, `virial`, `stress`, `bec`, `dipole`,
360 `polarizability`, or `atomic_v`.
361 selection
362 A list containing which components to return, and/or the norm.
363 Possible values are `x`, `y`, `z`, `xx`, `yy`,
364 `zz`, `yz`, `xz`, `xy`, `norm`, `pressure`.
365 flatten
366 if True return flattened lists; this is useful for flattening
367 the components of force or virials into a simple list
368 """
369 voigt_mapping = {
370 'x': 0, 'y': 1, 'z': 2, 'xx': 0, 'yy': 1, 'zz': 2, 'yz': 3, 'xz': 4, 'xy': 5,
371 }
372 global_properties = ['energy', 'virial', 'stress', 'polarizability', 'dipole']
373 per_atom_properties = ['force', 'bec', 'atomic_v']
374 if property not in global_properties + per_atom_properties:
375 raise ValueError(
376 '`property` must be one of the following:'
377 + ', '.join(global_properties + per_atom_properties)
378 )
379 if property in ['energy'] and selection:
380 raise ValueError('Selection cannot be applied to scalars.')
381 if property != 'stress' and selection and 'pressure' in selection:
382 raise ValueError(f'Cannot calculate pressure for `{property}`.')
384 data = {'predicted': [], 'target': []}
385 if property in ['force', 'bec'] and flatten:
386 size = 3 if property == 'force' else 9
387 data['species'] = []
388 for structure in structures:
389 if 'species' in data:
390 data['species'].extend(np.repeat(structure.symbols, size).tolist())
391 for stype in ['predicted', 'target']:
392 property_with_stype = f'{property}_{stype}'
393 if property in global_properties:
394 if property_with_stype not in structure.info.keys():
395 raise KeyError(f'{property_with_stype} not'
396 ' available in info field of structure')
397 extracted_property = np.array(structure.info[property_with_stype])
398 else:
399 if property_with_stype not in structure.arrays:
400 raise KeyError(f'{property_with_stype} not available in arrays of structure')
401 extracted_property = np.array(structure.arrays[property_with_stype])
403 if selection is None or len(selection) == 0:
404 data[stype].append(extracted_property)
405 continue
407 selected_values = []
408 for select in selection:
409 if property in ['force', 'bec', 'atomic_v']:
410 # flip to get (n_components, n_structures)
411 extracted_property = extracted_property.T
412 if select == 'norm':
413 if property in ['force', 'atomic_v']:
414 selected_values.append(np.linalg.norm(extracted_property, axis=0))
415 elif property in ['virial', 'stress']:
416 full_tensor = voigt_6_to_full_3x3_stress(extracted_property)
417 selected_values.append(np.linalg.norm(full_tensor))
418 elif property in ['dipole']:
419 selected_values.append(np.linalg.norm(extracted_property))
420 else:
421 raise ValueError(
422 f'Cannot handle selection=`norm` with property=`{property}`.')
423 continue
425 if select == 'pressure' and property == 'stress':
426 total_stress = extracted_property
427 selected_values.append(-np.sum(total_stress[:3]) / 3)
428 continue
430 if select not in voigt_mapping:
431 raise ValueError(f'Selection `{select}` is not allowed.')
432 index = voigt_mapping[select]
433 if index >= extracted_property.shape[0]:
434 raise ValueError(
435 f'Selection `{select}` is not compatible with property `{property}`.'
436 )
437 selected_values.append(extracted_property[index])
439 data[stype].append(selected_values)
440 if flatten:
441 for stype in ['target', 'predicted']:
442 value = data[stype]
443 if len(np.shape(value[0])) > 0:
444 data[stype] = np.concatenate(value).ravel().tolist()
445 if property in ['force', 'atomic_v']:
446 n = len(data['target']) // 3
447 data['component'] = ['x', 'y', 'z'] * n
448 elif property in ['virial', 'stress']:
449 n = len(data['target']) // 6
450 data['component'] = ['xx', 'yy', 'zz', 'yz', 'xz', 'xy'] * n
451 elif property in ['bec']:
452 n = len(data['target']) // 9
453 data['component'] = ['xx', 'xy', 'xz', 'yx', 'yy', 'yz', 'zx', 'zy', 'zz'] * n
454 df = DataFrame(data)
455 # In case of flatten, cast to float64 for compatibility
456 # with e.g. seaborn.
457 # Casting in this way breaks tensorial properties though,
458 # so skip it there.
459 if flatten:
460 df['target'] = df.target.astype('float64')
461 df['predicted'] = df.predicted.astype('float64')
462 return df