Extending asyncmd#
This section is relevant for developers of asyncmd, e.g. when you want to add the option to steer an additional molecular dynamics engines (like NAMD or LAMMPS) or add additional ways to wrap functions acting on Trajectory objects.
This section also contains the interface of the classes, which are used under the hood by various user facing-classes in asyncmd to interact with the SLURM queueing system.
Namely there is the SlurmProcess, which emulates the interface of asyncio.subprocess.Process and which is used to submit and wait for single SLURM jobs.
In the spirit of asyncios subprocess module, there is also the coroutine-function create_slurmprocess_submit, which can be used to create and directly submit a SlurmProcess (just like asyncio.create_subprocess_exec()).
Additionally (one level deeper under the hood) there is the SlurmClusterMediator, which is a singleton class acting as the central communication point between the single SlurmProcess and the SLURM commands (“sacct”, “sbatch”, etc.).
Finally there is also a number of slurm-specific exceptions documented below.
See also
The example notebook on the SlurmProcess.
Writing a TrajectoryFunctionWrapper class#
All wrapper classes for functions acting on Trajectory objects must subclass TrajectoryFunctionWrapper to make full and easy use of the caching mechanism already implemented.
You then only need to implement TrajectoryFunctionWrapper._get_id_str and TrajectoryFunctionWrapper._get_values_for_trajectory to get a fully functional TrajectoryFunctionWrapper class.
See also the (reference) implementations of the other wrapper classes, PyTrajectoryFunctionWrapper and SlurmTrajectoryFunctionWrapper.
- class asyncmd.trajectory.functionwrapper.TrajectoryFunctionWrapper(**kwargs)#
Abstract base class to define the API and some common methods.
- async __call__(value: Trajectory) ndarray#
Apply wrapped function asynchronously on given trajectory.
- Parameters:
value (asyncmd.Trajectory) – Input trajectory.
- Returns:
The values of the wrapped function when applied on the trajectory.
- Return type:
iterable, usually list or np.ndarray
- abstractmethod _get_id_str() str#
This function is expected to return an unique identifying string.
It should be unique and portable, i.e. it should enable us to make sure that the cached values will only be used for the same function called with the same arguments.
- abstractmethod async _get_values_for_trajectory(traj: Trajectory) ndarray#
Will be called by self.__call__ to actually perform the function calculation. Is expected to return a numpy array, shape=(n_frames, n_dim_function).
- property call_kwargs: dict[str, Any]#
Additional calling arguments for the wrapped function/executable.
NOTE: You can only (re)set the complete dict and not single keys!
- property id: str#
Unique and persistent identifier.
Takes into account the wrapped function and its calling arguments.
Writing a MDEngine class#
All molecular dynamics engines must subclass the abstract base class asyncmd.mdengine.MDEngine, which defines the common interface expected from all asyncmd engine classes.
In addition the module asyncmd.mdengine defines exceptions that the engines should raise when applicable.
Currently defined are:
asyncmd.mdengine.EngineError(a generic error, should be raised when no more specific error applies)asyncmd.mdengine.EngineCrashedError(should be raised when the wrapped MD engine code raises an exception during the MD integration)
Note
You should also register and implement the corresponding utility functions for your engine.
- class asyncmd.mdengine.MDEngine#
Abstract base class to define a common interface for all
MDEngine.- abstractmethod async apply_constraints(conf_in: Trajectory, conf_out_name: str, *, workdir: str = '.') Trajectory#
Apply constraints to given conf_in, write conf_out_name and return it.
- Parameters:
conf_in (Trajectory)
conf_out_name (str)
workdir (str)
- Return type:
- abstractmethod async prepare(starting_configuration: Trajectory, workdir: str, deffnm: str) None#
Prepare the engine to run a MD from starting_configuration.
NOTE: We assume that we do not change the system for/in one engine, i.e. .top, .ndx, mdp-object, …?! should go into __init__.
- Parameters:
starting_configuration (Trajectory) – The initial configuration.
workdir (str) – The directory in which the MD will be performed.
deffnm (str) – The standard filename to use for this MD run.
- abstractmethod async prepare_from_files(workdir: str, deffnm: str) None#
Prepare the engine to continue a previously stopped simulation starting with the last trajectory part in workdir that is compatible with deffnm.
NOTE: This can not be a classmethod (reliably) because we set top/ndx/ mdconfig/etc in ‘__init__’.
- Parameters:
workdir (str) – The directory in which the MD will be/ was previously performed.
deffnm (str) – The standard filename to use for this MD run.
- abstractmethod async run_steps(nsteps: int, steps_per_part: bool = False) Trajectory | None#
Run for a specified number of integration steps.
Return None if no integration is needed because nsteps integration steps have already been performed.
NOTE: Make sure we can run multiple times after preparing once!
- Parameters:
nsteps (int)
steps_per_part (bool, optional) – Count nsteps for this part/run or in total, by default False
- Return type:
Trajectory | None
- abstractmethod async run_walltime(walltime: float, max_steps: int | None = None) Trajectory | None#
Run for specified walltime.
NOTE: Must be possible to run this multiple times after preparing once!
It is optional (but recommended if possible) for engines to respect the
max_stepsargument. I.e. terminating upon reaching max_steps is optional and no code should rely on it. See therun_steps()if a fixed number of integration steps is required.Return None if no integration is needed because max_steps integration steps have already been performed.
- Parameters:
walltime (float) – Walltime in hours.
max_steps (int | None, optional) – If not None, (optionally) terminate when max_steps integration steps in total are reached, also if this is before walltime is reached. By default None.
- Return type:
Trajectory | None
- abstract property current_trajectory: Trajectory | None#
Trajectory or None.
- Type:
Return current trajectory
- abstract property output_traj_type: str#
Return a string with the ending (without “.”) of the trajectory type this engine uses.
NOTE: This should not be implemented as a property in subclasses as it must be available at the classlevel too, i.e. cls.output_traj_type must also return the string. So this should just be overwritten with a string with the correct value, or if your engine supports multiple output_traj_types you should have a look at the descriptor implementation in asyncmd/tools.py (and, e.g., used in asyncmd/gromacs/mdengine.py), which checks for allowed values (at least when set on an instance) but is accessible from the class level too, i.e. like a ‘classproperty’ (which is not a thing in python).
- abstract property steps_done: int#
Return the number of integration steps this engine has performed in total.
- exception asyncmd.mdengine.EngineError#
Exception raised when something goes wrong with the (MD)-Engine.
- exception asyncmd.mdengine.EngineCrashedError#
Exception raised when the (MD)-Engine crashes during a run.
Writing a MDconfig class for configuration file parsing and writing#
All molecular dynamics configuration file wrappers must subclass asyncmd.mdconfig.MDConfig.
This class defines the two abstract methods parse and write as well as the dictionary-like interface by subclassing from collections.abc.MutableMapping.
However, most often you can probably subclass asyncmd.mdconfig.LineBasedMDConfig directly.
This has the advantage that you will only need to define the datatypes of the values (if you want them to be typed) and define a function that knows how to parse single lines of the config file format.
To this end you should overwrite the abstract method LineBasedMDConfig._parse_line in your subclass.
The function will get single lines to parse (as str) and is expected to return the key, list of value(s) pair as a dict with one item, i.e. {key: list of value(s)}.
If the line is parsed as comment the returned dict must be empty, i.e. {}.
If the option/key is present but without associated value(s) the list in the dict must be empty, i.e. {key: []}.
- class asyncmd.mdconfig.MDConfig#
Abstract base class only to define the interface.
- abstractmethod parse()#
Should read original file and populate self with key, value pairs.
- abstractmethod write(outfile)#
Should write out current config stored in self to outfile.
- class asyncmd.mdconfig.LineBasedMDConfig(original_file: str)#
Abstract base class for line based parsing and writing.
Subclasses must implement _parse_line() method and should set the appropriate separator characters for their line format. We assume that every line/option can be parsed and written on its own! We assume the order of the options in the written file is not relevant! We represent every line/option with a key (str), list of values pair. Values can have a specific type (e.g. int or float) or default to str.
Initialize a
LineBasedMDConfig.- Parameters:
original_file (str) – Path to original config file (absolute or relative).
- abstractmethod _parse_line(line: str) dict#
Parse a line of the configuration file and return a
dict.- Parameters:
line (str) – A single line of the read-in configuration file
- Returns:
parsed – Dictionary with a single (key, list of value(s)) pair representing the parsed line.
- Return type:
dict
- property original_file: str#
Return the original config file this
LineBasedMDConfigparsed.- Returns:
Path to the original file.
- Return type:
str
- property changed: bool#
Indicate if the current configuration differs from original_file.
- Returns:
Whether we changed the configuration w.r.t.
original_file.- Return type:
bool
- parse()#
Parse the current
self.original_fileto update own state.
- write(outfile: str, overwrite: bool = False) None#
Write current configuration to outfile.
- Parameters:
outfile (str) – Path to outfile (relative or absolute).
overwrite (bool, optional) – If True overwrite existing files, by default False.
- Raises:
ValueError – Raised when overwrite=False but outfile exists.
SLURM interface classes#
Note
The function below is an alias for/imported from
- asyncmd.slurm.process.create_slurmprocess_submit()#
Note: It is recommended/preferred to use asyncmd.slurm.create_slurmprocess_submit().
- async asyncmd.slurm.create_slurmprocess_submit(jobname: str, sbatch_script: str, *, workdir: str | None = None, time: float | None = None, sbatch_options: dict | None = None, stdin: str | None = None, **kwargs)#
Create and submit a
SlurmProcess.All arguments are directly passed trough to
SlurmProcessinitialization andSlurmProcess.submit(). All additional keyword arguments are passed toSlurmProcessinitialization.- Parameters:
jobname (str) – SLURM jobname (
--job-name).sbatch_script (str) – Absolute or relative path to a SLURM submission script.
workdir (str) – Absolute or relative path to use as working directory.
time (float or None) – Timelimit for the job in hours. None will result in using the default as either specified in the sbatch script or the partition.
sbatch_options (dict or None) – Dictionary of sbatch options, keys are long names for options, values are the corresponding values. The keys/long names are given without the dashes, e.g. to specify
--mem=1024the dictionary needs to be{"mem": "1024"}. To specify options without values use keys with empty strings as values, e.g. to specify--contiguousthe dictionary needs to be{"contiguous": ""}. See the SLURM documentation for a full list of sbatch options (https://slurm.schedmd.com/sbatch.html).stdin (str or None) – If given it is interpreted as a file to which we connect the batch scripts stdin via sbatchs
--inputoption. This enables sending data to the processes stdin viacommunicate(). Note that if it is desired to send data to the process the process has to be submitted with stdin.kwargs (dict, optional) – Additional keyword arguments to be passed to :meth`SlurmProcess.__init__`.
- Returns:
The submitted slurm process instance.
- Return type:
- class asyncmd.slurm.SlurmProcess(jobname: str, sbatch_script: str, *, workdir: str | None = None, time: float | None = None, sbatch_options: dict | None = None, **kwargs)#
Generic wrapper around SLURM submissions.
Imitates the interface of asyncio.subprocess.Process.
- sbatch_executable#
Name or path to the sbatch executable, by default “sbatch”.
- Type:
str
- scancel_executable#
Name or path to the scancel executable, by default “scancel”.
- Type:
str
- sleep_time#
Time (in seconds) between checks if the underlying job has finished when using self.wait. By default 15 s.
- Type:
int
- stdfiles_removal#
Whether to remove the stdout, stderr (and possibly stdin) files. Possible values are:
“success”: remove on successful completion, i.e. zero returncode)
“no”: never remove
“yes”/”always”: remove on job completion independent of returncode and also when using
terminate()
By default “success”.
- Type:
str
Initialize a SlurmProcess.
Note that you can set all attributes by passing matching init kwargs with the wanted values.
- Parameters:
jobname (str) – SLURM jobname (
--job-name).sbatch_script (str) – Absolute or relative path to a SLURM submission script.
workdir (str or None) – Absolute or relative path to use as working directory. None will result in using the current directory as workdir.
time (float or None) – Timelimit for the job in hours. None will result in using the default as either specified in the sbatch script or the partition.
sbatch_options (dict or None) – Dictionary of sbatch options, keys are long names for options, values are the corresponding values. The keys/long names are given without the dashes, e.g. to specify
--mem=1024the dictionary needs to be{"mem": "1024"}. To specify options without values use keys with empty strings as values, e.g. to specify--contiguousthe dictionary needs to be{"contiguous": ""}. See the SLURM documentation for a full list of sbatch options (https://slurm.schedmd.com/sbatch.html).
- Raises:
TypeError – If the value set via init kwarg for a attribute does not match the default/original type for that attribute.
- async communicate(input: bytes | None = None) tuple[bytes, bytes]#
Interact with process. Optionally send data to the process. Wait for the process to finish, then read from stdout and stderr (files) and return the data.
- Parameters:
input (bytes or None, optional) – The input data to send to the process, by default None. Note that you an only send data to processes created/submitted with stdin set.
- Returns:
(stdout, stderr)
- Return type:
tuple[bytes, bytes]
- Raises:
RuntimeError – If the job has never been submitted.
ValueError – If stdin is not None but the process was created without stdin set.
- del_sbatch_option(key: str) None#
Delete sbatch option with given key from sbatch_options.
- Parameters:
key (str) – The name of the sbatch option to delete.
- kill() None#
Alias for
terminate().
- send_signal(signal: int) None#
Send signal to the underlying slurm job.
Currently not implemented!
- set_sbatch_option(key: str, value: str) None#
Set sbatch option with given key to value.
I.e. add/modify single key, value pair in sbatch_options.
- Parameters:
key (str) – The name of the sbatch option.
value (str) – The value for the sbatch option.
- async submit(stdin: str | None = None) None#
Submit the job via sbatch.
- Parameters:
stdin (str or None) – If given it is interpreted as a file to which we connect the batch scripts stdin via sbatchs
--inputoption. This enables sending data to the processes stdin viacommunicate(). Note that if it is desired to send data to the process the process has to be submitted with stdin.- Raises:
RuntimeError – If the job has already been submitted.
SlurmSubmissionError – If something goes wrong during the submission with sbatch.
CancelledError – (Re)raises CancelledError if cancelled while awaiting the sbatch submission.
- terminate() None#
Terminate (cancel) the underlying SLURM job.
- Raises:
SlurmCancellationError – If scancel has non-zero returncode.
RuntimeError – If no jobid is known, e.g. because the job was never submitted.
- async wait() int#
Wait for the SLURM job to finish. Set and return the returncode.
- Returns:
returncode of the wrapped SLURM job
- Return type:
int
- Raises:
RuntimeError – If the job has never been submitted.
- property nodes: list[str] | None#
The nodes this job runs on.
- property returncode: int | None#
The returncode this job returned (if finished).
- property sbatch_options: dict#
A copy of the sbatch_options dictionary.
Note that modifying single key, value pairs has no effect, to modify (single) sbatch_options either use the set_sbatch_option and del_sbatch_option methods or (re)assign a dictionary to sbatch_options.
- property slurm_job_state: str | None#
The slurm jobstate of this job.
- property slurm_jobid: str | None#
The slurm jobid of this job.
- property stdfiles_removal: str#
Whether/when we remove stdfiles created by SLURM.
Can be one of “success”, “no”, “yes”, “always”, where “yes” and “always” are synonyms for always remove. “success” means remove stdfiles if the slurm-job was successful and “no” means never remove.
- property time: float | None#
Timelimit for SLURM job in hours.
Can be a float or None (meaning do not specify a timelimit).
- class asyncmd.slurm.cluster_mediator.SlurmClusterMediator(**kwargs)#
Singleton class to be used by all SlurmProcess for sacct/sinfo calls.
- sinfo_executable#
Name or path to the sinfo executable, by default “sinfo”.
- Type:
str
- sacct_executable#
Name or path to the sacct executable, by default “sacct”.
- Type:
str
- min_time_between_sacct_calls#
Minimum time (in seconds) between subsequent sacct calls.
- Type:
int
- num_fails_for_broken_node#
Number of failed jobs we need to observe per node before declaring it to be broken (and not submitting any more jobs to it).
- Type:
int
- success_to_fail_ratio#
Number of successful jobs we need to observe per node to decrease the failed job counter by one.
- Type:
int
- async get_info_for_job(jobid: str) dict[str, Any]#
Retrieve and return info for job with given jobid.
- Parameters:
jobid (str) – The SLURM jobid of the queried job.
- Returns:
- Dictionary with information about the job,
the keys (str) are sacct format fields, the values are the (parsed) corresponding values.
- Return type:
dict
- list_all_nodes() list[str]#
List all node (hostnames) in the SLURM cluster this runs on.
- Returns:
List of all node (hostnames) queried from sinfo.
- Return type:
list[str]
- monitor_register_job(jobid: str) None#
Add job with given jobid to sacct monitoring calls.
- Parameters:
jobid (str) – The SLURM jobid of the job to monitor.
- monitor_remove_job(jobid: str) None#
Remove job with given jobid from sacct monitoring calls.
- Parameters:
jobid (str) – The SLURM jobid of the job to remove.
- property exclude_nodes: set[str]#
Return a set with all nodes excluded from job submissions.
- exception asyncmd.slurm.constants_and_errors.SlurmError#
Generic error superclass for all SLURM errors.
- exception asyncmd.slurm.constants_and_errors.SlurmCancellationError#
Error raised when something goes wrong canceling a SLURM job.
- exception asyncmd.slurm.constants_and_errors.SlurmSubmissionError#
Error raised when something goes wrong submitting a SLURM job.