Skip to content

skais_mapper.data

Image/map data readers and (HDF5) writers.

Classes:

Name Description
Img2H5Buffer

Parse images (incrementally or all at once) and write to HDF5 files.

ImgRead

Flexible image reader for multiple formats.

Img2H5Buffer

Img2H5Buffer(
    path: str | Path = None,
    target: str | Path = None,
    data: np.ndarray | dict = None,
    size: int | float | str = "1G",
)

Parse images (incrementally or all at once) and write to HDF5 files.

The directory structure of a dataset should be as follows
  • : /path/to/dataset/root
  • : image class as a subdirectory in the dataset
  • image file : {npy | jpg | png | etc.}

E.g. file paths of the following structure: /path/to/dataset/root//image_class//423120.npy HDF5 files end up being: /image_class/dataset

Note: by default the entire dataset is loaded into cache

Constructor.

Parameters:

Name Type Description Default
path str | Path

Path to a data directory where the source files are located.

None
target str | Path

Filename of the HDF5 file to be written.

None
data np.ndarray | dict

Alternative input format to path for adding data arrays or dictionaries directly to the buffer queue.

None
size int | float | str

Buffer cache size in bytes or passed as string.

'1G'

Methods:

Name Description
configure_rdcc

Automatically configure HDF5 data chunking for optimal writing.

flush

Send all data pages from the buffer queue.

glob_path

Glob path recursively for files.

inc_write

Incrementally (append mode) write the buffer to HDF5 file.

send

Grab first data page from the buffer queue.

store

Insert data into the buffer queue.

write

Write all files in buffer to a new HDF5 file.

Attributes:

Name Type Description
n_files int

Number of files to be parsed.

nbytes list[nbytes]

List of the number of bytes for each buffer file.

page np.ndarray | dict | None

Buffer page ready to be written to file.

total_nbytes list[nbytes]

Total number of bytes for buffer.

Source code in skais_mapper/data.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __init__(
    self,
    path: str | Path = None,
    target: str | Path = None,
    data: np.ndarray | dict = None,
    size: int | float | str = "1G",
):
    """Constructor.

    Args:
        path: Path to a data directory where the source files are located.
        target: Filename of the HDF5 file to be written.
        data: Alternative input format to `path` for adding data arrays or
          dictionaries directly to the buffer queue.
        size: Buffer cache size in bytes or passed as string.
    """
    self.files = self.glob_path(path)
    self.queue: list[np.ndarray | dict] = []
    if isinstance(data, np.ndarray | dict):
        self.queue.append(data)
    if target is None:
        target = self.default_target_name.format(current_time()[2:])
    self.target = Path(target)
    self.cache_size = nbytes(size)
    self.index = -1

n_files property

n_files: int

Number of files to be parsed.

nbytes property

nbytes: list[nbytes]

List of the number of bytes for each buffer file.

page property

page: np.ndarray | dict | None

Buffer page ready to be written to file.

total_nbytes property

total_nbytes: list[nbytes]

Total number of bytes for buffer.

configure_rdcc

configure_rdcc(
    cache_size: int | float | str | None = None,
    f: int = 10,
    verbose: bool = False,
    **kwargs,
) -> dict

Automatically configure HDF5 data chunking for optimal writing.

Parameters:

Name Type Description Default
cache_size int | float | str | None

Cache of the entire buffer.

None
f int

Factor with which to increase the number of slots.

10
verbose bool

Print additional information to stdout.

False
**kwargs

Additional keyword arguments such as - rdcc_nbytes: See h5py.File or below for details. - rdcc_w0: See h5py.File or below for details. - rdcc_nslots: See h5py.File or below for details.

{}

Returns:

Type Description
dict

rdcc_nbytes: sets the total size (measured in bytes) of the raw data chunk cache for each dataset. This should be set to the size of each chunk times the number of chunks that are likely to be needed in cache.

dict

