Skip to content

Commit

Permalink
Add KMeans sparsity support (uxlfoundation#2666)
Browse files Browse the repository at this point in the history
* Add KMeans sparsity support

* Remove untracked files

* Remove untracked binary files

* Remove untracked binary files

* Apply PR suggestions

* Fix init_centroids stage in DAAL

* Fix spmd cases

* Apply clang format

* Update copyright headers and add releaseSparseBlock to prevent memory leak
  • Loading branch information
inteldimitrius authored Feb 28, 2024
1 parent d724f38 commit 4ddbada
Show file tree
Hide file tree
Showing 22 changed files with 1,433 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ template class BatchContainer<DAAL_FPTYPE, lloydCSR, DAAL_CPU>;
}
namespace internal
{
template class KMeansBatchKernel<lloydCSR, DAAL_FPTYPE, DAAL_CPU>;
template class DAAL_EXPORT KMeansBatchKernel<lloydCSR, DAAL_FPTYPE, DAAL_CPU>;
} // namespace internal
} // namespace kmeans
} // namespace algorithms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ template class BatchContainer<DAAL_FPTYPE, plusPlusCSR, DAAL_CPU>;
}
namespace internal
{
template class KMeansInitKernel<plusPlusCSR, DAAL_FPTYPE, DAAL_CPU>;
template class DAAL_EXPORT KMeansInitKernel<plusPlusCSR, DAAL_FPTYPE, DAAL_CPU>;
} // namespace internal
} // namespace init
} // namespace kmeans
Expand Down
43 changes: 26 additions & 17 deletions cpp/daal/src/algorithms/kmeans/kmeans_plusplus_init_impl.i
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,27 @@ template <typename algorithmFPType, CpuType cpu>
class DataHelperCSR
{
public:
typedef BlockHelperCSR<algorithmFPType, cpu, CSRNumericTableIface> BlockHelperType;
typedef BlockHelperCSR<algorithmFPType, cpu, CSRNumericTable> BlockHelperType;

DataHelperCSR(NumericTable * ntData)
: dim(ntData->getNumberOfColumns()), nRows(ntData->getNumberOfRows()), _nt(ntData), _csr(dynamic_cast<CSRNumericTableIface *>(ntData))
: dim(ntData->getNumberOfColumns()), nRows(ntData->getNumberOfRows()), _nt(ntData), _csr(dynamic_cast<CSRNumericTable *>(ntData))
{}
NumericTable * nt() const { return _nt; }
CSRNumericTableIface * ntIface() const { return _csr; }
CSRNumericTable * ntIface() const { return _csr; }

Status updateMinDistInBlock(algorithmFPType * const minDistAccTrials, size_t nBlock, size_t iBlock, size_t nTrials, size_t iBestTrial,
const algorithmFPType * aWeights, const algorithmFPType * const pLastAddedCenter, algorithmFPType * const aMinDist)
{
const size_t iStartRow = iBlock * _nRowsInBlock; //start row
const size_t nRowsToProcess = (iBlock == nBlock - 1) ? nRows - iBlock * _nRowsInBlock : _nRowsInBlock; //rows to process

ReadRowsCSR<algorithmFPType, cpu> ntDataBD(_csr, iStartRow, nRowsToProcess);
DAAL_CHECK_BLOCK_STATUS(ntDataBD);
const algorithmFPType * const pData = ntDataBD.values();
const size_t * const colIdx = ntDataBD.cols();
const size_t * const rowIdx = ntDataBD.rows();
// TODO: Better to use ReadRowsCSR, but there is a bug related to static library linking.
// Fixme when ReadRowsCSR will be fixed.
daal::data_management::CSRBlockDescriptor<algorithmFPType> block;
_csr->getSparseBlock(iStartRow, nRowsToProcess, daal::data_management::readOnly, block);
const auto pData = block.getBlockValuesPtr();
const auto colIdx = block.getBlockColumnIndicesPtr();
const auto rowIdx = block.getBlockRowIndicesPtr();

algorithmFPType * const pDistSqBest = &aMinDist[iBestTrial * nRows + iStartRow];
const algorithmFPType * const weights = aWeights ? &aWeights[iStartRow] : nullptr;
Expand All @@ -282,7 +284,7 @@ public:
minDistAccTrials[iBestTrial * nBlock + iBlock] =
updateMinDistForITrials(pDistSqBest, iBestTrial, nRowsToProcess, pData, colIdx, rowIdx, pLastAddedCenter, weights, pDistSqBest);

return Status();
return _csr->releaseSparseBlock(block);
}

algorithmFPType updateMinDistForITrials(algorithmFPType * const pDistSq, size_t iTrials, size_t nRowsToProcess,
Expand Down Expand Up @@ -316,19 +318,25 @@ public:
//of the data in this row
algorithmFPType copyOneRowCalcSumSq(size_t iRow, algorithmFPType * pDst) const
{
ReadRowsCSR<algorithmFPType, cpu> ntDataBD(_csr, iRow, 1);
const algorithmFPType * pData = ntDataBD.values();
const size_t * colIdx = ntDataBD.cols();
const size_t * rowIdx = ntDataBD.rows();
// TODO: Better to use ReadRowsCSR, but there is a bug related to static library linking.
// Fixme when ReadRowsCSR will be fixed.
daal::data_management::CSRBlockDescriptor<algorithmFPType> block;
_csr->getSparseBlock(iRow, 1, daal::data_management::readOnly, block);
const auto pData = block.getBlockValuesPtr();
const auto colIdx = block.getBlockColumnIndicesPtr();
const auto rowIdx = block.getBlockRowIndicesPtr();

daal::services::internal::service_memset<algorithmFPType, cpu>(pDst, algorithmFPType(0.), dim);
algorithmFPType res(0.);
const size_t nValues = rowIdx[1] - rowIdx[0];
for (size_t i = 0; i < nValues; ++i, ++pData, ++colIdx)
for (size_t i = 0; i < nValues; ++i)
{
res += (*pData) * (*pData);
pDst[(*colIdx) - 1] = *pData;
const auto val = pData[i];
res += val * val;
const auto colIndex = colIdx[i];
pDst[colIndex - 1] = val;
}
_csr->releaseSparseBlock(block);
return res;
}

Expand All @@ -338,7 +346,7 @@ public:

protected:
NumericTable * _nt;
CSRNumericTableIface * _csr;
CSRNumericTable * _csr;
};

//Base task class for kmeans++ and kmeans||
Expand Down Expand Up @@ -546,6 +554,7 @@ Status TaskPlusPlusBatch<algorithmFPType, cpu, DataHelper>::run()
//copy it to the result
status |= this->copyPoints(&clusters[iCluster * this->_data.dim], &this->_lastAddedCenter[this->_trialBest * this->_data.dim], 1u);
}

