Source code for pyhdtoolkit.cpymadtools.tune

"""
.. _cpymadtools-tune:

Tune Utilities
--------------

Module with functions to manipulate ``MAD-X`` functionality around the tune through
a `~cpymad.madx.Madx` object.
"""
import math
import sys

from pathlib import Path
from typing import Dict, List, Tuple

import matplotlib.collections
import matplotlib.patches
import numpy as np
import tfs

from cpymad.madx import Madx
from loguru import logger


[docs]def make_footprint_table( madx: Madx, /, sigma: float = 5, dense: bool = False, file: str = None, cleanup: bool = True, **kwargs ) -> tfs.TfsDataFrame: """ .. versionadded:: 0.9.0 Instantiates an ensemble of particles up to the desired bunch :math:`\\sigma` amplitude to be tracked for the ``DYNAP`` command, letting ``MAD-X`` infer their tunes. Particules are instantiated for different angle variables for each amplitude, creating an ensemble able to represent the tune footprint. .. warning:: Since the ``DYNAP`` command makes use of tracking, your sequence needs to be sliced before calling this function. Args: madx (cpymad.madx.Madx): an instanciated `~cpymad.madx.Madx` object. Positional only. sigma (float): the maximum amplitude of the tracked particles, in bunch :math:`\\sigma`. Defaults to 5. dense (bool): if set to `True`, an increased number of particles will be tracked. Defaults to `False`. file (str): If given, the ``dynaptune`` table will be exported as a ``TFS`` file with the provided name. cleanup (bool): If `True`, the **fort.69** and **lyapunov.data** files are cleared before returning the ``dynaptune`` table. Defaults to `True`. **kwargs: any keyword argument will be transmitted to the ``DYNAP`` command in ``MAD-X``. Returns: The resulting ``dynaptune`` table, as a `~tfs.frame.TfsDataFrame`. Example: .. code-block:: python dynap_dframe = make_footprint_table(madx) """ logger.debug(f"Initiating particules up to {sigma:d} bunch sigma to create a tune footprint table") small, big = 0.05, math.sqrt(1 - 0.05**2) sigma_multiplier, angle_multiplier = 0.1, 0.0 logger.debug("Initializing particles") madx.command.track() madx.command.start(fx=small, fy=small) while sigma_multiplier <= sigma + 1: angle = 15 * angle_multiplier * math.pi / 180 if angle_multiplier == 0: madx.command.start(fx=sigma_multiplier * big, fy=sigma_multiplier * small) elif angle_multiplier == 6: madx.command.start(fx=sigma_multiplier * small, fy=sigma_multiplier * big) else: madx.command.start(fx=sigma_multiplier * math.cos(angle), fy=sigma_multiplier * math.sin(angle)) angle_multiplier += 0.5 if int(angle_multiplier) == 7: angle_multiplier = 0 sigma_multiplier += 1 if not dense else 0.5 logger.debug("Starting DYNAP tracking with initialized particles") try: madx.command.dynap(fastune=True, turns=1024, **kwargs) madx.command.endtrack() except RuntimeError as madx_crash: logger.exception( "Remote MAD-X process crashed, most likely because you did not slice the sequence " "before running DYNAP. Restart and slice before calling this function." ) raise RuntimeError("DYNAP command crashed the MAD-X process") from madx_crash if cleanup and sys.platform not in ("win32", "cygwin"): # fails on Windows due to its I/O system, since MAD-X still has "control" of the files try: logger.debug("Cleaning up DYNAP output files `fort.69` and `lyapunov.data`") Path("fort.69").unlink() Path("lyapunov.data").unlink() except FileNotFoundError: logger.exception("Could not cleanup DYNAP output files, they might have not been created") tfs_dframe = tfs.TfsDataFrame( data=madx.table.dynaptune.dframe(), headers=dict( NAME="DYNAPTUNE", TYPE="DYNAPTUNE", TITLE="FOOTPRINT TABLE", MADX_VERSION=str(madx.version).upper(), ORIGIN="pyhdtoolkit.cpymadtools.tune.make_footprint_table() function", ANGLE=7, # default of the function AMPLITUDE=sigma, DSIGMA=1 if not dense else 0.5, ANGLE_MEANING="Number of different starting angles used for each starting amplitude", AMPLITUDE_MEANING="Up to which bunch sigma the starting amplitudes were ramped up", DSIGMA_MEANING="Increment value of AMPLITUDE at each new starting amplitude", ), ) tfs_dframe = tfs_dframe.reset_index(drop=True) if file: tfs.write(Path(file).absolute(), tfs_dframe) return tfs_dframe
[docs]def get_footprint_lines(dynap_dframe: tfs.TfsDataFrame) -> Tuple[np.ndarray, np.ndarray]: """ .. versionadded:: 0.12.0 Provided with the `~tfs.frame.TfsDataFrame` returned by `~.tune.make_footprint_table`, determines the various (Qx, Qy) points needed to plot the footprint data with lines representing the different amplitudes and angles from starting particles, and returns these in immediately plottable `numpy.ndarray` objects. .. warning:: This function is some *dark magic* stuff I have taken out of very dusty drawers, and I cannot explain exactly how it works. I also do not know who wrote this initially. Results are not guaranteed to be correct and should be checked with a quick plot. Args: dynap_dframe (tfs.frame.TfsDataFrame): the dynap data frame returned by `~.tune.make_footprint_table`. Returns: The Qx and Qy data points to plot directly, both as `~numpy.ndarray` objects. Example: .. code-block:: python dynap_tfs = make_footprint_table(madx) qxs, qys = get_footprint_lines(dynap_tfs) plt.plot(qxs, qys, "o--", label="Tune Footprint from DYNAP Table") """ logger.debug("Determining footprint plottable") logger.debug("Retrieving AMPLITUDE, ANGLE and DSIGMA data from TfsDataFrame headers") amplitude = dynap_dframe.headers["AMPLITUDE"] angle = dynap_dframe.headers["ANGLE"] dsigma = dynap_dframe.headers["DSIGMA"] tune_groups = _make_tune_groups(dynap_string_rep=_get_dynap_string_rep(dynap_dframe), dsigma=dsigma) footprint = _Footprint(tune_groups, amplitude, angle, dsigma) qxs, qys = footprint.get_plottable() return np.array(qxs, dtype=float), np.array(qys, dtype=float)
[docs]def get_footprint_patches(dynap_dframe: tfs.TfsDataFrame) -> matplotlib.collections.PatchCollection: """ .. versionadded:: 0.12.0 Provided with the `~tfs.frame.TfsDataFrame` returned by `~.tune.make_footprint_table`, computes the polygon patches needed to plot the footprint data, with lines representing the different amplitudes and angles from starting particles, and returns the `~matplotlib.collections.PatchCollection` with the computed polygons. Initial implementation credits go to :user:`Konstantinos Paraschou <kparasch>`. .. note:: The polygons will have blue edges, except the ones corresponding to the last starting angle particles (in red) and the last starting amplitude particles (in green). .. warning:: The internal construction of polygons can be tricky, and you might need to change the ``ANGLE`` or ``AMPLITUDE`` values in *dynap_dframe* headers. Args: dynap_dframe (tfs.frame.TfsDataFrame): the dynap data frame returned by `~.tune.make_footprint_table`. Returns: The `~matplotlib.collections.PatchCollection` with the created polygons. Example: .. code-block:: python fig, axis = plt.subplots() dynap_tfs = make_footprint_table(madx) footprint_polygons = get_footprint_patches(dynap_tfs) axis.add_collection(footprint_polygons) """ logger.debug("Determining footprint polygons") angle = dynap_dframe.headers["ANGLE"] amplitude = dynap_dframe.headers["AMPLITUDE"] logger.debug("Grouping tune points according to starting angles and amplitudes") try: A = np.zeros([amplitude, angle, 2]) A[0, :, 0] = dynap_dframe["tunx"].to_numpy()[0] A[0, :, 1] = dynap_dframe["tuny"].to_numpy()[0] A[1:, :, 0] = dynap_dframe["tunx"].to_numpy()[1:].reshape(-1, angle) A[1:, :, 1] = dynap_dframe["tuny"].to_numpy()[1:].reshape(-1, angle) except ValueError as tune_grouping_error: logger.exception( "Cannot group tune points according to starting angles and amplitudes. Try changing " "the 'AMPLITUDE' value in the provided TfsDataFrame's headers." ) raise tune_grouping_error logger.debug("Determining polygon vertices") sx = A.shape[0] - 1 sy = A.shape[1] - 1 p1 = A[:-1, :-1, :].reshape(sx * sy, 2)[:, :] p2 = A[1:, :-1, :].reshape(sx * sy, 2)[:] p3 = A[1:, 1:, :].reshape(sx * sy, 2)[:] p4 = A[:-1, 1:, :].reshape(sx * sy, 2)[:] polygons = np.stack((p1, p2, p3, p4)) # Stack endpoints to form polygons polygons = np.transpose(polygons, (1, 0, 2)) # transpose polygons logger.debug("Creating PatchCollection of Polygons") patches = list(map(matplotlib.patches.Polygon, polygons)) patch_colors = [(0, 0, 1) for _ in polygons] patch_colors[(sx - 1) * sy :] = [(0, 1, 0)] * sy # differentiate first angle in green patch_colors[(sy - 1) :: sy] = [(1, 0, 0)] * sx # differentiate last amplitude in red return matplotlib.collections.PatchCollection(patches, facecolors=[], edgecolor=patch_colors)
# ----- Arcane Private Utilities ----- # def _get_dynap_string_rep(dynap_dframe: tfs.TfsDataFrame) -> str: """ This is a weird dusty function to get a specific useful string representation from the `~tfs.frame.TfsDataFrame` returned by `~.tune.make_footprint_table`. This specific dataframe contains important information. Args: dynap_dframe (tfs.frame.TfsDataFrame): the dynap data frame returned by `~.tune.make_footprint_table`. Returns: A weird string representation gathering tune points split according to the number of angles and amplitudes used in `~.tune.make_footprint_table`. """ logger.trace("Retrieving AMPLITUDE and ANGLE data from TfsDataFrame headers") amplitude = dynap_dframe.headers["AMPLITUDE"] angle = dynap_dframe.headers["ANGLE"] string_rep = f"TMPNAME,{amplitude},1,<{dynap_dframe.tunx[0]};{dynap_dframe.tuny[0]}>" for n in range(1, amplitude): string_rep += f",{angle}" for m in range(angle): string_rep += ( f",<{dynap_dframe.tunx[1 + (n - 1) * angle + m]};{dynap_dframe.tuny[1 + (n - 1) * angle + m]}>" ) return string_rep def _make_tune_groups(dynap_string_rep: str, dsigma: float = 1.0) -> List[List[Dict[str, float]]]: """ Creates appropriate tune points groups from the arcane string representation returned by `~.tune._get_dynap_string_rep` based on starting amplitude and angle for each particle. Args: dynap_string_rep (str): weird string representation of the `~tfs.frame.TfsDataFrame` returned by `~.tune.make_footprint_table` and as given by `~.tune._get_dynap_string_rep()`. dsigma (float): The increment in amplitude between different starting amplitudes when starting particles for the ``DYNAP`` command in ``MAD-X``. This information is in the headers of the `~tfs.frame.TfsDataFrame` returned by `~.tune.make_footprint_table`. Returns: A `list` of lists of dictionaries containing horizontal and vertical tune points. The data is constructed such that one can access the data of a particle starting with given amplitude and angle through ``data[amplitude][angle]["H"/"V"]``. This function is only meant to be used internally by `~.tune.get_footprint_lines`. """ logger.debug("Constructing tune points groups based on starting amplitudes and angles") tune_groups: List[List[Dict[str, float]]] = [] items = dynap_string_rep.strip().split(",") amplitude = int(items[1]) current = 2 for i in np.arange(amplitude): tune_groups.append([]) angle = int(items[current]) current = current + 1 for j in np.arange(angle): tune_groups[i].append([]) tune_string = items[current].lstrip("<").rstrip(">").split(";") tune_groups[i][j] = {"H": float(tune_string[0]), "V": float(tune_string[1])} current = current + 1 return tune_groups class _Footprint: """More dark magic from the past here, close your eyes my friends.""" def __init__(self, tune_groups: List[List[Dict[str, float]]], amplitude: int, angle: int, dsigma: float): self._tunes = tune_groups self._maxnangl = angle self._nampl = amplitude self._dSigma = dsigma def get_h_tune(self, ampl, angl): if len(self._tunes[ampl]) <= angl < self._maxnangl: return self._tunes[ampl][len(self._tunes[ampl]) - 1]["H"] else: return self._tunes[ampl][angl]["H"] def get_v_tune(self, ampl, angl): if len(self._tunes[ampl]) <= angl < self._maxnangl: return self._tunes[ampl][len(self._tunes[ampl]) - 1]["V"] else: return self._tunes[ampl][angl]["V"] def get_plottable(self) -> Tuple[List[float], List[float]]: qxs, qys = [], [] for i in np.arange(0, self._nampl - 1, 2): for j in np.arange(self._maxnangl): qxs.append(self.get_h_tune(i, j)) qys.append(self.get_v_tune(i, j)) for j in np.arange(self._maxnangl - 1, -1, -1): qxs.append(self.get_h_tune(i + 1, j)) qys.append(self.get_v_tune(i + 1, j)) if self._nampl % 2 == 0: for j in np.arange(0, self._maxnangl - 1, 2): for i in np.arange(self._nampl - 1, -1, -1): qxs.append(self.get_h_tune(i, j)) qys.append(self.get_v_tune(i, j)) for i in np.arange(0, self._nampl, 1): qxs.append(self.get_h_tune(i, j + 1)) qys.append(self.get_v_tune(i, j + 1)) if self._maxnangl % 2 != 0: for i in np.arange(self._nampl - 1, -1, -1): qxs.append(self.get_h_tune(i, self._maxnangl - 1)) qys.append(self.get_v_tune(i, self._maxnangl - 1)) qxs.append(self.get_h_tune(0, self._maxnangl - 2)) qys.append(self.get_v_tune(0, self._maxnangl - 2)) else: for j in np.arange(self._maxnangl): qxs.append(self.get_h_tune(self._nampl - 1, j)) qys.append(self.get_v_tune(self._nampl - 1, j)) for j in np.arange(self._maxnangl - 1, -1, -2): for i in np.arange(self._nampl - 1, -1, -1): qxs.append(self.get_h_tune(i, j)) qys.append(self.get_v_tune(i, j)) for i in np.arange(0, self._nampl, 1): qxs.append(self.get_h_tune(i, j - 1)) qys.append(self.get_v_tune(i, j - 1)) return qxs, qys