Skip to content

smart_geocubes

Smart-Geocubes: A high-performance library for intelligent loading and caching of remote geospatial raster data, built with xarray and zarr.

Modules:

  • accessors

    Smart-Geocubes cccessor implementations.

  • datasets

    Predefined datasets for the SmartGeocubes package.

  • exceptions

    Exceptions for the smart_geocubes package.

Classes:

ArcticDEM10m

ArcticDEM10m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 10m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_tiles

    Get adjacent tiles from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download

    Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

  • download_tile

    Download a tile from a STAC API and write it to a zarr datacube.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • procedural_download

    Download the data for the given geobox.

  • procedural_download_blocking

    Download tiles procedurally in blocking mode.

  • procedural_download_threading

    Download tiles procedurally in threading mode.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi.to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacTileWrapper(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.


    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_tiles)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from a STAC API and write it to a zarr datacube.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/stac.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from a STAC API and write it to a zarr datacube.

    Args:
        tile (TileWrapper): The tile to download and write.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    from odc.stac import stac_load

    tiledata = stac_load([tile.item], bands=self.channels, chunks=None, progress=None)

    # TODO: Allow for multi-temporal datacubes
    tiledata = tiledata.max("time")

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

ArcticDEM2m

ArcticDEM2m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 2m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_tiles

    Get adjacent tiles from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download

    Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

  • download_tile

    Download a tile from a STAC API and write it to a zarr datacube.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • procedural_download

    Download the data for the given geobox.

  • procedural_download_blocking

    Download tiles procedurally in blocking mode.

  • procedural_download_threading

    Download tiles procedurally in threading mode.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi.to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacTileWrapper(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.


    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_tiles)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from a STAC API and write it to a zarr datacube.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/stac.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from a STAC API and write it to a zarr datacube.

    Args:
        tile (TileWrapper): The tile to download and write.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    from odc.stac import stac_load

    tiledata = stac_load([tile.item], bands=self.channels, chunks=None, progress=None)

    # TODO: Allow for multi-temporal datacubes
    tiledata = tiledata.max("time")

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

ArcticDEM32m

