Skip to content

smart_geocubes.accessors.gee

Google Earth Engine Accessor for Smart Geocubes.

Classes:

  • GEEAccessor

    Accessor for Google Earth Engine data.

GEEAccessor

GEEAccessor(
    storage: Storage | Path | str,
    create_icechunk_storage: bool = True,
)

Bases: RemoteAccessor

Accessor for Google Earth Engine data.

Attributes:

  • extent (GeoBox) –

    The extent of the datacube represented by a GeoBox.

  • chunk_size (int) –

    The chunk size of the datacube.

  • channels (list) –

    The channels of the datacube.

  • storage (Storage) –

    The icechunk storage.

  • repo (Repository) –

    The icechunk repository.

  • title (str) –

    The title of the datacube.

  • stopuhr (StopUhr) –

    The benchmarking timer from the stopuhr library.

  • zgeobox (GeoBox) –

    The geobox of the underlaying zarr array. Should be equal to the extent geobox. However, this property is used to find the target index of the downloaded data, so better save than sorry.

  • created (bool) –

    True if the datacube already exists in the storage.

Initialize base class for remote accessors.

Warning

In a multiprocessing environment, it is strongly recommended to not set create_icechunk_storage=False.

Parameters:

  • storage

    (Storage) –

    The icechunk storage of the datacube.

  • create_icechunk_storage

    (bool, default: True ) –

    If an icechunk repository should be created at provided storage if no exists. This should be disabled in a multiprocessing environment. Defaults to True.

Raises:

  • ValueError

    If the storage is not an icechunk.Storage.

Methods:

Source code in src/smart_geocubes/accessors/base.py
def __init__(
    self,
    storage: icechunk.Storage | Path | str,
    create_icechunk_storage: bool = True,
):
    """Initialize base class for remote accessors.

    !!! warning

        In a multiprocessing environment, it is strongly recommended to not set `create_icechunk_storage=False`.

    Args:
        storage (icechunk.Storage): The icechunk storage of the datacube.
        create_icechunk_storage (bool, optional): If an icechunk repository should be created at provided storage
            if no exists.
            This should be disabled in a multiprocessing environment.
            Defaults to True.

    Raises:
        ValueError: If the storage is not an icechunk.Storage.

    """
    # Title is used for logging, debugging and as a default name for the datacube
    self.title = self.__class__.__name__

    if isinstance(storage, (str | Path)):
        storage = storage if isinstance(storage, str) else str(storage.resolve())
        storage = icechunk.local_filesystem_storage(storage)
    if not isinstance(storage, icechunk.Storage):
        raise ValueError(f"Expected an icechunk.Storage, but got {type(storage)}")
    self.storage = storage
    logger.debug(f"Using storage {storage=}")
    if create_icechunk_storage:
        self.repo = icechunk.Repository.open_or_create(storage)  # Will create a "main" branch
    else:
        self.repo = icechunk.Repository.open(storage)
    logger.debug(f"Using repository {self.repo=}")

    # The benchmarking timer for this accessor
    self.stopuhr = StopUhr(logger.debug)

    # The TypeVar used by the ThreadingHandler was added in 3.12
    # The Shutdown method of the queue was added in 3.13
    # Hence, we don't want to import the module unless Python 3.13 is installed
    if _check_python_version(3, 13):
        from smart_geocubes._concurrency.threading import ThreadingHandler

        self._threading_handler = ThreadingHandler(self._threading_download)

adjacent_tiles

adjacent_tiles(
    roi: GeoBox | GeoDataFrame,
) -> list[TileWrapper]

Get adjacent tiles from Google Earth Engine.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe

Returns:

  • list[TileWrapper]

    list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

Source code in src/smart_geocubes/accessors/gee.py
def adjacent_tiles(self, roi: GeoBox | gpd.GeoDataFrame) -> list[TileWrapper]:
    """Get adjacent tiles from Google Earth Engine.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe

    Returns:
        list[TileWrapper]: List of adjacent tiles, wrapped in own datastructure for easier processing.

    """
    if isinstance(roi, gpd.GeoDataFrame):
        adjacent_geometries = (
            gpd.sjoin(self._tile_geometries, roi.to_crs(self.extent.crs.wkt), how="inner", predicate="intersects")
            .reset_index()
            .drop_duplicates(subset="index", keep="first")
            .set_index("index")
        )
        return [TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx]) for idx in adjacent_geometries["idx"]]

    elif isinstance(roi, GeoBox):
        return [
            TileWrapper(_tileidx_to_id(idx), self._extent_tiles[idx])
            for idx in self._extent_tiles.tiles(roi.extent)
        ]

