Coverage for calorine/nep/io.py: 100%

228 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2024-10-17 08:55 +0000

1from os.path import exists 

2from os.path import join as join_path 

3from typing import Any, Dict, Iterable, List, NamedTuple, TextIO, Tuple 

4from warnings import warn 

5 

6import numpy as np 

7from ase import Atoms 

8from ase.io import read, write 

9from ase.stress import voigt_6_to_full_3x3_stress 

10from ase.units import GPa 

11from pandas import DataFrame 

12 

13 

14def read_loss(filename: str) -> DataFrame: 

15 """Parses a file in ``loss.out`` format from GPUMD and returns the 

16 content as a data frame. More information concerning file format, 

17 content and units can be found `here 

18 <https://gpumd.org/nep/output_files/loss_out.html>`__. 

19 

20 Parameters 

21 ---------- 

22 filename 

23 input file name 

24 

25 """ 

26 data = np.loadtxt(filename) 

27 if isinstance(data[0], np.float64): 

28 # If only a single row in loss.out, append a dimension 

29 data = data.reshape(1, -1) 

30 if len(data[0]) == 6: 

31 tags = 'total_loss L1 L2' 

32 tags += ' RMSE_P_train' 

33 tags += ' RMSE_P_test' 

34 elif len(data[0]) == 10: 

35 tags = 'total_loss L1 L2' 

36 tags += ' RMSE_E_train RMSE_F_train RMSE_V_train' 

37 tags += ' RMSE_E_test RMSE_F_test RMSE_V_test' 

38 else: 

39 raise ValueError( 

40 f'Input file contains {len(data[0])} data columns. Expected 6 or 10 columns.' 

41 ) 

42 generations = range(100, len(data) * 100 + 1, 100) 

43 df = DataFrame(data=data[:, 1:], columns=tags.split(), index=generations) 

44 return df 

45 

46 

47def _write_structure_in_nep_format(structure: Atoms, f: TextIO) -> None: 

48 """Write structure block into a file-like object in format readable by nep executable. 

49 

50 Parameters 

51 ---------- 

52 structure 

53 input structure; must hold information regarding energy and forces 

54 f 

55 file-like object to which to write 

56 """ 

57 

58 # Allowed keyword=value pairs. Use ASEs extyz write functionality.: 

59 # lattice="ax ay az bx by bz cx cy cz" (mandatory) 

60 # energy=energy_value (mandatory) 

61 # virial="vxx vxy vxz vyx vyy vyz vzx vzy vzz" (optional) 

62 # weight=relative_weight (optional) 

63 # properties=property_name:data_type:number_of_columns 

64 # species:S:1 (mandatory) 

65 # pos:R:3 (mandatory) 

66 # force:R:3 or forces:R:3 (mandatory) 

67 try: 

68 energy = structure.get_potential_energy() 

69 structure.get_forces() # calculate forces to have them on the Atoms object 

70 structure.info['energy'] = energy 

71 except RuntimeError: 

72 raise RuntimeError('Failed to retrieve energy and/or forces for structure') 

73 if np.isclose(structure.get_volume(), 0): 

74 raise ValueError('Structure cell must have a non-zero volume!') 

75 try: 

76 stress = structure.get_stress(voigt=False).flatten() 

77 stress = ' '.join([f'{s:.6f}' for s in stress]) 

78 structure.info['stress'] = f'"{stress}"' 

79 except RuntimeError: 

80 warn('Failed to retrieve stresses for structure') 

81 write(filename=f, images=structure, write_info=True, format='extxyz') 

82 

83 

84def write_structures(outfile: str, structures: List[Atoms]) -> None: 

85 """Writes structures for training/testing in format readable by nep executable. 

86 

87 Parameters 

88 ---------- 

89 outfile 

90 output filename 

91 structures 

92 list of structures with energy, forces, and (possibly) stresses 

93 """ 

94 with open(outfile, 'w') as f: 

95 for structure in structures: 

96 _write_structure_in_nep_format(structure, f) 

97 

98 

99def write_nepfile(parameters: NamedTuple, dirname: str) -> None: 

100 """Writes parameters file for NEP construction. 

101 

102 Parameters 

103 ---------- 

104 parameters 

105 input parameters; see `here <https://gpumd.org/nep/input_parameters/index.html>`__ 

106 dirname 

107 directory in which to place input file and links 

108 """ 

109 with open(join_path(dirname, 'nep.in'), 'w') as f: 

110 for key, val in parameters.items(): 

111 f.write(f'{key} ') 

112 if isinstance(val, Iterable): 

113 f.write(' '.join([f'{v}' for v in val])) 

114 else: 

115 f.write(f'{val}') 

