Skip to content

smart_geocubes

Smart-Geocubes: A high-performance library for intelligent loading and caching of remote geospatial raster data, built with xarray and zarr.

Modules:

  • accessors

    Smart-Geocubes accessor implementations.

  • backends

    Download backends for smart-geocubes.

  • core

    Core functionality of smart-geocubes.

  • datasets

    Predefined datasets for the SmartGeocubes package.

  • exceptions

    Exceptions for the smart_geocubes package.

Classes:

AlphaEarthEmbeddings

AlphaEarthEmbeddings(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: GEEMosaicAccessor

Accessor for AlphaEarth Embeddings data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get the adjacent patches for the given geobox.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Post create actions. Can be overwritten by the dataset accessor.

  • post_init

    Post init actions. Can be overwritten by the dataset accessor.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[Item]

Get the adjacent patches for the given geobox.

Must be implemented by the Accessor.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download.

Returns:

  • list[Item]

    list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

Raises:

  • ValueError

    If the ROI type is invalid.

  • ValueError

    If the datacube is not temporal, but a time of interest is provided.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[Item]:
    """Get the adjacent patches for the given geobox.

    Must be implemented by the Accessor.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.

    Returns:
        list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

    Raises:
        ValueError: If the ROI type is invalid.
        ValueError: If the datacube is not temporal, but a time of interest is provided.

    """
    if toi is not None and not self.is_temporal:
        raise ValueError("Datacube is not temporal, but time of interest is provided.")

    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        spatial_idxs: list[tuple[int, int]] = list(adjacent_geometries["idx"])
    elif isinstance(roi, GeoBox):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi.extent))
    elif isinstance(roi, Geometry):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi))
    else:
        raise ValueError("Invalid ROI type.")

    if not self.is_temporal:
        return [
            PatchIndex(
                self._stringify_index(spatial_idx),
                self._extent_tiles[spatial_idx].geographic_extent,
                None,
                Item(self._extent_tiles[spatial_idx], None),
            )
            for spatial_idx in spatial_idxs
        ]

    # Now datacube is temporal
    toi = normalize_toi(self.temporal_extent, toi)
    patch_idxs = []
    for time in toi:
        time_idx = self.temporal_extent.get_loc(time)
        assert isinstance(time_idx, int), "Non-Unique temporal extents are not supported!"
        for spatial_idx in spatial_idxs:
            patch_idxs.append(
                PatchIndex(
                    self._stringify_index(spatial_idx, time_idx),
                    self._extent_tiles[spatial_idx].geographic_extent,
                    time,
                    Item(self._extent_tiles[spatial_idx], time),
                )
            )
    return patch_idxs

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    patch_infos = []
    for pid in loaded_patches:
        spatial_idx, temporal_idx = self._parse_index(pid)
        geometry = self._extent_tiles[spatial_idx].extent.geom
        if self.is_temporal:
            time = self.temporal_extent[temporal_idx]
            patch_infos.append({"geometry": geometry, "id": pid, "time": time})
        else:
            patch_infos.append({"geometry": geometry, "id": pid})

    gdf = gpd.GeoDataFrame(patch_infos, crs=self.extent.crs.to_wkt())
    return gdf

download_patch

