Extending asyncmd

Contents

Extending asyncmd#

This section is relevant for developers of asyncmd, e.g. when you want to add the option to steer an additional molecular dynamics engines (like NAMD or LAMMPS) or add additional ways to wrap functions acting on Trajectory objects.

This section also contains the interface of the classes, which are used under the hood by various user facing-classes in asyncmd to interact with the SLURM queueing system. Namely there is the SlurmProcess, which emulates the interface of asyncio.subprocess.Process and which is used to submit and wait for single SLURM jobs. In the spirit of asyncios subprocess module, there is also the coroutine-function create_slurmprocess_submit, which can be used to create and directly submit a SlurmProcess (just like asyncio.create_subprocess_exec()). Additionally (one level deeper under the hood) there is the SlurmClusterMediator, which is a singleton class acting as the central communication point between the single SlurmProcess and the SLURM commands (“sacct”, “sbatch”, etc.). Finally there is also a number of slurm-specific exceptions documented below.

See also

The example notebook on the SlurmProcess.

Writing a TrajectoryFunctionWrapper class#

All wrapper classes for functions acting on Trajectory objects must subclass TrajectoryFunctionWrapper to make full and easy use of the caching mechanism already implemented. You then only need to implement TrajectoryFunctionWrapper._get_id_str and TrajectoryFunctionWrapper._get_values_for_trajectory to get a fully functional TrajectoryFunctionWrapper class. See also the (reference) implementations of the other wrapper classes, PyTrajectoryFunctionWrapper and SlurmTrajectoryFunctionWrapper.

class asyncmd.trajectory.functionwrapper.TrajectoryFunctionWrapper(**kwargs)#

Abstract base class to define the API and some common methods.

async __call__(value: Trajectory) ndarray#

Apply wrapped function asynchronously on given trajectory.

Parameters:

value (asyncmd.Trajectory) – Input trajectory.

Returns:

The values of the wrapped function when applied on the trajectory.

Return type:

iterable, usually list or np.ndarray

abstractmethod _get_id_str() str#

This function is expected to return an unique identifying string.

It should be unique and portable, i.e. it should enable us to make sure that the cached values will only be used for the same function called with the same arguments.

abstractmethod async _get_values_for_trajectory(traj: Trajectory) ndarray#

Will be called by self.__call__ to actually perform the function calculation. Is expected to return a numpy array, shape=(n_frames, n_dim_function).

property call_kwargs: dict[str, Any]#

Additional calling arguments for the wrapped function/executable.

NOTE: You can only (re)set the complete dict and not single keys!

property id: str#

Unique and persistent identifier.

Takes into account the wrapped function and its calling arguments.

Writing a MDEngine class#

All molecular dynamics engines must subclass the abstract base class asyncmd.mdengine.MDEngine, which defines the common interface expected from all asyncmd engine classes.

In addition the module asyncmd.mdengine defines exceptions that the engines should raise when applicable. Currently defined are:

Note

You should also register and implement the corresponding utility functions for your engine.

class asyncmd.mdengine.MDEngine#

Abstract base class to define a common interface for all MDEngine.

abstractmethod async apply_constraints(conf_in: Trajectory, conf_out_name: str, *, workdir: str = '.') Trajectory#

Apply constraints to given conf_in, write conf_out_name and return it.

Parameters:
  • conf_in (Trajectory)

  • conf_out_name (str)

  • workdir (str)

Return type:

Trajectory

abstractmethod async prepare(starting_configuration: Trajectory, workdir: str, deffnm: str) None#

Prepare the engine to run a MD from starting_configuration.

NOTE: We assume that we do not change the system for/in one engine, i.e. .top, .ndx, mdp-object, …?! should go into __init__.

Parameters:
  • starting_configuration (Trajectory) – The initial configuration.

  • workdir (str) – The directory in which the MD will be performed.

  • deffnm (str) – The standard filename to use for this MD run.

abstractmethod async prepare_from_files(workdir: str, deffnm: str) None#

Prepare the engine to continue a previously stopped simulation starting with the last trajectory part in workdir that is compatible with deffnm.

NOTE: This can not be a classmethod (reliably) because we set top/ndx/ mdconfig/etc in ‘__init__’.

Parameters:
  • workdir (str) – The directory in which the MD will be/ was previously performed.

  • deffnm (str) – The standard filename to use for this MD run.

abstractmethod async run_steps(nsteps: int, steps_per_part: bool = False) Trajectory | None#

