Skip to content

API

starbox.calibrate

Radio telescope visibility calibration components.

This module provides classes and functions for calibrating radio telescope visibility data.

Classes:

Name Description
Solutions

Class for handling calibration solutions.

Solver

Class for solving for calibration solutions.

Solutions dataclass

Class for handling calibration solutions.

Attributes:

Name Type Description
station_phase_gains ndarray

Phase gains for each station.

Source code in src/starbox/calibrate/solutions.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@dataclass(slots=True)
class Solutions:
    """Class for handling calibration solutions.

    Attributes:
        station_phase_gains: Phase gains for each station.
    """

    station_phase_gains: np.ndarray

    def apply(self, visibilities: VisibilitySet) -> VisibilitySet:
        """Apply calibration solutions to visibilities."""

        # Placeholder implementation: return visibilities unchanged
        return visibilities

apply(visibilities)

Apply calibration solutions to visibilities.

Source code in src/starbox/calibrate/solutions.py
18
19
20
21
22
def apply(self, visibilities: VisibilitySet) -> VisibilitySet:
    """Apply calibration solutions to visibilities."""

    # Placeholder implementation: return visibilities unchanged
    return visibilities

Solver

Class to handle calibration solving.

Source code in src/starbox/calibrate/solver.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Solver:
    """Class to handle calibration solving."""

    def __init__(self, config: SolverConfig):
        self.config = config

    def solve(
        self,
        observed_visibilities: VisibilitySet,
        model_visibilities: VisibilitySet,
        n_stations: int,
    ) -> Solutions:
        """Estimate calibration solutions from observed and model visibilities."""

        n_timesteps, _, n_channels = observed_visibilities.vis.shape

        # Default: solve per timestep / per channel
        tbin = self.config.solution_interval_seconds or 1

        n_time_bins = int(np.ceil(n_timesteps / tbin))
        n_freq_bins = int(np.ceil(n_channels / 1))

        # Just return unity gains (phase = 0)
        gains = np.ones(
            (n_time_bins, n_freq_bins, n_stations),
            dtype=np.complex64,
        )
        return Solutions(station_phase_gains=gains)

solve(observed_visibilities, model_visibilities, n_stations)

Estimate calibration solutions from observed and model visibilities.

Source code in src/starbox/calibrate/solver.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def solve(
    self,
    observed_visibilities: VisibilitySet,
    model_visibilities: VisibilitySet,
    n_stations: int,
) -> Solutions:
    """Estimate calibration solutions from observed and model visibilities."""

    n_timesteps, _, n_channels = observed_visibilities.vis.shape

    # Default: solve per timestep / per channel
    tbin = self.config.solution_interval_seconds or 1

    n_time_bins = int(np.ceil(n_timesteps / tbin))
    n_freq_bins = int(np.ceil(n_channels / 1))

    # Just return unity gains (phase = 0)
    gains = np.ones(
        (n_time_bins, n_freq_bins, n_stations),
        dtype=np.complex64,
    )
    return Solutions(station_phase_gains=gains)

starbox.config

Module for configuration schemas.

Exports
  • ObservationConfig: Configuration schema for observations.
  • SkyModelConfig: Configuration schema for sky models.
  • TelescopeConfig: Configuration schema for telescopes.
  • TelescopeSiteConfig: Configuration schema for telescope sites.
  • CorruptionsConfig: Configuration schema for corruptions.
  • SolverConfig: Configuration schema for solvers.
  • ExperimentConfig: Configuration schema for experiments.

CorruptionsConfig

Bases: BaseModel

Configuration schema for the Corruptions.

Source code in src/starbox/config/corruptions.py
 6
 7
 8
 9
10
11
class CorruptionsConfig(BaseModel):
    """Configuration schema for the Corruptions."""

    seed: int = Field(ge=0)
    rms_noise: float = Field(ge=0)
    rms_phase_gain: float = Field(ge=0)

ExperimentConfig

Bases: BaseModel

Configuration for a simulation experiment.

Source code in src/starbox/config/experiment.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ExperimentConfig(BaseModel):
    """Configuration for a simulation experiment."""

    name: str = Field(..., description="Name of the experiment.")
    description: str | None = Field(
        None, description="Optional description of the experiment."
    )
    telescope: TelescopeConfig = Field(
        ..., description="Configuration for the telescope array."
    )
    skymodel: SkyModelConfig = Field(
        ..., description="Configuration for the sky model."
    )
    observation: ObservationConfig = Field(
        ..., description="Configuration for the observation."
    )
    corruptions: CorruptionsConfig = Field(
        ..., description="Configuration for the corruptions to apply."
    )
    solver: SolverConfig = Field(
        ..., description="Configuration for the calibration solver."
    )

ObservationConfig

Bases: BaseModel

Configuration schema for the Observation.

Source code in src/starbox/config/observation.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class ObservationConfig(BaseModel):
    """Configuration schema for the Observation."""

    start_time_mjd: float = Field(ge=0)
    observation_length: float = Field(gt=0)
    num_timesteps: int = Field(ge=1)
    start_frequency: float = Field(gt=0)
    num_channels: int = Field(ge=1)
    total_bandwidth: float = Field(gt=0)
    phase_centre_ra: float = Field(ge=0, le=360, default=0)
    phase_centre_dec: float = Field(ge=-90, le=90, default=0)
    pointing_centre_ra: float = Field(ge=0, le=360, default=0)
    pointing_centre_dec: float = Field(ge=-90, le=90, default=0)

SkyModelConfig

Bases: BaseModel

Configuration schema for the SkyModel.

Source code in src/starbox/config/skymodel.py
 6
 7
 8
 9
10
11
12
13
class SkyModelConfig(BaseModel):
    """Configuration schema for the SkyModel."""

    num_sources: int = Field(gt=0)
    max_flux_jy: float = Field(gt=0)
    field_centre_deg: tuple[float, float] = Field(default=(0, 0))
    fov_deg: float = Field(gt=0)
    seed: int = Field(ge=0)

SolverConfig

Bases: BaseModel