download_patch(idx: PatchIndex[Item]) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/accessors/gee.py
def download_patch(self, idx: PatchIndex[Item]) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex[Item]): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    import ee
    import rioxarray  # noqa: F401
    import xee  # noqa: F401

    # Note: This is a little bit weird: First we create an own grid which overlaps to the chunks
    # of the zarr array. Then we create a mosaic of the data and clip it to a single chunk.
    # We could load the images from the collection directly instead of creating a mosaic.
    # However, this would require more testing and probably results a lot of manual computation
    # of slices etc. like in the stac variant. So for now, we just use the mosaic.
    logging.getLogger("urllib3.connectionpool").disabled = True

    ee_col = ee.ImageCollection(self.collection)
    if self.is_temporal:
        ee_col = ee_col.filterDate(idx.item.time)
    geom = ee.Geometry.Rectangle(idx.item.geobox.geographic_extent.boundingbox)
    ee_img = ee_col.mosaic().clip(geom)

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=UserWarning, message=EE_WARN_MSG)
        patch = xr.open_dataset(
            ee_img,
            engine="ee",
            geometry=geom,
            crs=f"epsg:{self.extent.crs.to_epsg()}",
            scale=self.extent.resolution.x,
        )

    # Do a mosaic if time axis are returned for non-temporal data
    if "time" in patch.dims and not self.is_temporal:
        patch = patch.max("time")

    patch = patch.rename({"lon": "x", "lat": "y"})
    if "time" in patch.dims:
        patch["time"] = [idx.item.time]
        patch = patch.transpose("time", "y", "x")
    else:
        patch = patch.transpose("y", "x")

    # Download the data
    logger.debug(f"{idx.id=}: Trigger GEE download)")
    patch.load()
    logger.debug(f"{idx.id=}: Finished GEE download")
    logging.getLogger("urllib3.connectionpool").disabled = False

    # Flip y-axis, because convention is x in positive direction and y in negative, but gee use positive for both
    patch = patch.isel(y=slice(None, None, -1))

    # For some reason xee does not always set the crs
    patch = patch.odc.assign_crs(self.extent.crs)

    # Recrop the data to the tile, since gee does not always return the exact extent
    patch = patch.odc.crop(idx.item.geobox.extent)

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

post_init

post_init()

Post init actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_init(self):
    """Post init actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/alphaearth.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    raise NotImplementedError("Visualization not implemented yet for AlphaEarth Embeddings datacube.")

ArcticDEM10m

ArcticDEM10m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 10m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get adjacent patch indexes from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • post_init

    Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[PatchIndex]

Get adjacent patch indexes from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download. Not used in this implementation since ArcticDEM is not temporal.

Returns:

  • list[PatchIndex]

    list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

Raises:

  • ValueError

    If the roi is not a GeoBox or a GeoDataFrame.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[PatchIndex]:
    """Get adjacent patch indexes from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.
            Not used in this implementation since ArcticDEM is not temporal.

    Returns:
        list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

    Raises:
        ValueError: If the roi is not a GeoBox or a GeoDataFrame.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi[["geometry"]].to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first", ignore_index=True)
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    elif isinstance(roi, Geometry):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.geom)].copy()
    else:
        raise ValueError("roi must be a GeoBox or a GeoDataFrame")
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacPatchIndex(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_patches)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download_patch

download_patch(idx: PatchIndex[Item]) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex[Item]) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/accessors/stac.py
def download_patch(self, idx: PatchIndex["Item"]) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex[Item]): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    from odc.stac import stac_load

    patch = stac_load([idx.item], bands=self.channels, chunks=None, progress=None)

    # Do a mosaic if multiple items are returned for non-temporal data
    if "time" in patch.dims and self.temporal_extent is None:
        patch = patch.max("time")

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

post_init

