Coverage for calorine / calculators / gpunep.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-26 13:49 +0000

1import os 

2import shutil 

3import warnings 

4import tempfile 

5from collections.abc import Iterable 

6from typing import Any, List, Tuple, Union 

7 

8import numpy as np 

9from ase import Atoms 

10from ase.calculators.calculator import FileIOCalculator, OldShellProfile, all_changes 

11from ase.io import read as ase_read 

12from ase.units import GPa 

13 

14from calorine.nep.model import _get_nep_contents 

15from ..gpumd import write_xyz 

16 

17 

18class GPUMDShellProfile(OldShellProfile): 

19 """This class provides an ASE calculator for NEP calculations with 

20 GPUMD. 

21 

22 Parameters 

23 ---------- 

24 command : str 

25 Command to run GPUMD with. 

26 Default: ``gpumd`` 

27 gpu_identifier_index : int, None 

28 Index that identifies the GPU that GPUNEP should be run with. 

29 Typically, NVIDIA GPUs are enumerated with integer indices. 

30 See https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#env-vars. 

31 Set to None in order to use all available GPUs. Note that GPUMD exit with an error 

32 when running with more than one GPU if your system is not large enough. 

33 Default: None 

34 """ 

35 def __init__(self, command : str, gpu_identifier_index: Union[int, None]): 

36 if gpu_identifier_index is not None: 

37 # Do not set a specific device to use = use all available GPUs 

38 self.cuda_environment_variables = f'CUDA_VISIBLE_DEVICES={gpu_identifier_index}' 

39 command_with_gpus = f'export {self.cuda_environment_variables} && ' + command 

40 else: 

41 command_with_gpus = command 

42 super().__init__(command_with_gpus) 

43 

44 

45class GPUNEP(FileIOCalculator): 

46 """This class provides an ASE calculator for NEP calculations with 

47 GPUMD. 

48 

49 This calculator writes files that are input to the `gpumd` 

50 executable. It is thus likely to be slow if many calculations 

51 are to be performed. 

52 

53 Parameters 

54 ---------- 

55 model_filename : str 

56 Path to file in ``nep.txt`` format with model parameters. 

57 directory : str 

58 Directory to run GPUMD in. If None, a temporary directory 

59 will be created and removed once the calculations are finished. 

60 If specified, the directory will not be deleted. In the latter 

61 case, it is advisable to do no more than one calculation with 

62 this calculator (unless you know exactly what you are doing). 

63 label : str 

64 Label for this calculator. 

65 atoms : Atoms 

66 Atoms to attach to this calculator. 

67 command : str 

68 Command to run GPUMD with. 

69 Default: ``gpumd`` 

70 gpu_identifier_index : int 

71 Index that identifies the GPU that GPUNEP should be run with. 

72 Typically, NVIDIA GPUs are enumerated with integer indices. 

73 See https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#env-vars. 

74 Set to None in order to use all available GPUs. Note that GPUMD exit with an error 

75 when running with more than one GPU if your system is not large enough. 

76 Default: 0 

77 

78 

79 Example 

80 ------- 

81 

82 >>> calc = GPUNEP('nep.txt') 

83 >>> atoms.calc = calc 

84 >>> atoms.get_potential_energy() 

85 """ 

86 

87 command = 'gpumd' 

88 base_implemented_properties = ['energy', 'forces', 'stress'] 

89 discard_results_on_any_change = True 

90 

91 # We use list of tuples to define parameters for 

92 # MD simulations. Looks like a dictionary, but sometimes 

93 # we want to repeat the same keyword. 

94 base_single_point_parameters = [('dump_thermo', 1), 

95 ('dump_force', 1), 

96 ('dump_position', 1), 

97 ('velocity', 1e-24), 

98 ('time_step', 1e-6), # 1 zeptosecond 

99 ('ensemble', 'nve'), 

100 ('run', 1)] 

101 

102 def __init__(self, 

103 model_filename: str, 

104 directory: str = None, 

105 label: str = 'GPUNEP', 

106 atoms: Atoms = None, 

107 command: str = command, 

108 gpu_identifier_index: Union[int, None] = 0 

109 ): 

110 if not os.path.exists(model_filename): 

111 raise FileNotFoundError(f'{model_filename} does not exist.') 

112 self.model_filename = str(model_filename) 

113 

114 # Get model type from first row in nep.txt 

115 header, _ = _get_nep_contents(self.model_filename) 

116 self.model_type = header['model_type'] 

117 self.supported_species = set(header['types']) 

118 self.nep_version = header['version'] 

119 self.model_filename = model_filename 

120 

121 self.implemented_properties = list(self.base_implemented_properties) 

122 self.single_point_parameters = self.base_single_point_parameters 

123 if 'charge' in self.model_type: 

124 # Only available for charge models 