Run for a specified number of integration steps.

Return None if no integration is needed because nsteps integration steps have already been performed.

NOTE: Make sure we can run multiple times after preparing once!

Parameters:
  • nsteps (int)

  • steps_per_part (bool, optional) – Count nsteps for this part/run or in total, by default False

Return type:

Trajectory | None

abstractmethod async run_walltime(walltime: float, max_steps: int | None = None) Trajectory | None#

Run for specified walltime.

NOTE: Must be possible to run this multiple times after preparing once!

It is optional (but recommended if possible) for engines to respect the max_steps argument. I.e. terminating upon reaching max_steps is optional and no code should rely on it. See the run_steps() if a fixed number of integration steps is required.

Return None if no integration is needed because max_steps integration steps have already been performed.

Parameters:
  • walltime (float) – Walltime in hours.

  • max_steps (int | None, optional) – If not None, (optionally) terminate when max_steps integration steps in total are reached, also if this is before walltime is reached. By default None.

Return type:

Trajectory | None

abstract property current_trajectory: Trajectory | None#

Trajectory or None.

Type:

Return current trajectory

abstract property output_traj_type: str#

Return a string with the ending (without “.”) of the trajectory type this engine uses.

NOTE: This should not be implemented as a property in subclasses as it must be available at the classlevel too, i.e. cls.output_traj_type must also return the string. So this should just be overwritten with a string with the correct value, or if your engine supports multiple output_traj_types you should have a look at the descriptor implementation in asyncmd/tools.py (and, e.g., used in asyncmd/gromacs/mdengine.py), which checks for allowed values (at least when set on an instance) but is accessible from the class level too, i.e. like a ‘classproperty’ (which is not a thing in python).

abstract property steps_done: int#

Return the number of integration steps this engine has performed in total.

exception asyncmd.mdengine.EngineError#

Exception raised when something goes wrong with the (MD)-Engine.

exception asyncmd.mdengine.EngineCrashedError#

Exception raised when the (MD)-Engine crashes during a run.

Writing a MDconfig class for configuration file parsing and writing#

All molecular dynamics configuration file wrappers must subclass asyncmd.mdconfig.MDConfig. This class defines the two abstract methods parse and write as well as the dictionary-like interface by subclassing from collections.abc.MutableMapping.

However, most often you can probably subclass asyncmd.mdconfig.LineBasedMDConfig directly. This has the advantage that you will only need to define the datatypes of the values (if you want them to be typed) and define a function that knows how to parse single lines of the config file format. To this end you should overwrite the abstract method LineBasedMDConfig._parse_line in your subclass. The function will get single lines to parse (as str) and is expected to return the key, list of value(s) pair as a dict with one item, i.e. {key: list of value(s)}. If the line is parsed as comment the returned dict must be empty, i.e. {}. If the option/key is present but without associated value(s) the list in the dict must be empty, i.e. {key: []}.

class asyncmd.mdconfig.MDConfig#

Abstract base class only to define the interface.

abstractmethod parse()#

Should read original file and populate self with key, value pairs.

abstractmethod write(outfile)#

Should write out current config stored in self to outfile.

class asyncmd.mdconfig.LineBasedMDConfig(original_file: str)#

Abstract base class for line based parsing and writing.

Subclasses must implement _parse_line() method and should set the appropriate separator characters for their line format. We assume that every line/option can be parsed and written on its own! We assume the order of the options in the written file is not relevant! We represent every line/option with a key (str), list of values pair. Values can have a specific type (e.g. int or float) or default to str.

Initialize a LineBasedMDConfig.

Parameters:

original_file (str) – Path to original config file (absolute or relative).

abstractmethod _parse_line(line: str) dict#

Parse a line of the configuration file and return a dict.

Parameters:

line (str) – A single line of the read-in configuration file

Returns:

parsed – Dictionary with a single (key, list of value(s)) pair representing the parsed line.

Return type:

dict

property original_file: str#

Return the original config file this LineBasedMDConfig parsed.

Returns:

Path to the original file.

Return type:

str

property changed: bool#

Indicate if the current configuration differs from original_file.

Returns:

Whether we changed the configuration w.r.t. original_file.

Return type:

bool

parse()#

Parse the current self.original_file to update own state.

write(outfile: str, overwrite: bool = False) None#

Write current configuration to outfile.

Parameters:
  • outfile (str) – Path to outfile (relative or absolute).

  • overwrite (bool, optional) – If True overwrite existing files, by default False.

