Source code for xplogger.experiment_manager.slurm.ds

"""Datastructures to interact with the SLURM system."""
from __future__ import annotations

from collections import UserDict, UserList
from dataclasses import dataclass
from typing import Any, Union

from xplogger.experiment_manager.record.base import Record
from xplogger.experiment_manager.store.mongo import MongoStore


[docs]@dataclass class SlurmInfo: priority: int job_id: str partition: str # job_name: str job_step_name: str user: str state: str start_time: str time: str time_limit: str num_nodes: int tres_per_node: str min_cpus: int min_memory_size: str nodelist: str mongo_id: str = "" project: str = "" git_issue_id: str = "" script_id: str = ""
[docs] @classmethod def from_dict(cls, data: dict[str, str]) -> SlurmInfo: # noqa: ANN102 """Map a dict to SlurmInfo instance.""" slurm_key_to_kwarg_key_mapping = { "PRIORITY": "priority", "JOBID": "job_id", "PARTITION": "partition", "NAME": "job_step_name", "USER": "user", "STATE": "state", "START_TIME": "start_time", "TIME": "time", "TIME_LIMIT": "time_limit", "NODES": "num_nodes", "TRES_PER_NOD": "tres_per_node", "MI": "min_cpus", "MIN_": "min_memory_size", "NODELIST(REASON)": "nodelist", } kwargs_for_slurm_info: dict[str, Union[str, int]] = { kwargs_key: data[slurm_key] for slurm_key, kwargs_key in slurm_key_to_kwarg_key_mapping.items() } for key in ["priority", "num_nodes", "min_cpus"]: kwargs_for_slurm_info[key] = int(kwargs_for_slurm_info[key]) return cls(**kwargs_for_slurm_info) # type: ignore
[docs]class SlurmInfoList(UserList): # type: ignore def __init__(self, slurm_info_list: list[SlurmInfo]): """list-like interface to a collection of SlurmInfo.""" super().__init__(slurm_info_list)
[docs] def to_slurminfo_dict( # type: ignore self, key_fn=lambda slurm_info: slurm_info.job_id ) -> SlurmInfoDict: """Map SlurmInfo instance to a dict.""" return SlurmInfoDict( {key_fn(slurm_info): slurm_info for slurm_info in self.data} )
[docs] def populate_additional_fields(self, mongo_stores: list[MongoStore]) -> None: """Populate additional fields like collection, git_issue_id and script_id.""" record_list: list[Record] = [] for current_mongo_store in mongo_stores: record_list += current_mongo_store.get_unanalyzed_records() for record in record_list: if "slurm" not in record["setup"]: print(record["setup"]) records: dict[str, Record] = { record["setup"]["slurm"]["id"]: record for record in record_list if "setup" in record and "slurm" in record["setup"] } def _process_slurm_info(slurm_info: SlurmInfo) -> SlurmInfo: job_id = slurm_info.job_id if job_id in records: key = job_id else: key = job_id + "-0" if key in records: slurm_info.project = records[key]["logbook"]["mongo_config"][ "collection" ] slurm_info.mongo_id = str(records[key]["_id"]) slurm_info.git_issue_id = ( f"{slurm_info.project}-{records[key]['setup']['git']['issue_id']}" ) slurm_info.script_id = ( f"{slurm_info.project}-{records[key]['setup']['script_id']}" ) return slurm_info self.data = [_process_slurm_info(slurm_info) for slurm_info in self.data]
[docs]class SlurmInfoDict(UserDict): # type: ignore def __init__(self, slurminfo_sequence_dict: dict[Any, SlurmInfoList]): """dict-like interface to a collection of SlurmInfo.""" super().__init__(slurminfo_sequence_dict)