Coverage for calorine / calculators / gpunep.py: 100%
143 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-26 13:49 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-26 13:49 +0000
1import os
2import shutil
3import warnings
4import tempfile
5from collections.abc import Iterable
6from typing import Any, List, Tuple, Union
8import numpy as np
9from ase import Atoms
10from ase.calculators.calculator import FileIOCalculator, OldShellProfile, all_changes
11from ase.io import read as ase_read
12from ase.units import GPa
14from calorine.nep.model import _get_nep_contents
15from ..gpumd import write_xyz
18class GPUMDShellProfile(OldShellProfile):
19 """This class provides an ASE calculator for NEP calculations with
20 GPUMD.
22 Parameters
23 ----------
24 command : str
25 Command to run GPUMD with.
26 Default: ``gpumd``
27 gpu_identifier_index : int, None
28 Index that identifies the GPU that GPUNEP should be run with.
29 Typically, NVIDIA GPUs are enumerated with integer indices.
30 See https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#env-vars.
31 Set to None in order to use all available GPUs. Note that GPUMD exit with an error
32 when running with more than one GPU if your system is not large enough.
33 Default: None
34 """
35 def __init__(self, command : str, gpu_identifier_index: Union[int, None]):
36 if gpu_identifier_index is not None:
37 # Do not set a specific device to use = use all available GPUs
38 self.cuda_environment_variables = f'CUDA_VISIBLE_DEVICES={gpu_identifier_index}'
39 command_with_gpus = f'export {self.cuda_environment_variables} && ' + command
40 else:
41 command_with_gpus = command
42 super().__init__(command_with_gpus)
45class GPUNEP(FileIOCalculator):
46 """This class provides an ASE calculator for NEP calculations with
47 GPUMD.
49 This calculator writes files that are input to the `gpumd`
50 executable. It is thus likely to be slow if many calculations
51 are to be performed.
53 Parameters
54 ----------
55 model_filename : str
56 Path to file in ``nep.txt`` format with model parameters.
57 directory : str
58 Directory to run GPUMD in. If None, a temporary directory
59 will be created and removed once the calculations are finished.
60 If specified, the directory will not be deleted. In the latter
61 case, it is advisable to do no more than one calculation with
62 this calculator (unless you know exactly what you are doing).
63 label : str
64 Label for this calculator.
65 atoms : Atoms
66 Atoms to attach to this calculator.
67 command : str
68 Command to run GPUMD with.
69 Default: ``gpumd``
70 gpu_identifier_index : int
71 Index that identifies the GPU that GPUNEP should be run with.
72 Typically, NVIDIA GPUs are enumerated with integer indices.
73 See https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#env-vars.
74 Set to None in order to use all available GPUs. Note that GPUMD exit with an error
75 when running with more than one GPU if your system is not large enough.
76 Default: 0
79 Example
80 -------
82 >>> calc = GPUNEP('nep.txt')
83 >>> atoms.calc = calc
84 >>> atoms.get_potential_energy()
85 """
87 command = 'gpumd'
88 base_implemented_properties = ['energy', 'forces', 'stress']
89 discard_results_on_any_change = True
91 # We use list of tuples to define parameters for
92 # MD simulations. Looks like a dictionary, but sometimes
93 # we want to repeat the same keyword.
94 base_single_point_parameters = [('dump_thermo', 1),
95 ('dump_force', 1),
96 ('dump_position', 1),
97 ('velocity', 1e-24),
98 ('time_step', 1e-6), # 1 zeptosecond
99 ('ensemble', 'nve'),
100 ('run', 1)]
102 def __init__(self,
103 model_filename: str,
104 directory: str = None,
105 label: str = 'GPUNEP',
106 atoms: Atoms = None,
107 command: str = command,
108 gpu_identifier_index: Union[int, None] = 0
109 ):
110 if not os.path.exists(model_filename):
111 raise FileNotFoundError(f'{model_filename} does not exist.')
112 self.model_filename = str(model_filename)
114 # Get model type from first row in nep.txt
115 header, _ = _get_nep_contents(self.model_filename)
116 self.model_type = header['model_type']
117 self.supported_species = set(header['types'])
118 self.nep_version = header['version']
119 self.model_filename = model_filename
121 self.implemented_properties = list(self.base_implemented_properties)
122 self.single_point_parameters = self.base_single_point_parameters
123 if 'charge' in self.model_type:
124 # Only available for charge models
125 self.implemented_properties.extend(
126 ['charges', 'born_effective_charges'])
127 qnep_parameters = [('dump_xyz', (-1, 1, 1, 'charges_and_bec.xyz', 'charge', 'bec'))]
128 self.single_point_parameters = qnep_parameters + self.base_single_point_parameters
130 # Determine run command
131 # Determine whether to save stdout or not
132 if directory is None and '>' not in command:
133 # No need to save stdout if we run in temporary directory
134 command += ' > /dev/null'
135 elif '>' not in command:
136 command += ' > stdout'
137 self.command = command
139 # Determine directory to run in
140 self._use_temporary_directory = directory is None
141 self._directory = directory
142 if self._use_temporary_directory:
143 self._make_new_tmp_directory()
144 else:
145 self._potential_path = os.path.relpath(
146 os.path.abspath(self.model_filename), self._directory)
148 # Override the profile in ~/.config/ase/config.ini.
149 # See https://wiki.fysik.dtu.dk/ase/ase/
150 # calculators/calculators.html#calculator-configuration
151 profile = GPUMDShellProfile(command, gpu_identifier_index)
152 FileIOCalculator.__init__(self,
153 directory=self._directory,
154 label=label,
155 atoms=atoms,
156 profile=profile)
158 def run_custom_md(
159 self,
160 parameters: List[Tuple[str, Any]],
161 return_last_atoms: bool = False,
162 only_prepare: bool = False,
163 ):
164 """
165 Run a custom MD simulation.
167 Parameters
168 ----------
169 parameters
170 Parameters to be specified in the run.in file.
171 The potential keyword is set automatically, all other
172 keywords need to be set via this argument.
173 Example::
175 [('dump_thermo', 100),
176 ('dump_position', 1000),
177 ('velocity', 300),
178 ('time_step', 1),
179 ('ensemble', ['nvt_ber', 300, 300, 100]),
180 ('run', 10000)]
182 return_last_atoms
183 If ``True`` the last saved snapshot will be returned.
184 only_prepare
185 If ``True`` the necessary input files will be written
186 but the MD run will not be executed.
188 Returns
189 -------
190 The last snapshot if :attr:`return_last_atoms` is ``True``.
191 """
192 if self._use_temporary_directory:
193 self._make_new_tmp_directory()
195 if self._use_temporary_directory and not return_last_atoms:
196 raise ValueError('Refusing to run in temporary directory '
197 'and not returning atoms; all results will be gone.')
199 if self._use_temporary_directory and only_prepare:
200 raise ValueError('Refusing to only prepare in temporary directory, '
201 'all files will be removed.')
203 # Write files and run
204 FileIOCalculator.write_input(self, self.atoms)
205 self._write_runfile(parameters)
206 write_xyz(filename=os.path.join(self._directory, 'model.xyz'),
207 structure=self.atoms)
209 if only_prepare:
210 return None
212 # Execute the calculation.
213 self.execute()
215 # Extract last snapshot if needed
216 if return_last_atoms:
217 last_atoms = ase_read(os.path.join(self._directory, 'movie.xyz'),
218 format='extxyz', index=-1)
220 if self._use_temporary_directory:
221 self._clean()
223 if return_last_atoms:
224 return last_atoms
225 else:
226 return None
228 def write_input(self, atoms, properties=None, system_changes=None):
229 """
230 Write the input files necessary for a single-point calculation.
231 """
232 if self._use_temporary_directory:
233 self._make_new_tmp_directory()
234 FileIOCalculator.write_input(self, atoms, properties, system_changes)
235 self._write_runfile(parameters=self.single_point_parameters)
236 write_xyz(filename=os.path.join(self._directory, 'model.xyz'),
237 structure=atoms)
239 def _write_runfile(self, parameters):
240 """Write run.in file to define input parameters for MD simulation.
242 Parameters
243 ----------
244 parameters : dict
245 Defines all key-value pairs used in run.in file
246 (see GPUMD documentation for a complete list).
247 Values can be either floats, integers, or lists/tuples.
248 """
249 if len(os.listdir(self._directory)) > 0:
250 warnings.warn(f'{self._directory} is not empty.')
252 with open(os.path.join(self._directory, 'run.in'), 'w') as f:
253 # Custom potential is allowed but normally it can be deduced
254 if 'potential' not in [keyval[0] for keyval in parameters]:
255 f.write(f'potential {self._potential_path} \n')
256 # Write all keywords with parameter(s)
257 for key, val in parameters:
258 f.write(f'{key} ')
259 if isinstance(val, Iterable) and not isinstance(val, str):
260 for v in val:
261 f.write(f'{v} ')
262 else:
263 f.write(f'{val}')
264 f.write('\n')
266 def get_potential_energy_and_stresses_from_file(self):
267 """
268 Extract potential energy (third column of last line in thermo.out) and stresses
269 from thermo.out
270 """
271 data = np.loadtxt(os.path.join(self._directory, 'thermo.out'))
272 if len(data.shape) == 1:
273 line = data
274 else:
275 line = data[-1, :]
277 # Energy
278 energy = line[2]
280 # Stress
281 stress = [v for v in line[3:9]]
282 stress = -GPa * np.array(stress) # to eV/A^3
284 if np.any(np.isnan(stress)) or np.isnan(energy):
285 raise ValueError(f'Failed to extract energy and/or stresses:\n {line}')
286 return energy, stress
288 def _read_potential_energy_and_stresses(self):
289 """Reads potential energy and stresses."""
290 self.results['energy'], self.results['stress'] = \
291 self.get_potential_energy_and_stresses_from_file()
293 def get_forces_from_file(self):
294 """
295 Extract forces (in eV/A) from last snapshot in force.out
296 """
297 data = np.loadtxt(os.path.join(self._directory, 'force.out'))
298 return data[-len(self.atoms):, :]
300 def _read_forces(self):
301 """Reads forces (the last snapshot in force.out) in eV/A"""
302 self.results['forces'] = self.get_forces_from_file()
304 def get_charges_and_becs_from_file(self):
305 """Extract charges and Born Effective Charges from last
306 snapshot in `charges_and_bec.xyz`"""
307 structure = ase_read(os.path.join(self._directory, 'charges_and_bec.xyz'), '-1')
308 charges = structure.get_charges()
309 becs = structure.get_array('bec')
310 return charges, becs
312 def _read_charges_and_becs(self):
313 """Reads charges and Born Effective Charges from file."""
314 charges, becs = self.get_charges_and_becs_from_file()
315 self.results['charges'] = charges
316 self.results['born_effective_charges'] = becs
318 def read_results(self):
319 """
320 Read results from last step of MD calculation.
321 """
322 self._read_potential_energy_and_stresses()
323 self._read_forces()
325 if 'charge' in self.model_type:
326 self._read_charges_and_becs()
327 if self._use_temporary_directory:
328 self._clean()
330 def _clean(self):
331 """
332 Remove directory with calculations.
333 """
334 shutil.rmtree(self._directory)
336 def _make_new_tmp_directory(self):
337 """
338 Create a new temporary directory.
339 """
340 # We do not need to create a new temporary directory
341 # if the current one is empty
342 if self._directory is None or \
343 (os.path.isdir(self._directory) and len(os.listdir(self._directory)) > 0):
344 self._directory = tempfile.mkdtemp()
345 self._potential_path = os.path.relpath(os.path.abspath(self.model_filename),
346 self._directory)
348 def set_atoms(self, atoms):
349 """
350 Set Atoms object.
351 Used also when attaching calculator to Atoms object.
352 """
353 self.atoms = atoms
354 self.results = {}
356 def set_directory(self, directory):
357 """
358 Set path to a new directory. This makes it possible to run
359 several calculations with the same calculator while saving
360 all results
361 """
362 self._directory = directory
363 self._use_temporary_directory = False
364 self._potential_path = os.path.relpath(os.path.abspath(self.model_filename),
365 self._directory)
367 def get_born_effective_charges(
368 self,
369 atoms: Atoms = None,
370 properties: List[str] = None,
371 system_changes: List[str] = all_changes,
372 ) -> np.ndarray:
373 """Calculates (if needed) and returns the Born effective charges.
374 Note that this requires a qNEP model.
376 Parameters
377 ----------
378 atoms
379 System for which to calculate properties, by default `None`.
380 properties
381 Properties to calculate, by default `None`.
382 system_changes
383 Changes to the system since last call, by default all_changes.
384 """
385 if 'born_effective_charges' not in self.implemented_properties:
386 raise ValueError(
387 'This model does not support the calculation of Born effective charges.')
388 self.calculate(atoms, properties, system_changes)
389 return self.results['born_effective_charges']