#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-02-19
# @Filename: target.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
#
# @Last modified by: José Sánchez-Gallego (gallegoj@uw.edu)
# @Last modified time: 2019-09-25 15:20:31
import os
import pathlib
from copy import copy
import astropy
import numpy
import yaml
import warnings
from lvmsurveysim.ifu import IFU
from lvmsurveysim.utils import plot as lvm_plot
import lvmsurveysim.utils.spherical
from lvmsurveysim.exceptions import LVMSurveyOpsError, LVMSurveyOpsWarning
from lvmsurveysim.utils.plot import __MOLLWEIDE_ORIGIN__, get_axes, transform_patch_mollweide, convert_to_mollweide
from .. import config
from ..telescope import Telescope
from .skyregion import SkyRegion
from .tile import Tile
__all__ = ['Target', 'TargetList']
[docs]class Target(object):
"""A `.Region` with additional observing information.
A `.Target` object is similar to a `.SkyRegion` but it is named and contains
information about what telescope will observe it, its observing
priority relative to all other targets, and a set of observing constraints
ans strategies to be implemented during scheduling of the target (airmass,
lunation, shadow height, tile order, ...).
There is a special kind of target not represented internally as
a `.SkyRegion`, the fullsky target, which represents a (sparse) grid of tiles
on the whole sky.
The Target constructor accepts the following keyword parameters, which are
also available as keywords from a list read by `.from_list`. Typically a Target
will not be instatiated indivisually. The typical use case will involve the `.TargetList`
class which is initialize via a yaml configuration file, the survey 'target list'.
Parameters
----------
name : str
The name of the target.
priority : int
The priority at which this target should be observed. Higher numbers
mean higher priority.
telescope : str
The telescope that will observe the target. Must be a string that
matches a telescope entry in the configuration file or a
`~lvmsurveysim.telescope.Telescope` instance.
max_airmass : float
Maximum air mass to observe the given target
min_shadowheight : float
Minimum shadow height in km to observe the given target
exptime : float
Exposure time of an individual pointing
n_exposures
Number of individual pointings to reach desired S/N
min_exposures : int
Minimum number of exposures to make a "good visit"
min_moon_dist : float
Minimum moon distance between target before observations are
called off.
max_lunation : float
The maximum lunation (fraction of moon illuminated,
number between 0 and 1)
overhead : float
The overhead factor per exposure quantum for this target's observing
scheme.
overlap:
calculate overlap between this target and others and discard, defaults to true
tile_union:
tile_union that the target belongs to, if any; that is an area of sky that is tiled
from a single hexagon grid to ensure gapless tiling of overlapping regions.
tile_overlap: fraction of tile separation to overlap with neighboring tiles (ignored for sparse targets)
geodesic:
geodesic tiling of the full sphere instead of region
sparse:
sparse tiling factor, or depth value (number of subdivisions) in case of geodesic tiling
group:
(list of) group names the target belongs to (e.g. MilkyWay). used for aggregating survey statistics
and plotting survey progress.
Attributes
----------
region : `.SkyRegion`
The `.SkyRegion` object associated with this target.
"""
def __init__(self, *args, **kwargs):
self.name = kwargs.pop('name', '')
self.priority = kwargs.pop('priority', 1)
self.observatory = kwargs.pop('observatory', 'BOTH')
self.max_airmass = kwargs.pop('max_airmass', 1.75)
self.min_shadowheight = kwargs.pop('min_shadowheight', 1000.0)
self.exptime = kwargs.pop('exptime', 900)
self.n_exposures = kwargs.pop('n_exposures', 9)
self.min_exposures = kwargs.pop('min_exposures', 3)
self.min_moon_dist = kwargs.pop('min_moon_dist', 90)
self.max_lunation = kwargs.pop('max_lunation', 1.0)
self.overhead = kwargs.pop('overhead', 1.0)
self.groups = kwargs.pop('group', [])
self.tiling_strategy = kwargs.pop('tiling_strategy', 'lowest_airmass')
self.tile_union = kwargs.pop('tile_union', None)
self.tile_overlap = kwargs.pop('tile_overlap', None)
self.overlap = kwargs.pop('overlap', True)
self.geodesic = kwargs.pop('geodesic', False) # full sky tiling, use sparse for depth
self.sparse = kwargs.pop('sparse', None)
telescope = kwargs.pop('telescope', None)
assert telescope is not None, 'must specify a telescope keyword.'
if isinstance(telescope, Telescope):
self.telescope = Telescope
else:
self.telescope = Telescope.from_config(telescope)
self.region = SkyRegion(*args, **kwargs)
self.frame = self.region.frame
self.tiles = None
self.tile_priorities = None
def __repr__(self):
return (f'<Target (name={self.name!r}, telescope={self.telescope.name!r}, '
f'region_type={self.region.region_type!r})>')
[docs] @classmethod
def from_list(cls, name, targets=None):
"""Returns an instance of `.Target` from a target list.
Initialises a new `.Target` whose parameters have been previously
defined in a target list. Target lists must be YAML files in which each
target defines region and the telescope that will observe it, as
detailed in :ref:`target-defining`. For example:
.. code-block:: yaml
M81:
coords: [148.888333, 69.0652778]
region_type: ellipse
frame: icrs
region_params:
a: 0.209722
b: 0.106958333
pa: 149
priority: 1
observatory: APO {LCO, BOTH}
telecope: LVM-1m {LVM-160}
max_airmass: 1.75
min_shadowheight: 1000.0
exptime: 900
n_exposures: 1
min_exposures: 1
...
Parameters
----------
name : str
The identifier for the target. Must be defined in the region.
list file.
target_file : `str`, `~pathlib.Path`, or `None`
The path to the YAML file containing the region list. If
`None`, default to the target list contained in ``lvmcore``.
Example:
>>> from lvmsurveysim.target import Target
>>> m81 = Target.from_list('M81')
"""
assert targets is not None, "target dictionary not defined"
assert name in targets, 'target not found in target list.'
target = targets[name]
region_type = target.pop('region_type')
coords = target.pop('coords')
region_params = target.pop('region_params', {})
target.update(region_params)
return cls(region_type, coords, name=name, **target)
[docs] @classmethod
def supertarget(cls, targets):
'''Create a new target from a list of targets forming a tile union.
This method takes a list of targets and returns a new target object
whose region is the union of the regions of the targets in the list.
This is used in tiling tile unions, which are multiple distinct targets
that overlap or at least share an edge and need to be tiled uniformly across
these interfaces. This is achieved by tiling them as if they were a single
target and then redistributing the tiles back to the original targets according
to their boundaries.
Parameters:
-----------
targets : list of `~lvmsurveysim.target.Target`
list of targets forming a tile union.
Returns:
--------
target : `~lvmsurveysim.target.Target`
'supertarget' consisiting of the union of the input targets
'''
uregion = SkyRegion.multi_union([t.get_skyregion() for t in targets])
supertarget = copy(targets[0])
supertarget.region = uregion
supertarget.name = 'TileUnion ' + targets[0].tile_union
return supertarget
[docs] def get_pixarea(self, pixarea=None, ifu=None, telescope=None):
"""Gets the size of the tile in square degrees."""
telescope = telescope or self.telescope
if ifu is None:
ifu = IFU.from_config()
# warnings.warn(f'target {self.name}: no IFU provided. '
# f'Using default IFU {ifu.name!r}.', LVMSurveyOpsWarning)
assert pixarea is not None or (ifu is not None and telescope is not None), \
'either pixarea or ifu and telescope need to be defined.'
if pixarea is None:
pixarea = (ifu.fibre_size / 2. * telescope.plate_scale).to('degree')**2 * numpy.pi
pixarea *= ifu.n_fibres
pixarea = pixarea.value
return pixarea
[docs] def tile(self, ifu=None, telescope=None, to_frame=None):
"""Tessellates the target region and populates the tiles, pa and tile_priorities
fields.
Parameters
----------
ifu : ~lvmsurveysim.tiling.IFU
The IFU used for tiling the region. If not provided, the default
one is used.
telescope : ~lvmsurveysim.telescope.Telescope
The telescope on which the IFU is mounted. Defaults to the object
``telescope`` attribute.
to_frame : str
The reference frame in which the coordinates should be returned.
If `None`, defaults to the region internal reference frame.
"""
telescope = telescope or self.telescope
if ifu is None:
ifu = IFU.from_config()
# warnings.warn(f'target {self.name}: no IFU provided. '
# f'Using default IFU {ifu.name!r}.', LVMSurveyOpsWarning)
print('Tiling target ' + self.name)
coords, pa = ifu.get_tile_grid(self.region, telescope.plate_scale,
tile_overlap=self.tile_overlap, sparse=self.sparse, geodesic=self.geodesic)
# convert to skycoords and optionally transform in to the requested frame, most likely 'icrs'
tiles, pa2 = self.transform_skycoords(coords[:, 0], coords[:, 1], unit='deg', to_frame=to_frame)
self.pa = pa + pa2
# cache the new tiles and the priorities
self.tiles = tiles
self.tile_priorities = self.get_tile_priorities()
[docs] def make_tiles(self):
""" Return a list of `~lvmsurveysim.schedule.Tile` tile objects for this target.
Requires the self.tiles, self.pa and self.tile_priorites arrays to have been
calculated using the `.tile` method.
"""
return [Tile(self.tiles[i], self.pa[i], self.tile_priorities[i]) for i in range(len(self.tiles))]
[docs] def get_tiles_from_union(self, coords, pa):
""" Select tiles belonging to this target from a list of coordinates.
This method is used to select and assign the tiles belonging to this target
from a list of coordinates of a tile union.
Parameters
----------
coords, pa : ~numpy.array
Vectors of coordinates and PAs of the tile union before selection. Assumed
to be in the ICRS frame.
Returns
-------
coords, pa : ~numpy.array
Vectors of coordinates and PAs remaining in tile union after selection.
"""
mask = numpy.full(len(coords), True)
icrs_r = self.region.icrs_region()
for i, c in enumerate(coords):
if icrs_r.contains_point(c.ra.deg, c.dec.deg):
mask[i] = False
self.tiles = coords[~mask]
self.pa = pa[~mask]
self.tile_priorities = self.get_tile_priorities()
return coords[mask], pa[mask]
[docs] def get_tile_priorities(self):
"""Return an array with tile priorities according to the tiling
strategy defined for this target.
Returns
-------
priorities: ~numpy.array
Array of length of number of tiles with the priority for each tile.
"""
if len(self.tiles) == 0:
warnings.warn(f'target {self.name}: no tiles when calling get_tile_priorities(). ', LVMSurveyOpsWarning)
return numpy.array([])
if self.tiling_strategy == 'lowest_airmass':
self.tile_priorities = numpy.ones(len(self.tiles), dtype=int)
elif self.tiling_strategy == 'center_first':
self.tile_priorities = self.center_first_priorities_()
else:
raise ValueError(f'invalid tiling strategy: {self.tiling_strategy}.')
return self.tile_priorities
[docs] def center_first_priorities_(self):
"""Return an array with tile priorities according for the center-first
tiling strategy.
Tiles are prioritized according to the distance from the region
barycenter. Priorities are equal along lines of constant distance
from the barycenter, quantized in units of the tile diameter.
Returns
-------
priorities : ~numpy.array
Array of length of number of tiles with the priority for each tile.
"""
if self.tiles.frame.name=='icrs':
r, d = self.tiles.ra.deg, self.tiles.dec.deg
else:
r, d = self.tiles.l.deg, self.tiles.b.deg
# TODO: proper calculation of barycenter on the sphere!
rc = numpy.average(r)
dc = numpy.average(d)
dist = lvmsurveysim.utils.spherical.great_circle_distance(r, d, rc, dc)
field = numpy.sqrt(self.get_pixarea() / numpy.pi) # TODO: better way to get field size!!!
p = numpy.floor(dist / field).astype(int)
return numpy.max(p) - p + 1 # invert since priorities increase with value
[docs] def get_skyregion(self):
""" Return the `.SkyRegion` of the target
"""
return self.region
[docs] def is_sparse(self):
'''Return True if the Target is sparse.
'''
if self.sparse == None:
return False
else:
return True
[docs] def density(self):
'''Return the tile density of the target.
The tile density is 1 if the target is not sparse, and
1/sparse otherwise.
'''
if self.is_sparse():
return 1.0/self.sparse
else:
return 1.0
[docs] def in_tile_union_with(self, other):
'''Return True if `self` is a member of the same tile union as `other`.
'''
return (self.tile_union != None) and (self.tile_union==other.tile_union)
[docs] def plot(self, *args, **kwargs):
"""Plots the region. An alias for ``.SkyRegion.plot``.
"""
return self.region.plot(*args, **kwargs)
[docs] def plot_tiling(self, projection='rectangular', ifu=None, frame=None, fig=None, **kwargs):
"""Plots the tiles within the region.
Parameters
----------
ifu : ~lvmsurveysim.tiling.IFU
The IFU used for tiling the region. If not provided, the default
one is used.
frame : str
The reference frame on which the pixels will be displayed. Defaults
to the internal frame of the target.
ax : ~matplotlib.axes.Axes
A Matplotlib `~matplotlib.axes.Axes` object to use. Otherwise, a
new one will be created.
kwargs : dict
Parameters to be passed to `~matplotlib.axes.scatter`.
Returns
-------
figure : `~matplotlib.figure.Figure`
The Matplotlib `~matplotlib.figure.Figure`.
"""
frame = frame or self.frame
ifu = ifu or IFU.from_config()
if self.tiles is None:
self.tile(ifu=ifu, to_frame=frame)
if frame == 'icrs':
lon, lat = self.tiles.ra.deg, self.tiles.dec.deg
elif frame == 'galactic':
lon, lat = self.tiles.l.deg, self.tiles.b.deg
if fig is None:
fig, ax = lvm_plot.get_axes(projection=projection, frame=frame)
else:
ax = fig.axes[0]
if projection=='mollweide':
c1,c2 = lvm_plot.convert_to_mollweide(lon, lat)
else:
c1, c2 = lon, lat
patches = [ifu.get_patch(scale=self.telescope.plate_scale, centre=[c1[p], c2[p]], pa=self.pa[p],
edgecolor='r', linewidth=1, alpha=0.5)[0]
for p in range(len(c1))]
if projection == 'mollweide':
patches = [transform_patch_mollweide(patch) for patch in patches]
for patch in patches:
ax.add_patch(patch)
ax.scatter(c1, c2, s=1, **kwargs)
return fig, ax
[docs]class TargetList(list):
"""A list of all the targets to observe.
Parameters
----------
target_file : str
The YAML file with all the targets to observe. Defaults to the
``lvmcore`` target list.
Returns
-------
target_set : list
A list of `.Target` instances.
"""
def __init__(self, targets=None, target_file=None):
self.filename = None
if targets:
self._names = [target.name for target in targets]
super().__init__(targets)
else:
if target_file is None:
target_file = pathlib.Path(
os.path.expanduser(os.path.expandvars(config['tiledb']['target_file'])))
else:
target_file = pathlib.Path(target_file)
assert target_file.exists()
self.filename = target_file
targets_dict = yaml.load(open(str(target_file)), Loader=yaml.FullLoader)
self._names = list(targets_dict.keys())
targets = [Target.from_list(name, targets=targets_dict)
for name in self._names]
super().__init__(targets)
[docs] def get_target(self, name):
"""Returns the target whose name correspond to ``name``."""
return self[self._names.index(name)]
[docs] def get_group_targets(self, group, primary=True):
"""Returns the targets that are in a group.
Parameters
----------
group : str
The group name.
primary : bool
Return only the target if ``group`` is the primary group to which
the target belongs (i.e., the first one in the list).
Returns
-------
targets : `list`
A list of target names that are included in ``group``.
"""
targets = []
for target in self:
if group in target.groups:
if (primary and group == target.groups[0]) or (not primary):
targets.append(target.name)
return targets
[docs] def get_groups(self):
"""Returns a list of all the groups for all the targets in the list."""
groups = set()
for target in self:
groups.update(target.groups)
return list(groups)
[docs] def get_tile_unions(self):
"""Returns a list of all the tile unions in the target list."""
unions = set()
for target in self:
if target.tile_union:
unions.update([target.tile_union])
return list(unions)
[docs] def get_union_targets(self, tile_union):
"""Returns the targets that are in a tile union.
Parameters
----------
tile_union : str
The group name.
Returns
-------
targets : `list`
A list of target names that are included in ``tile_union``.
"""
ut = []
for target in self:
if tile_union == target.tile_union:
ut.append(target)
return TargetList(targets=ut)
[docs] def order_by_priority(self):
""" Return a copy of the target list ordered by priorities highest to lowest.
"""
def prio(t):
return t.priority
return sorted(self, key=prio, reverse=True)
[docs] def plot_tiling(self, frame='icrs', **kwargs):
"""Plots all the target pixels in a single Mollweide projection.
Parameters
----------
frame : str
The coordinate frame to which all the pixel centres will be
converted.
kwargs : dict
Parameters to be passed to `.Target.plot_tiling`. By default, each
target will be plotted in a different colour.
Returns
-------
figure : `~matplotlib.figure.Figure`
The Matplotlib `~matplotlib.figure.Figure`.
"""
assert len(self) > 0, 'no targets in list.'
zorder = 100
fig = self[0].plot_tiling(frame=frame, zorder=zorder, **kwargs)
if len(self) > 1:
for target in self[1:]:
zorder -= 1
fig = target.plot_tiling(fig=fig, frame=frame, zorder=zorder, **kwargs)
return fig