Coverage for calorine/nep/io.py: 100%
225 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-12-10 08:26 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-12-10 08:26 +0000
1from os.path import exists
2from os.path import join as join_path
3from typing import Any, Dict, Iterable, List, NamedTuple, TextIO, Tuple
4from warnings import warn
6import numpy as np
7from ase import Atoms
8from ase.io import read, write
9from ase.stress import voigt_6_to_full_3x3_stress
10from ase.units import GPa
11from pandas import DataFrame
14def read_loss(filename: str) -> DataFrame:
15 """Parses a file in ``loss.out`` format from GPUMD and returns the
16 content as a data frame. More information concerning file format,
17 content and units can be found `here
18 <https://gpumd.org/nep/output_files/loss_out.html>`__.
20 Parameters
21 ----------
22 filename
23 input file name
25 """
26 data = np.loadtxt(filename)
27 if isinstance(data[0], np.float64):
28 # If only a single row in loss.out, append a dimension
29 data = data.reshape(1, -1)
30 if len(data[0]) == 6:
31 tags = 'total_loss L1 L2'
32 tags += ' RMSE_P_train'
33 tags += ' RMSE_P_test'
34 elif len(data[0]) == 10:
35 tags = 'total_loss L1 L2'
36 tags += ' RMSE_E_train RMSE_F_train RMSE_V_train'
37 tags += ' RMSE_E_test RMSE_F_test RMSE_V_test'
38 else:
39 raise ValueError(
40 f'Input file contains {len(data[0])} data columns. Expected 6 or 10 columns.'
41 )
42 generations = range(100, len(data) * 100 + 1, 100)
43 df = DataFrame(data=data[:, 1:], columns=tags.split(), index=generations)
44 return df
47def _write_structure_in_nep_format(structure: Atoms, f: TextIO) -> None:
48 """Write structure block into a file-like object in format readable by nep executable.
50 Parameters
51 ----------
52 structure
53 input structure; must hold information regarding energy and forces
54 f
55 file-like object to which to write
56 """
58 # Allowed keyword=value pairs. Use ASEs extyz write functionality.:
59 # lattice="ax ay az bx by bz cx cy cz" (mandatory)
60 # energy=energy_value (mandatory)
61 # virial="vxx vxy vxz vyx vyy vyz vzx vzy vzz" (optional)
62 # weight=relative_weight (optional)
63 # properties=property_name:data_type:number_of_columns
64 # species:S:1 (mandatory)
65 # pos:R:3 (mandatory)
66 # force:R:3 or forces:R:3 (mandatory)
67 try:
68 structure.get_potential_energy()
69 structure.get_forces() # calculate forces to have them on the Atoms object
70 except RuntimeError:
71 raise RuntimeError('Failed to retrieve energy and/or forces for structure')
72 if np.isclose(structure.get_volume(), 0):
73 raise ValueError('Structure cell must have a non-zero volume!')
74 try:
75 structure.get_stress()
76 except RuntimeError:
77 warn('Failed to retrieve stresses for structure')
78 write(filename=f, images=structure, write_info=True, format='extxyz')
81def write_structures(outfile: str, structures: List[Atoms]) -> None:
82 """Writes structures for training/testing in format readable by nep executable.
84 Parameters
85 ----------
86 outfile
87 output filename
88 structures
89 list of structures with energy, forces, and (possibly) stresses
90 """
91 with open(outfile, 'w') as f:
92 for structure in structures:
93 _write_structure_in_nep_format(structure, f)
96def write_nepfile(parameters: NamedTuple, dirname: str) -> None:
97 """Writes parameters file for NEP construction.
99 Parameters
100 ----------
101 parameters
102 input parameters; see `here <https://gpumd.org/nep/input_parameters/index.html>`__
103 dirname
104 directory in which to place input file and links
105 """
106 with open(join_path(dirname, 'nep.in'), 'w') as f:
107 for key, val in parameters.items():
108 f.write(f'{key} ')
109 if isinstance(val, Iterable):
110 f.write(' '.join([f'{v}' for v in val]))
111 else:
112 f.write(f'{val}')
113 f.write('\n')
116def read_nepfile(filename: str) -> Dict[str, Any]:
117 """Returns the content of a configuration file (`nep.in`) as a dictionary.
119 Parameters
120 ----------
121 filename
122 input file name
123 """
124 settings = {}
125 with open(filename) as f:
126 for line in f.readlines():
127 flds = line.split()
128 if len(flds) == 0:
129 continue
130 if flds[0].startswith('#'):
131 continue
132 settings[flds[0]] = ' '.join(flds[1:])
133 for key, val in settings.items():
134 if key in ['version', 'neuron', 'generation', 'batch', 'population', 'mode', 'model_type']:
135 settings[key] = int(val)
136 elif key in [
137 'lambda_1',
138 'lambda_2',
139 'lambda_e',
140 'lambda_f',
141 'lambda_v',
142 'lambda_shear',
143 'force_delta',
144 ]:
145 settings[key] = float(val)
146 elif key in ['cutoff', 'n_max', 'l_max', 'basis_size', 'zbl', 'type_weight']:
147 settings[key] = [float(v) for v in val.split()]
148 elif key == 'type':
149 types = val.split()
150 types[0] = int(types[0])
151 settings[key] = types
152 return settings
155def read_structures(dirname: str) -> Tuple[List[Atoms], List[Atoms]]:
156 """Parses the ``energy_*.out``, ``force_*.out``, ``virial_*.out``,
157 ``polarizability_*.out`` and ``dipole_*.out`` files from a nep run and
158 returns their content as lists. The first and second list contain the structures
159 from the training and test sets, respectively. Each list entry corresponds to an ASE
160 Atoms object, which in turn contains predicted and target energies, forces and virials/stresses
161 or polarizability/diople stored in the `info` property.
163 Parameters
164 ----------
165 dirname
166 directory from which to read output files
168 """
169 path = join_path(dirname)
170 if not exists(path):
171 raise ValueError(f'Directory {path} does not exist')
172 nep_info = read_nepfile(f'{path}/nep.in')
173 if 'mode' in nep_info or 'model_type' in nep_info:
174 ftype = nep_info.get('mode', nep_info.get('model_type'))
175 if ftype == 2 or ftype == 1:
176 return _read_structures_tensors(dirname, ftype)
177 return _read_structures_potential(dirname)
180def _read_structures_tensors(dirname: str, ftype: int) \
181 -> Tuple[List[Atoms], List[Atoms]]:
182 """Parses the ``polarizability_*.out`` and ``dipole_*.out``
183 files from a nep run and returns their content as lists.
184 The first and second list contain the structures from the training and
185 test sets, respectively. Each list entry corresponds to an ASE
186 Atoms object, which in turn contains predicted and target
187 dipole or polarizability stored in the `info`
188 property.
190 Parameters
191 ----------
192 dirname
193 directory from which to read output files
195 """
196 path = join_path(dirname)
197 structures = {}
199 if ftype == 1:
200 sname = 'dipole'
201 else:
202 sname = 'polarizability'
204 for stype in ['train', 'test']:
205 filename = join_path(dirname, f'{stype}.xyz')
206 try:
207 structures[stype] = read(filename, format='extxyz', index=':')
208 except FileNotFoundError:
209 warn(f'File {filename} not found.')
210 structures[stype] = []
211 continue
213 n_structures = len(structures[stype])
214 ts, ps = _read_data_file(path, f'{sname}_{stype}.out')
215 if ftype == 1:
216 ts = np.array(ts).reshape((-1, 3))
217 ps = np.array(ps).reshape((-1, 3))
218 else:
219 ts = np.array(ts).reshape((-1, 6))
220 ps = np.array(ps).reshape((-1, 6))
221 assert len(ts) == n_structures, \
222 f'Number of structures in {sname}_{stype}.out ({len(ts)})' \
223 f' and {stype}.xyz ({n_structures}) inconsistent'
224 for structure, t, p in zip(structures[stype], ts, ps):
225 if ftype == 1:
226 assert np.shape(t) == (3,)
227 assert np.shape(p) == (3,)
228 else:
229 assert np.shape(t) == (6,)
230 assert np.shape(p) == (6,)
231 structure.info[f'{sname}_target'] = t
232 structure.info[f'{sname}_predicted'] = p
234 return structures['train'], structures['test']
237def _read_structures_potential(dirname: str) -> Tuple[List[Atoms], List[Atoms]]:
238 """Parses the ``energy_*.out``, ``force_*.out``, ``virial_*.out``
239 files from a nep run and returns their content as lists.
240 The first and second list contain the structures from the training and
241 test sets, respectively. Each list entry corresponds to an ASE
242 Atoms object, which in turn contains predicted and target
243 energies, forces and virials/stresses stored in the `info`
244 property.
246 Parameters
247 ----------
248 dirname
249 directory from which to read output files
251 """
252 path = join_path(dirname)
253 structures = {}
255 for stype in ['train', 'test']:
256 file_path = join_path(dirname, f'{stype}.xyz')
257 if not exists(file_path):
258 warn(f'File {file_path} not found.')
259 structures[stype] = []
260 continue
262 structures[stype] = read(
263 file_path, format='extxyz', index=':'
264 )
266 ts, ps = _read_data_file(path, f'energy_{stype}.out')
267 n_structures = len(structures[stype])
268 assert len(ts) == n_structures, (
269 f'Number of structures in energy_{stype}.out ({len(ts)})'
270 f' and {stype}.xyz ({n_structures}) inconsistent'
271 )
272 for structure, t, p in zip(structures[stype], ts, ps):
273 structure.info['energy_target'] = t
274 structure.info['energy_predicted'] = p
276 ts, ps = _read_data_file(path, f'force_{stype}.out')
277 n_atoms_total = sum([len(s) for s in structures[stype]])
278 assert len(ts) == n_atoms_total, (
279 f'Number of structures in force_{stype}.out ({len(ts)})'
280 f' and {stype}.xyz ({n_structures}) inconsistent'
281 )
282 n = 0
283 for structure in structures[stype]:
284 nat = len(structure)
285 structure.info['force_target'] = np.array(ts[n: n + nat]).reshape(nat, 3)
286 structure.info['force_predicted'] = np.array(ps[n: n + nat]).reshape(
287 nat, 3
288 )
289 n += nat
291 ts, ps = _read_data_file(path, f'virial_{stype}.out')
292 ts, ps = np.array(ts), np.array(ps)
293 N = len(structures[stype])
294 if ts.shape == (6*N,):
295 # GPUMD <=v3.6 style virial_*.out
296 # First column are NEP predictions, second are targets
297 # Order: First N values are xx, second are yy etc.
298 ts = np.array(ts).reshape((6, -1)).T # GPUMD 3.6 compatibility
299 ps = np.array(ps).reshape((6, -1)).T
300 elif ts.shape == (N, 6):
301 # GPUMD >=v3.7 style virial_*.out
302 # First 6 columns are NEP predictions, last 6 are targets
303 # Order: xx, yy, zz, xy, yz, zx
304 pass
305 else:
306 raise ValueError(f'virial_*.out has invalid shape, {ts.shape}')
308 assert len(ts) == n_structures, \
309 f'Number of structures in virial_{stype}.out ({len(ts)})' \
310 f' and {stype}.xyz ({n_structures}) inconsistent'
311 for structure, t, p in zip(structures[stype], ts, ps):
312 assert np.shape(t) == (6,)
313 structure.info['virial_target'] = t
314 structure.info['virial_predicted'] = p
315 conv = len(structure) / structure.get_volume() / GPa
316 structure.info['stress_target'] = t * conv
317 structure.info['stress_predicted'] = p * conv
319 return structures['train'], structures['test']
322def _read_data_file(dirname: str, fname: str):
323 """Private function that parses energy/force/virial_*.out files and
324 returns their content for further processing.
325 """
326 path = join_path(dirname, fname)
327 if not exists(path):
328 raise ValueError(f'Directory {path} does not exist')
329 with open(path, 'r') as f:
330 lines = f.readlines()
331 target, predicted = [], []
332 for line in lines:
333 flds = line.split()
334 if len(flds) == 12: # Virial after GPUMD 3.7
335 predicted.append([float(s) for s in flds[0:6]])
336 target.append([float(s) for s in flds[6:12]])
337 elif len(flds) == 6: # Force
338 predicted.append([float(s) for s in flds[0:3]])
339 target.append([float(s) for s in flds[3:6]])
340 elif len(flds) == 2: # Energy, virial before GPUMD 3.7
341 predicted.append(float(flds[0]))
342 target.append(float(flds[1]))
343 else:
344 raise ValueError(f'Malformed file: {path}')
345 return target, predicted
348def get_parity_data(
349 structures: List[Atoms],
350 property: str,
351 selection: List[str] = None,
352 flatten: bool = True,
353) -> DataFrame:
354 """Returns the predicted and target energies, forces, virials or stresses
355 from a list of structures in a format suitable for generating parity plots.
357 The structures should have been read using :func:`read_structures
358 <calorine.nep.read_structures>`, such that the ``info``-object is
359 populated with keys on the form ``<property>_<type>`` where ``<property>``
360 is one of ``energy``, ``force``, ``virial``, and stress, and ``<type>`` is one
361 of ``predicted`` or ``target``.
363 The resulting parity data is returned as a tuple of dicts, where each entry
364 corresponds to a list.
366 Parameters
367 ----------
368 structures
369 List of structures as read with :func:`read_structures <calorine.nep.read_structures>`.
370 property
371 One of ``energy``, ``force``, ``virial``, ``stress``, ``polarizability``, ``dipole``.
372 selection
373 A list containing which components to return, and/or the absolute value.
374 Possible values are ``x``, ``y``, ``z``, ``xx``, ``yy``,
375 ``zz``, ``yz``, ``xz``, ``xy``, ``abs``, ``pressure``.
376 flatten
377 if True return flattened lists; this is useful for flattening
378 the components of force or virials into a simple list
379 """
380 data = {'predicted': [], 'target': []}
381 voigt_mapping = {
382 'x': 0,
383 'y': 1,
384 'z': 2,
385 'xx': 0,
386 'yy': 1,
387 'zz': 2,
388 'yz': 3,
389 'xz': 4,
390 'xy': 5,
391 }
392 if property not in ('energy', 'force', 'virial', 'stress', 'polarizability', 'dipole'):
393 raise ValueError(
394 "`property` must be one of 'energy', 'force', 'virial', 'stress',"
395 " 'polarizability', 'dipole'."
396 )
397 if property == 'energy' and selection:
398 raise ValueError('Selection does nothing for scalar-valued `energy`.')
399 if property != 'stress' and selection and 'pressure' in selection:
400 raise ValueError(f'Cannot calculate pressure for `{property}`.')
401 for structure in structures:
402 for stype in data:
403 property_with_stype = f'{property}_{stype}'
404 if property_with_stype not in structure.info.keys():
405 raise KeyError(f'{property_with_stype} does not exist in info object!')
406 extracted_property = np.array(structure.info[property_with_stype])
408 if selection is None or len(selection) == 0:
409 data[stype].append(extracted_property)
410 continue
412 selected_values = []
413 for select in selection:
414 if property == 'force':
415 # flip to get (n_components, n_structures)
416 extracted_property = extracted_property.T
417 if select == 'abs':
418 if property == 'force':
419 selected_values.append(np.linalg.norm(extracted_property, axis=0))
420 else:
421 # property can only be in ('virial', 'stress')
422 full_tensor = voigt_6_to_full_3x3_stress(extracted_property)
423 selected_values.append(np.linalg.norm(full_tensor))
424 continue
426 if select == 'pressure' and property == 'stress':
427 total_stress = extracted_property
428 selected_values.append(-np.sum(total_stress[:3]) / 3)
429 continue
431 if select not in voigt_mapping:
432 raise ValueError(f'Selection `{select}` is not allowed.')
433 index = voigt_mapping[select]
434 if index >= extracted_property.shape[0]:
435 raise ValueError(
436 f'Selection `{select}` is not compatible with property `{property}`.'
437 )
438 selected_values.append(extracted_property[index])
439 data[stype].append(selected_values)
440 if flatten:
441 for key, value in data.items():
442 if len(np.shape(value[0])) > 0:
443 data[key] = np.concatenate(value).ravel().tolist()
444 df = DataFrame(data)
445 # In case of flatten, cast to float64 for compatibility
446 # with e.g. seaborn.
447 # Casting in this way breaks tensorial properties though,
448 # so skip it there.
449 if flatten:
450 df['target'] = df.target.astype('float64')
451 df['predicted'] = df.predicted.astype('float64')
452 return df