116 f.write('\n') 

117 

118 

119def read_nepfile(filename: str) -> Dict[str, Any]: 

120 """Returns the content of a configuration file (`nep.in`) as a dictionary. 

121 

122 Parameters 

123 ---------- 

124 filename 

125 input file name 

126 """ 

127 settings = {} 

128 with open(filename) as f: 

129 for line in f.readlines(): 

130 flds = line.split() 

131 if len(flds) == 0: 

132 continue 

133 if flds[0].startswith('#'): 

134 continue 

135 settings[flds[0]] = ' '.join(flds[1:]) 

136 for key, val in settings.items(): 

137 if key in ['version', 'neuron', 'generation', 'batch', 'population', 'mode', 'model_type']: 

138 settings[key] = int(val) 

139 elif key in [ 

140 'lambda_1', 

141 'lambda_2', 

142 'lambda_e', 

143 'lambda_f', 

144 'lambda_v', 

145 'lambda_shear', 

146 'force_delta', 

147 ]: 

148 settings[key] = float(val) 

149 elif key in ['cutoff', 'n_max', 'l_max', 'basis_size', 'zbl', 'type_weight']: 

150 settings[key] = [float(v) for v in val.split()] 

151 elif key == 'type': 

152 types = val.split() 

153 types[0] = int(types[0]) 

154 settings[key] = types 

155 return settings 

156 

157 

158def read_structures(dirname: str) -> Tuple[List[Atoms], List[Atoms]]: 

159 """Parses the ``energy_*.out``, ``force_*.out``, ``virial_*.out``, 

160 ``polarizability_*.out`` and ``dipole_*.out`` files from a nep run and 

161 returns their content as lists. The first and second list contain the structures 

162 from the training and test sets, respectively. Each list entry corresponds to an ASE 

163 Atoms object, which in turn contains predicted and target energies, forces and virials/stresses 

164 or polarizability/diople stored in the `info` property. 

165 

166 Parameters 

167 ---------- 

168 dirname 

169 directory from which to read output files 

170 

171 """ 

172 path = join_path(dirname) 

173 if not exists(path): 

174 raise ValueError(f'Directory {path} does not exist') 

175 nep_info = read_nepfile(f'{path}/nep.in') 

176 if 'mode' in nep_info or 'model_type' in nep_info: 

177 ftype = nep_info.get('mode', nep_info.get('model_type')) 

178 if ftype == 2 or ftype == 1: 

179 return _read_structures_tensors(dirname, ftype) 

180 return _read_structures_potential(dirname) 

181 

182 

183def _read_structures_tensors(dirname: str, ftype: int) \ 

184 -> Tuple[List[Atoms], List[Atoms]]: 

185 """Parses the ``polarizability_*.out`` and ``dipole_*.out`` 

186 files from a nep run and returns their content as lists. 

187 The first and second list contain the structures from the training and 

188 test sets, respectively. Each list entry corresponds to an ASE 

189 Atoms object, which in turn contains predicted and target 

190 dipole or polarizability stored in the `info` 

191 property. 

192 

193 Parameters 

194 ---------- 

195 dirname 

196 directory from which to read output files 

197 

198 """ 

199 path = join_path(dirname) 

200 structures = {} 

201 

202 if ftype == 1: 

203 sname = 'dipole' 

204 else: 

205 sname = 'polarizability' 

206 

207 for stype in ['train', 'test']: 

208 filename = join_path(dirname, f'{stype}.xyz') 

209 try: 

210 structures[stype] = read(filename, format='extxyz', index=':') 

211 except FileNotFoundError: 

212 warn(f'File {filename} not found.') 

213 structures[stype] = [] 

214 continue 

215 

216 n_structures = len(structures[stype]) 

217 ts, ps = _read_data_file(path, f'{sname}_{stype}.out') 

218 if ftype == 1: 

219 ts = np.array(ts).reshape((-1, 3)) 

220 ps = np.array(ps).reshape((-1, 3)) 

221 else: 

222 ts = np.array(ts).reshape((-1, 6)) 

223 ps = np.array(ps).reshape((-1, 6)) 

224 assert len(ts) == n_structures, \ 

225 f'Number of structures in {sname}_{stype}.out ({len(ts)})' \ 

226 f' and {stype}.xyz ({n_structures}) inconsistent' 

227 for structure, t, p in zip(structures[stype], ts, ps): 

228 if ftype == 1: 

229 assert np.shape(t) == (3,) 

230 assert np.shape(p) == (3,) 

231 else: 

232 assert np.shape(t) == (6,) 

233 assert np.shape(p) == (6,) 

234 structure.info[f'{sname}_target'] = t 

235 structure.info[f'{sname}_predicted'] = p 

