docarray.array.mixins.dataloader.helper module#

class docarray.array.mixins.dataloader.helper.DocumentArrayLoader(path, protocol='protobuf', compress=None, show_progress=False)[source]#

Bases: ParallelMixin, GroupMixin

apply(*args, **kwargs)#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

apply_batch(*args, **kwargs)#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

batch(batch_size, shuffle=False)#

Creates a Generator that yields DocumentArray of size batch_size until docs is fully traversed along the traversal_path. The None docs are filtered out and optionally the docs can be filtered by checking for the existence of a Document attribute. Note, that the last batch might be smaller than batch_size.

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller, default: 32)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

Yield:

a Generator of DocumentArray, each in the length of batch_size

Return type:

Generator[DocumentArray, None, None]

batch_ids(batch_size, shuffle=False)#

Creates a Generator that yields lists of ids of size batch_size until self is fully traversed. Note, that the last batch might be smaller than batch_size.

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

Yield:

a Generator of list of IDs, each in the length of batch_size

Return type:

Generator[List[str], None, None]

map(func, backend='thread', num_worker=None, show_progress=False, pool=None)#

Return an iterator that applies function to every element of iterable in parallel, yielding the results.

See also

  • To process on a batch of elements, please use map_batch();

  • To return a DocumentArray, please use apply().

Parameters:
  • func (Callable[[Document], T]) – a function that takes Document as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]

map_batch(func, batch_size, backend='thread', num_worker=None, shuffle=False, show_progress=False, pool=None)#

Return an iterator that applies function to every minibatch of iterable in parallel, yielding the results. Each element in the returned iterator is DocumentArray.

See also

  • To process single element, please use map();

  • To return DocumentArray, please use apply_batch().

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller, default: 32)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

  • func (Callable[[DocumentArray], T]) – a function that takes DocumentArray as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]

split_by_tag(tag)#

Split the DocumentArray into multiple DocumentArray according to the tag value of each Document.

Parameters:

tag (str) – the tag name to split stored in tags.

Return type:

Dict[Any, DocumentArray]

Returns:

a dict where Documents with the same value on tag are grouped together, their orders are preserved from the original DocumentArray.

Note

If the tags of Document do not contains the specified tag, return an empty dict.