Skip to content

Commit

Permalink
add size projections in distributed mode
Browse files Browse the repository at this point in the history
  • Loading branch information
JojiiOfficial committed Dec 10, 2024
1 parent 0f65d13 commit 4b8277d
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 65 deletions.
34 changes: 18 additions & 16 deletions lib/collection/src/collection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use tokio::sync::{Mutex, RwLock, RwLockWriteGuard};

use crate::collection::payload_index_schema::PayloadIndexSchema;
use crate::collection_state::{ShardInfo, State};
use crate::common::collection_size_stats::{
CollectionSizeAtomicStats, CollectionSizeStats, CollectionSizeStatsCache,
};
use crate::common::is_ready::IsReady;
use crate::common::local_data_stats::{LocalDataAtomicStats, LocalDataStats, LocalDataStatsCache};
use crate::config::CollectionConfigInternal;
use crate::operations::config_diff::{DiffConfig, OptimizersConfigDiff};
use crate::operations::shared_storage_config::SharedStorageConfig;
Expand Down Expand Up @@ -81,8 +83,8 @@ pub struct Collection {
// Search runtime handle.
search_runtime: Handle,
optimizer_cpu_budget: CpuBudget,
// Cached stats over all local shards used in strict mode, may be outdated
local_stats_cache: LocalDataStatsCache,
// Cached statistics of collection size, may be outdatetd.
collection_stats_cache: CollectionSizeStatsCache,
}

pub type RequestShardTransfer = Arc<dyn Fn(ShardTransfer) + Send + Sync>;
Expand Down Expand Up @@ -153,8 +155,8 @@ impl Collection {

let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));

let local_stats_cache = LocalDataStatsCache::new_with_values(
Self::calculate_local_shards_stats(&locked_shard_holder).await,
let collection_stats_cache = CollectionSizeStatsCache::new_with_values(
Self::estimate_collection_size_stats(&locked_shard_holder).await,
);

// Once the config is persisted - the collection is considered to be successfully created.
Expand Down Expand Up @@ -183,7 +185,7 @@ impl Collection {
update_runtime: update_runtime.unwrap_or_else(Handle::current),
search_runtime: search_runtime.unwrap_or_else(Handle::current),
optimizer_cpu_budget,
local_stats_cache,
collection_stats_cache,
})
}

Expand Down Expand Up @@ -271,8 +273,8 @@ impl Collection {

let locked_shard_holder = Arc::new(LockedShardHolder::new(shard_holder));

let local_stats_cache = LocalDataStatsCache::new_with_values(
Self::calculate_local_shards_stats(&locked_shard_holder).await,
let collection_stats_cache = CollectionSizeStatsCache::new_with_values(
Self::estimate_collection_size_stats(&locked_shard_holder).await,
);

Self {
Expand All @@ -297,7 +299,7 @@ impl Collection {
update_runtime: update_runtime.unwrap_or_else(Handle::current),
search_runtime: search_runtime.unwrap_or_else(Handle::current),
optimizer_cpu_budget,
local_stats_cache,
collection_stats_cache,
}
}

Expand Down Expand Up @@ -798,18 +800,18 @@ impl Collection {
self.shards_holder.read().await.trigger_optimizers().await;
}

async fn calculate_local_shards_stats(
async fn estimate_collection_size_stats(
shards_holder: &Arc<RwLock<ShardHolder>>,
) -> LocalDataStats {
) -> Option<CollectionSizeStats> {
let shard_lock = shards_holder.read().await;
shard_lock.calculate_local_shards_stats().await
shard_lock.estimate_collection_size_stats().await
}

/// Returns estimations of local shards statistics. This values are cached and might be not 100% up to date.
/// Returns estimations of collection sizes. This values are cached and might be not 100% up to date.
/// The cache gets updated every 32 calls.
pub async fn local_stats_estimations(&self) -> &LocalDataAtomicStats {
self.local_stats_cache
.get_or_update_cache(|| Self::calculate_local_shards_stats(&self.shards_holder))
pub(crate) async fn estimated_collection_stats(&self) -> Option<&CollectionSizeAtomicStats> {
self.collection_stats_cache
.get_or_update_cache(|| Self::estimate_collection_size_stats(&self.shards_holder))
.await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ const UPDATE_INTERVAL: usize = 32;

/// A cache for `LocalDataStats` utilizing `AtomicUsize` for better performance.
#[derive(Default)]
pub(crate) struct LocalDataStatsCache {
stats: LocalDataAtomicStats,
pub(crate) struct CollectionSizeStatsCache {
stats: Option<CollectionSizeAtomicStats>,

request_counter: AtomicUsize,
}

impl LocalDataStatsCache {
pub fn new_with_values(stats: LocalDataStats) -> Self {
let stats = LocalDataAtomicStats::new(stats);
impl CollectionSizeStatsCache {
pub fn new_with_values(stats: Option<CollectionSizeStats>) -> Self {
let stats = stats.map(CollectionSizeAtomicStats::new);
Self {
stats,
request_counter: AtomicUsize::new(1), // Prevent same data getting loaded a second time when doing the first request.
Expand All @@ -32,34 +33,36 @@ impl LocalDataStatsCache {
pub async fn get_or_update_cache<U>(
&self,
update_fn: impl FnOnce() -> U,
) -> &LocalDataAtomicStats
) -> Option<&CollectionSizeAtomicStats>
where
U: Future<Output = LocalDataStats>,
U: Future<Output = Option<CollectionSizeStats>>,
{
// Update if necessary
if self.check_need_update_and_increment() {
let updated = update_fn().await;
let updated = update_fn().await?;
self.update(updated);
}

// Give caller access to cached (inner) values which are always updated if required
&self.stats
self.stats.as_ref()
}

/// Sets all cache values to `new_stats`.
pub fn update(&self, new_stats: LocalDataStats) {
self.stats.update(new_stats)
pub fn update(&self, new_stats: CollectionSizeStats) {
if let Some(stats) = self.stats.as_ref() {
stats.update(new_stats)
}
}
}

/// Same as `LocalDataStats` but each value is atomic.
#[derive(Default)]
pub struct LocalDataAtomicStats {
pub(crate) struct CollectionSizeAtomicStats {
vector_storage_size: AtomicUsize,
payload_storage_size: AtomicUsize,
}

impl LocalDataAtomicStats {
impl CollectionSizeAtomicStats {
/// Get the vector storage size.
pub fn get_vector_storage_size(&self) -> usize {
self.vector_storage_size.load(Ordering::Relaxed)
Expand All @@ -70,26 +73,23 @@ impl LocalDataAtomicStats {
self.payload_storage_size.load(Ordering::Relaxed)
}

fn new(data: LocalDataStats) -> Self {
let LocalDataStats {
fn new(data: CollectionSizeStats) -> Self {
let CollectionSizeStats {
vector_storage_size,
payload_storage_size,
} = data;

let vector_storage_size = AtomicUsize::new(vector_storage_size);
let payload_storage_size = AtomicUsize::new(payload_storage_size);
Self {
vector_storage_size,
payload_storage_size,
vector_storage_size: AtomicUsize::new(vector_storage_size),
payload_storage_size: AtomicUsize::new(payload_storage_size),
}
}

fn update(&self, new_values: LocalDataStats) {
let LocalDataStats {
fn update(&self, new_stats: CollectionSizeStats) {
let CollectionSizeStats {
vector_storage_size,
payload_storage_size,
} = new_values;

} = new_stats;
self.vector_storage_size
.store(vector_storage_size, Ordering::Relaxed);
self.payload_storage_size
Expand All @@ -99,21 +99,36 @@ impl LocalDataAtomicStats {

/// Statistics for local data, like the size of vector storage.
#[derive(Clone, Copy, Default)]
pub struct LocalDataStats {
pub struct CollectionSizeStats {
/// Estimated amount of vector storage size.
pub vector_storage_size: usize,
/// Estimated amount of payload storage size.
pub payload_storage_size: usize,
}

impl LocalDataStats {
pub fn accumulate_from(&mut self, other: &Self) {
let LocalDataStats {
impl CollectionSizeStats {
pub(crate) fn accumulate_metrics_from(&mut self, other: &Self) {
let CollectionSizeStats {
vector_storage_size,
payload_storage_size,
} = other;

self.vector_storage_size += vector_storage_size;
self.payload_storage_size += payload_storage_size;
}

pub(crate) fn multiplied_with(self, factor: usize) -> Self {
let CollectionSizeStats {
mut vector_storage_size,
mut payload_storage_size,
} = self;

vector_storage_size *= factor;
payload_storage_size *= factor;

Self {
vector_storage_size,
payload_storage_size,
}
}
}
2 changes: 1 addition & 1 deletion lib/collection/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
pub mod batching;
pub mod collection_size_stats;
pub mod eta_calculator;
pub mod fetch_vectors;
pub mod file_utils;
pub mod is_ready;
pub mod local_data_stats;
pub mod retrieve_request_trait;
pub mod sha_256;
pub mod snapshot_stream;
Expand Down
15 changes: 9 additions & 6 deletions lib/collection/src/operations/verification/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use segment::types::{Filter, StrictModeConfig};

use super::{check_limit_opt, StrictModeVerification};
use crate::collection::Collection;
use crate::common::local_data_stats::LocalDataAtomicStats;
use crate::common::collection_size_stats::CollectionSizeAtomicStats;
use crate::operations::payload_ops::{DeletePayload, SetPayload};
use crate::operations::point_ops::PointsSelector;
use crate::operations::types::CollectionError;
Expand Down Expand Up @@ -64,8 +64,9 @@ impl StrictModeVerification for SetPayload {
) -> Result<(), CollectionError> {
if let Some(payload_size_limit_bytes) = strict_mode_config.max_collection_payload_size_bytes
{
let local_stats = collection.local_stats_estimations().await;
check_collection_payload_size_limit(payload_size_limit_bytes, local_stats)?;
if let Some(local_stats) = collection.estimated_collection_stats().await {
check_collection_payload_size_limit(payload_size_limit_bytes, local_stats)?;
}
}

Ok(())
Expand Down Expand Up @@ -203,7 +204,9 @@ async fn check_collection_size_limit(
return Ok(());
}

let stats = collection.local_stats_estimations().await;
let Some(stats) = collection.estimated_collection_stats().await else {
return Ok(());
};

if let Some(vector_storage_size_limit_bytes) = vector_limit {
check_collection_vector_size_limit(vector_storage_size_limit_bytes, stats)?;
Expand All @@ -219,7 +222,7 @@ async fn check_collection_size_limit(
/// Check collections vector storage size limit.
fn check_collection_vector_size_limit(
max_vec_storage_size_bytes: usize,
stats: &LocalDataAtomicStats,
stats: &CollectionSizeAtomicStats,
) -> Result<(), CollectionError> {
let vec_storage_size_bytes = stats.get_vector_storage_size();

Expand All @@ -236,7 +239,7 @@ fn check_collection_vector_size_limit(
/// Check collections payload storage size limit.
fn check_collection_payload_size_limit(
max_payload_storage_size_bytes: usize,
stats: &LocalDataAtomicStats,
stats: &CollectionSizeAtomicStats,
) -> Result<(), CollectionError> {
let payload_storage_size_bytes = stats.get_payload_storage_size();

Expand Down
19 changes: 11 additions & 8 deletions lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::remote_shard::RemoteShard;
use super::transfer::ShardTransfer;
use super::CollectionId;
use crate::collection::payload_index_schema::PayloadIndexSchema;
use crate::common::local_data_stats::LocalDataStats;
use crate::common::collection_size_stats::CollectionSizeStats;
use crate::common::snapshots_manager::SnapshotStorageManager;
use crate::config::CollectionConfigInternal;
use crate::operations::point_ops::{self};
Expand Down Expand Up @@ -345,6 +345,10 @@ impl ShardReplicaSet {
self.replica_state.read().this_peer_id
}

pub async fn has_remote_shard(&self) -> bool {
!self.remotes.read().await.is_empty()
}

pub async fn has_local_shard(&self) -> bool {
self.local.read().await.is_some()
}
Expand Down Expand Up @@ -1020,10 +1024,9 @@ impl ShardReplicaSet {
true
}

/// Returns the estimated size of all locally stored vectors in bytes.
/// Locks and iterates over all segments.
/// Cache this value in performance critical scenarios!
pub(crate) async fn calculate_local_shards_stats(&self) -> LocalDataStats {
/// Returns the estimated size of all local segments.
/// Since this locks all segments you should cache this value in performance critical scenarios!
pub(crate) async fn calculate_local_shard_stats(&self) -> Option<CollectionSizeStats> {
self.local
.read()
.await
Expand All @@ -1039,15 +1042,15 @@ impl ShardReplicaSet {
total_payload_size += size_info.payloads_size_bytes;
}

LocalDataStats {
Some(CollectionSizeStats {
vector_storage_size: total_vector_size,
payload_storage_size: total_payload_size,
}
})
}
Shard::Proxy(_)
| Shard::ForwardProxy(_)
| Shard::QueueProxy(_)
| Shard::Dummy(_) => LocalDataStats::default(),
| Shard::Dummy(_) => None,
})
.unwrap_or_default()
}
Expand Down
Loading

0 comments on commit 4b8277d

Please sign in to comment.