Raises:

ValueError – Raised when overwrite=False but outfile exists.

SLURM interface classes#

Note

The function below is an alias for/imported from

asyncmd.slurm.process.create_slurmprocess_submit()#

Note: It is recommended/preferred to use asyncmd.slurm.create_slurmprocess_submit().

async asyncmd.slurm.create_slurmprocess_submit(jobname: str, sbatch_script: str, *, workdir: str | None = None, time: float | None = None, sbatch_options: dict | None = None, stdin: str | None = None, **kwargs)#

Create and submit a SlurmProcess.

All arguments are directly passed trough to SlurmProcess initialization and SlurmProcess.submit(). All additional keyword arguments are passed to SlurmProcess initialization.

Parameters:
  • jobname (str) – SLURM jobname (--job-name).

  • sbatch_script (str) – Absolute or relative path to a SLURM submission script.

  • workdir (str) – Absolute or relative path to use as working directory.

  • time (float or None) – Timelimit for the job in hours. None will result in using the default as either specified in the sbatch script or the partition.

  • sbatch_options (dict or None) – Dictionary of sbatch options, keys are long names for options, values are the corresponding values. The keys/long names are given without the dashes, e.g. to specify --mem=1024 the dictionary needs to be {"mem": "1024"}. To specify options without values use keys with empty strings as values, e.g. to specify --contiguous the dictionary needs to be {"contiguous": ""}. See the SLURM documentation for a full list of sbatch options (https://slurm.schedmd.com/sbatch.html).

  • stdin (str or None) – If given it is interpreted as a file to which we connect the batch scripts stdin via sbatchs --input option. This enables sending data to the processes stdin via communicate(). Note that if it is desired to send data to the process the process has to be submitted with stdin.

  • kwargs (dict, optional) – Additional keyword arguments to be passed to :meth`SlurmProcess.__init__`.

Returns:

The submitted slurm process instance.

Return type:

SlurmProcess

class asyncmd.slurm.SlurmProcess(jobname: str, sbatch_script: str, *, workdir: str | None = None, time: float | None = None, sbatch_options: dict | None = None, **kwargs)#

Generic wrapper around SLURM submissions.

Imitates the interface of asyncio.subprocess.Process.

sbatch_executable#

Name or path to the sbatch executable, by default “sbatch”.

Type:

str

scancel_executable#

Name or path to the scancel executable, by default “scancel”.

Type:

str

sleep_time#

Time (in seconds) between checks if the underlying job has finished when using self.wait. By default 15 s.

Type:

int

stdfiles_removal#

Whether to remove the stdout, stderr (and possibly stdin) files. Possible values are:

  • “success”: remove on successful completion, i.e. zero returncode)

  • “no”: never remove

  • “yes”/”always”: remove on job completion independent of returncode and also when using terminate()

By default “success”.

Type:

str

Initialize a SlurmProcess.

Note that you can set all attributes by passing matching init kwargs with the wanted values.