Configuration schema for the solver.

Source code in src/starbox/config/solver.py
 6
 7
 8
 9
10
11
class SolverConfig(BaseModel):
    """Configuration schema for the solver."""

    solution_interval_seconds: float = Field(
        gt=0, description="Solution interval in seconds."
    )

TelescopeConfig

Bases: BaseModel

Configuration schema for the Telescope.

Source code in src/starbox/config/telescope.py
14
15
16
17
18
19
20
class TelescopeConfig(BaseModel):
    """Configuration schema for the Telescope."""

    num_stations: int = Field(gt=0)
    diameter: float = Field(gt=0)
    seed: int = Field(ge=0)
    site: TelescopeSiteConfig

TelescopeSiteConfig

Bases: BaseModel

Configuration schema for the Telescope site.

Source code in src/starbox/config/telescope.py
 6
 7
 8
 9
10
11
class TelescopeSiteConfig(BaseModel):
    """Configuration schema for the Telescope site."""

    latitude_deg: float = Field(ge=-90.0, le=90.0)
    longitude_deg: float = Field(ge=-180.0, le=180.0)
    altitude_m: float = 0.0

starbox.factory

Module for building simulation components.

Functions to build sky models, observations, and telescopes.

Exports
  • build_skymodel: Function to build sky models.
  • build_observation: Function to build observations.
  • build_telescope: Function to build telescopes.
  • build_corruptions: Function to build corruptions.
  • build_solver: Function to build solvers.

build_corruptions(cfg)

Build a Corruptions instance from a CorruptionsConfig.

Parameters:

Name Type Description Default
cfg CorruptionsConfig

The CorruptionsConfig instance.

required

Returns: A Corruptions instance.

Source code in src/starbox/factory/corruptions.py
 7
 8
 9
10
11
12
13
14
15
def build_corruptions(cfg: CorruptionsConfig) -> Corruptions:
    """Build a Corruptions instance from a CorruptionsConfig.

    Args:
        cfg: The CorruptionsConfig instance.
    Returns:
        A Corruptions instance.
    """
    return Corruptions(cfg)

build_observation(config)

Build an Observation from its configuration.

Parameters:

Name Type Description Default
config ObservationConfig

The Observation configuration.

required

Returns: The built Observation.

Source code in src/starbox/factory/observation.py
 7
 8
 9
10
11
12
13
14
15
def build_observation(config: ObservationConfig) -> Observation:
    """Build an Observation from its configuration.

    Args:
        config: The Observation configuration.
    Returns:
        The built Observation.
    """
    return Observation(config)

build_skymodel(cfg)

Build a SkyModel from a SkyModelConfig.

Parameters:

Name Type Description Default
cfg SkyModelConfig

Configuration for the sky model.

required

Returns:

Name Type Description
SkyModel SkyModel

The constructed sky model.

Source code in src/starbox/factory/skymodel.py
 7
 8
 9
10
11
12
13
14
15
16
def build_skymodel(cfg: SkyModelConfig) -> SkyModel:
    """Build a SkyModel from a SkyModelConfig.

    Args:
        cfg (SkyModelConfig): Configuration for the sky model.

    Returns:
        SkyModel: The constructed sky model.
    """
    return SkyModel(config=cfg)

build_solver(cfg)

Build a Solver instance from the given configuration.

Parameters:

Name Type Description Default
cfg SolverConfig

Configuration for the solver.

required

Returns: An instance of Solver.

Source code in src/starbox/factory/solver.py
 7
 8
 9
10
11
12
13
14
15
def build_solver(cfg: SolverConfig) -> Solver:
    """Build a Solver instance from the given configuration.

    Args:
        cfg: Configuration for the solver.
    Returns:
        An instance of Solver.
    """
    return Solver(cfg)

build_telescope(cfg, name='Telescope')

Build a Telescope from a TelescopeConfig.

Parameters:

Name Type Description Default
cfg TelescopeConfig

Configuration for the telescope.

required

Returns:

Name Type Description
Telescope Telescope

The constructed telescope.

Source code in src/starbox/factory/telescope.py
 7
 8
 9
10
11
12
13
14
15
16
def build_telescope(cfg: TelescopeConfig, name="Telescope") -> Telescope:
    """Build a Telescope from a TelescopeConfig.

    Args:
        cfg (TelescopeConfig): Configuration for the telescope.

    Returns:
        Telescope: The constructed telescope.
    """
    return Telescope(cfg, name=name)

starbox.image

Radio telescope visibility imaging components.

This module provides classes and functions for creating images from radio telescope visibility data.

Classes:

Name Description
Imager

Class for gridding and imaging radio telescope visibilities.

Imager

Class for handling image processing.