236 

237 return structures['train'], structures['test'] 

238 

239 

240def _read_structures_potential(dirname: str) -> Tuple[List[Atoms], List[Atoms]]: 

241 """Parses the ``energy_*.out``, ``force_*.out``, ``virial_*.out`` 

242 files from a nep run and returns their content as lists. 

243 The first and second list contain the structures from the training and 

244 test sets, respectively. Each list entry corresponds to an ASE 

245 Atoms object, which in turn contains predicted and target 

246 energies, forces and virials/stresses stored in the `info` 

247 property. 

248 

249 Parameters 

250 ---------- 

251 dirname 

252 directory from which to read output files 

253 

254 """ 

255 path = join_path(dirname) 

256 structures = {} 

257 

258 for stype in ['train', 'test']: 

259 file_path = join_path(dirname, f'{stype}.xyz') 

260 if not exists(file_path): 

261 warn(f'File {file_path} not found.') 

262 structures[stype] = [] 

263 continue 

264 

265 structures[stype] = read( 

266 file_path, format='extxyz', index=':' 

267 ) 

268 

269 ts, ps = _read_data_file(path, f'energy_{stype}.out') 

270 n_structures = len(structures[stype]) 

271 assert len(ts) == n_structures, ( 

272 f'Number of structures in energy_{stype}.out ({len(ts)})' 

273 f' and {stype}.xyz ({n_structures}) inconsistent' 

274 ) 

275 for structure, t, p in zip(structures[stype], ts, ps): 

276 structure.info['energy_target'] = t 

277 structure.info['energy_predicted'] = p 

278 

279 ts, ps = _read_data_file(path, f'force_{stype}.out') 

280 n_atoms_total = sum([len(s) for s in structures[stype]]) 

281 assert len(ts) == n_atoms_total, ( 

282 f'Number of structures in force_{stype}.out ({len(ts)})' 

283 f' and {stype}.xyz ({n_structures}) inconsistent' 

284 ) 

285 n = 0 

286 for structure in structures[stype]: 

287 nat = len(structure) 

288 structure.info['force_target'] = np.array(ts[n: n + nat]).reshape(nat, 3) 

289 structure.info['force_predicted'] = np.array(ps[n: n + nat]).reshape( 

290 nat, 3 

291 ) 

292 n += nat 

293 

294 ts, ps = _read_data_file(path, f'virial_{stype}.out') 

295 ts, ps = np.array(ts), np.array(ps) 

296 N = len(structures[stype]) 

297 if ts.shape == (6*N,): 

298 # GPUMD <=v3.6 style virial_*.out 

299 # First column are NEP predictions, second are targets 

300 # Order: First N values are xx, second are yy etc. 

301 ts = np.array(ts).reshape((6, -1)).T # GPUMD 3.6 compatibility 

302 ps = np.array(ps).reshape((6, -1)).T 

303 elif ts.shape == (N, 6): 

304 # GPUMD >=v3.7 style virial_*.out 

305 # First 6 columns are NEP predictions, last 6 are targets 

306 # Order: xx, yy, zz, xy, yz, zx 

307 pass 

308 else: 

309 raise ValueError(f'virial_*.out has invalid shape, {ts.shape}') 

310 

311 assert len(ts) == n_structures, \ 

312 f'Number of structures in virial_{stype}.out ({len(ts)})' \ 

313 f' and {stype}.xyz ({n_structures}) inconsistent' 

314 for structure, t, p in zip(structures[stype], ts, ps): 

315 assert np.shape(t) == (6,) 

316 structure.info['virial_target'] = t 

317 structure.info['virial_predicted'] = p 

318 conv = len(structure) / structure.get_volume() / GPa 

319 structure.info['stress_target'] = t * conv 

320 structure.info['stress_predicted'] = p * conv 

321 

322 return structures['train'], structures['test'] 

323 

324 

325def _read_data_file(dirname: str, fname: str): 

326 """Private function that parses energy/force/virial_*.out files and 

327 returns their content for further processing. 

328 """ 

329 path = join_path(dirname, fname) 

330 if not exists(path): 

331 raise ValueError(f'Directory {path} does not exist') 

332 with open(path, 'r') as f: 

333 lines = f.readlines() 

334 target, predicted = [], [] 

335 for line in lines: 

336 flds = line.split() 

337 if len(flds) == 12: # Virial after GPUMD 3.7 

338 predicted.append([float(s) for s in flds[0:6]]) 

339 target.append([float(s) for s in flds[6:12]]) 

340 elif len(flds) == 6: # Force 

341 predicted.append([float(s) for s in flds[0:3]]) 

342 target.append([float(s) for s in flds[3:6]]) 