assert_created

assert_created()

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def assert_created(self):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if not self.created:
        msg = f"Datacube {self.title} does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

create

create(overwrite: bool = False)

Create an empty datacube and write it to the store.

Parameters:

  • overwrite

    (bool, default: False ) –

    Allowing overwriting an existing datacube. Defaults to False.

Raises:

Source code in src/smart_geocubes/accessors/base.py
def create(self, overwrite: bool = False):
    """Create an empty datacube and write it to the store.

    Args:
        overwrite (bool, optional): Allowing overwriting an existing datacube. Defaults to False.

    Raises:
        FileExistsError: If a datacube already exists at location

    """
    with self.stopuhr("Empty datacube creation"):
        # Check if the zarr data already exists
        session = self.repo.writable_session("main")
        cube_is_empty = sync(session.store.is_empty(""))
        if not overwrite and not cube_is_empty:
            logger.debug(f"Unable to create a new datacube. {overwrite=} {cube_is_empty=} {session.store=}")
            raise FileExistsError(f"Cannot create a new  datacube. {session.store=} is not empty!")

        logger.debug(
            f"Creating an empty zarr datacube '{self.title}' with the variables"
            f" {self.channels} at a {self.extent.resolution=} (epsg:{self.extent.crs.epsg})"
            f" and {self.chunk_size=} to {session.store=}"
        )

        ds = xr.Dataset(
            {
                name: odc.geo.xr.xr_zeros(
                    self.extent,
                    chunks=-1,
                    dtype=self._channels_encoding[name].get("dtype", "float32"),
                    always_yx=True,
                )
                for name in self.channels
            },
            attrs={"title": self.title, "loaded_tiles": []},
        )

        # Add metadata
        for name, meta in self._channels_meta.items():
            ds[name].attrs.update(meta)

        # Get the encoding for the coordinates, variables and spatial reference
        coords_encoding = {
            "x": {"chunks": ds.x.shape, **optimize_coord_encoding(ds.x.values, self.extent.resolution.x)},
            "y": {"chunks": ds.y.shape, **optimize_coord_encoding(ds.y.values, self.extent.resolution.y)},
        }
        var_encoding = {
            name: {
                "chunks": (self.chunk_size, self.chunk_size),
                "compressors": [BloscCodec(clevel=9)],
                **self._channels_encoding[name],
            }
            for name in self.channels
        }
        encoding = {
            "spatial_ref": {"chunks": None, "dtype": "int32"},
            **coords_encoding,
            **var_encoding,
        }
        logger.debug(f"Datacube {encoding=}")

        ds.to_zarr(
            session.store,
            encoding=encoding,
            compute=False,
            consolidated=False,
            zarr_format=3,
            mode="w" if overwrite else "w-",
        )

        commit = session.commit("Initialize empty datacube")
        logger.debug(f"Datacube created: {commit=}")

        self.post_create()

current_state

current_state() -> gpd.GeoDataFrame | None

Get info about currently stored tiles.

Returns:

  • GeoDataFrame | None

    gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

Source code in src/smart_geocubes/accessors/gee.py
def current_state(self) -> gpd.GeoDataFrame | None:
    """Get info about currently stored tiles.

    Returns:
        gpd.GeoDataFrame: Tiles from odc.geo.GeoboxTiles. None if datacube is empty.

    """
    import geopandas as gpd

    if not self.created:
        return None

    session = self.repo.readonly_session("main")
    zcube = zarr.open(session.store, mode="r")
    loaded_tiles = zcube.attrs.get("loaded_tiles", [])

    if len(loaded_tiles) == 0:
        return None

    tiles = GeoboxTiles(self.extent, (self.chunk_size, self.chunk_size))
    loaded_tiles = [{"geometry": tiles[_id_to_tileidx(tid)].extent.geom, "id": tid} for tid in loaded_tiles]
    gdf = gpd.GeoDataFrame(loaded_tiles, crs=self.extent.crs.to_wkt())
    return gdf

download

download(roi: GeoBox | GeoDataFrame)

Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

