Skip to content

smart_geocubes.datasets.tctrend

Predefined accessor for TCTrend data.

Classes:

  • TCTrend

    Accessor for TCTrend data.

TCTrend

TCTrend(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: GEEAccessor

Accessor for TCTrend data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the zarr array. Should be equal to the extent geobox.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from Google Earth Engine.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from Google Earth Engine.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        return [TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx]) for idx in adjacent_geometries["idx"]]

    elif isinstance(roi, GeoBox):
        return [
            TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx])
            for idx in self._extent_tiles.tiles(roi.extent)
        ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    tiles = GeoboxTiles(self.extent, (self.chunk_size, self.chunk_size))
    loaded_tiles = [{"geometry": tiles[_id_to_tileidx(tid)].extent.geom, "id": tid} for tid in loaded_tiles]
    gdf = gpd.GeoDataFrame(loaded_tiles, crs=self.extent.crs.to_wkt())
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from Google Earth Engine.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/gee.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from Google Earth Engine.

    Args:
        tile (TileWrapper): The tile to download.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    import ee
    import rioxarray  # noqa: F401
    import xee  # noqa: F401

    # Note: This is a little bit weird: First we create an own grid which overlaps to the chunks
    # of the zarr array. Then we create a mosaic of the data and clip it to a single chunk.
    # We could load the images from the collection directly instead of creating a mosaic.
    # However, this would require more testing and probably results a lot of manual computation
    # of slices etc. like in the stac variant. So for now, we just use the mosaic.
    logging.getLogger("urllib3.connectionpool").disabled = True
    geom = ee.Geometry.Rectangle(tile.item.geographic_extent.boundingbox)
    ee_img = ee.ImageCollection(self.collection).mosaic().clip(geom)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=UserWarning, message=EE_WARN_MSG)
        tiledata = xr.open_dataset(
            ee_img,
            engine="ee",
            geometry=geom,
            crs=f"epsg:{self.extent.crs.to_epsg()}",
            scale=self.extent.resolution.x,
        )

    # TODO: Allow for multi-temporal datacubes and lat/lon coordinates
    tiledata = tiledata.max("time").rename({"lon": "x", "lat": "y"}).transpose("y", "x")

    # Download the data
    tiledata.load()
    logging.getLogger("urllib3.connectionpool").disabled = False

    # Flip y-axis, because convention is x in positive direction and y in negative, but gee use positive for both
    tiledata = tiledata.isel(y=slice(None, None, -1))

    # For some reason xee does not always set the crs
    tiledata = tiledata.odc.assign_crs(self.extent.crs)

    # Recrop the data to the tile, since gee does not always return the exact extent
    tiledata = tiledata.odc.crop(tile.item.extent)

    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (tiledata[band].min().values.item(), tiledata[band].max().values.item())
        for band in tiledata.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    tiledata.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in tiledata.data_vars:
        tiledata[band] = tiledata[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in tiledata.data_vars:
        band_min, band_max = clip_values[band]
        tiledata[band] = (
            tiledata[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)
        )

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/accessors/base.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize the extend, hence the already downloaded and filled data, of the datacube.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

Raises:

Source code in src/smart_geocubes/datasets/tctrend.py
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize the extend, hence the already downloaded and filled data, of the datacube.

    Args:
        ax (plt.Axes | None): The axes drawn to. If None, will create a new figure and axes.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization if no axes was provided, else the axes.

    Raises:
        ValueError: If the datacube is empty

    """
    import cartopy.crs as ccrs
    import cartopy.feature as cfeature
    import matplotlib.pyplot as plt

    tile_info = self.current_state()

    if tile_info is None:
        raise ValueError("Datacube is not created or loaded yet. Can't visualize!")

    # Define the projection
    projection = ccrs.PlateCarree()

    # Create a figure
    fig = None
    if ax is None:
        fig, ax = plt.subplots(figsize=(10, 10), subplot_kw={"projection": projection})

    # Set the extent to show the whole world
    ax.set_extent([-180, 180, -90, 90], crs=ccrs.PlateCarree())

    # Add features
    ax.add_feature(cfeature.LAND, zorder=0, edgecolor="black", facecolor="white")
    ax.add_feature(cfeature.OCEAN, zorder=0, facecolor="lightgrey")
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=":")
    ax.add_feature(cfeature.LAKES, alpha=0.5)
    ax.add_feature(cfeature.RIVERS)

    # Add gridlines
    gl = ax.gridlines(draw_labels=True)
    gl.top_labels = False
    gl.right_labels = False

    tile_info.plot(
        "id",
        ax=ax,
        transform=ccrs.PlateCarree(),
        edgecolor="black",
        categorical=True,
        aspect="equal",
        alpha=0.5,
    )

    if fig is not None:
        return fig
    else:
        return ax