125 self.implemented_properties.extend( 

126 ['charges', 'born_effective_charges']) 

127 qnep_parameters = [('dump_xyz', (-1, 1, 1, 'charges_and_bec.xyz', 'charge', 'bec'))] 

128 self.single_point_parameters = qnep_parameters + self.base_single_point_parameters 

129 

130 # Determine run command 

131 # Determine whether to save stdout or not 

132 if directory is None and '>' not in command: 

133 # No need to save stdout if we run in temporary directory 

134 command += ' > /dev/null' 

135 elif '>' not in command: 

136 command += ' > stdout' 

137 self.command = command 

138 

139 # Determine directory to run in 

140 self._use_temporary_directory = directory is None 

141 self._directory = directory 

142 if self._use_temporary_directory: 

143 self._make_new_tmp_directory() 

144 else: 

145 self._potential_path = os.path.relpath( 

146 os.path.abspath(self.model_filename), self._directory) 

147 

148 # Override the profile in ~/.config/ase/config.ini. 

149 # See https://wiki.fysik.dtu.dk/ase/ase/ 

150 # calculators/calculators.html#calculator-configuration 

151 profile = GPUMDShellProfile(command, gpu_identifier_index) 

152 FileIOCalculator.__init__(self, 

153 directory=self._directory, 

154 label=label, 

155 atoms=atoms, 

156 profile=profile) 

157 

158 def run_custom_md( 

159 self, 

160 parameters: List[Tuple[str, Any]], 

161 return_last_atoms: bool = False, 

162 only_prepare: bool = False, 

163 ): 

164 """ 

165 Run a custom MD simulation. 

166 

167 Parameters 

168 ---------- 

169 parameters 

170 Parameters to be specified in the run.in file. 

171 The potential keyword is set automatically, all other 

172 keywords need to be set via this argument. 

173 Example:: 

174 

175 [('dump_thermo', 100), 

176 ('dump_position', 1000), 

177 ('velocity', 300), 

178 ('time_step', 1), 

179 ('ensemble', ['nvt_ber', 300, 300, 100]), 

180 ('run', 10000)] 

181 

182 return_last_atoms 

183 If ``True`` the last saved snapshot will be returned. 

184 only_prepare 

185 If ``True`` the necessary input files will be written 

186 but the MD run will not be executed. 

187 

188 Returns 

189 ------- 

190 The last snapshot if :attr:`return_last_atoms` is ``True``. 

191 """ 

192 if self._use_temporary_directory: 

193 self._make_new_tmp_directory() 

194 

195 if self._use_temporary_directory and not return_last_atoms: 

196 raise ValueError('Refusing to run in temporary directory ' 

197 'and not returning atoms; all results will be gone.') 

198 

199 if self._use_temporary_directory and only_prepare: 

200 raise ValueError('Refusing to only prepare in temporary directory, ' 

201 'all files will be removed.') 

202 

203 # Write files and run 

204 FileIOCalculator.write_input(self, self.atoms) 

205 self._write_runfile(parameters) 

206 write_xyz(filename=os.path.join(self._directory, 'model.xyz'), 

207 structure=self.atoms) 

208 

209 if only_prepare: 

210 return None 

211 

212 # Execute the calculation. 

213 self.execute() 

214 

215 # Extract last snapshot if needed 

216 if return_last_atoms: 

217 last_atoms = ase_read(os.path.join(self._directory, 'movie.xyz'), 

218 format='extxyz', index=-1) 

219 

220 if self._use_temporary_directory: 

221 self._clean() 

222 

223 if return_last_atoms: 

224 return last_atoms 

225 else: 

226 return None 

227 

228 def write_input(self, atoms, properties=None, system_changes=None): 

229 """ 

230 Write the input files necessary for a single-point calculation. 

231 """ 

232 if self._use_temporary_directory: 

233 self._make_new_tmp_directory() 

234 FileIOCalculator.write_input(self, atoms, properties, system_changes) 

235 self._write_runfile(parameters=self.single_point_parameters) 

236 write_xyz(filename=os.path.join(self._directory, 'model.xyz'), 

237 structure=atoms) 

238 

239 def _write_runfile(self, parameters): 

240 """Write run.in file to define input parameters for MD simulation. 

241 

242 Parameters 

243 ---------- 

244 parameters : dict 

245 Defines all key-value pairs used in run.in file 

246 (see GPUMD documentation for a complete list). 

247 Values can be either floats, integers, or lists/tuples. 

248 """ 

249 if len(os.listdir(self._directory)) > 0: 

250 warnings.warn(f'{self._directory} is not empty.') 

251 

252 with open(os.path.join(self._directory, 'run.in'), 'w') as f: 

253 # Custom potential is allowed but normally it can be deduced 

254 if 'potential' not in [keyval[0] for keyval in parameters]: 

255 f.write(f'potential {self._potential_path} \n') 