ArcticDEM32m(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: ArcticDEMABC

Accessor for ArcticDEM 32m data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

  • adjacent_tiles

    Get adjacent tiles from a STAC API.

  • assert_created

    Assert that the datacube exists in the storage.

  • create

    Create an empty datacube and write it to the store.

  • current_state

    Get info about currently stored tiles.

  • download

    Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

  • download_tile

    Download a tile from a STAC API and write it to a zarr datacube.

  • load

    Load the data for the given geobox.

  • load_like

    Load the data for the given geobox.

  • log_benchmark_summary

    Log the benchmark summary.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • post_create

    Download the ArcticDEM mosaic extent info and store it in the datacube.

  • procedural_download

    Download the data for the given geobox.

  • procedural_download_blocking

    Download tiles procedurally in blocking mode.

  • procedural_download_threading

    Download tiles procedurally in threading mode.

  • visualize_state

    Visulize the extend, hence the already downloaded and filled data, of the datacube.

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from a STAC API.

Overwrite the default implementation from the STAC accessor to use pre-downloaded extent files instead of querying the STAC API. This results in a faster loading time, but requires the extent files to be downloaded beforehand. This is done in the post_create step.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/datasets/arcticdem.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from a STAC API.

    Overwrite the default implementation from the STAC accessor
    to use pre-downloaded extent files instead of querying the STAC API.
    This results in a faster loading time, but requires the extent files to be downloaded beforehand.
    This is done in the post_create step.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    # Assumes that the extent files are already present and the datacube is already created
    self.assert_created()

    resolution = f"{int(self.extent.resolution.x)}m"
    extent_info = gpd.read_parquet(self._aux_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}.parquet")
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_tiles = (
            gpd.sjoin(
                extent_info,
                roi.to_crs(self.extent.crs.wkt),
                how="inner",
                predicate="intersects",
            )
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
    elif isinstance(roi, GeoBox):
        adjacent_tiles = extent_info.loc[extent_info.intersects(roi.boundingbox.polygon.geom)].copy()
    if adjacent_tiles.empty:
        return []
    return [
        LazyStacTileWrapper(tile.dem_id, _get_stac_url(tile.dem_id, resolution))
        for tile in adjacent_tiles.itertuples()
    ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.

Source code in src/smart_geocubes/accessors/stac.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tile info from pystac. None if datacube is empty.


    """
    import geopandas as gpd
    import pystac_client

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    catalog = pystac_client.Client.open(self.stac_api_url)
    search = catalog.search(collections=[self.collection], ids=loaded_tiles)
    stac_json = search.item_collection_as_dict()

    gdf = gpd.GeoDataFrame.from_features(stac_json, "epsg:4326")
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from a STAC API and write it to a zarr datacube.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/stac.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from a STAC API and write it to a zarr datacube.

    Args:
        tile (TileWrapper): The tile to download and write.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    from odc.stac import stac_load

    tiledata = stac_load([tile.item], bands=self.channels, chunks=None, progress=None)

    # TODO: Allow for multi-temporal datacubes
    tiledata = tiledata.max("time")

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Download the ArcticDEM mosaic extent info and store it in the datacube.

Source code in src/smart_geocubes/datasets/arcticdem.py
def post_create(self):
    """Download the ArcticDEM mosaic extent info and store it in the datacube."""
    _download_arcticdem_extent(self._aux_dir)

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/arcticdem.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.path as mpath
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.Stereographic(central_latitude=90, central_longitude=-45, true_scale_latitude=70)

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to focus on the North Pole
    ax.set_extent([-180, 180, 50, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    # Compute a circle in axes coordinates, which we can use as a boundary
    # for the map. We can pan/zoom as much as we like - the boundary will be
    # permanently circular.
    theta = np.linspace(0, 2 * np.pi, 100)
    center, radius = [0.5, 0.5], 0.5
    verts = np.vstack([np.sin(theta), np.cos(theta)]).T
    circle = mpath.Path(verts * radius + center)

    ax.set_boundary(circle, transform=ax.transAxes)

    tile_info.plot(
        "title",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax

TCTrend

TCTrend(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: GEEAccessor

Accessor for TCTrend data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from Google Earth Engine.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from Google Earth Engine.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        return [TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx]) for idx in adjacent_geometries["idx"]]

    elif isinstance(roi, GeoBox):
        return [
            TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx])
            for idx in self._extent_tiles.tiles(roi.extent)
        ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    tiles = GeoboxTiles(self.extent, (self.chunk_size, self.chunk_size))
    loaded_tiles = [{"geometry": tiles[_id_to_tileidx(tid)].extent.geom, "id": tid} for tid in loaded_tiles]
    gdf = gpd.GeoDataFrame(loaded_tiles, crs=self.extent.crs.to_wkt())
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from Google Earth Engine.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/gee.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from Google Earth Engine.

    Args:
        tile (TileWrapper): The tile to download.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    import ee
    import rioxarray  # noqa: F401
    import xee  # noqa: F401

    # Note: This is a little bit weird: First we create an own grid which overlaps to the chunks
    # of the zarr array. Then we create a mosaic of the data and clip it to a single chunk.
    # We could load the images from the collection directly instead of creating a mosaic.
    # However, this would require more testing and probably results a lot of manual computation
    # of slices etc. like in the stac variant. So for now, we just use the mosaic.
    logging.getLogger("urllib3.connectionpool").disabled = True
    geom = ee.Geometry.Rectangle(tile.item.geographic_extent.boundingbox)
    ee_img = ee.ImageCollection(self.collection).mosaic().clip(geom)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=UserWarning, message=EE_WARN_MSG)
        tiledata = xr.open_dataset(
            ee_img,
            engine="ee",
            geometry=geom,
            crs=f"epsg:{self.extent.crs.to_epsg()}",
            scale=self.extent.resolution.x,
        )

    # TODO: Allow for multi-temporal datacubes and lat/lon coordinates
    tiledata = tiledata.max("time").rename({"lon": "x", "lat": "y"}).transpose("y", "x")

    # Download the data
    tiledata.load()
    logging.getLogger("urllib3.connectionpool").disabled = False

    # Flip y-axis, because convention is x in positive direction and y in negative, but gee use positive for both
    tiledata = tiledata.isel(y=slice(None, None, -1))

    # For some reason xee does not always set the crs
    tiledata = tiledata.odc.assign_crs(self.extent.crs)

    # Recrop the data to the tile, since gee does not always return the exact extent
    tiledata = tiledata.odc.crop(tile.item.extent)

    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (tiledata[band].min().values.item(), tiledata[band].max().values.item())
        for band in tiledata.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    tiledata.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in tiledata.data_vars:
        tiledata[band] = tiledata[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in tiledata.data_vars:
        band_min, band_max = clip_values[band]
        tiledata[band] = (
            tiledata[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)
        )

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/accessors/base.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/tctrend.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.PlateCarree()

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to show the whole world
    ax.set_extent([-180, 180, -90, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    tile_info.plot(
        "id",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax