docarray.array.mixins.parallel module#

class docarray.array.mixins.parallel.ParallelMixin[source]#

Bases: object

Helper functions that provide parallel map to DocumentArray

apply(func: Callable[[Document], Document], backend: str = 'thread', num_worker: Optional[int] = None, show_progress: bool = False, pool: Optional[Union[Pool, ThreadPool]] = None) T[source]#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

map(func, backend='thread', num_worker=None, show_progress=False, pool=None)[source]#

Return an iterator that applies function to every element of iterable in parallel, yielding the results.

See also

  • To process on a batch of elements, please use map_batch();

  • To return a DocumentArray, please use apply().

Parameters:
  • func (Callable[[Document], T]) – a function that takes Document as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]

apply_batch(func: Callable[[DocumentArray], DocumentArray], batch_size: int, backend: str = 'thread', num_worker: Optional[int] = None, shuffle: bool = False, show_progress: bool = False, pool: Optional[Union[Pool, ThreadPool]] = None) T[source]#

# noqa: DAR102 # noqa: DAR101 # noqa: DAR201 :rtype: T :return: a new DocumentArray

map_batch(func, batch_size, backend='thread', num_worker=None, shuffle=False, show_progress=False, pool=None)[source]#

Return an iterator that applies function to every minibatch of iterable in parallel, yielding the results. Each element in the returned iterator is DocumentArray.

See also

  • To process single element, please use map();

  • To return DocumentArray, please use apply_batch().

Parameters:
  • batch_size (int) – Size of each generated batch (except the last one, which might be smaller, default: 32)

  • shuffle (bool) – If set, shuffle the Documents before dividing into minibatches.

  • func (Callable[[DocumentArray], T]) – a function that takes DocumentArray as input and outputs anything. You can either modify elements in-place (only with thread backend) or work later on return elements.

  • backend (str) –

    if to use multi-process or multi-thread as the parallelization backend. In general, if your func is IO-bound then perhaps thread is good enough. If your func is CPU-bound then you may use process. In practice, you should try yourselves to figure out the best value. However, if you wish to modify the elements in-place, regardless of IO/CPU-bound, you should always use thread backend.

    Warning

    When using process backend, you should not expect func modify elements in-place. This is because the multiprocessing backing pass the variable via pickle and work in another process. The passed object and the original object do not share the same memory.

  • num_worker (Optional[int]) – the number of parallel workers. If not given, then the number of CPUs in the system will be used.

  • show_progress (bool) – show a progress bar

  • pool (Union[Pool, ThreadPool, None]) – use an existing/external pool. If given, backend is ignored and you will be responsible for closing the pool.

Yield:

anything return from func

Return type:

Generator[T, None, None]