post_init()

Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_init(self):
    """Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not."""
    required_files = [self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{res}.parquet" for res in ["2m", "10m", "32m"]]
    if not all(file.exists() for file in required_files):
        _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

ArcticDEM2m

ArcticDEM2m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 2m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get adjacent patch indexes from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • post_init

    Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[PatchIndex]

Get adjacent patch indexes from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download. Not used in this implementation since ArcticDEM is not temporal.

Returns:

  • list[PatchIndex]

    list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

Raises:

  • ValueError

    If the roi is not a GeoBox or a GeoDataFrame.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[PatchIndex]:
    """Get adjacent patch indexes from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.
            Not used in this implementation since ArcticDEM is not temporal.

    Returns:
        list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

    Raises:
        ValueError: If the roi is not a GeoBox or a GeoDataFrame.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi[["geometry"]].to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first", ignore_index=True)
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    elif isinstance(roi, Geometry):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.geom)].copy()
    else:
        raise ValueError("roi must be a GeoBox or a GeoDataFrame")
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacPatchIndex(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_patches)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download_patch

download_patch(idx: PatchIndex[Item]) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex[Item]) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/accessors/stac.py
def download_patch(self, idx: PatchIndex["Item"]) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex[Item]): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    from odc.stac import stac_load

    patch = stac_load([idx.item], bands=self.channels, chunks=None, progress=None)

    # Do a mosaic if multiple items are returned for non-temporal data
    if "time" in patch.dims and self.temporal_extent is None:
        patch = patch.max("time")

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

post_init

post_init()

Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_init(self):
    """Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not."""
    required_files = [self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{res}.parquet" for res in ["2m", "10m", "32m"]]
    if not all(file.exists() for file in required_files):
        _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

ArcticDEM32m

ArcticDEM32m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 32m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get adjacent patch indexes from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • post_init

    Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[PatchIndex]

Get adjacent patch indexes from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download. Not used in this implementation since ArcticDEM is not temporal.

Returns:

  • list[PatchIndex]

    list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

Raises:

  • ValueError

    If the roi is not a GeoBox or a GeoDataFrame.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[PatchIndex]:
    """Get adjacent patch indexes from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.
            Not used in this implementation since ArcticDEM is not temporal.

    Returns:
        list[PatchIndex]: List of adjacent patches, wrapped in own datastructure for easier processing.

    Raises:
        ValueError: If the roi is not a GeoBox or a GeoDataFrame.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi[["geometry"]].to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first", ignore_index=True)
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    elif isinstance(roi, Geometry):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.geom)].copy()
    else:
        raise ValueError("roi must be a GeoBox or a GeoDataFrame")
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacPatchIndex(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_patches)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download_patch

download_patch(idx: PatchIndex[Item]) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex[Item]) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/accessors/stac.py
def download_patch(self, idx: PatchIndex["Item"]) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex[Item]): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    from odc.stac import stac_load

    patch = stac_load([idx.item], bands=self.channels, chunks=None, progress=None)

    # Do a mosaic if multiple items are returned for non-temporal data
    if "time" in patch.dims and self.temporal_extent is None:
        patch = patch.max("time")

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

post_init

post_init()

Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_init(self):
    """Check if the ArcticDEM mosaic extent info is already downloaded and downlaod if not."""
    required_files = [self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{res}.parquet" for res in ["2m", "10m", "32m"]]
    if not all(file.exists() for file in required_files):
        _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

TCTrend2019

TCTrend2019(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: TCTrendABC

Accessor for TCTrend data derived from 2000-2019.

Attributes:

  • collection (str) –

    The collection ID of the datacube.

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get the adjacent patches for the given geobox.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Post create actions. Can be overwritten by the dataset accessor.

  • post_init

    Post init actions. Can be overwritten by the dataset accessor.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[Item]

Get the adjacent patches for the given geobox.

Must be implemented by the Accessor.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download.

Returns:

  • list[Item]

    list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

Raises:

  • ValueError

    If the ROI type is invalid.

  • ValueError

    If the datacube is not temporal, but a time of interest is provided.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[Item]:
    """Get the adjacent patches for the given geobox.

    Must be implemented by the Accessor.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.

    Returns:
        list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

    Raises:
        ValueError: If the ROI type is invalid.
        ValueError: If the datacube is not temporal, but a time of interest is provided.

    """
    if toi is not None and not self.is_temporal:
        raise ValueError("Datacube is not temporal, but time of interest is provided.")

    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        spatial_idxs: list[tuple[int, int]] = list(adjacent_geometries["idx"])
    elif isinstance(roi, GeoBox):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi.extent))
    elif isinstance(roi, Geometry):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi))
    else:
        raise ValueError("Invalid ROI type.")

    if not self.is_temporal:
        return [
            PatchIndex(
                self._stringify_index(spatial_idx),
                self._extent_tiles[spatial_idx].geographic_extent,
                None,
                Item(self._extent_tiles[spatial_idx], None),
            )
            for spatial_idx in spatial_idxs
        ]

    # Now datacube is temporal
    toi = normalize_toi(self.temporal_extent, toi)
    patch_idxs = []
    for time in toi:
        time_idx = self.temporal_extent.get_loc(time)
        assert isinstance(time_idx, int), "Non-Unique temporal extents are not supported!"
        for spatial_idx in spatial_idxs:
            patch_idxs.append(
                PatchIndex(
                    self._stringify_index(spatial_idx, time_idx),
                    self._extent_tiles[spatial_idx].geographic_extent,
                    time,
                    Item(self._extent_tiles[spatial_idx], time),
                )
            )
    return patch_idxs

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    patch_infos = []
    for pid in loaded_patches:
        spatial_idx, temporal_idx = self._parse_index(pid)
        geometry = self._extent_tiles[spatial_idx].extent.geom
        if self.is_temporal:
            time = self.temporal_extent[temporal_idx]
            patch_infos.append({"geometry": geometry, "id": pid, "time": time})
        else:
            patch_infos.append({"geometry": geometry, "id": pid})

    gdf = gpd.GeoDataFrame(patch_infos, crs=self.extent.crs.to_wkt())
    return gdf

