mirror of
https://github.com/placeholder-soft/chroma.git
synced 2026-04-29 12:24:58 +08:00
Add LRU cache FD management (#891)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Adds an LRU cache for segment file handle management to make sure we respect OS file handle limits - New functionality - ... ## Test plan Added a stress test which creates many collections and forces them to open index files by writing to them ## Documentation Changes We should add a troubleshooting section about filesystem limits so that users can set max limits more appropriately - similar to https://www.mongodb.com/docs/manual/reference/ulimit/
This commit is contained in:
24
.github/workflows/chroma-test.yml
vendored
24
.github/workflows/chroma-test.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
matrix:
|
||||
python: ['3.7', '3.8', '3.9', '3.10']
|
||||
platform: [ubuntu-latest, windows-latest]
|
||||
testfile: ["--ignore-glob 'chromadb/test/property/*'",
|
||||
testfile: ["--ignore-glob 'chromadb/test/property/*' --ignore-glob 'chromadb/test/stress/*'",
|
||||
"chromadb/test/property/test_add.py",
|
||||
"chromadb/test/property/test_collections.py",
|
||||
"chromadb/test/property/test_cross_version_persist.py",
|
||||
@@ -39,3 +39,25 @@ jobs:
|
||||
if: runner.os == 'Windows'
|
||||
- name: Test
|
||||
run: python -m pytest ${{ matrix.testfile }}
|
||||
stress-test:
|
||||
timeout-minutes: 90
|
||||
strategy:
|
||||
matrix:
|
||||
python: ['3.7']
|
||||
platform: ['16core-64gb-ubuntu-latest', '16core-64gb-windows-latest']
|
||||
testfile: ["'chromadb/test/stress/'"]
|
||||
runs-on: ${{ matrix.platform }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: Set up Python ${{ matrix.python }}
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: ${{ matrix.python }}
|
||||
- name: Install test dependencies
|
||||
run: python -m pip install -r requirements.txt && python -m pip install -r requirements_dev.txt
|
||||
- name: Upgrade SQLite
|
||||
run: python bin/windows_upgrade_sqlite.py
|
||||
if: runner.os == 'Windows'
|
||||
- name: Test
|
||||
run: python -m pytest ${{ matrix.testfile }}
|
||||
|
||||
@@ -10,10 +10,21 @@ from chromadb.config import System, get_class
|
||||
from chromadb.db.system import SysDB
|
||||
from overrides import override
|
||||
from enum import Enum
|
||||
from chromadb.segment.impl.vector.local_persistent_hnsw import (
|
||||
PersistentLocalHnswSegment,
|
||||
)
|
||||
from chromadb.types import Collection, Operation, Segment, SegmentScope, Metadata
|
||||
from typing import Dict, Type, Sequence, Optional, cast
|
||||
from uuid import UUID, uuid4
|
||||
from collections import defaultdict
|
||||
import platform
|
||||
|
||||
from chromadb.utils.lru_cache import LRUCache
|
||||
|
||||
if platform.system() != "Windows":
|
||||
import resource
|
||||
elif platform.system() == "Windows":
|
||||
import ctypes
|
||||
|
||||
|
||||
class SegmentType(Enum):
|
||||
@@ -33,9 +44,15 @@ class LocalSegmentManager(SegmentManager):
|
||||
_sysdb: SysDB
|
||||
_system: System
|
||||
_instances: Dict[UUID, SegmentImplementation]
|
||||
_segment_cache: Dict[UUID, Dict[SegmentScope, Segment]]
|
||||
_vector_instances_file_handle_cache: LRUCache[
|
||||
UUID, PersistentLocalHnswSegment
|
||||
] # LRU cache to manage file handles across vector segment instances
|
||||
_segment_cache: Dict[
|
||||
UUID, Dict[SegmentScope, Segment]
|
||||
] # Tracks which segments are loaded for a given collection
|
||||
_vector_segment_type: SegmentType = SegmentType.HNSW_LOCAL_MEMORY
|
||||
_lock: Lock
|
||||
_max_file_handles: int
|
||||
|
||||
def __init__(self, system: System):
|
||||
super().__init__(system)
|
||||
@@ -47,6 +64,17 @@ class LocalSegmentManager(SegmentManager):
|
||||
|
||||
if self._system.settings.require("is_persistent"):
|
||||
self._vector_segment_type = SegmentType.HNSW_LOCAL_PERSISTED
|
||||
if platform.system() != "Windows":
|
||||
self._max_file_handles = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
|
||||
else:
|
||||
self._max_file_handles = ctypes.windll.msvcrt._getmaxstdio() # type: ignore
|
||||
segment_limit = (
|
||||
self._max_file_handles
|
||||
// PersistentLocalHnswSegment.get_file_handle_count()
|
||||
)
|
||||
self._vector_instances_file_handle_cache = LRUCache(
|
||||
segment_limit, callback=lambda _, v: v.close_persistent_index()
|
||||
)
|
||||
|
||||
@override
|
||||
def start(self) -> None:
|
||||
@@ -118,7 +146,13 @@ class LocalSegmentManager(SegmentManager):
|
||||
# segments for the given collection.
|
||||
for type in [MetadataReader, VectorReader]:
|
||||
# Just use get_segment to load the segment into the cache
|
||||
self.get_segment(collection_id, type)
|
||||
instance = self.get_segment(collection_id, type)
|
||||
# If the segment is a vector segment, we need to keep segments in an LRU cache
|
||||
# to avoid hitting the OS file handle limit.
|
||||
if type == VectorReader and self._system.settings.require("is_persistent"):
|
||||
instance = cast(PersistentLocalHnswSegment, instance)
|
||||
instance.open_persistent_index()
|
||||
self._vector_instances_file_handle_cache.set(collection_id, instance)
|
||||
|
||||
def _cls(self, segment: Segment) -> Type[SegmentImplementation]:
|
||||
classname = SEGMENT_TYPE_IMPLS[SegmentType(segment["type"])]
|
||||
|
||||
@@ -377,3 +377,21 @@ class PersistentLocalHnswSegment(LocalHnswSegment):
|
||||
)
|
||||
results.append(curr_results)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def get_file_handle_count() -> int:
|
||||
"""Return how many file handles are used by the index"""
|
||||
hnswlib_count = hnswlib.Index.file_handle_count
|
||||
hnswlib_count = cast(int, hnswlib_count)
|
||||
# One extra for the metadata file
|
||||
return hnswlib_count + 1
|
||||
|
||||
def open_persistent_index(self) -> None:
|
||||
"""Open the persistent index"""
|
||||
if self._index is not None:
|
||||
self._index.open_file_handles()
|
||||
|
||||
def close_persistent_index(self) -> None:
|
||||
"""Close the persistent index"""
|
||||
if self._index is not None:
|
||||
self._index.close_file_handles()
|
||||
|
||||
37
chromadb/test/stress/test_many_collections.py
Normal file
37
chromadb/test/stress/test_many_collections.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from typing import List
|
||||
import numpy as np
|
||||
|
||||
from chromadb.api import API
|
||||
from chromadb.api.models.Collection import Collection
|
||||
|
||||
|
||||
def test_many_collections(api: API) -> None:
|
||||
"""Test that we can create a large number of collections and that the system
|
||||
# remains responsive."""
|
||||
api.reset()
|
||||
|
||||
N = 10
|
||||
D = 10
|
||||
|
||||
metadata = None
|
||||
if api.get_settings().is_persistent:
|
||||
metadata = {"hnsw:batch_size": 3, "hnsw:sync_threshold": 3}
|
||||
else:
|
||||
# We only want to test persistent configurations in this way, since the main
|
||||
# point is to test the file handle limit
|
||||
return
|
||||
|
||||
num_collections = 10000
|
||||
collections: List[Collection] = []
|
||||
for i in range(num_collections):
|
||||
new_collection = api.create_collection(
|
||||
f"test_collection_{i}",
|
||||
metadata=metadata,
|
||||
)
|
||||
collections.append(new_collection)
|
||||
|
||||
# Add a few embeddings to each collection
|
||||
data = np.random.rand(N, D).tolist()
|
||||
ids = [f"test_id_{i}" for i in range(N)]
|
||||
for i in range(num_collections):
|
||||
collections[i].add(ids, data)
|
||||
32
chromadb/utils/lru_cache.py
Normal file
32
chromadb/utils/lru_cache.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Callable, Generic, Optional, TypeVar
|
||||
|
||||
|
||||
K = TypeVar("K")
|
||||
V = TypeVar("V")
|
||||
|
||||
|
||||
class LRUCache(Generic[K, V]):
|
||||
"""A simple LRU cache implementation, based on the OrderedDict class, which allows
|
||||
for a callback to be invoked when an item is evicted from the cache."""
|
||||
|
||||
def __init__(self, capacity: int, callback: Optional[Callable[[K, V], Any]] = None):
|
||||
self.capacity = capacity
|
||||
self.cache: OrderedDict[K, V] = OrderedDict()
|
||||
self.callback = callback
|
||||
|
||||
def get(self, key: K) -> Optional[V]:
|
||||
if key not in self.cache:
|
||||
return None
|
||||
value = self.cache.pop(key)
|
||||
self.cache[key] = value
|
||||
return value
|
||||
|
||||
def set(self, key: K, value: V) -> None:
|
||||
if key in self.cache:
|
||||
self.cache.pop(key)
|
||||
elif len(self.cache) == self.capacity:
|
||||
evicted_key, evicted_value = self.cache.popitem(last=False)
|
||||
if self.callback:
|
||||
self.callback(evicted_key, evicted_value)
|
||||
self.cache[key] = value
|
||||
@@ -18,7 +18,7 @@ dependencies = [
|
||||
'pandas >= 1.3',
|
||||
'requests >= 2.28',
|
||||
'pydantic>=1.9,<2.0',
|
||||
'chroma-hnswlib==0.7.1',
|
||||
'chroma-hnswlib==0.7.2',
|
||||
'fastapi>=0.95.2, <0.100.0',
|
||||
'uvicorn[standard] >= 0.18.3',
|
||||
'numpy >= 1.21.6',
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
chroma-hnswlib==0.7.1
|
||||
chroma-hnswlib==0.7.2
|
||||
fastapi>=0.95.2, <0.100.0
|
||||
graphlib_backport==1.0.3; python_version < '3.9'
|
||||
importlib-resources
|
||||
|
||||
Reference in New Issue
Block a user