ThreadedBackend(
repo: Repository,
f: Callable[[PatchIndex], Dataset],
concurrent_downloads: int = 4,
)
Bases: DownloadBackend
Threaded backend for downloading patches.
Initialize the ThreadedBackend.
Parameters:
-
repo
(Repository)
–
-
f
(callable[[PatchIndex], Dataset])
–
A function that takes a PatchIndex and returns an xr.Dataset.
This should be implemented by the specific source backend.
-
concurrent_downloads
(int, default:
4
)
–
The number of concurrent downloads. Defaults to 4.
Methods:
Attributes:
-
created
(bool)
–
Check if the datacube already exists in the storage.
Source code in src/smart_geocubes/backends/threaded.py
| def __init__(self, repo: icechunk.Repository, f: Callable[[PatchIndex], xr.Dataset], concurrent_downloads: int = 4):
"""Initialize the ThreadedBackend.
Args:
repo (icechunk.Repository): The icechunk repository.
f (callable[[PatchIndex], xr.Dataset]): A function that takes a PatchIndex and returns an xr.Dataset.
This should be implemented by the specific source backend.
concurrent_downloads (int, optional): The number of concurrent downloads. Defaults to 4.
"""
super().__init__(repo, f)
self.download_pool = ThreadPoolExecutor(
max_workers=concurrent_downloads, thread_name_prefix="DownloadPoolThread"
)
# The writer allows for asynchronous download and writing
# The writing_pool allows for concurrent writes within the writer thread
# The write_queue is blocking with a maxsize of 2 to prevent too many successful downloads
# from piling up in memory
self.writer = Thread(target=self._writer, daemon=True, name="WriterThread")
self.write_queue: Queue[xr.Dataset] = Queue(maxsize=2)
self.writing_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="WritingPoolThread")
self.writer.start()
|
created
property
Check if the datacube already exists in the storage.
Returns:
-
bool ( bool
) –
True if the datacube already exists in the storage.
assert_created
assert_created(session: Session | None = None)
Assert that the datacube exists in the storage.
Raises:
Source code in src/smart_geocubes/core/backend.py
| def assert_created(self, session: icechunk.Session | None = None):
"""Assert that the datacube exists in the storage.
Raises:
FileNotFoundError: If the datacube does not exist.
"""
if session is None:
session = self.repo.readonly_session("main")
if sync(session.store.is_empty("")):
msg = "Datacube does not exist."
" Please use the `create` method or pass `create=True` to `load`."
logger.error(msg)
raise FileNotFoundError(msg)
|
close
Close the backend.
Returns:
-
bool ( bool
) –
True if the backend was closed successfully, False otherwise.
Source code in src/smart_geocubes/backends/threaded.py
| def close(self) -> bool:
"""Close the backend.
Returns:
bool: True if the backend was closed successfully, False otherwise.
"""
logger.debug("Closing Backend...")
self.download_pool.shutdown(wait=False, cancel_futures=True)
logger.debug("Download pool shut down.")
# We can assume that shutdown is present (even if ty sais otherwise) because this backend
# is only available for Python 3.13 and above, where the shutdown method is available for queues
self.write_queue.shutdown(immediate=True) # ty:ignore[unresolved-attribute]
logger.debug("Write queue shut down.")
self.writer.join()
logger.debug("Writer thread joined.")
self.writing_pool.shutdown(wait=True, cancel_futures=False)
logger.debug("Writing pool shut down.")
logger.info("Backend closed.")
return True
|
loaded_patches
loaded_patches(session: Session | None = None) -> list[str]
Get a list of all loaded patch ids.
Returns:
-
list[str]
–
list[str]: A list of all loaded patch ids.
Source code in src/smart_geocubes/core/backend.py
| def loaded_patches(self, session: icechunk.Session | None = None) -> list[str]:
"""Get a list of all loaded patch ids.
Returns:
list[str]: A list of all loaded patch ids.
"""
zcube = self.open_zarr(session)
loaded_patches = cast(list[str], zcube.attrs.get("loaded_patches", []))
assert isinstance(loaded_patches, list), "Expected 'loaded_patches' attribute to be a list of strings."
return loaded_patches
|
open_xarray
open_xarray(session: Session | None = None) -> xr.Dataset
Open the xarray datacube in read-only mode.
Returns:
-
Dataset
–
xr.Dataset: The xarray datacube.
Source code in src/smart_geocubes/core/backend.py
| def open_xarray(self, session: icechunk.Session | None = None) -> xr.Dataset:
"""Open the xarray datacube in read-only mode.
Returns:
xr.Dataset: The xarray datacube.
"""
if session is None:
session = self.repo.readonly_session("main")
self.assert_created(session)
xcube = xr.open_zarr(session.store, mask_and_scale=False, consolidated=False).set_coords("spatial_ref")
return xcube
|
open_zarr
open_zarr(session: Session | None = None) -> zarr.Group
Open the zarr datacube in read-only mode.
Returns:
-
Group
–
zarr.Group: The zarr datacube.
Source code in src/smart_geocubes/core/backend.py
| def open_zarr(self, session: icechunk.Session | None = None) -> zarr.Group:
"""Open the zarr datacube in read-only mode.
Returns:
zarr.Group: The zarr datacube.
"""
if session is None:
session = self.repo.readonly_session("main")
self.assert_created(session)
zcube = zarr.open(store=session.store, mode="r")
assert isinstance(zcube, zarr.Group), "Expected a zarr group at the root of the store."
return zcube
|
submit
submit(idx: PatchIndex | list[PatchIndex])
Submit a patch download request to the backend.
Parameters:
Raises:
-
RuntimeError
–
If the writer thread is not alive or if downloading failed for any patches.
Source code in src/smart_geocubes/backends/threaded.py
| def submit(self, idx: PatchIndex | list[PatchIndex]):
"""Submit a patch download request to the backend.
Args:
idx (PatchIndex | list[PatchIndex]): The index or multiple indices of the patch(es) to download.
Raises:
RuntimeError: If the writer thread is not alive or if downloading failed for any patches.
"""
if isinstance(idx, PatchIndex):
idx = [idx]
# Check if the writer thread is still alive
if not self.writer.is_alive():
raise RuntimeError("Writer thread is not alive. This happens if the writer thread crashes.")
futures = [self.download_pool.submit(self._download_in_pool, i) for i in idx]
done, not_done = wait(futures)
if len(not_done) > 0:
raise RuntimeError(f"Downloading not completed for {len(not_done)} patches.")
# Check if any of the downloads raised an exception for f in done:
failed = [f for f in done if f.exception() is not None]
if len(failed) > 0:
raise RuntimeError(f"Downloading failed for {len(failed)} patches.") from failed[0].exception()
# Check if the queue is still alive
# We can assume that shutdown is present (even if ty sais otherwise) because this backend
# is only available for Python 3.13 and above, where the shutdown method is available for queues
if self.write_queue.is_shutdown: # ty:ignore[unresolved-attribute]
raise RuntimeError(
"Write queue is not alive. This happens if the writer thread crashes or the backend is closed."
)
# Check how many tasks are currently in the queue
queue_size = self.write_queue.qsize()
logger.debug(f"{len(idx)} patches submitted for download. {queue_size} patches currently in the write queue.")
with self.write_queue.mutex:
if self.write_queue.unfinished_tasks > 0:
self.write_queue.all_tasks_done.wait()
else:
logger.debug("No patches currently being written. All submitted patches were written immediately.")
logger.debug("All submitted patches downloaded and written.")
|