return status;
}

Expand Down
53 changes: 36 additions & 17 deletions cpp/oneapi/dal/algo/kmeans/backend/cpu/infer_kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@ using descriptor_t = detail::descriptor_base<task::clustering>;
namespace daal_kmeans = daal::algorithms::kmeans;
namespace interop = dal::backend::interop;

template <typename Float, daal::CpuType Cpu>
using daal_kmeans_lloyd_dense_kernel_t =
daal_kmeans::internal::KMeansBatchKernel<daal_kmeans::lloydDense, Float, Cpu>;
template <daal_kmeans::Method Value>
using daal_method_constant = std::integral_constant<daal_kmeans::Method, Value>;

template <typename Method>
struct to_daal_method;

template <>
struct to_daal_method<method::lloyd_dense> : daal_method_constant<daal_kmeans::lloydDense> {};

template <>
struct to_daal_method<method::lloyd_csr> : daal_method_constant<daal_kmeans::lloydCSR> {};

template <typename Float, daal::CpuType Cpu, typename Method>
using batch_kernel_t =
daal_kmeans::internal::KMeansBatchKernel<to_daal_method<Method>::value, Float, Cpu>;

inline auto get_daal_parameter_to_infer(const descriptor_t& desc) {
const std::int64_t max_iteration_count = 0;
Expand All @@ -55,11 +67,11 @@ inline auto get_daal_parameter_to_infer(const descriptor_t& desc) {
return parameter;
}

template <typename Float, typename Task>
template <typename Float, typename Task, typename Method, typename Table>
static infer_result<Task> call_daal_kernel(const context_cpu& ctx,
const descriptor_t& desc,
const model<Task>& trained_model,
const table& data) {
const Table& data) {
const std::int64_t row_count = data.get_row_count();

auto result = infer_result<Task>{}.set_result_options(desc.get_result_options());
Expand All @@ -84,11 +96,13 @@ static infer_result<Task> call_daal_kernel(const context_cpu& ctx,
daal_objective_function_value.get(),
nullptr };

interop::status_to_exception(
interop::call_daal_kernel<Float, daal_kmeans_lloyd_dense_kernel_t>(ctx,
input,
output,
&par));
interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) {
return batch_kernel_t<Float,
oneapi::dal::backend::interop::to_daal_cpu_type<decltype(cpu)>::value,
Method>()
.compute(input, output, &par);
}));