Source code in src/starbox/image/imager.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Imager:
    """Class for handling image processing."""

    def __init__(self, grid_size: int = 256, fov_deg: float = 5.0):
        """Initialize the Imager.

        Args:
            grid_size: Number of pixels along each image axis.
            fov_deg: Imaged field of view in degrees.
        """
        if fov_deg <= 0:
            raise ValueError(f"fov_deg must be positive, got {fov_deg!r}")
        self.grid_size = grid_size
        self.fov_deg = fov_deg

    def grid(self, visibilities: VisibilitySet) -> np.ndarray:
        """
        Grid visibilities onto a regular uv grid using nearest-neighbour mapping,
        using all times and frequency channels.

        Notes:
          - Uses uniform weighting (simple sum).
          - Adds Hermitian symmetric samples to encourage a real dirty image.
          - Grid is centered: DC is at (grid_size//2, grid_size//2).
        """
        grid = np.zeros((self.grid_size, self.grid_size), dtype=np.complex128)

        uvw_m = np.asarray(visibilities.uvw_m, dtype=float)  # (T,B,3)
        vis = np.asarray(visibilities.vis, dtype=np.complex128)  # (T,B,F)
        freqs = np.asarray(visibilities.freqs_hz, dtype=float)  # (F,)

        half = self.grid_size // 2
        centre = (self.grid_size - 1) / 2.0

        # Set uv scaling from requested image field-of-view.
        # Approximate relation: uv_max ~ N / (2 * FoV_rad).
        fov_rad = np.deg2rad(self.fov_deg)
        uv_max = self.grid_size / (2.0 * fov_rad)

        # Map uv in [-uv_max, uv_max] linearly onto pixel indices [0, grid_size-1].
        # With this choice:
        #   u = -uv_max -> 0
        #   u =  0      -> ~ (grid_size - 1) / 2 (centre)
        #   u = +uv_max -> grid_size - 1
        scale = (self.grid_size - 1) / (2.0 * uv_max)

        # Grid all samples
        # Vectorized over times, baselines, and frequency channels.
        grid_flat = grid.ravel()

        # Broadcast uvw with frequency axis to get u, v in wavelengths for all channels
        # uvw_m[..., 0] and uvw_m[..., 1] have shape (T, B); freqs has shape (F,)
        # Use u = u_m * f / c rather than dividing by lambda explicitly.
        u_m = uvw_m[:, :, 0]  # (T, B)
        v_m = uvw_m[:, :, 1]  # (T, B)
        freqs_reshaped = freqs.reshape(1, 1, -1)  # (1, 1, F)
        u = u_m[:, :, None] * freqs_reshaped / SPEED_OF_LIGHT  # (T, B, F)
        v = v_m[:, :, None] * freqs_reshaped / SPEED_OF_LIGHT  # (T, B, F)

        # Pixel coordinates for all times, baselines, and channels
        u_pix = np.rint(u * scale + centre).astype(np.int64)  # (T, B, F)
        v_pix = np.rint(v * scale + centre).astype(np.int64)  # (T, B, F)

        # Flatten (T, B, F) -> (N,) where N = T * B * F
        u_flat = u.ravel()
        v_flat = v.ravel()
        up_flat = u_pix.ravel()
        vp_flat = v_pix.ravel()
        vis_flat = vis.ravel().astype(np.complex128)

        # Mask samples outside the imaged FoV in uv-space
        fov_mask = (np.abs(u_flat) <= uv_max) & (np.abs(v_flat) <= uv_max)

        # Mask samples with pixel coordinates outside the grid
        in_x = (up_flat >= 0) & (up_flat < self.grid_size)
        in_y = (vp_flat >= 0) & (vp_flat < self.grid_size)
        pix_mask = in_x & in_y

        # Combined validity mask
        mask = fov_mask & pix_mask
        if np.any(mask):
            up_valid = up_flat[mask]
            vp_valid = vp_flat[mask]
            vals_valid = vis_flat[mask]

            # Linear indices into flattened grid for direct accumulation
            idx = (vp_valid * self.grid_size + up_valid).astype(np.int64)
            n_pix = grid_flat.size

            # Accumulate direct contributions using bincount on real and imaginary parts
            real_accum = np.bincount(idx, weights=np.real(vals_valid), minlength=n_pix)
            imag_accum = np.bincount(idx, weights=np.imag(vals_valid), minlength=n_pix)
            grid_flat += real_accum + 1j * imag_accum

            # Hermitian symmetric points about the DC centre (half, half)
            sym_u = (2 * half - up_valid) % self.grid_size
            sym_v = (2 * half - vp_valid) % self.grid_size
            sym_idx = (sym_v * self.grid_size + sym_u).astype(np.int64)

            # Avoid double-counting samples whose symmetric pixel is the same as the original
            self_sym_mask = sym_idx != idx
            if np.any(self_sym_mask):
                sym_idx_valid = sym_idx[self_sym_mask]
                sym_vals_valid = np.conj(vals_valid[self_sym_mask])
                real_sym = np.bincount(
                    sym_idx_valid,
                    weights=np.real(sym_vals_valid),
                    minlength=n_pix,
                )
                imag_sym = np.bincount(
                    sym_idx_valid,
                    weights=np.imag(sym_vals_valid),
                    minlength=n_pix,
                )
                grid_flat += real_sym + 1j * imag_sym

        return grid

    def ifft(self, gridded_visibilities: np.ndarray) -> np.ndarray:
        """
        Inverse FFT uv-grid -> dirty image.
        Because the grid is centered (DC at centre), use ifftshift before ifft2.
        """
        img = np.fft.ifft2(np.fft.ifftshift(gridded_visibilities))
        img = np.fft.fftshift(img)
        return np.real(img)

    def image(self, visibilities: VisibilitySet) -> np.ndarray:
        """Create an image from visibilities."""
        gridded_visibilities = self.grid(visibilities=visibilities)
        image = self.ifft(gridded_visibilities)
        return image

__init__(grid_size=256, fov_deg=5.0)

Initialize the Imager.

Parameters:

Name Type Description Default
grid_size int

Number of pixels along each image axis.

256
fov_deg float

Imaged field of view in degrees.

5.0
Source code in src/starbox/image/imager.py
12
13
14
15
16
17
18
19
20
21
22
def __init__(self, grid_size: int = 256, fov_deg: float = 5.0):
    """Initialize the Imager.

    Args:
        grid_size: Number of pixels along each image axis.
        fov_deg: Imaged field of view in degrees.
    """
    if fov_deg <= 0:
        raise ValueError(f"fov_deg must be positive, got {fov_deg!r}")
    self.grid_size = grid_size
    self.fov_deg = fov_deg

grid(visibilities)

Grid visibilities onto a regular uv grid using nearest-neighbour mapping, using all times and frequency channels.