Parameters:

  • roi

    (GeoBox | GeoDataFrame) –

    The reference geobox or reference geodataframe to download the data for.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def download(self, roi: GeoBox | gpd.GeoDataFrame):
    """Download the data for the given region of interest which can be provided either as GeoBox or GeoDataFrame.

    Args:
        roi (GeoBox | gpd.GeoDataFrame): The reference geobox or reference geodataframe to download the data for.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"Download of {roi=}"):
        adjacent_tiles = self.adjacent_tiles(roi)
        if not adjacent_tiles:
            logger.error(f"No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(f"{len(adjacent_tiles)=} & {len(loaded_tiles)=} -> {len(new_tiles)=} to download")
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

download_tile

download_tile(tile: TileWrapper) -> xr.Dataset

Download a tile from Google Earth Engine.

Parameters:

Returns:

  • Dataset

    xr.Dataset: The downloaded tile data.

Source code in src/smart_geocubes/accessors/gee.py
def download_tile(self, tile: TileWrapper) -> xr.Dataset:
    """Download a tile from Google Earth Engine.

    Args:
        tile (TileWrapper): The tile to download.

    Returns:
        xr.Dataset: The downloaded tile data.

    """
    import ee
    import rioxarray  # noqa: F401
    import xee  # noqa: F401

    # Note: This is a little bit weird: First we create an own grid which overlaps to the chunks
    # of the zarr array. Then we create a mosaic of the data and clip it to a single chunk.
    # We could load the images from the collection directly instead of creating a mosaic.
    # However, this would require more testing and probably results a lot of manual computation
    # of slices etc. like in the stac variant. So for now, we just use the mosaic.
    logging.getLogger("urllib3.connectionpool").disabled = True
    geom = ee.Geometry.Rectangle(tile.item.geographic_extent.boundingbox)
    ee_img = ee.ImageCollection(self.collection).mosaic().clip(geom)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=UserWarning, message=EE_WARN_MSG)
        tiledata = xr.open_dataset(
            ee_img,
            engine="ee",
            geometry=geom,
            crs=f"epsg:{self.extent.crs.to_epsg()}",
            scale=self.extent.resolution.x,
        )

    # TODO: Allow for multi-temporal datacubes and lat/lon coordinates
    tiledata = tiledata.max("time").rename({"lon": "x", "lat": "y"}).transpose("y", "x")

    # Download the data
    tiledata.load()
    logging.getLogger("urllib3.connectionpool").disabled = False

    # Flip y-axis, because convention is x in positive direction and y in negative, but gee use positive for both
    tiledata = tiledata.isel(y=slice(None, None, -1))

    # For some reason xee does not always set the crs
    tiledata = tiledata.odc.assign_crs(self.extent.crs)

    # Recrop the data to the tile, since gee does not always return the exact extent
    tiledata = tiledata.odc.crop(tile.item.extent)

    # Save original min-max values for each band for clipping later
    clip_values = {
        band: (tiledata[band].min().values.item(), tiledata[band].max().values.item())
        for band in tiledata.data_vars
    }

    # Interpolate missing values (there are very few, so we actually can interpolate them)
    tiledata.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    for band in tiledata.data_vars:
        tiledata[band] = tiledata[band].rio.write_nodata(np.nan).rio.interpolate_na()

    # Convert to uint8
    for band in tiledata.data_vars:
        band_min, band_max = clip_values[band]
        tiledata[band] = (
            tiledata[band].clip(band_min, band_max, keep_attrs=True).astype("uint8").rio.write_nodata(None)
        )

    return tiledata

load

load(
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to load the data for.

  • buffer

    (int, default: 0 ) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist

    (bool, default: True ) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create

    (bool, default: False ) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load(
    self,
    geobox: GeoBox,
    buffer: int = 0,
    persist: bool = True,
    create: bool = False,
    concurrency_mode: ConcurrencyModes = "blocking",
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        geobox (GeoBox): The reference geobox to load the data for.
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} tile {'loading' if persist else 'lazy-loading'}"):
        logger.debug(f"{_geobox_repr(geobox)}: {geobox.resolution} original resolution")

        # Create the datacube if it does not exist
        if create:
            try:
                self.create(overwrite=False)
            except FileExistsError:  # We are okay if the datacube already exists
                pass
        else:
            # Check if the datacube exists
            self.assert_created()

        # Download the adjacent tiles (if necessary)
        reference_geobox = geobox.to_crs(self.extent.crs, resolution=self.extent.resolution.x).pad(buffer)
        self.procedural_download(reference_geobox, concurrency_mode=concurrency_mode)

        # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
        session = self.repo.readonly_session("main")
        chunks = None if persist else "auto"
        xrcube = xr.open_zarr(
            session.store,
            mask_and_scale=False,
            chunks=chunks,
            consolidated=False,
        ).set_coords("spatial_ref")

        # Get an AOI slice of the datacube
        xrcube_aoi = xrcube.odc.crop(reference_geobox.extent, apply_mask=False)

        # The following code would load the lazy zarr data from disk into memory
        if persist:
            with self.stopuhr(f"{_geobox_repr(geobox)}: {self.title} AOI loading from disk"):
                xrcube_aoi = xrcube_aoi.load()
    return xrcube_aoi

load_like

load_like(
    ref: Dataset | DataArray, **kwargs: Unpack[LoadParams]
) -> xr.Dataset

Load the data for the given geobox.

Parameters:

  • ref

    (Dataset | DataArray) –

    The reference dataarray or dataset to load the data for.

  • **kwargs

    (Unpack[LoadParams], default: {} ) –

    The load parameters (buffer, persist, create, concurrency_mode).

Other Parameters:

  • buffer (int) –

    The buffer around the projected geobox in pixels. Defaults to 0.

  • persist (bool) –

    If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

  • create (bool) –

    Create a new zarr array at defined storage if it not exists. This is not recommended, because it can have side effects in a multi-process environment. Defaults to False.

  • concurrency_mode (ConcurrencyModes) –

    The concurrency mode for the download. Defaults to "blocking".

Returns:

  • Dataset

    xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

Source code in src/smart_geocubes/accessors/base.py
def load_like(
    self,
    ref: xr.Dataset | xr.DataArray,
    **kwargs: Unpack[LoadParams],
) -> xr.Dataset:
    """Load the data for the given geobox.

    Args:
        ref (xr.Dataset | xr.DataArray): The reference dataarray or dataset to load the data for.
        **kwargs: The load parameters (buffer, persist, create, concurrency_mode).

    Keyword Args:
        buffer (int, optional): The buffer around the projected geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.
        create (bool, optional): Create a new zarr array at defined storage if it not exists.
            This is not recommended, because it can have side effects in a multi-process environment.
            Defaults to False.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Returns:
        xr.Dataset: The loaded dataset in the same resolution and extent like the geobox.

    """
    return self.load(_geobox_repr(ref.geobox), **kwargs)

log_benchmark_summary

log_benchmark_summary()

Log the benchmark summary.

Source code in src/smart_geocubes/accessors/base.py
def log_benchmark_summary(self):
    """Log the benchmark summary."""
    self.stopuhr.summary()

open_xarray

open_xarray() -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_xarray(self) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr() -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/accessors/base.py
def open_zarr(self) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    self.assert_created()
    session = self.repo.readonly_session("main")
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

post_create

post_create()

Post create actions. Can be overwritten by the dataset accessor.

Source code in src/smart_geocubes/accessors/base.py
def post_create(self):
    """Post create actions. Can be overwritten by the dataset accessor."""
    pass

procedural_download

procedural_download(
    geobox: GeoBox,
    concurrency_mode: ConcurrencyModes = "blocking",
)

Download the data for the given geobox.

Note

The "threading" concurrency mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The reference geobox to download the data for.

  • concurrency_mode

    (ConcurrencyModes, default: 'blocking' ) –

    The concurrency mode for the download. Defaults to "blocking".

Raises:

  • ValueError

    If an unknown concurrency mode is provided.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download(self, geobox: GeoBox, concurrency_mode: ConcurrencyModes = "blocking"):
    """Download the data for the given geobox.

    Note:
        The "threading" concurrency mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The reference geobox to download the data for.
        concurrency_mode (ConcurrencyModes, optional): The concurrency mode for the download.
            Defaults to "blocking".

    Raises:
        ValueError: If an unknown concurrency mode is provided.

    """
    self.assert_created()
    if concurrency_mode == "blocking":
        self.procedural_download_blocking(geobox)
    elif concurrency_mode == "threading":
        raise ValueError("Threading mode is currently disabled. Use 'blocking' instead.")
        # self.procedural_download_threading(geobox)
    else:
        raise ValueError(f"Unknown concurrency mode {concurrency_mode}")

procedural_download_blocking

procedural_download_blocking(geobox: GeoBox)

Download tiles procedurally in blocking mode.

Warning

This method is meant for single-process use, but can (in theory) be used in a multi-process environment. However, in a multi-process environment it can happen that multiple processes try to write concurrently, which results in a conflict. In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • ValueError

    If no tries are left.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_blocking(self, geobox: GeoBox):
    """Download tiles procedurally in blocking mode.

    Warning:
        This method is meant for single-process use, but can (in theory) be used in a multi-process environment.
        However, in a multi-process environment it can happen that multiple processes try to write concurrently,
        which results in a conflict.
        In such cases, the download will be retried until it succeeds or the number of maximum-tries is reached.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        ValueError: If no tries are left.

    """
    with self.stopuhr(f"{_geobox_repr(geobox)}: Procedural download in blocking mode"):
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        session = self.repo.readonly_session("main")
        zcube = zarr.open(store=session.store, mode="r")
        loaded_tiles = zcube.attrs.get("loaded_tiles", [])
        new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
        logger.debug(
            f"{_geobox_repr(geobox)}:  {len(adjacent_tiles)=} & {len(loaded_tiles)=}"
            f" -> {len(new_tiles)=} to download"
        )
        if not new_tiles:
            return

        for tile in new_tiles:
            with self.stopuhr(f"{tile.id=}: Downloading one new tile in blocking mode"):
                logger.debug(f"{tile.id=}: Start downloading")
                tiledata = self.download_tile(tile)

            # Try to write the data to file until a limit is reached
            limit = 100
            for i in range(limit):
                try:
                    self._write_tile_to_zarr(tiledata, tile)
                    break
                except icechunk.ConflictError as conflict_error:
                    logger.debug(f"{tile.id=}: {conflict_error=} at retry {i}/{limit}")
            else:
                logger.error(
                    f"{tile.id=}: {limit} tries to write the tile failed. "
                    "Please check if the datacube is already created and not empty."
                )
                raise ValueError(f"{tile.id=}: {limit} tries to write the tile failed.")

procedural_download_threading

procedural_download_threading(geobox: GeoBox)

Download tiles procedurally in threading mode.

Note

This method ensures that only a single download is running at a time. It uses a SetQueue to prevent duplicate downloads. The threading mode requires Python 3.13 or higher.

Parameters:

  • geobox

    (GeoBox) –

    The geobox of the aoi to download.

Raises:

  • ValueError

    If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.

  • RuntimeError

    If the Python version is lower than 3.13.

Source code in src/smart_geocubes/accessors/base.py
def procedural_download_threading(self, geobox: GeoBox):
    """Download tiles procedurally in threading mode.

    Note:
        This method ensures that only a single download is running at a time.
        It uses a SetQueue to prevent duplicate downloads.
        The threading mode requires Python 3.13 or higher.

    Args:
        geobox (GeoBox): The geobox of the aoi to download.

    Raises:
        ValueError: If no adjacent tiles are found. This can happen if the geobox is out of the dataset bounds.
        RuntimeError: If the Python version is lower than 3.13.

    """
    if not _check_python_version(3, 13):
        raise RuntimeError("Threading mode requires Python 3.13 or higher")
    with self._threading_handler:
        adjacent_tiles = self.adjacent_tiles(geobox)
        if not adjacent_tiles:
            logger.error(f"{_geobox_repr(geobox)}: No adjacent tiles found: {adjacent_tiles=}")
            raise ValueError("No adjacent tiles found - is the provided geobox corrent?")

        # Wait until all new_items are loaded
        prev_len = None
        while True:
            session = self.repo.readonly_session("main")
            zcube = zarr.open(store=session.store, mode="r")
            loaded_tiles = zcube.attrs.get("loaded_tiles", [])
            new_tiles = [tile for tile in adjacent_tiles if tile.id not in loaded_tiles]
            done_tiles = [tile for tile in adjacent_tiles if tile.id in loaded_tiles]
            if not new_tiles:
                break
            if prev_len != len(new_tiles):
                logger.debug(
                    f"{_geobox_repr(geobox)}: {len(done_tiles)} of {len(adjacent_tiles)} downloaded."
                    f" Missing: {[t.id for t in new_tiles]} Done: {[t.id for t in done_tiles]}"
                )
            for tile in new_tiles:
                self._threading_handler._queue.put(tile)
            prev_len = len(new_tiles)
            time.sleep(5)

visualize_state abstractmethod

visualize_state(
    ax: Axes | None = None,
) -> plt.Figure | plt.Axes

Visulize currently stored tiles / chunk.

Must be implemented by the DatasetAccessor.

Parameters:

  • ax

    (Axes | None, default: None ) –

    The axes drawn to. If None, will create a new figure and axes. Defaults to None.

Returns:

  • Figure | Axes

    plt.Figure | plt.Axes: The figure with the visualization

Source code in src/smart_geocubes/accessors/base.py
@abstractmethod
def visualize_state(self, ax: "plt.Axes | None" = None) -> "plt.Figure | plt.Axes":
    """Visulize currently stored tiles / chunk.

    Must be implemented by the DatasetAccessor.

    Args:
        ax (plt.Axes | None, optional): The axes drawn to. If None, will create a new figure and axes.
            Defaults to None.

    Returns:
        plt.Figure | plt.Axes: The figure with the visualization

    """