256 # Write all keywords with parameter(s) 

257 for key, val in parameters: 

258 f.write(f'{key} ') 

259 if isinstance(val, Iterable) and not isinstance(val, str): 

260 for v in val: 

261 f.write(f'{v} ') 

262 else: 

263 f.write(f'{val}') 

264 f.write('\n') 

265 

266 def get_potential_energy_and_stresses_from_file(self): 

267 """ 

268 Extract potential energy (third column of last line in thermo.out) and stresses 

269 from thermo.out 

270 """ 

271 data = np.loadtxt(os.path.join(self._directory, 'thermo.out')) 

272 if len(data.shape) == 1: 

273 line = data 

274 else: 

275 line = data[-1, :] 

276 

277 # Energy 

278 energy = line[2] 

279 

280 # Stress 

281 stress = [v for v in line[3:9]] 

282 stress = -GPa * np.array(stress) # to eV/A^3 

283 

284 if np.any(np.isnan(stress)) or np.isnan(energy): 

285 raise ValueError(f'Failed to extract energy and/or stresses:\n {line}') 

286 return energy, stress 

287 

288 def _read_potential_energy_and_stresses(self): 

289 """Reads potential energy and stresses.""" 

290 self.results['energy'], self.results['stress'] = \ 

291 self.get_potential_energy_and_stresses_from_file() 

292 

293 def get_forces_from_file(self): 

294 """ 

295 Extract forces (in eV/A) from last snapshot in force.out 

296 """ 

297 data = np.loadtxt(os.path.join(self._directory, 'force.out')) 

298 return data[-len(self.atoms):, :] 

299 

300 def _read_forces(self): 

301 """Reads forces (the last snapshot in force.out) in eV/A""" 

302 self.results['forces'] = self.get_forces_from_file() 

303 

304 def get_charges_and_becs_from_file(self): 

305 """Extract charges and Born Effective Charges from last 

306 snapshot in `charges_and_bec.xyz`""" 

307 structure = ase_read(os.path.join(self._directory, 'charges_and_bec.xyz'), '-1') 

308 charges = structure.get_charges() 

309 becs = structure.get_array('bec') 

310 return charges, becs 

311 

312 def _read_charges_and_becs(self): 

313 """Reads charges and Born Effective Charges from file.""" 

314 charges, becs = self.get_charges_and_becs_from_file() 

315 self.results['charges'] = charges 

316 self.results['born_effective_charges'] = becs 

317 

318 def read_results(self): 

319 """ 

320 Read results from last step of MD calculation. 

321 """ 

322 self._read_potential_energy_and_stresses() 

323 self._read_forces() 

324 

325 if 'charge' in self.model_type: 

326 self._read_charges_and_becs() 

327 if self._use_temporary_directory: 

328 self._clean() 

329 

330 def _clean(self): 

331 """ 

332 Remove directory with calculations. 

333 """ 

334 shutil.rmtree(self._directory) 

335 

336 def _make_new_tmp_directory(self): 

337 """ 

338 Create a new temporary directory. 

339 """ 

340 # We do not need to create a new temporary directory 

341 # if the current one is empty 

342 if self._directory is None or \ 

343 (os.path.isdir(self._directory) and len(os.listdir(self._directory)) > 0): 

344 self._directory = tempfile.mkdtemp() 

345 self._potential_path = os.path.relpath(os.path.abspath(self.model_filename), 

346 self._directory) 

347 

348 def set_atoms(self, atoms): 

349 """ 

350 Set Atoms object. 

351 Used also when attaching calculator to Atoms object. 

352 """ 

353 self.atoms = atoms 

354 self.results = {} 

355 

356 def set_directory(self, directory): 

357 """ 

358 Set path to a new directory. This makes it possible to run 

359 several calculations with the same calculator while saving 

360 all results 

361 """ 

362 self._directory = directory 

363 self._use_temporary_directory = False 

364 self._potential_path = os.path.relpath(os.path.abspath(self.model_filename), 

365 self._directory) 

366 

367 def get_born_effective_charges( 

368 self, 

369 atoms: Atoms = None, 

370 properties: List[str] = None, 

371 system_changes: List[str] = all_changes, 

372 ) -> np.ndarray: 

373 """Calculates (if needed) and returns the Born effective charges. 

374 Note that this requires a qNEP model. 

375 

376 Parameters 

377 ---------- 

378 atoms 

379 System for which to calculate properties, by default `None`. 

380 properties 

381 Properties to calculate, by default `None`. 

382 system_changes 

383 Changes to the system since last call, by default all_changes. 

384 """ 

385 if 'born_effective_charges' not in self.implemented_properties: 

386 raise ValueError( 

387 'This model does not support the calculation of Born effective charges.') 

388 self.calculate(atoms, properties, system_changes) 

389 return self.results['born_effective_charges']