download_patch

download_patch(idx: PatchIndex) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/datasets/tctrend.py
def download_patch(self, idx: PatchIndex) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    patch = super().download_patch(idx)

    # ?: The following code handles the occurance of nan values when using mosaics
    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (patch[band].min().values.item(), patch[band].max().values.item()) for band in patch.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    patch.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in patch.data_vars:
        patch[band] = patch[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in patch.data_vars:
        band_min, band_max = clip_values[band]
        patch[band] = patch[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

post_init

post_init()

Post init actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_init(self):
    """Post init actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/tctrend.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.PlateCarree()

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to show the whole world
    ax.set_extent([-180, 180, -90, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    tile_info.plot(
        "id",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

TCTrend2020

TCTrend2020(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: TCTrendABC

Accessor for TCTrend data derived from 2001-2020.

Attributes:

  • collection (str) –

    The collection ID of the datacube.

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get the adjacent patches for the given geobox.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Post create actions. Can be overwritten by the dataset accessor.

  • post_init

    Post init actions. Can be overwritten by the dataset accessor.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[Item]

Get the adjacent patches for the given geobox.

Must be implemented by the Accessor.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download.

Returns:

  • list[Item]

    list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

Raises:

  • ValueError

    If the ROI type is invalid.

  • ValueError

    If the datacube is not temporal, but a time of interest is provided.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[Item]:
    """Get the adjacent patches for the given geobox.

    Must be implemented by the Accessor.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.

    Returns:
        list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

    Raises:
        ValueError: If the ROI type is invalid.
        ValueError: If the datacube is not temporal, but a time of interest is provided.

    """
    if toi is not None and not self.is_temporal:
        raise ValueError("Datacube is not temporal, but time of interest is provided.")

    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        spatial_idxs: list[tuple[int, int]] = list(adjacent_geometries["idx"])
    elif isinstance(roi, GeoBox):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi.extent))
    elif isinstance(roi, Geometry):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi))
    else:
        raise ValueError("Invalid ROI type.")

    if not self.is_temporal:
        return [
            PatchIndex(
                self._stringify_index(spatial_idx),
                self._extent_tiles[spatial_idx].geographic_extent,
                None,
                Item(self._extent_tiles[spatial_idx], None),
            )
            for spatial_idx in spatial_idxs
        ]

    # Now datacube is temporal
    toi = normalize_toi(self.temporal_extent, toi)
    patch_idxs = []
    for time in toi:
        time_idx = self.temporal_extent.get_loc(time)
        assert isinstance(time_idx, int), "Non-Unique temporal extents are not supported!"
        for spatial_idx in spatial_idxs:
            patch_idxs.append(
                PatchIndex(
                    self._stringify_index(spatial_idx, time_idx),
                    self._extent_tiles[spatial_idx].geographic_extent,
                    time,
                    Item(self._extent_tiles[spatial_idx], time),
                )
            )
    return patch_idxs

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    patch_infos = []
    for pid in loaded_patches:
        spatial_idx, temporal_idx = self._parse_index(pid)
        geometry = self._extent_tiles[spatial_idx].extent.geom
        if self.is_temporal:
            time = self.temporal_extent[temporal_idx]
            patch_infos.append({"geometry": geometry, "id": pid, "time": time})
        else:
            patch_infos.append({"geometry": geometry, "id": pid})

    gdf = gpd.GeoDataFrame(patch_infos, crs=self.extent.crs.to_wkt())
    return gdf

download_patch

download_patch(idx: PatchIndex) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/datasets/tctrend.py
def download_patch(self, idx: PatchIndex) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    patch = super().download_patch(idx)

    # ?: The following code handles the occurance of nan values when using mosaics
    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (patch[band].min().values.item(), patch[band].max().values.item()) for band in patch.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    patch.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in patch.data_vars:
        patch[band] = patch[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in patch.data_vars:
        band_min, band_max = clip_values[band]
        patch[band] = patch[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

post_init

post_init()

Post init actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_init(self):
    """Post init actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/tctrend.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.PlateCarree()

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to show the whole world
    ax.set_extent([-180, 180, -90, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    tile_info.plot(
        "id",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

TCTrend2022

TCTrend2022(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
)

Bases: TCTrendABC

Accessor for TCTrend data derived from 2003-2022.

Attributes:

  • collection (str) –

    The collection ID of the datacube.

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

  • backend

    (Literal['threaded', 'simple'], default: 'threaded' ) –

    The backend to use for downloading data. Currently, only "threaded" is supported. Defaults to "threaded".

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_patches

    Get the adjacent patches for the given geobox.

  • assert_created

    Assert that the datacube exists in the storage.

  • assert_temporal_cube

    Assert that the datacube has a temporal dimension.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download_patch

    Download the data for the given patch.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • loaded_patches

    Get the ids of already (down-)loaded patches.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Post create actions. Can be overwritten by the dataset accessor.

  • post_init

    Post init actions. Can be overwritten by the dataset accessor.

  • procedural_download

    Download tiles procedurally.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/core/accessor.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
    backend: Literal["threaded", "simple"] = "threaded",
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.
        backend (Literal["threaded", "simple"], optional): The backend to use for downloading data.
            Currently, only "threaded" is supported. Defaults to "threaded".

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = Chronometer(logger.debug)

    if backend == "threaded":
        if not _check_python_version(3, 13):
            raise NotImplementedError(
                "The 'threaded' backend is only fully supported in Python 3.13 and above."
                " Please consider using the 'simple' backend in a multiprocessing environment"
                " or upgrade your Python version."
            )
        self.backend = ThreadedBackend(self.repo, self.download_patch)
    elif backend == "simple":
        self.backend = SimpleBackend(self.repo, self.download_patch)
    else:
        raise ValueError(f"Unknown backend {backend}")

    self.post_init()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

is_temporal property

is_temporal: bool

Check if the datacube has a temporal dimension.

Returns:

  • bool ( bool ) –

    True if the datacube has a temporal dimension.

adjacent_patches

adjacent_patches(
    roi: Geometry | GeoBox | GeoDataFrame, toi: TOI
) -> list[Item]

Get the adjacent patches for the given geobox.

Must be implemented by the Accessor.

Parameters:

  • roi

    (Geometry | GeoBox | GeoDataFrame) –

    The reference geometry, geobox or reference geodataframe

  • toi

    (TOI) –

    The time of interest to download.

Returns:

  • list[Item]

    list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

Raises:

  • ValueError

    If the ROI type is invalid.

  • ValueError

    If the datacube is not temporal, but a time of interest is provided.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_patches(self, roi: Geometry | GeoBox | gpd.GeoDataFrame, toi: TOI) -> list[Item]:
    """Get the adjacent patches for the given geobox.

    Must be implemented by the Accessor.

    Args:
        roi (Geometry | GeoBox | gpd.GeoDataFrame): The reference geometry, geobox or reference geodataframe
        toi (TOI): The time of interest to download.

    Returns:
        list[PatchIndex[Item]]: The adjacent patch(-id)s for the given geobox.

    Raises:
        ValueError: If the ROI type is invalid.
        ValueError: If the datacube is not temporal, but a time of interest is provided.

    """
    if toi is not None and not self.is_temporal:
        raise ValueError("Datacube is not temporal, but time of interest is provided.")

    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        spatial_idxs: list[tuple[int, int]] = list(adjacent_geometries["idx"])
    elif isinstance(roi, GeoBox):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi.extent))
    elif isinstance(roi, Geometry):
        spatial_idxs: list[tuple[int, int]] = list(self._extent_tiles.tiles(roi))
    else:
        raise ValueError("Invalid ROI type.")

    if not self.is_temporal:
        return [
            PatchIndex(
                self._stringify_index(spatial_idx),
                self._extent_tiles[spatial_idx].geographic_extent,
                None,
                Item(self._extent_tiles[spatial_idx], None),
            )
            for spatial_idx in spatial_idxs
        ]

    # Now datacube is temporal
    toi = normalize_toi(self.temporal_extent, toi)
    patch_idxs = []
    for time in toi:
        time_idx = self.temporal_extent.get_loc(time)
        assert isinstance(time_idx, int), "Non-Unique temporal extents are not supported!"
        for spatial_idx in spatial_idxs:
            patch_idxs.append(
                PatchIndex(
                    self._stringify_index(spatial_idx, time_idx),
                    self._extent_tiles[spatial_idx].geographic_extent,
                    time,
                    Item(self._extent_tiles[spatial_idx], time),
                )
            )
    return patch_idxs

assert_created

assert_created()

Assert that the datacube exists in the storage.

Source code in src/smart_geocubes/core/accessor.py
def assert_created(self):
    """Assert that the datacube exists in the storage."""
    self.backend.assert_created()

assert_temporal_cube

assert_temporal_cube()

Assert that the datacube has a temporal dimension.

Raises:

  • ValueError

    If the datacube has no temporal dimension.

Source code in src/smart_geocubes/core/accessor.py
def assert_temporal_cube(self):
    """Assert that the datacube has a temporal dimension.

    Raises:
        ValueError: If the datacube has no temporal dimension.

    """
    if self.temporal_extent is None:
        msg = f"Datacube {self.title} has no temporal dimension."
        logger.error(msg)
        raise ValueError(msg)

create

create(overwrite: bool = False, exists_ok: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Has no effect if exists_ok is True. Defaults to False.

  • exists_ok

    (bool, default: False ) –

    Do not raise an error if the datacube already exists.

Raises:

  • FileExistsError

    If a datacube already exists at location and exists_ok is False.

Source code in src/smart_geocubes/core/accessor.py
def create(self, overwrite: bool = False, exists_ok: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube.
            Has no effect if exists_ok is True. Defaults to False.
        exists_ok (bool, optional): Do not raise an error if the datacube already exists.

    Raises:
        FileExistsError: If a datacube already exists at location and exists_ok is False.

    """
    if exists_ok and self.created:
        logger.debug("Datacube was already created.")
        return

    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_patches": []},
        )

        # Expand to temporal dimension if defined
        if self.temporal_extent is not None:
            ds = ds.expand_dims(time=self.temporal_extent)

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        if self.temporal_extent is not None:
            coords_encoding["time"] = {"chunks": ds.time.shape, **optimize_temporal_encoding(self.temporal_extent)}
        chunks = (
            (1, self.chunk_size, self.chunk_size)
            if self.temporal_extent is not None
            else (self.chunk_size, self.chunk_size)
        )
        var_encoding = {
            name: {
                "chunks": chunks,
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    loaded_patches = self.loaded_patches()

    if len(loaded_patches) == 0:
        return None

    patch_infos = []
    for pid in loaded_patches:
        spatial_idx, temporal_idx = self._parse_index(pid)
        geometry = self._extent_tiles[spatial_idx].extent.geom
        if self.is_temporal:
            time = self.temporal_extent[temporal_idx]
            patch_infos.append({"geometry": geometry, "id": pid, "time": time})
        else:
            patch_infos.append({"geometry": geometry, "id": pid})

    gdf = gpd.GeoDataFrame(patch_infos, crs=self.extent.crs.to_wkt())
    return gdf

download_patch

download_patch(idx: PatchIndex) -> xr.Dataset

Download the data for the given patch.

Must be implemented by the Accessor.

Parameters:

  • idx

    (PatchIndex) –

    The reference patch to download the data for.

Returns:

  • Dataset

    xr.Dataset: The downloaded patch data.

Source code in src/smart_geocubes/datasets/tctrend.py
def download_patch(self, idx: PatchIndex) -> xr.Dataset:
    """Download the data for the given patch.

    Must be implemented by the Accessor.

    Args:
        idx (PatchIndex): The reference patch to download the data for.

    Returns:
        xr.Dataset: The downloaded patch data.

    """
    patch = super().download_patch(idx)

    # ?: The following code handles the occurance of nan values when using mosaics
    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (patch[band].min().values.item(), patch[band].max().values.item()) for band in patch.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    patch.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in patch.data_vars:
        patch[band] = patch[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in patch.data_vars:
        band_min, band_max = clip_values[band]
        patch[band] = patch[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)

    return patch

load

load(
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The reference geometry to load the data for. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI, default: None ) –

    The temporal slice to load. Defaults to None.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load(
    self,
    aoi: Geometry | GeoBox,
    toi: TOI = None,
    persist: bool = True,
    create: bool = False,
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        aoi (Geometry | GeoBox): The reference geometry to load the data for. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The temporal slice to load. Defaults to None.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    if toi is not None:
        self.assert_temporal_cube()

    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        aligned_aoi = aoi.to_crs(self.extent.crs)
        with self.stopuhr(f"{_geometry_repr(aoi)}: Procedural download in blocking mode"):
            self.procedural_download(aligned_aoi, toi)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get temporal slice if time is provided
        if toi is not None:
            xrcube = xrcube.sel(time=toi)

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(aligned_aoi, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geometry_repr(aoi)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/core/accessor.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    toi = None
    if "time" in ref.coords and self.temporal_extent is not None:
        toi = ref.get_index("time")
    return self.load(ref.geobox, toi=toi, **kwargs)

loaded_patches

loaded_patches() -> list[str]

Get the ids of already (down-)loaded patches.

Returns:

  • list[str]

    list[str]: A list of already loaded patch ids.

Source code in src/smart_geocubes/core/accessor.py
def loaded_patches(self) -> list[str]:
    """Get the ids of already (down-)loaded patches.

    Returns:
        list[str]: A list of already loaded patch ids.

    """
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube.attrs.get("loaded_patches", []).copy()

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/core/accessor.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    return self.backend.open_xarray()

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/accessor.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    return self.backend.open_zarr()

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

post_init

post_init()

Post init actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/core/accessor.py
def post_init(self):
    """Post init actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(aoi: Geometry | GeoBox, toi: TOI)

Download tiles procedurally.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • aoi

    (Geometry | GeoBox) –

    The geometry of the aoi to download. If a Geobox is provided, it will use the extent of the geobox.

  • toi

    (TOI) –

    The time of interest to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If not all downloads were successful.

Source code in src/smart_geocubes/core/accessor.py
def procedural_download(self, aoi: Geometry | GeoBox, toi: TOI):
    """Download tiles procedurally.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        aoi (Geometry | GeoBox): The geometry of the aoi to download. If a Geobox is provided,
            it will use the extent of the geobox.
        toi (TOI): The time of interest to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If not all downloads were successful.

    """
    if isinstance(aoi, GeoBox):
        aoi = aoi.extent

    adjacent_patches = self.adjacent_patches(aoi, toi)
    # interest-string
    soi = f"{_geometry_repr(aoi)}" + (f" @ {_repr_toi(toi)}" if toi is not None else "")
    if not adjacent_patches:
        logger.error(f"{soi}: No adjacent patches found: {adjacent_patches=}")
        raise ValueError("No adjacent patches found - is the provided aoi and toi correct?")

    loaded_patches = self.loaded_patches()

    new_patches = [patch for patch in adjacent_patches if patch.id not in loaded_patches]

    logger.debug(f"{soi}:  {len(adjacent_patches)=} & {len(loaded_patches)=} -> {len(new_patches)=} to download")
    if not new_patches:
        return

    # This raises Errors if anything goes wrong -> we want to propagate
    self.backend.submit(new_patches)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/tctrend.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.PlateCarree()

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to show the whole world
    ax.set_extent([-180, 180, -90, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    tile_info.plot(
        "id",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax