[Cleanup] Combine Batched and Regular KMeans Impl#2015
[Cleanup] Combine Batched and Regular KMeans Impl#2015tarang-jain wants to merge 53 commits intorapidsai:mainfrom
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
viclafargue
left a comment
There was a problem hiding this comment.
Thanks! Here are some comments.
|
|
||
| auto minClusterAndDistance = raft::make_device_vector<raft::KeyValuePair<IndexT, DataT>, IndexT>( | ||
| handle, streaming_batch_size); | ||
| auto L2NormBatch = raft::make_device_vector<DataT, IndexT>(handle, streaming_batch_size); |
There was a problem hiding this comment.
pams.streaming_batch_size = 0 by default in the data on device case, but nothing prevent a user from setting a value. This would allocate a smaller than n_samples L2NormBatch which would cause OOB writes (and later reads) during norm computation.
We should probably guard this with a check :
RAFT_EXPECTS(streaming_batch_size == n_samples || !data_on_device, ...)
There was a problem hiding this comment.
I have updated this so that for device arrays, we simply ignore the streaming_batch_size and use the entire dataset always.
| auto init_sample = | ||
| raft::make_device_matrix<DataT, IndexT>(handle, init_sample_size, n_features); | ||
| raft::matrix::sample_rows(handle, random_state, X, init_sample.view()); |
There was a problem hiding this comment.
| auto init_sample = | |
| raft::make_device_matrix<DataT, IndexT>(handle, init_sample_size, n_features); | |
| raft::matrix::sample_rows(handle, random_state, X, init_sample.view()); | |
| if (init_sample_size == n_samples && data_on_device) { | |
| auto init_sample_const = raft::make_device_matrix_view<const DataT, IndexT>(X.data_handle(), n_samples, n_features); | |
| // pass directly to kmeansPlusPlus / initScalableKMeansPlusPlus | |
| } else { | |
| auto init_sample = raft::make_device_matrix<DataT, IndexT>(handle, init_sample_size, n_features); | |
| raft::matrix::sample_rows(handle, random_state, X, init_sample.view()); | |
| // pass init_sample to kmeansPlusPlus / initScalableKMeansPlusPlus | |
| } |
If init_size = 0 in the data on device path, we basically double memory use by copying the dataset over. Let's skip this by creating a view on the dataset.
There was a problem hiding this comment.
I completely skipped the sampling for the device path. That is how it was being done earlier. The init size is only used if the data is on host.
| auto batch_workspace = rmm::device_uvector<char>( | ||
| current_batch_sz, stream, raft::resource::get_workspace_resource(handle)); |
There was a problem hiding this comment.
Every call to process_batch allocates both this workspace and the device scalar below. Both buffers could be instantiated out of the process_batch function.
There was a problem hiding this comment.
moved the workspace buffer allocation outside the process_batch function.
| raft::matrix::sample_rows(handle, random_state, X, centroidsRawData); | ||
| } else if (iter_params.init == cuvs::cluster::kmeans::params::InitMethod::KMeansPlusPlus) { | ||
| IndexT default_init_size = | ||
| data_on_device ? n_samples : std::min(static_cast<IndexT>(3 * n_clusters), n_samples); |
There was a problem hiding this comment.
Unlikely to be an actual issue, but n_clusters could be casted before the multiplication to avoid any risk of integer overflow.
There was a problem hiding this comment.
I have gotten rid of the batching for the device path. So when a user sets a batch size for device mdspan, we just set it to n_samples and warn the user. We should definitely not be creating a new buffer just for the init sample if we can accommodate the entire input matrix on device already.
| DataT curClusteringCost = DataT{0}; | ||
| raft::copy(&curClusteringCost, clustering_cost.data_handle(), 1, stream); | ||
| raft::resource::sync_stream(handle, stream); | ||
|
|
||
| if (curClusteringCost == DataT{0}) { | ||
| RAFT_LOG_WARN("Zero clustering cost detected: all points coincide with their centroids."); |
There was a problem hiding this comment.
Going from ASSERT to RAFT_LOG_WARN may indeed be useful for the spectral clustering case. However, removing the inertia_check option forces the sync at every iteration. Do we truly need to drop this option?
There was a problem hiding this comment.
I'm not sure we need the log and an assert might be better?
Early stopping (aka skipping iterations) is ultimately going to be the best way to extract perf here. Whether it's by explicitly computing inertia or just looking at the residuals of the centroids from the prior iteration.
Seems like inertia check / residuals could be done on gpu if we had to in order to avoid syncing so we would only need to sync in the final iteration, right?
There was a problem hiding this comment.
Seems like inertia check / residuals could be done on gpu if we had to in order to avoid syncing so we would only need to sync in the final iteration, right?
Until the iteration has completed, the CPU should not start the next iteration. So all the operations on the GPU stream must complete to finish the iteration.
There was a problem hiding this comment.
Seems like inertia check / residuals could be done on gpu if we had to in order to avoid syncing so we would only need to sync in the final iteration, right?
Yes, but this is throwing an error in the spectral clustering case wherein all the points converge on the centroids themselves. This is happening in one of the spectral tests and an assertion here is leading to an error, where instead it should simply return those centroids directly.
There was a problem hiding this comment.
I'm not sure we need the log and an assert might be better?
Therefore, I had to change it to a warning instead of an assertion. Earlier those spectral tests were skipping the inertia check which was avoiding the assertion.
| } else { | ||
| std::vector<DataT> h_weights(n_samples); | ||
| auto d_view = raft::make_device_vector_view<const DataT, IndexT>(weight_ptr, n_samples); | ||
| auto h_view = raft::make_host_vector_view<DataT, IndexT>(h_weights.data(), n_samples); | ||
| raft::copy(handle, h_view, d_view); | ||
| raft::resource::sync_stream(handle); | ||
| for (IndexT i = 0; i < n_samples; ++i) { | ||
| wt_sum += h_weights[i]; | ||
| } |
There was a problem hiding this comment.
In the data on device case since the data is already on device it would be much faster to sumreduce thanks to cub::DeviceReduce::Sum or raft::linalg::reduce. The summation would also have better precision since it is done in a tree fashion O(log N).
There was a problem hiding this comment.
When its device accessible, I have changed that to a raft::linalg::mapThenSumReduce. I have also removed this function and directly updated checkWeight (changed its name to weightSum). We do the scaling after the weight sum is computed.
| @@ -33,9 +33,10 @@ cdef extern from "cuvs/cluster/kmeans.h" nogil: | |||
| int batch_samples, | |||
| int batch_centroids, | |||
| bool inertia_check, | |||
There was a problem hiding this comment.
Maybe add a comment saying the field is present but deprecated.
There was a problem hiding this comment.
I dont think its necessary to add a comment here in the .pxd. The C header already has that information. And this file will be updated along with the C headers / src files.
|
|
||
| for (n_current_iter = 1; n_current_iter <= iter_params.max_iter; ++n_current_iter) { | ||
| if (n_current_iter > 1) { | ||
| RAFT_CUDA_TRY(cudaEventSynchronize(convergence_event)); |
There was a problem hiding this comment.
The convergence event is recorded right after the inertia is computed, but we check it here before proceeding to the next iteration so that the operations after the convergence check are not blocked. The use of an event also means that only the operations up until the event need to be completed in order for it to be synchronized.
|
/ok to test b1c034e |
|
/ok to test 73293cf |
dantegd
left a comment
There was a problem hiding this comment.
Just had a minor suggestion, not blocking
| /** | ||
| * If true, check inertia during iterations for early convergence. | ||
| * Number of samples to randomly draw for the KMeansPlusPlus initialization | ||
| * step. A random subset of this size is used for centroid seeding. | ||
| * When set to 0 the default depends on the data location: | ||
| * - Device data: n_samples (use the full dataset). | ||
| * - Host data: min(3 * n_clusters, n_samples). | ||
| * Default: 0. | ||
| */ | ||
| bool inertia_check = false; | ||
| int64_t init_size = 0; |
There was a problem hiding this comment.
I soflty agree with this rabbit comment, would be worth to add a not in the PR description and code, C++ callers using the C++ API directly will fail to compile, not warn. That's a real source-API break worth a release-notes line at least
|
@dantegd I have updated the PR desc. Since this PR is marked as |
| cluster_centers, impl->n_lists(), impl->dim()); | ||
| if (impl->metric() == distance::DistanceType::CosineExpanded) { | ||
| raft::linalg::row_normalize<raft::linalg::L2Norm>(handle, centers_const_view, centers_view); | ||
| } |
There was a problem hiding this comment.
This normalization was lost in #2001. So this PR adds it again.
Depends on rapidsai/cuvs#2015. Inertia checking is being made mandatory and rapidsai/cuvs#2015 is a breaking change. This PR is needed to prevent compilation failures. Authors: - Tarang Jain (https://github.com/tarang-jain) Approvers: - Jim Crist-Harif (https://github.com/jcrist) - Anupam (https://github.com/aamijar) - Victor Lafargue (https://github.com/viclafargue) URL: #8033
Combine batched and regular k-means implementations
fitinto a singlekmeans_fittemplate that works with both host and device mdspans viabatch_load_iteratorinit_centroidsinertia_checkparameter — inertia-based convergence checking now always runs. Zero clustering cost (perfect fit) logs a warning instead of asserting. This is needed because spectral clustering can cause all points to converge on the cluster centroids itself.init_sizeparameter to control how many samples are drawn for KMeansPlusPlus initialization. Defaults ton_samplesfor device data,(3 * n_clusters)for host dataraft::copywithstd::swapof buffer pointersprocess_batchno longer computes norms internallycudaPointerGetAttributescall withraft::memory_type_from_pointercub::DeviceReduce::Sumcalls withraft::linalg::mapThenSumReduce(w / wt_sum) * n_samplesvia a composed op instead of precomputing a scale, so very smallwt_sumvalues don't produce infcheckWeighttoweightSumand made it mdspan-based with anAccessortemplate: device reduce for device weights, host loop for host weights. Callers apply the scaling themselvesbatch_sums/batch_countsscratch buffers by accumulating directly intocentroid_sums/weight_per_clusterviareset_sums=falseinreduce_rows_by_key/reduce_cols_by_key, removing two per-batchraft::linalg::addkernelsupdate_centroidshelpers (both thedetailand public template) — no remaining callers after thefit_mainconsolidationraft::sync_streamcalls and add a CUDA Event to record if the convergence criteria is met. Convergence check is now done on device. Average per-iteration time with mandatory inertia check now matches previous benchmarks even when previously inertia check was disabled.C Tests
This PR adds C tests for KMeans. These were missing. Here we test both -- the old version and the new (i.e. breaking change).
Benchmarks:
With mandatory early stopping. Batch size is such that we fill up 90% of available GPU memory (95830MiB)
HW:
GPU:
NVIDIA H100 NVL (CUDA 13.0)CPU:
Breaking Change
This PR is a breaking change of the C++ API because the
inertia_checkparam is removed. The breaking changes to the C ABI will be applied in 26.08