if (desc.get_result_options().test(result_options::compute_assignments)) {
result.set_responses(
dal::detail::homogen_table_builder{}.reset(arr_responses, row_count, 1).build());
Expand All @@ -101,23 +115,28 @@ static infer_result<Task> call_daal_kernel(const context_cpu& ctx,
return result;
}

template <typename Float, typename Task>
template <typename Float, typename Task, typename Method>
static infer_result<Task> infer(const context_cpu& ctx,
const descriptor_t& desc,
const infer_input<Task>& input) {
return call_daal_kernel<Float, Task>(ctx, desc, input.get_model(), input.get_data());
using table_type =
std::conditional_t<std::is_same_v<Method, method::lloyd_csr>, csr_table, table>;
const auto data = static_cast<table_type>(input.get_data());
return call_daal_kernel<Float, Task, Method>(ctx, desc, input.get_model(), data);
}

template <typename Float>
struct infer_kernel_cpu<Float, method::by_default, task::clustering> {
template <typename Float, typename Method>
struct infer_kernel_cpu<Float, Method, task::clustering> {
infer_result<task::clustering> operator()(const context_cpu& ctx,
const descriptor_t& desc,
const infer_input<task::clustering>& input) const {
return infer<Float, task::clustering>(ctx, desc, input);
return infer<Float, task::clustering, Method>(ctx, desc, input);
}
};

template struct infer_kernel_cpu<float, method::by_default, task::clustering>;
template struct infer_kernel_cpu<double, method::by_default, task::clustering>;
template struct infer_kernel_cpu<float, method::lloyd_csr, task::clustering>;
template struct infer_kernel_cpu<double, method::lloyd_csr, task::clustering>;
template struct infer_kernel_cpu<float, method::lloyd_dense, task::clustering>;
template struct infer_kernel_cpu<double, method::lloyd_dense, task::clustering>;

} // namespace oneapi::dal::kmeans::backend
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <daal/src/algorithms/kmeans/kmeans_lloyd_kernel.h>

#include "oneapi/dal/algo/kmeans/backend/cpu/train_kernel.hpp"
#include "oneapi/dal/algo/kmeans/detail/train_init_centroids.hpp"
#include "oneapi/dal/backend/interop/common.hpp"
#include "oneapi/dal/backend/interop/error_converter.hpp"
#include "oneapi/dal/backend/interop/table_conversion.hpp"
Expand All @@ -32,50 +33,34 @@ using dal::backend::context_cpu;
using descriptor_t = detail::descriptor_base<task::clustering>;

namespace daal_kmeans = daal::algorithms::kmeans;
namespace daal_kmeans_init = daal::algorithms::kmeans::init;
namespace interop = dal::backend::interop;

template <typename Float, daal::CpuType Cpu>
using daal_kmeans_lloyd_dense_kernel_t =
daal_kmeans::internal::KMeansBatchKernel<daal_kmeans::lloydDense, Float, Cpu>;
template <daal_kmeans::Method Value>
using daal_method_constant = std::integral_constant<daal_kmeans::Method, Value>;

template <typename Float, daal::CpuType Cpu>
using daal_kmeans_init_plus_plus_dense_kernel_t =
daal_kmeans_init::internal::KMeansInitKernel<daal_kmeans_init::plusPlusDense, Float, Cpu>;
template <typename Method>
struct to_daal_method;

template <typename Float>
template <>
struct to_daal_method<method::lloyd_dense> : daal_method_constant<daal_kmeans::lloydDense> {};

template <>
struct to_daal_method<method::lloyd_csr> : daal_method_constant<daal_kmeans::lloydCSR> {};

template <typename Float, daal::CpuType Cpu, typename Method>
using batch_kernel_t =
daal_kmeans::internal::KMeansBatchKernel<to_daal_method<Method>::value, Float, Cpu>;

template <typename Float, typename Method, typename Table>
static daal::data_management::NumericTablePtr get_initial_centroids(
const context_cpu& ctx,
const descriptor_t& desc,
const table& data,
const Table& data,
const table& initial_centroids) {
const std::int64_t column_count = data.get_column_count();
const std::int64_t cluster_count = desc.get_cluster_count();

daal::data_management::NumericTablePtr daal_initial_centroids;
if (!initial_centroids.has_data()) {
const auto daal_data = interop::convert_to_daal_table<Float>(data);
daal_kmeans_init::Parameter par(dal::detail::integral_cast<std::size_t>(cluster_count));

const std::size_t init_len_input = 1;
daal::data_management::NumericTable* init_input[init_len_input] = { daal_data.get() };

daal_initial_centroids =
interop::allocate_daal_homogen_table<Float>(cluster_count, column_count);
const std::size_t init_len_output = 1;
daal::data_management::NumericTable* init_output[init_len_output] = {
daal_initial_centroids.get()
};

interop::status_to_exception(
interop::call_daal_kernel<Float, daal_kmeans_init_plus_plus_dense_kernel_t>(
ctx,
init_len_input,
init_input,
init_len_output,
init_output,
&par,
*(par.engine)));
oneapi::dal::kmeans::detail::daal_generate_centroids<Float, Method>(desc, data);
}
else {
daal_initial_centroids = interop::convert_to_daal_table<Float>(initial_centroids);
Expand All @@ -96,18 +81,19 @@ inline auto get_daal_parameter_to_train(const descriptor_t& desc) {
return par;
}

template <typename Float, typename Task>
template <typename Float, typename Task, typename Method, typename Table>
static train_result<Task> call_daal_kernel(const context_cpu& ctx,
const descriptor_t& desc,
const table& data,
const Table& data,
const table& initial_centroids) {
const std::int64_t row_count = data.get_row_count();
const std::int64_t column_count = data.get_column_count();
const std::int64_t cluster_count = desc.get_cluster_count();

auto par = get_daal_parameter_to_train(desc);

auto daal_initial_centroids = get_initial_centroids<Float>(ctx, desc, data, initial_centroids);
auto daal_initial_centroids =
get_initial_centroids<Float, Method>(ctx, desc, data, initial_centroids);

const auto daal_data = interop::convert_to_daal_table<Float>(data);
auto result = train_result<Task>{};
Expand All @@ -127,7 +113,6 @@ static train_result<Task> call_daal_kernel(const context_cpu& ctx,

array<int> arr_responses = array<int>::empty(row_count);
array<Float> arr_objective_function_value = array<Float>::empty(1);

const auto daal_responses = interop::convert_to_daal_homogen_table(arr_responses, row_count, 1);
const auto daal_objective_function_value =
interop::convert_to_daal_homogen_table(arr_objective_function_value, 1, 1);
Expand All @@ -136,11 +121,13 @@ static train_result<Task> call_daal_kernel(const context_cpu& ctx,
daal_responses.get(),
daal_objective_function_value.get(),
daal_iteration_count.get() };
interop::status_to_exception(
interop::call_daal_kernel<Float, daal_kmeans_lloyd_dense_kernel_t>(ctx,
input,
output,
&par));

interop::status_to_exception(dal::backend::dispatch_by_cpu(ctx, [&](auto cpu) {
return batch_kernel_t<Float,
oneapi::dal::backend::interop::to_daal_cpu_type<decltype(cpu)>::value,
Method>()
.compute(input, output, &par);
}));

result.set_objective_function_value(static_cast<double>(arr_objective_function_value[0]));

Expand All @@ -153,30 +140,31 @@ static train_result<Task> call_daal_kernel(const context_cpu& ctx,
model<Task>().set_centroids(dal::detail::homogen_table_builder{}
.reset(arr_centroids, cluster_count, column_count)
.build()));

return result;
}

template <typename Float, typename Task>
template <typename Float, typename Task, typename Method>
static train_result<Task> train(const context_cpu& ctx,
const descriptor_t& desc,
const train_input<Task>& input) {
return call_daal_kernel<Float, Task>(ctx,
desc,
input.get_data(),
input.get_initial_centroids());
using table_type =
std::conditional_t<std::is_same_v<Method, method::lloyd_csr>, csr_table, table>;
const auto data = static_cast<table_type>(input.get_data());
return call_daal_kernel<Float, Task, Method>(ctx, desc, data, input.get_initial_centroids());
}

template <typename Float>
struct train_kernel_cpu<Float, method::lloyd_dense, task::clustering> {
template <typename Float, typename Method>
struct train_kernel_cpu<Float, Method, task::clustering> {
train_result<task::clustering> operator()(const context_cpu& ctx,
const descriptor_t& desc,
const train_input<task::clustering>& input) const {
return train<Float, task::clustering>(ctx, desc, input);
return train<Float, task::clustering, Method>(ctx, desc, input);
}
};

template struct train_kernel_cpu<float, method::lloyd_dense, task::clustering>;
template struct train_kernel_cpu<double, method::lloyd_dense, task::clustering>;
template struct train_kernel_cpu<float, method::lloyd_csr, task::clustering>;
template struct train_kernel_cpu<double, method::lloyd_csr, task::clustering>;

} // namespace oneapi::dal::kmeans::backend
Loading

0 comments on commit 4ddbada

Please sign in to comment.