rdcc_w0: sets the eviction policy for chunks from the cache when more space is needed. 0 is always last used chunk, 1 the last used chunk fully read or written, and inbetween values hybrid policies.

dict

rdcc_nslots: is the number of chunk slots in the cache for each dataset. In order to allow the chunks to be looked up quickly in cache, each chunk is hashed. Thus, it should be large enough to minimize the number of hash value collisions. At minimum 10, for maximum performance about 100 times larger as the number of chunks which fit in cache, ideally a prime number.

Source code in skais_mapper/data.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def configure_rdcc(
    self,
    cache_size: int | float | str | None = None,
    f: int = 10,
    verbose: bool = False,
    **kwargs,
) -> dict:
    """Automatically configure HDF5 data chunking for optimal writing.

    Args:
        cache_size: Cache of the entire buffer.
        f: Factor with which to increase the number of slots.
        verbose: Print additional information to stdout.
        **kwargs: Additional keyword arguments such as
          - `rdcc_nbytes`: See h5py.File or below for details.
          - `rdcc_w0`: See h5py.File or below for details.
          - `rdcc_nslots`: See h5py.File or below for details.

    Returns:
        `rdcc_nbytes`: sets the total size (measured in bytes) of the raw data
          chunk cache for each dataset. This should be set to the size of each
          chunk times the number of chunks that are likely to be needed in cache.
        `rdcc_w0`: sets the eviction policy for chunks from the cache when more
          space is needed. 0 is always last used chunk, 1 the last used chunk fully
          read or written, and inbetween values hybrid policies.
        `rdcc_nslots`: is the number of chunk slots in the cache for each dataset.
          In order to allow the chunks to be looked up quickly in cache, each chunk
          is hashed. Thus, it should be large enough to minimize the number of hash
          value collisions. At minimum 10, for maximum performance about 100 times
          larger as the number of chunks which fit in cache, ideally a prime number.
    """
    if cache_size is None:
        cache_size = self.cache_size
    else:
        cache_size = nbytes(cache_size)
    slots_size = max(self.nbytes) if self.nbytes else nbytes("2M")
    n_slots = int(cache_size / slots_size) if self.n_files else 100_000
    # avoid calculating prime numbers if previous configuration looks similar
    if (
        hasattr(self, "_rdcc")
        and self._rdcc["rdcc_nbytes"] == int(cache_size)
        and self._rdcc["rdcc_w0"] == kwargs.get("rdcc_w0", 1.0)
    ):
        return self._rdcc
    kwargs.setdefault("rdcc_nbytes", int(cache_size))
    kwargs.setdefault("rdcc_w0", 1.0)
    kwargs.setdefault("rdcc_nslots", next_prime(int(n_slots * f)))
    if verbose:
        sample_size = max(self.nbytes)
        print(f"Sample size: {sample_size}")
        print(f"Slot size: {nbytes(kwargs['rdcc_nbytes'] / kwargs['rdcc_nslots'])}")
        print(f"Slots: {kwargs['rdcc_nslots']}({n_slots})")
        print(f"Cache size: {nbytes(kwargs['rdcc_nbytes'])}")
        print(f"Eviction policy: {kwargs['rdcc_w0']}")
    self._rdcc = kwargs
    return kwargs

flush

flush() -> (
    np.ndarray | dict | list[np.ndarray | dict] | None
)

Send all data pages from the buffer queue.

Source code in skais_mapper/data.py
463
464
465
466
467
468
469
470
471
472
473
474
def flush(self) -> np.ndarray | dict | list[np.ndarray | dict] | None:
    """Send all data pages from the buffer queue."""
    if self.files:
        self.store(self.files)
        self.files = []
    if self.queue:
        data = self.queue
        self.queue = []
        if len(data) == 1:
            return data[0]
        return data
    return None

glob_path staticmethod

glob_path(
    path: str | Path | list[str] | list[Path],
    extensions: str | list[str] = None,
) -> list[Path]

Glob path recursively for files.

Parameters:

Name Type Description Default
path str | Path | list[str] | list[Path]

Filename, path or list, can contain wildcards * or **.

required
extensions str | list[str]

File extension to look fo

None
Source code in skais_mapper/data.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@staticmethod
def glob_path(
    path: str | Path | list[str] | list[Path], extensions: str | list[str] = None
) -> list[Path]:
    """Glob path recursively for files.

    Args:
        path: Filename, path or list, can contain wildcards `*` or `**`.
        extensions: File extension to look fo
    """
    files: list[Path] = []
    extensions = [extensions] if isinstance(extensions, str) else extensions
    root, file_key = Img2H5Buffer._split_glob(path)
    for p, k in zip(root, file_key):
        if k is None:
            files.append(Path(p))
        else:
            path_files = [
                f
                for f in Path(p).rglob(k)
                if f.is_file() and f.suffix in Img2H5Buffer.extensions
            ]
            files += sorted(path_files)
    files = [p for p in files if p.exists()]
    return files

inc_write

inc_write(
    path: str | Path | None = None,
    group: str = "images",
    data: np.ndarray | dict | None = None,
    expand_dim: bool = True,
    axis: int = 0,
    overwrite: bool | int | None = None,
    verbose: bool = False,
    **kwargs,
)

Incrementally (append mode) write the buffer to HDF5 file.

Parameters:

Name Type Description Default
path str | Path | None

Filename of the HDF5 file and optionally the path of the HDF5 group where the dataset is saved.

None
group str

HDF5 group where to save the dataset. If it does not exist, it is created.

'images'
data np.ndarray | dict | None

Data to be written to the hdf5 file. If None, all files in the buffer are written to HDF5 file.

None
expand_dim bool

Expand dimension of data array for stacking.

True
axis int

Axis of the n-dimensional array where to append

0
overwrite bool | int | None

If data should overwrite indices in a pre-existing HDF5 dataset, set to the index.

None
verbose bool

Print additional information to stdout.

False
kwargs