Notes
  • Uses uniform weighting (simple sum).
  • Adds Hermitian symmetric samples to encourage a real dirty image.
  • Grid is centered: DC is at (grid_size//2, grid_size//2).
Source code in src/starbox/image/imager.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def grid(self, visibilities: VisibilitySet) -> np.ndarray:
    """
    Grid visibilities onto a regular uv grid using nearest-neighbour mapping,
    using all times and frequency channels.

    Notes:
      - Uses uniform weighting (simple sum).
      - Adds Hermitian symmetric samples to encourage a real dirty image.
      - Grid is centered: DC is at (grid_size//2, grid_size//2).
    """
    grid = np.zeros((self.grid_size, self.grid_size), dtype=np.complex128)

    uvw_m = np.asarray(visibilities.uvw_m, dtype=float)  # (T,B,3)
    vis = np.asarray(visibilities.vis, dtype=np.complex128)  # (T,B,F)
    freqs = np.asarray(visibilities.freqs_hz, dtype=float)  # (F,)

    half = self.grid_size // 2
    centre = (self.grid_size - 1) / 2.0

    # Set uv scaling from requested image field-of-view.
    # Approximate relation: uv_max ~ N / (2 * FoV_rad).
    fov_rad = np.deg2rad(self.fov_deg)
    uv_max = self.grid_size / (2.0 * fov_rad)

    # Map uv in [-uv_max, uv_max] linearly onto pixel indices [0, grid_size-1].
    # With this choice:
    #   u = -uv_max -> 0
    #   u =  0      -> ~ (grid_size - 1) / 2 (centre)
    #   u = +uv_max -> grid_size - 1
    scale = (self.grid_size - 1) / (2.0 * uv_max)

    # Grid all samples
    # Vectorized over times, baselines, and frequency channels.
    grid_flat = grid.ravel()

    # Broadcast uvw with frequency axis to get u, v in wavelengths for all channels
    # uvw_m[..., 0] and uvw_m[..., 1] have shape (T, B); freqs has shape (F,)
    # Use u = u_m * f / c rather than dividing by lambda explicitly.
    u_m = uvw_m[:, :, 0]  # (T, B)
    v_m = uvw_m[:, :, 1]  # (T, B)
    freqs_reshaped = freqs.reshape(1, 1, -1)  # (1, 1, F)
    u = u_m[:, :, None] * freqs_reshaped / SPEED_OF_LIGHT  # (T, B, F)
    v = v_m[:, :, None] * freqs_reshaped / SPEED_OF_LIGHT  # (T, B, F)

    # Pixel coordinates for all times, baselines, and channels
    u_pix = np.rint(u * scale + centre).astype(np.int64)  # (T, B, F)
    v_pix = np.rint(v * scale + centre).astype(np.int64)  # (T, B, F)

    # Flatten (T, B, F) -> (N,) where N = T * B * F
    u_flat = u.ravel()
    v_flat = v.ravel()
    up_flat = u_pix.ravel()
    vp_flat = v_pix.ravel()
    vis_flat = vis.ravel().astype(np.complex128)

    # Mask samples outside the imaged FoV in uv-space
    fov_mask = (np.abs(u_flat) <= uv_max) & (np.abs(v_flat) <= uv_max)

    # Mask samples with pixel coordinates outside the grid
    in_x = (up_flat >= 0) & (up_flat < self.grid_size)
    in_y = (vp_flat >= 0) & (vp_flat < self.grid_size)
    pix_mask = in_x & in_y

    # Combined validity mask
    mask = fov_mask & pix_mask
    if np.any(mask):
        up_valid = up_flat[mask]
        vp_valid = vp_flat[mask]
        vals_valid = vis_flat[mask]

        # Linear indices into flattened grid for direct accumulation
        idx = (vp_valid * self.grid_size + up_valid).astype(np.int64)
        n_pix = grid_flat.size

        # Accumulate direct contributions using bincount on real and imaginary parts
        real_accum = np.bincount(idx, weights=np.real(vals_valid), minlength=n_pix)
        imag_accum = np.bincount(idx, weights=np.imag(vals_valid), minlength=n_pix)
        grid_flat += real_accum + 1j * imag_accum

        # Hermitian symmetric points about the DC centre (half, half)
        sym_u = (2 * half - up_valid) % self.grid_size
        sym_v = (2 * half - vp_valid) % self.grid_size
        sym_idx = (sym_v * self.grid_size + sym_u).astype(np.int64)

        # Avoid double-counting samples whose symmetric pixel is the same as the original
        self_sym_mask = sym_idx != idx
        if np.any(self_sym_mask):
            sym_idx_valid = sym_idx[self_sym_mask]
            sym_vals_valid = np.conj(vals_valid[self_sym_mask])
            real_sym = np.bincount(
                sym_idx_valid,
                weights=np.real(sym_vals_valid),
                minlength=n_pix,
            )
            imag_sym = np.bincount(
                sym_idx_valid,
                weights=np.imag(sym_vals_valid),
                minlength=n_pix,
            )
            grid_flat += real_sym + 1j * imag_sym

    return grid

ifft(gridded_visibilities)

Inverse FFT uv-grid -> dirty image. Because the grid is centered (DC at centre), use ifftshift before ifft2.

Source code in src/starbox/image/imager.py
127
128
129
130
131
132
133
134
def ifft(self, gridded_visibilities: np.ndarray) -> np.ndarray:
    """
    Inverse FFT uv-grid -> dirty image.
    Because the grid is centered (DC at centre), use ifftshift before ifft2.
    """
    img = np.fft.ifft2(np.fft.ifftshift(gridded_visibilities))
    img = np.fft.fftshift(img)
    return np.real(img)

image(visibilities)

Create an image from visibilities.

Source code in src/starbox/image/imager.py
136
137
138
139
140
def image(self, visibilities: VisibilitySet) -> np.ndarray:
    """Create an image from visibilities."""
    gridded_visibilities = self.grid(visibilities=visibilities)
    image = self.ifft(gridded_visibilities)
    return image

starbox.io

I/O module.

Contains functions and classes for reading and writing configuration files.

Functions:

Name Description
- save

Save experiment configuration data to a file.

starbox.predict

Radio telescope visibility prediction components.

This module provides functions for predicting radio telescope visibility data.

Functions:

Name Description
predict_visibilities

Function to predict visibilities.

predict_visibilities(telescope, skymodel, observation)

Predict visibilities given a telescope, sky model, and observation.

Source code in src/starbox/predict/predict.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def predict_visibilities(
    telescope: Telescope, skymodel: SkyModel, observation: Observation
) -> VisibilitySet:
    """Predict visibilities given a telescope, sky model, and observation."""

    uvw_m = calculate_uvw(
        gmst_rad=observation.gmst_rad,
        phase_centre_rad=observation.phase_centre_rad,
        baselines_ecef_m=telescope.baselines_ecef,
    )

    # Inverse wavelength for each channel (1 / meters)
    inv_wavelength_m = observation.frequencies_hz / SPEED_OF_LIGHT

    # Extract sky model parameters as arrays
    ra_arr, dec_arr, flux_arr = skymodel.as_arrays_rad()

    # Precompute direction cosines (l, m, n-1) for all sources
    num_sources = ra_arr.size
    l_arr = np.empty(num_sources, dtype=np.float64)
    m_arr = np.empty(num_sources, dtype=np.float64)
    n_arr = np.empty(num_sources, dtype=np.float64)
    for i, (ra_rad, dec_rad) in enumerate(zip(ra_arr, dec_arr)):
        l_dir, m_dir, n_dir = calculate_lmn(
            ra_dec_rad=(ra_rad, dec_rad),
            phase_centre_rad=observation.phase_centre_rad,
        )
        l_arr[i] = l_dir
        m_arr[i] = m_dir
        n_arr[i] = n_dir

    # Direction vectors (l, m, n-1) for each source: shape (num_sources, 3)
    dir_vecs = np.stack([l_arr, m_arr, n_arr - 1.0], axis=1)

    # Project UVW coordinates onto all source direction vectors.
    # uvw_m: (num_times, num_baselines, 3)
    # dir_vecs: (num_sources, 3)
    # Result proj: (num_times, num_baselines, num_sources)
    proj = np.tensordot(uvw_m, dir_vecs, axes=([2], [1]))

    # Allocate output visibilities: (num_times, num_baselines, num_channels)
    visibilities = np.zeros(
        (observation.num_times, telescope.num_baselines, observation.num_channels),
        dtype=np.complex128,
    )

    # Accumulate contributions from each source without forming large 4D arrays.
    for s in range(num_sources):
        # Path difference (meters) projected for source s: (num_times, num_baselines)
        proj_s = proj[:, :, s]
        # Convert to phase cycles for all channels: (num_times, num_baselines, num_channels)
        phase_cycles_s = (
            proj_s[:, :, np.newaxis] * inv_wavelength_m[np.newaxis, np.newaxis, :]
        )
        # Add this source's contribution, weighted by its flux.
        visibilities += np.exp(-2j * np.pi * phase_cycles_s) * flux_arr[s]

    station1_index, station2_index = np.triu_indices(
        telescope.num_stations, k=1
    )  # strictly upper triangle
    visibilities_set = VisibilitySet(
        vis=visibilities,
        uvw_m=uvw_m,
        station1=station1_index,
        station2=station2_index,
        times_mjd=observation.times_mjd,
        freqs_hz=observation.frequencies_hz,
        weights=np.ones(
            (observation.num_times, telescope.num_baselines, observation.num_channels)
        ),
    )
    return visibilities_set

starbox.simulate

Radio telescope simulation components.

This module provides classes and functions for simulating random radio telescope array configurations and sky models.

Classes:

Name Description
Telescope

A class representing a radio telescope array with random antenna configurations.

SkyModel

A class for simulating sky models with random sources.

Observation

A class representing an observation setup including time and frequency parameters.

Corruptions

A class for simulating corruptions to the observed signal.

Corruptions

A class representing corruptions to apply to a signal.

Attributes:

Name Type Description
rms_noise

The RMS noise level to add to the visibilities.

station_phase_gain

Phase gain errors for each station.

Source code in src/starbox/simulate/corruptions.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Corruptions:
    """A class representing corruptions to apply to a signal.

    Attributes:
        rms_noise: The RMS noise level to add to the visibilities.
        station_phase_gain: Phase gain errors for each station.
    """

    def __init__(self, config: CorruptionsConfig):
        self.config = config
        self.rng = np.random.default_rng(self.config.seed)
        self._add_noise()
        self._add_station_phase_gain()

    def _add_noise(self):
        """Add Gaussian noise corruption."""
        self.sigma = self.config.rms_noise / np.sqrt(2)

    def _add_station_phase_gain(self):
        """Add station phase gain corruption."""
        self.rms_phase_gain = self.config.rms_phase_gain

    def apply(self, visibility_set: VisibilitySet) -> VisibilitySet:
        """Apply the corruptions to the given visibilities."""
        corrupted_visibility_set = VisibilitySet(
            vis=np.copy(visibility_set.vis),
            uvw_m=visibility_set.uvw_m,
            station1=visibility_set.station1,
            station2=visibility_set.station2,
            times_mjd=visibility_set.times_mjd,
            freqs_hz=visibility_set.freqs_hz,
            weights=visibility_set.weights,
        )
        if abs(self.rms_phase_gain) != 0.0:
            station_phase_gains = self._sample_station_phase_gains(
                num_stations=visibility_set.num_stations
            )
            corrupted_visibility_set = self._apply_station_phase_gain(
                corrupted_visibility_set, station_phase_gains
            )
        if abs(self.sigma) != 0.0:
            corrupted_visibility_set = self._apply_noise(corrupted_visibility_set)

        return corrupted_visibility_set

    def _apply_station_phase_gain(
        self, visibility_set: VisibilitySet, station_phase_gains: np.ndarray
    ) -> VisibilitySet:
        """Apply only the station phase gain corruption to the given visibilities."""
        phase_gains_1 = station_phase_gains[visibility_set.station1]
        phase_gains_2 = station_phase_gains[visibility_set.station2]
        # Broadcast station gains to all times and channels
        phase_gains_1 = phase_gains_1[np.newaxis, :, np.newaxis]
        phase_gains_2 = phase_gains_2[np.newaxis, :, np.newaxis]
        visibility_set.vis *= phase_gains_1 * np.conj(phase_gains_2)

        return visibility_set

    def _sample_station_phase_gains(self, num_stations: int) -> np.ndarray:
        """Sample random phase gains for each station."""
        phi = self.rng.normal(loc=0.0, scale=self.rms_phase_gain, size=num_stations)
        # Reference station to have zero phase gain
        ref_station = 0
        phi[ref_station] = 0.0
        station_phase_gains = np.exp(1j * phi)

        return station_phase_gains

    def _apply_noise(self, visibility_set: VisibilitySet) -> VisibilitySet:
        """Apply only the noise corruption to the given visibilities."""
        noise_real = self.rng.normal(scale=self.sigma, size=visibility_set.vis.shape)
        noise_imag = self.rng.normal(scale=self.sigma, size=visibility_set.vis.shape)
        noise = noise_real + 1j * noise_imag
        visibility_set.vis += noise

        return visibility_set

apply(visibility_set)

Apply the corruptions to the given visibilities.

Source code in src/starbox/simulate/corruptions.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def apply(self, visibility_set: VisibilitySet) -> VisibilitySet:
    """Apply the corruptions to the given visibilities."""
    corrupted_visibility_set = VisibilitySet(
        vis=np.copy(visibility_set.vis),
        uvw_m=visibility_set.uvw_m,
        station1=visibility_set.station1,
        station2=visibility_set.station2,
        times_mjd=visibility_set.times_mjd,
        freqs_hz=visibility_set.freqs_hz,
        weights=visibility_set.weights,
    )
    if abs(self.rms_phase_gain) != 0.0:
        station_phase_gains = self._sample_station_phase_gains(
            num_stations=visibility_set.num_stations
        )
        corrupted_visibility_set = self._apply_station_phase_gain(
            corrupted_visibility_set, station_phase_gains
        )
    if abs(self.sigma) != 0.0:
        corrupted_visibility_set = self._apply_noise(corrupted_visibility_set)

    return corrupted_visibility_set

Observation

Observation configuration and derived sampling grids.

Source code in src/starbox/simulate/observation.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Observation:
    """Observation configuration and derived sampling grids."""

    def __init__(self, config: ObservationConfig):
        self.config = config
        self.channel_width = self.config.total_bandwidth / self.config.num_channels
        self._get_times()
        self.num_times = len(self.times_mjd)
        self._get_frequencies()
        self.num_channels = len(self.frequencies_hz)

    @property
    def phase_centre_rad(self) -> tuple[float, float]:
        """Return the phase centre in radians."""
        ra_rad = math.radians(self.config.phase_centre_ra)
        dec_rad = math.radians(self.config.phase_centre_dec)
        return ra_rad, dec_rad

    @property
    def pointing_centre_rad(self) -> tuple[float, float]:
        """Return the pointing centre in radians."""
        ra_rad = math.radians(self.config.pointing_centre_ra)
        dec_rad = math.radians(self.config.pointing_centre_dec)
        return ra_rad, dec_rad

    @property
    def gmst_rad(self) -> npt.NDArray[np.float64]:
        """Return the times converted to Greenwich mean sidereal time."""
        # Avoid mutating global Astropy IERS configuration permanently by
        # using a temporary config context while computing sidereal times.
        with iers.conf.set_temp("auto_download", False):
            times = Time(self.times_mjd, format="mjd", scale="utc")
            return np.asarray(
                times.sidereal_time("mean", "greenwich").rad,
                dtype=np.float64,
            )

    def _get_times(self) -> None:
        """Return time samples for the observation."""
        if self.config.num_timesteps > 1:
            timestep_seconds = self.config.observation_length / (
                self.config.num_timesteps - 1
            )
            timestep_mjd = timestep_seconds / 86_400.0
            self.times_mjd = np.array(
                [
                    self.config.start_time_mjd + i * timestep_mjd
                    for i in range(self.config.num_timesteps)
                ]
            )
        else:
            self.times_mjd = np.array([self.config.start_time_mjd])

    def _get_frequencies(self) -> None:
        """Return frequency channels for the observation."""
        self.frequencies_hz = np.array(
            [
                self.config.start_frequency + i * self.channel_width
                for i in range(self.config.num_channels)
            ]
        )

gmst_rad property

Return the times converted to Greenwich mean sidereal time.

phase_centre_rad property

Return the phase centre in radians.

pointing_centre_rad property

Return the pointing centre in radians.

SkyModel

A class representing a sky model.

Attributes:

Name Type Description
name

The name of the sky model.

ra_deg

Right ascension of sources in degrees.

dec_deg

Declination of sources in degrees.

flux_jy

Flux densities of sources in Jansky.

config

The SkyModelConfig used to generate this sky model.

Source code in src/starbox/simulate/skymodel.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class SkyModel:
    """A class representing a sky model.

    Attributes:
        name: The name of the sky model.
        ra_deg: Right ascension of sources in degrees.
        dec_deg: Declination of sources in degrees.
        flux_jy: Flux densities of sources in Jansky.
        config: The SkyModelConfig used to generate this sky model.
    """

    def __init__(
        self,
        config: SkyModelConfig,
        name: str = "Sky Model",
    ):
        self.name = name
        self.config = config
        self._generate_sources()

    def _generate_sources(self):
        """Generate the sky model sources."""
        rng = np.random.default_rng(self.config.seed)
        ra_centre, dec_centre = self.config.field_centre_deg
        half_fov_deg = self.config.fov_deg / 2.0

        self.ra_deg = rng.uniform(
            ra_centre - half_fov_deg, ra_centre + half_fov_deg, self.config.num_sources
        )
        self.dec_deg = rng.uniform(
            dec_centre - half_fov_deg,
            dec_centre + half_fov_deg,
            self.config.num_sources,
        )
        self.flux_jy = rng.uniform(
            0.0, self.config.max_flux_jy, self.config.num_sources
        )

    def as_arrays(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        return self.ra_deg, self.dec_deg, self.flux_jy

    def as_arrays_rad(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        return np.deg2rad(self.ra_deg), np.deg2rad(self.dec_deg), self.flux_jy

    def equals(self, other: "SkyModel", atol: float = 0.0, rtol: float = 0.0) -> bool:
        """Check equality with another SkyModel within a tolerance."""
        ra1, dec1, f1 = self.as_arrays()
        ra2, dec2, f2 = other.as_arrays()

        return (
            np.allclose(ra1, ra2, atol=atol, rtol=rtol)
            and np.allclose(dec1, dec2, atol=atol, rtol=rtol)
            and np.allclose(f1, f2, atol=atol, rtol=rtol)
        )

equals(other, atol=0.0, rtol=0.0)

Check equality with another SkyModel within a tolerance.

Source code in src/starbox/simulate/skymodel.py
52
53
54
55
56
57
58
59
60
61
def equals(self, other: "SkyModel", atol: float = 0.0, rtol: float = 0.0) -> bool:
    """Check equality with another SkyModel within a tolerance."""
    ra1, dec1, f1 = self.as_arrays()
    ra2, dec2, f2 = other.as_arrays()

    return (
        np.allclose(ra1, ra2, atol=atol, rtol=rtol)
        and np.allclose(dec1, dec2, atol=atol, rtol=rtol)
        and np.allclose(f1, f2, atol=atol, rtol=rtol)
    )

Telescope

A class representing a radio telescope array.

Source code in src/starbox/simulate/telescope.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class Telescope:
    """A class representing a radio telescope array."""

    def __init__(
        self,
        cfg: TelescopeConfig,
        name: str = "Telescope",
    ):
        """Initialize the Telescope.

        Args:
            cfg: TelescopeConfig instance containing configuration parameters.
            name: Name of the telescope.
        """
        self.name = name
        self.config = cfg

        self.rng = np.random.default_rng(self.config.seed)
        self.num_stations = self.config.num_stations
        self.num_baselines = self.num_stations * (self.num_stations - 1) // 2
        self.station_positions = self._configure_array()
        self.station_ids = np.array(
            [f"{self.name}_STN{idx:03d}" for idx in range(self.num_stations)]
        )
        self.baselines_ecef = self._compute_baselines(self._enu_to_ecef())

    def _configure_array(self) -> np.ndarray:
        """Configure an array of antennas.

        Generates a numpy array of shape (num_stations, 3) representing antenna positions in east,
        north, and up arranged randomly within a circle of the telescope's diameter.
        """
        angles = self._get_angles()
        radii = self._get_radii()
        east, north, up = _compute_enu_coordinates(angles, radii)
        return np.column_stack((east, north, up))

    def _compute_baselines(self, positions) -> np.ndarray:
        """Compute baselines from station positions.

        Args:
            positions: A numpy array of shape (num_stations, 3) containing the coordinates of each station.

        Returns:
            A numpy array of shape (num_baselines, 3) containing the baseline vectors.
        """
        i, j = np.triu_indices(self.num_stations, k=1)
        return positions[j] - positions[i]

    def _get_angles(self) -> np.ndarray:
        """Generate random angles for antenna placement."""
        return self.rng.uniform(0, 2 * np.pi, self.num_stations)

    def _get_radii(self) -> np.ndarray:
        """Generate random radii for antenna placement within the telescope diameter."""
        radius = self.config.diameter / 2
        return radius * np.sqrt(self.rng.uniform(0, 1, self.num_stations))

    def _enu_to_ecef(self) -> np.ndarray:
        """Express station ENU offsets in the ECEF basis at the telescope site.

        This method uses the telescope's station positions stored in
        ``self.station_positions`` (in local ENU coordinates) and applies the
        site-specific ENU→ECEF rotation matrix. The result is a set of station
        displacement vectors expressed in the global Earth-centred, Earth-Fixed
        (ECEF) coordinate basis, but **no origin translation to the site's
        absolute ECEF position is applied**.

        This is appropriate for computing baselines (differences between station
        vectors), where any common origin offset cancels out. It does *not*
        produce absolute ECEF positions of the stations.

        Returns:
            A numpy array of shape (num_stations, 3) containing station vectors
            in the ECEF basis at the site.
        """
        rotation_matrix = self._rotation_matrix()
        ecef = self.station_positions @ rotation_matrix.T
        return ecef

    def _rotation_matrix(self) -> np.ndarray:
        """Calculate the ENU→ECEF rotation matrix at the telescope site.

        The returned matrix rotates local ENU vectors at the site's latitude and
        longitude into the global Earth-centred, Earth-Fixed (ECEF) frame.
        """
        lat0_rad = np.radians(self.config.site.latitude_deg)
        lon0_rad = np.radians(self.config.site.longitude_deg)

        return np.array(
            [
                [
                    -np.sin(lon0_rad),
                    -np.sin(lat0_rad) * np.cos(lon0_rad),
                    np.cos(lat0_rad) * np.cos(lon0_rad),
                ],
                [
                    np.cos(lon0_rad),
                    -np.sin(lat0_rad) * np.sin(lon0_rad),
                    np.cos(lat0_rad) * np.sin(lon0_rad),
                ],
                [0, np.cos(lat0_rad), np.sin(lat0_rad)],
            ]
        )

__init__(cfg, name='Telescope')

Initialize the Telescope.

Parameters:

Name Type Description Default
cfg TelescopeConfig

TelescopeConfig instance containing configuration parameters.

required
name str

Name of the telescope.

'Telescope'
Source code in src/starbox/simulate/telescope.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    cfg: TelescopeConfig,
    name: str = "Telescope",
):
    """Initialize the Telescope.

    Args:
        cfg: TelescopeConfig instance containing configuration parameters.
        name: Name of the telescope.
    """
    self.name = name
    self.config = cfg

    self.rng = np.random.default_rng(self.config.seed)
    self.num_stations = self.config.num_stations
    self.num_baselines = self.num_stations * (self.num_stations - 1) // 2
    self.station_positions = self._configure_array()
    self.station_ids = np.array(
        [f"{self.name}_STN{idx:03d}" for idx in range(self.num_stations)]
    )
    self.baselines_ecef = self._compute_baselines(self._enu_to_ecef())

starbox.viz

Visualization module.

This module provides functions for plotting radio telescope array configurations, sky models, uv-coverage, calibration solutions and images.

Functions:

Name Description
plot_telescope

Plot the array configuration of a telescope.

plot_sky_model

Plot the sky model sources.

plot_uv_coverage

Plot the UV coverage given UVW coordinates.

plot_gains

Plot the calibration solutions.

plot_image

Plot the 2D image data.

plot_gains(solutions)

Plot the calibration solutions.

Source code in src/starbox/viz/plot.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def plot_gains(solutions: Solutions) -> Figure:
    """Plot the calibration solutions."""

    fig = px.imshow(
        # Transpose from (time, freq, station) to (station, freq, time) so each frame has
        # time on the x-axis, frequency on the y-axis, and animation_frame=0 animates stations.
        np.real(np.transpose(solutions.station_phase_gains, (2, 1, 0))),
        title="Gains",
        labels={
            "x": "time",
            "y": "frequency",
        },
        origin="lower",
        aspect="auto",
        animation_frame=0,  # Animate over stations
    )
    return fig

plot_image(image, title='Imaged Sky', fov_deg=None)

Plot the image.

Source code in src/starbox/viz/plot.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def plot_image(
    image: np.ndarray, title: str = "Imaged Sky", fov_deg: float | None = None
) -> Figure:
    """Plot the image."""

    if fov_deg is None:
        fig = px.imshow(
            image,
            origin="lower",
            title=title,
            labels={"x": "RA", "y": "Dec"},
        )
    else:
        x_deg = np.linspace(-fov_deg / 2.0, fov_deg / 2.0, image.shape[1])
        y_deg = np.linspace(-fov_deg / 2.0, fov_deg / 2.0, image.shape[0])
        fig = px.imshow(
            image,
            x=x_deg,
            y=y_deg,
            origin="lower",
            title=title,
            labels={"x": "ΔRA (deg)", "y": "ΔDec (deg)"},
        )
    fig.update_yaxes(scaleanchor="x", scaleratio=1)
    return fig

plot_sky_model(sky_model)

Plot the sky model sources.

Source code in src/starbox/viz/plot.py
25
26
27
28
29
30
31
32
33
34
35
36
def plot_sky_model(sky_model: SkyModel) -> Figure:
    """Plot the sky model sources."""
    ras, decs, fluxes = sky_model.as_arrays()
    fig = px.scatter(
        x=ras,
        y=decs,
        size=fluxes,
        title="Sky Model",
        labels={"x": "Right Ascension (deg)", "y": "Declination (deg)"},
    )
    fig.update_yaxes(scaleanchor="x", scaleratio=1)
    return fig

plot_telescope(telescope)

Plot the array configuration given antenna coordinates.

Source code in src/starbox/viz/plot.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def plot_telescope(telescope: Telescope) -> Figure:
    """Plot the array configuration given antenna coordinates."""
    fig = px.scatter(
        x=telescope.station_positions[:, 0],
        y=telescope.station_positions[:, 1],
        title=f"{telescope.name}",
    )
    fig.update_layout(
        xaxis_title="East [m]",
        yaxis_title="North [m]",
        yaxis_scaleanchor="x",
        yaxis_scaleratio=1,
    )
    fig.update_traces(marker=dict(size=15, color="blue", symbol="cross"))

    return fig

plot_uv_coverage(uvw_coordinates, title='UV Coverage')

Plot the UV coverage given UVW coordinates.

Source code in src/starbox/viz/plot.py
11
12
13
14
15
16
17
18
19
20
21
22
def plot_uv_coverage(uvw_coordinates: np.ndarray, title: str = "UV Coverage") -> Figure:
    """Plot the UV coverage given UVW coordinates."""

    u = uvw_coordinates[:, 0]
    v = uvw_coordinates[:, 1]

    fig = px.scatter(
        x=u, y=v, title=title, labels={"x": "U (wavelengths)", "y": "V (wavelengths)"}
    )
    fig.update_yaxes(scaleanchor="x", scaleratio=1)

    return fig

starbox.visibility

Data class for visibility set.

VisibilitySet dataclass

A data class representing a set of visibilities.

Attributes:

Name Type Description
vis ndarray

Complex visibilities with shape (time, baseline, chan).

uvw_m ndarray

UVW coordinates in meters with shape (time, baseline, 3).

station1 ndarray

Indices of the first station for each baseline.

station2 ndarray

Indices of the second station for each baseline.

times_mjd ndarray

Times of the observations in Modified Julian Date.

freqs_hz ndarray

Frequencies of the channels in Hz.

weights ndarray

Weights for each visibility with shape (time, baseline, chan).

Source code in src/starbox/visibility.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@dataclass
class VisibilitySet:
    """A data class representing a set of visibilities.

    Attributes:
        vis: Complex visibilities with shape (time, baseline, chan).
        uvw_m: UVW coordinates in meters with shape (time, baseline, 3).
        station1: Indices of the first station for each baseline.
        station2: Indices of the second station for each baseline.
        times_mjd: Times of the observations in Modified Julian Date.
        freqs_hz: Frequencies of the channels in Hz.
        weights: Weights for each visibility with shape (time, baseline, chan).
    """

    vis: np.ndarray  # (time, baseline, chan)
    uvw_m: np.ndarray  # (time, baseline, 3)
    station1: np.ndarray  # (baseline,)
    station2: np.ndarray  # (baseline,)
    times_mjd: np.ndarray  # (time,)
    freqs_hz: np.ndarray  # (chan,)
    weights: np.ndarray  # (time, baseline, chan)

    @property
    def station_ids(self) -> np.ndarray:
        """Get the unique station IDs from the baseline indices."""
        unique_stations = np.unique(np.concatenate((self.station1, self.station2)))
        return unique_stations

    @property
    def num_stations(self) -> int:
        """Get the number of unique stations in the visibility set."""
        return len(self.station_ids)

num_stations property

Get the number of unique stations in the visibility set.

station_ids property

Get the unique station IDs from the baseline indices.