Skip to content

smart_geocubes.backends.threaded

Write specific backends.

Classes:

ThreadedBackend

ThreadedBackend(
    repo: Repository,
    f: Callable[[PatchIndex], Dataset],
    concurrent_downloads: int = 4,
)

Bases: DownloadBackend

Threaded backend for downloading patches.

Initialize the ThreadedBackend.

Parameters:

  • repo

    (Repository) –

    The icechunk repository.

  • f

    (callable[[PatchIndex], Dataset]) –

    A function that takes a PatchIndex and returns an xr.Dataset. This should be implemented by the specific source backend.

  • concurrent_downloads

    (int, default: 4 ) –

    The number of concurrent downloads. Defaults to 4.

Methods:

  • assert_created

    Assert that the datacube exists in the storage.

  • close

    Close the backend.

  • loaded_patches

    Get a list of all loaded patch ids.

  • open_xarray

    Open the xarray datacube in read-only mode.

  • open_zarr

    Open the zarr datacube in read-only mode.

  • submit

    Submit a patch download request to the backend.

Attributes:

  • created (bool) –

    Check if the datacube already exists in the storage.

Source code in src/smart_geocubes/backends/threaded.py
def __init__(self, repo: icechunk.Repository, f: Callable[[PatchIndex], xr.Dataset], concurrent_downloads: int = 4):
    """Initialize the ThreadedBackend.

    Args:
        repo (icechunk.Repository): The icechunk repository.
        f (callable[[PatchIndex], xr.Dataset]): A function that takes a PatchIndex and returns an xr.Dataset.
            This should be implemented by the specific source backend.
        concurrent_downloads (int, optional): The number of concurrent downloads. Defaults to 4.

    """
    super().__init__(repo, f)

    self.download_pool = ThreadPoolExecutor(
        max_workers=concurrent_downloads, thread_name_prefix="DownloadPoolThread"
    )

    # The writer allows for asynchronous download and writing
    # The writing_pool allows for concurrent writes within the writer thread
    # The write_queue is blocking with a maxsize of 2 to prevent too many successful downloads
    # from piling up in memory
    self.writer = Thread(target=self._writer, daemon=True, name="WriterThread")
    self.write_queue: Queue[xr.Dataset] = Queue(maxsize=2)
    self.writing_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="WritingPoolThread")
    self.writer.start()

created property

created: bool

Check if the datacube already exists in the storage.

Returns:

  • bool ( bool ) –

    True if the datacube already exists in the storage.

assert_created

assert_created(session: Session | None = None)

Assert that the datacube exists in the storage.

Raises:

Source code in src/smart_geocubes/core/backend.py
def assert_created(self, session: icechunk.Session | None = None):
    """Assert that the datacube exists in the storage.

    Raises:
        FileNotFoundError: If the datacube does not exist.

    """
    if session is None:
        session = self.repo.readonly_session("main")
    if sync(session.store.is_empty("")):
        msg = "Datacube does not exist."
        " Please use the `create` method or pass `create=True` to `load`."
        logger.error(msg)
        raise FileNotFoundError(msg)

close

close() -> bool

Close the backend.

Returns:

  • bool ( bool ) –

    True if the backend was closed successfully, False otherwise.

Source code in src/smart_geocubes/backends/threaded.py
def close(self) -> bool:
    """Close the backend.

    Returns:
        bool: True if the backend was closed successfully, False otherwise.

    """
    logger.debug("Closing Backend...")

    self.download_pool.shutdown(wait=False, cancel_futures=True)
    logger.debug("Download pool shut down.")

    self.write_queue.shutdown(immediate=True)
    logger.debug("Write queue shut down.")

    self.writer.join()
    logger.debug("Writer thread joined.")

    self.writing_pool.shutdown(wait=True, cancel_futures=False)
    logger.debug("Writing pool shut down.")

    logger.info("Backend closed.")
    return True

loaded_patches

loaded_patches(session: Session | None = None) -> list[str]

Get a list of all loaded patch ids.

Returns:

  • list[str]

    list[str]: A list of all loaded patch ids.

Source code in src/smart_geocubes/core/backend.py
def loaded_patches(self, session: icechunk.Session | None = None) -> list[str]:
    """Get a list of all loaded patch ids.

    Returns:
        list[str]: A list of all loaded patch ids.

    """
    zcube = self.open_zarr(session)
    loaded_patches = zcube.attrs.get("loaded_patches", [])
    return loaded_patches

open_xarray

open_xarray(session: Session | None = None) -> xr.Dataset

Open the xarray datacube in read-only mode.

Returns:

  • Dataset

    xr.Dataset: The xarray datacube.

Source code in src/smart_geocubes/core/backend.py
def open_xarray(self, session: icechunk.Session | None = None) -> xr.Dataset:
    """Open the xarray datacube in read-only mode.

    Returns:
        xr.Dataset: The xarray datacube.

    """
    if session is None:
        session = self.repo.readonly_session("main")
    self.assert_created(session)
    xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
    return xcube

open_zarr

open_zarr(session: Session | None = None) -> zarr.Group

Open the zarr datacube in read-only mode.

Returns:

  • Group

    zarr.Group: The zarr datacube.

Source code in src/smart_geocubes/core/backend.py
def open_zarr(self, session: icechunk.Session | None = None) -> zarr.Group:
    """Open the zarr datacube in read-only mode.

    Returns:
        zarr.Group: The zarr datacube.

    """
    if session is None:
        session = self.repo.readonly_session("main")
    self.assert_created(session)
    zcube = zarr.open(store=session.store, mode="r")
    return zcube

submit

submit(idx: PatchIndex | list[PatchIndex])

Submit a patch download request to the backend.

Parameters:

Raises:

  • RuntimeError

    If the writer thread is not alive or if downloading failed for any patches.

Source code in src/smart_geocubes/backends/threaded.py
def submit(self, idx: PatchIndex | list[PatchIndex]):
    """Submit a patch download request to the backend.

    Args:
        idx (PatchIndex | list[PatchIndex]): The index or multiple indices of the patch(es) to download.

    Raises:
        RuntimeError: If the writer thread is not alive or if downloading failed for any patches.

    """
    if isinstance(idx, PatchIndex):
        idx = [idx]

    # Check if the writer thread is still alive
    if not self.writer.is_alive():
        raise RuntimeError("Writer thread is not alive. This happens if the writer thread crashes.")

    futures = [self.download_pool.submit(self._download_in_pool, i) for i in idx]

    _, failed = wait(futures)
    if len(failed) > 0:
        raise RuntimeError(f"Downloading failed for {len(failed)} patches.")

    # Check if the queue is still alive
    if self.write_queue.is_shutdown:
        raise RuntimeError(
            "Write queue is not alive. This happens if the writer thread crashes or the backend is closed."
        )
    with self.write_queue.mutex:
        self.write_queue.all_tasks_done.wait()
    logger.debug("All submitted patches downloaded and written.")