Additional keyword arguments for Img2H5Buffer.configure_rdcc, h5py.File, and/orh5py.Group.create_dataset`.

{}
Source code in skais_mapper/data.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
def inc_write(
    self,
    path: str | Path | None = None,
    group: str = "images",
    data: np.ndarray | dict | None = None,
    expand_dim: bool = True,
    axis: int = 0,
    overwrite: bool | int | None = None,
    verbose: bool = False,
    **kwargs,
):
    """Incrementally (append mode) write the buffer to HDF5 file.

    Args:
        path: Filename of the HDF5 file and optionally the path of the HDF5
          group where the dataset is saved.
        group: HDF5 group where to save the dataset. If it does not exist,
          it is created.
        data: Data to be written to the hdf5 file. If None, all files in
          the buffer are written to HDF5 file.
        expand_dim: Expand dimension of data array for stacking.
        axis: Axis of the n-dimensional array where to append
        overwrite: If data should overwrite indices in a pre-existing HDF5 dataset,
          set to the index.
        verbose: Print additional information to stdout.
        kwargs: Additional keyword arguments for `Img2H5Buffer.configure_rdcc`,
          h5py.File`, and/or `h5py.Group.create_dataset`.
    """
    if path is None:
        path = self.target
    else:
        path = Path(path)
    if ":" in str(path):
        group = str(path).split(":")[1]
        path = Path(str(path).split(":")[0])
    if group.startswith("/"):
        group = group[1:]
    if data is None:
        data = self.flush()
    if isinstance(data, np.ndarray) and expand_dim:
        data = data[np.newaxis, ...]
    if isinstance(overwrite, bool):
        overwrite = 0 if overwrite else None
    # configure HDF5 chunk caching
    rdcc = {"cache_size": self.cache_size, "verbose": verbose}
    for key in ["f", "rdcc_nbytes", "rdcc_w0", "rdcc_nslots"]:
        if key in kwargs:
            rdcc[key] = kwargs.pop(key)
    rdcc = self.configure_rdcc(**rdcc)
    # write metadata
    if isinstance(data, dict):
        file_kwargs, kwargs = self._h5py_file_kwargs(
            kwargs,
            defaults={"mode": "a", "libver": "latest"},
        )
        with h5py.File(path, **file_kwargs, **rdcc) as h5:
            g = h5.create_group(f"/{group}") if group not in h5.keys() else h5[group]
            for key in data:
                g.attrs[key] = data[key]
            if verbose:
                print(
                    f"Data attribute(s) {tuple(data.keys())} have been "
                    f"written to HDF5 file@[/{group}]"
                )
    # write data
    elif isinstance(data, np.ndarray):
        file_kwargs, kwargs = self._h5py_file_kwargs(
            kwargs,
            defaults={
                "mode": "a",
                "libver": "latest",
            },
        )
        kwargs, _ = self._h5py_create_dataset_kwargs(
            kwargs | file_kwargs,
            defaults={
                "compression": "gzip",
                "shuffle": True,
                "track_times": True,
                "dtype": data.dtype,
                "shape": data.shape,
                "maxshape": data.shape[:axis] + (None,) + data.shape[axis + 1 :],
                "chunks": data.shape[:axis] + (1,) + data.shape[axis + 1 :],
            },
        )
        with h5py.File(path, **file_kwargs, **rdcc) as h5:
            ds_existed = isinstance(h5[group], H5Dataset) if group in h5 else False
            ds = h5.require_dataset(group, **kwargs, **rdcc)
            ds_samples = ds.shape[axis]
            data_samples = data.shape[axis]
            if not ds_existed:
                self.index = 0
            elif overwrite is None:
                self.index = ds_samples
                ds.resize(self.index + data_samples, axis=axis)
            else:
                self.index = overwrite
                if data_samples > ds_samples:
                    ds.resize(self.index + data_samples, axis=axis)
            slc = [slice(None)] * len(ds.shape)
            slc[axis] = slice(self.index, self.index + data_samples)
            ds[tuple(slc)] = data
            if verbose:
                print(
                    f"Data {data.shape} have been written to HDF5 dataset "
                    f"{ds.shape}@({self.index}:{self.index + data_samples})"
                )
    else:
        warnings.warn(
            "Img2H5Buffer did not write data to file (either "
            "because the buffer was empty or data was incompatible)."
        )

send

send(clear: bool = True) -> np.ndarray | dict | None

Grab first data page from the buffer queue.

Source code in skais_mapper/data.py
449
450
451
452
453
454
455
456
457
458
459
460
461
def send(self, clear: bool = True) -> np.ndarray | dict | None:
    """Grab first data page from the buffer queue."""
    if self.queue:
        if clear:
            return self.queue.pop(0)
        return self.page
    elif self.files:
        if clear:
            self.store(self.files.pop(0))
        else:
            self.store(self.files[0])
        return self.send(clear=True)
    return None

store

store(
    data: np.ndarray | dict | str | Path | list[str | Path],
    squash: bool = True,
) -> Img2H5Buffer

Insert data into the buffer queue.

Parameters:

Name Type Description Default
data np.ndarray | dict | str | Path | list[str | Path]

Data to be stored in buffer.

required
squash bool

Squash data dimensions if buffer data is compatible.

True
Source code in skais_mapper/data.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def store(
    self, data: np.ndarray | dict | str | Path | list[str | Path], squash: bool = True
) -> "Img2H5Buffer":
    """Insert data into the buffer queue.

    Args:
        data: Data to be stored in buffer.
        squash: Squash data dimensions if buffer data is compatible.
    """
    if (
        squash
        and isinstance(data, np.ndarray)
        and self.queue
        and isinstance(self.queue[-1], np.ndarray)
    ):
        try:
            self.queue[-1] = np.concatenate((self.queue[-1], data))
        except ValueError:
            self.queue[-1] = ImgRead._stack_max_expand([self.queue[-1], data])
    elif isinstance(data, np.ndarray | dict):
        self.queue.append(data)
    elif isinstance(data, str | Path) or (
        isinstance(data, list) and isinstance(data[0], str | Path)
    ):
        return self.store(ImgRead()(data), squash=squash)
    return self

write

write(
    path: str | Path | None = None,
    group: str = "images",
    data: np.ndarray | dict | None = None,
    verbose: bool = False,
    **kwargs,
)

Write all files in buffer to a new HDF5 file.

Parameters:

Name Type Description Default
path str | Path | None

Filename of the HDF5 file and optionally the path of the HDF5 group where the dataset is saved separated by a colon, e.g. '/path/to/file.hdf5:/path/to/group'.

None
group str

HDF5 group where to save the dataset. If it does not exist, it is created.

'images'
data np.ndarray | dict | None

Data to be written to the HDF5 file.

None
verbose bool

Print additional information to stdout.

False
kwargs

Additional keyword arguments for h5py.Group.create_dataset.

{}
Source code in skais_mapper/data.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def write(
    self,
    path: str | Path | None = None,
    group: str = "images",
    data: np.ndarray | dict | None = None,
    verbose: bool = False,
    **kwargs,
):
    """Write all files in buffer to a new HDF5 file.

    Args:
        path: Filename of the HDF5 file and optionally the path of the HDF5
          group where the dataset is saved separated by a colon,
          e.g. `'/path/to/file.hdf5:/path/to/group'`.
        group: HDF5 group where to save the dataset. If it does not exist,
          it is created.
        data: Data to be written to the HDF5 file.
        verbose: Print additional information to stdout.
        kwargs: Additional keyword arguments for `h5py.Group.create_dataset`.
    """
    self.inc_write(path, group=group, data=data, verbose=verbose, **kwargs)

ImgRead

Flexible image reader for multiple formats.

Methods:

Name Description
__call__

Automatically determine file type and read data appropriately.

__call__

__call__(
    paths: str | Path | list[str | Path] | None = None,
    squash: bool = True,
    pad_val: int | float = 0,
    **kwargs,
) -> np.ndarray

Automatically determine file type and read data appropriately.

Parameters:

Name Type Description Default
paths str | Path | list[str | Path] | None

File path to the image to be read.

None
squash bool

If multiple paths are passed, merge and squash arrays.

True
pad_val int | float

Padding value to be used for shape expansion if multiple paths are passed and images have different shape (default: 0).

0
**kwargs

Additional keyword arguments for parser functions: _read_npy, _read_png, or _read_jpg.

{}

Returns:

Type Description
np.ndarray

Numpy ndarray of the image data.

Source code in skais_mapper/data.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __call__(
    self,
    paths: str | Path | list[str | Path] | None = None,
    squash: bool = True,
    pad_val: int | float = 0,
    **kwargs,
) -> np.ndarray:
    """Automatically determine file type and read data appropriately.

    Args:
        paths: File path to the image to be read.
        squash: If multiple paths are passed, merge and squash arrays.
        pad_val: Padding value to be used for shape expansion if multiple
          paths are passed and images have different shape (default: 0).
        **kwargs: Additional keyword arguments for parser functions:
          `_read_npy`, `_read_png`, or `_read_jpg`.

    Returns:
        Numpy ndarray of the image data.
    """
    if isinstance(paths, list | tuple):
        data = [self(p, **kwargs) for p in tqdm(paths, desc="ImgRead")]
        if data and squash:
            try:
                data = np.concatenate(data)
            except ValueError:
                data = self._stack_max_expand(data, pad_val=0)
        return data
    filepath = Path(paths) if paths is not None else Path("")
    match filepath.suffix:
        case ".npy":
            return self._read_npy(filepath, **kwargs)
        case ".jpg":
            return self._read_jpg(filepath, **kwargs)
        case ".png":
            return self._read_png(filepath, **kwargs)
        case _:
            return None