Parameters:
  • jobname (str) – SLURM jobname (--job-name).

  • sbatch_script (str) – Absolute or relative path to a SLURM submission script.

  • workdir (str or None) – Absolute or relative path to use as working directory. None will result in using the current directory as workdir.

  • time (float or None) – Timelimit for the job in hours. None will result in using the default as either specified in the sbatch script or the partition.

  • sbatch_options (dict or None) – Dictionary of sbatch options, keys are long names for options, values are the corresponding values. The keys/long names are given without the dashes, e.g. to specify --mem=1024 the dictionary needs to be {"mem": "1024"}. To specify options without values use keys with empty strings as values, e.g. to specify --contiguous the dictionary needs to be {"contiguous": ""}. See the SLURM documentation for a full list of sbatch options (https://slurm.schedmd.com/sbatch.html).

Raises:

TypeError – If the value set via init kwarg for a attribute does not match the default/original type for that attribute.

async communicate(input: bytes | None = None) tuple[bytes, bytes]#

Interact with process. Optionally send data to the process. Wait for the process to finish, then read from stdout and stderr (files) and return the data.

Parameters:

input (bytes or None, optional) – The input data to send to the process, by default None. Note that you an only send data to processes created/submitted with stdin set.

Returns:

(stdout, stderr)

Return type:

tuple[bytes, bytes]

Raises:
  • RuntimeError – If the job has never been submitted.

  • ValueError – If stdin is not None but the process was created without stdin set.

del_sbatch_option(key: str) None#

Delete sbatch option with given key from sbatch_options.

Parameters:

key (str) – The name of the sbatch option to delete.

kill() None#

Alias for terminate().

send_signal(signal: int) None#

Send signal to the underlying slurm job.

Currently not implemented!

set_sbatch_option(key: str, value: str) None#

Set sbatch option with given key to value.

I.e. add/modify single key, value pair in sbatch_options.

Parameters:
  • key (str) – The name of the sbatch option.

  • value (str) – The value for the sbatch option.

async submit(stdin: str | None = None) None#

Submit the job via sbatch.

Parameters:

stdin (str or None) – If given it is interpreted as a file to which we connect the batch scripts stdin via sbatchs --input option. This enables sending data to the processes stdin via communicate(). Note that if it is desired to send data to the process the process has to be submitted with stdin.

Raises:
  • RuntimeError – If the job has already been submitted.

  • SlurmSubmissionError – If something goes wrong during the submission with sbatch.

  • CancelledError – (Re)raises CancelledError if cancelled while awaiting the sbatch submission.

terminate() None#

Terminate (cancel) the underlying SLURM job.

Raises:
  • SlurmCancellationError – If scancel has non-zero returncode.

  • RuntimeError – If no jobid is known, e.g. because the job was never submitted.

async wait() int#

Wait for the SLURM job to finish. Set and return the returncode.

Returns:

returncode of the wrapped SLURM job

Return type:

int

Raises:

RuntimeError – If the job has never been submitted.

property nodes: list[str] | None#

The nodes this job runs on.

property returncode: int | None#

The returncode this job returned (if finished).

property sbatch_options: dict#

A copy of the sbatch_options dictionary.

Note that modifying single key, value pairs has no effect, to modify (single) sbatch_options either use the set_sbatch_option and del_sbatch_option methods or (re)assign a dictionary to sbatch_options.

property slurm_job_state: str | None#

The slurm jobstate of this job.

property slurm_jobid: str | None#

The slurm jobid of this job.

property stdfiles_removal: str#

Whether/when we remove stdfiles created by SLURM.

Can be one of “success”, “no”, “yes”, “always”, where “yes” and “always” are synonyms for always remove. “success” means remove stdfiles if the slurm-job was successful and “no” means never remove.

property time: float | None#

Timelimit for SLURM job in hours.

Can be a float or None (meaning do not specify a timelimit).

class asyncmd.slurm.cluster_mediator.SlurmClusterMediator(**kwargs)#

Singleton class to be used by all SlurmProcess for sacct/sinfo calls.

sinfo_executable#

Name or path to the sinfo executable, by default “sinfo”.

Type:

str

sacct_executable#

Name or path to the sacct executable, by default “sacct”.

Type:

str

min_time_between_sacct_calls#

Minimum time (in seconds) between subsequent sacct calls.

Type:

int

num_fails_for_broken_node#

Number of failed jobs we need to observe per node before declaring it to be broken (and not submitting any more jobs to it).

Type:

int

success_to_fail_ratio#

Number of successful jobs we need to observe per node to decrease the failed job counter by one.

Type:

int

async get_info_for_job(jobid: str) dict[str, Any]#

Retrieve and return info for job with given jobid.

Parameters:

jobid (str) – The SLURM jobid of the queried job.

Returns:

Dictionary with information about the job,

the keys (str) are sacct format fields, the values are the (parsed) corresponding values.

Return type:

dict

list_all_nodes() list[str]#

List all node (hostnames) in the SLURM cluster this runs on.

Returns:

List of all node (hostnames) queried from sinfo.

Return type:

list[str]

monitor_register_job(jobid: str) None#

Add job with given jobid to sacct monitoring calls.

Parameters:

jobid (str) – The SLURM jobid of the job to monitor.

monitor_remove_job(jobid: str) None#

Remove job with given jobid from sacct monitoring calls.

Parameters:

jobid (str) – The SLURM jobid of the job to remove.

property exclude_nodes: set[str]#

Return a set with all nodes excluded from job submissions.

exception asyncmd.slurm.constants_and_errors.SlurmError#

Generic error superclass for all SLURM errors.

exception asyncmd.slurm.constants_and_errors.SlurmCancellationError#

Error raised when something goes wrong canceling a SLURM job.

exception asyncmd.slurm.constants_and_errors.SlurmSubmissionError#

Error raised when something goes wrong submitting a SLURM job.