343 elif len(flds) == 2: # Energy, virial before GPUMD 3.7 

344 predicted.append(float(flds[0])) 

345 target.append(float(flds[1])) 

346 else: 

347 raise ValueError(f'Malformed file: {path}') 

348 return target, predicted 

349 

350 

351def get_parity_data( 

352 structures: List[Atoms], 

353 property: str, 

354 selection: List[str] = None, 

355 flatten: bool = True, 

356) -> DataFrame: 

357 """Returns the predicted and target energies, forces, virials or stresses 

358 from a list of structures in a format suitable for generating parity plots. 

359 

360 The structures should have been read using :func:`read_structures 

361 <calorine.nep.read_structures>`, such that the ``info``-object is 

362 populated with keys on the form ``<property>_<type>`` where ``<property>`` 

363 is one of ``energy``, ``force``, ``virial``, and stress, and ``<type>`` is one 

364 of ``predicted`` or ``target``. 

365 

366 The resulting parity data is returned as a tuple of dicts, where each entry 

367 corresponds to a list. 

368 

369 Parameters 

370 ---------- 

371 structures 

372 List of structures as read with :func:`read_structures <calorine.nep.read_structures>`. 

373 property 

374 One of ``energy``, ``force``, ``virial``, ``stress``, ``polarizability``, ``dipole``. 

375 selection 

376 A list containing which components to return, and/or the absolute value. 

377 Possible values are ``x``, ``y``, ``z``, ``xx``, ``yy``, 

378 ``zz``, ``yz``, ``xz``, ``xy``, ``abs``, ``pressure``. 

379 flatten 

380 if True return flattened lists; this is useful for flattening 

381 the components of force or virials into a simple list 

382 """ 

383 data = {'predicted': [], 'target': []} 

384 voigt_mapping = { 

385 'x': 0, 

386 'y': 1, 

387 'z': 2, 

388 'xx': 0, 

389 'yy': 1, 

390 'zz': 2, 

391 'yz': 3, 

392 'xz': 4, 

393 'xy': 5, 

394 } 

395 if property not in ('energy', 'force', 'virial', 'stress', 'polarizability', 'dipole'): 

396 raise ValueError( 

397 "`property` must be one of 'energy', 'force', 'virial', 'stress'," 

398 " 'polarizability', 'dipole'." 

399 ) 

400 if property == 'energy' and selection: 

401 raise ValueError('Selection does nothing for scalar-valued `energy`.') 

402 if property != 'stress' and selection and 'pressure' in selection: 

403 raise ValueError(f'Cannot calculate pressure for `{property}`.') 

404 for structure in structures: 

405 for stype in data: 

406 property_with_stype = f'{property}_{stype}' 

407 if property_with_stype not in structure.info.keys(): 

408 raise KeyError(f'{property_with_stype} does not exist in info object!') 

409 extracted_property = np.array(structure.info[property_with_stype]) 

410 

411 if selection is None or len(selection) == 0: 

412 data[stype].append(extracted_property) 

413 continue 

414 

415 selected_values = [] 

416 for select in selection: 

417 if property == 'force': 

418 # flip to get (n_components, n_structures) 

419 extracted_property = extracted_property.T 

420 if select == 'abs': 

421 if property == 'force': 

422 selected_values.append(np.linalg.norm(extracted_property, axis=0)) 

423 else: 

424 # property can only be in ('virial', 'stress') 

425 full_tensor = voigt_6_to_full_3x3_stress(extracted_property) 

426 selected_values.append(np.linalg.norm(full_tensor)) 

427 continue 

428 

429 if select == 'pressure' and property == 'stress': 

430 total_stress = extracted_property 

431 selected_values.append(-np.sum(total_stress[:3]) / 3) 

432 continue 

433 

434 if select not in voigt_mapping: 

435 raise ValueError(f'Selection `{select}` is not allowed.') 

436 index = voigt_mapping[select] 

437 if index >= extracted_property.shape[0]: 

438 raise ValueError( 

439 f'Selection `{select}` is not compatible with property `{property}`.' 

440 ) 

441 selected_values.append(extracted_property[index]) 

442 data[stype].append(selected_values) 

443 if flatten: 

444 for key, value in data.items(): 

445 if len(np.shape(value[0])) > 0: 

446 data[key] = np.concatenate(value).ravel().tolist() 

447 df = DataFrame(data) 

448 # In case of flatten, cast to float64 for compatibility 

449 # with e.g. seaborn. 

450 # Casting in this way breaks tensorial properties though, 

451 # so skip it there. 

452 if flatten: 

453 df['target'] = df.target.astype('float64') 

454 df['predicted'] = df.predicted.astype('float64') 

455 return df