Source code for docarray.array.storage.base.helper

from typing import Iterator, Dict


[docs]class Offset2ID: def __init__(self, ids=None): self.ids = ids or []
[docs] def get_id(self, idx): return self.ids[idx]
[docs] def append(self, data): self.ids.append(data)
[docs] def extend(self, data): self.ids.extend(data)
[docs] def update(self, position, data_id): self.ids[position] = data_id
[docs] def delete_by_id(self, _id): del self.ids[self.ids.index(_id)]
[docs] def index(self, _id): return self.ids.index(_id)
[docs] def delete_by_offset(self, position): del self.ids[position]
[docs] def insert(self, position, data_id): self.ids.insert(position, data_id)
[docs] def clear(self): self.ids.clear()
[docs] def delete_by_ids(self, ids): ids = set(ids) self.ids = list(filter(lambda _id: _id not in ids, self.ids))
[docs] def update_ids(self, _ids_map: Dict[str, str]): for i in range(len(self.ids)): if self.ids[i] in _ids_map: self.ids[i] = _ids_map[self.ids[i]]
[docs] def save(self): pass
[docs] def load(self): pass
def __iter__(self) -> Iterator['str']: yield from self.ids def __eq__(self, other): return self.ids == other.ids def __len__(self): return len(self.ids)