A Gaussian mixture model (GMM) is a generative, statistical model where a dataset is modeled as having been produced by a set of Gaussians (multi-dimensional Normal distributions). Learning a GMM using the expectation maximization (EM) algorithm is one of the classical ML algorithms. EM is particularly interesting for a distributed benchmark because in theory, the running time should be dominated by linear algebra operations (such as repeated vector-matrix multiplications).

Our PC implementation uses GSL along with a single AggregateComp object, which contains inside of it the current version of the learned GMM model. The complete listing of this application can be found on the github repository TestGMM. As this AggregateComp is executed, a soft assignment of each data point to each Gaussian is performed, and updates to each of the Gaussians are accumulated. PC uses the standard “log space” trick to compute the soft assignment and avoid underflow (in comparison, Spark’s mllib uses thresholding).

Main algorithm

1. First we create the database and set for storing data

  // creates a new set to store the initial model
  pdbClient.createSet<DoubleVector>("gmm_db", "gmm_initial_model_set");

  // creates a new set for storing output data
  pdbClient.createSet<GmmAggregateOutputLazy>(
          "gmm_db", "gmm_output_set");

2. Next, we register shared libraries in the catalog


  // register shared libraries with Computation Objects
  pdbClient.registerType("libraries/libGmmAggregateLazy.so");
  pdbClient.registerType("libraries/libGmmModel.so");
  pdbClient.registerType("libraries/libGmmAggregateOutputLazy.so");
  pdbClient.registerType("libraries/libGmmAggregateDatapoint.so");
  pdbClient.registerType("libraries/libGmmAggregateNewComp.so");
  pdbClient.registerType("libraries/libGmmSampleSelection.so");

3. Initialize each GMM’s component by sampling a few datapoints and calculating their mean and variance

int nSamples = 5 * k;

  // Randomly sample k data points from the input data through Bernoulli
  // sampling
  // We guarantee the sampled size >= k in 99.99% of the time
  // const UseTemporaryAllocationBlock tempBlock {1024 * 1024 * 128};

  srand(time(NULL));
  const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 128};

  double fraction =
      Sampler::computeFractionForSampleSize(nSamples, numData, false);
  COUT << "The sample threshold is: " << fraction << std::endl;
  int initialCount = numData;

  std::vector<Handle<DoubleVector>> mySamples;
  while (mySamples.size() < nSamples) {
    std::cout << "Needed to sample due to insufficient sample size."
              << std::endl;
    Handle<Computation> mySampleScanSet =
        makeObject<ScanUserSet<DoubleVector>>("gmm_db", "gmm_input_set");

    Handle<Computation> myDataSample = makeObject<GmmSampleSelection>(fraction);
    myDataSample->setInput(mySampleScanSet);
    Handle<Computation> myWriteSet = makeObject<WriteUserSet<DoubleVector>>(
        "gmm_db", "gmm_initial_model_set");
    myWriteSet->setInput(myDataSample);

    std::cout << "Let's execute the Sampling!" << std::endl;

    pdbClient.executeComputations(myWriteSet);

    std::cout << "Sampling done!: " << std::endl;
    SetIterator<DoubleVector> sampleResult =
        pdbClient.getSetIterator<DoubleVector>("gmm_db",
                                               "gmm_initial_model_set");

    for (Handle<DoubleVector> a : sampleResult) {
      std::cout << "Scanning result from sampling:" << std::endl;

      Handle<DoubleVector> myDoubles = makeObject<DoubleVector>(dim);

      double *rawData = a->getRawData();
      double *myRawData = myDoubles->getRawData();

      for (int i = 0; i < dim; i++) {
        myRawData[i] = rawData[i];
      }
      mySamples.push_back(myDoubles);
    }
    std::cout << "Now we have " << mySamples.size() << " samples" << std::endl;
    pdbClient.clearSet("gmm_db", "gmm_initial_model_set", "pdb::DoubleVector");
  

4. Initialize Model:


std::cout << "Creating model" << std::endl;
  Handle<GmmModel> model = makeObject<GmmModel>(k, dim);

  std::cout << "Updating means and covars" << std::endl;

  std::vector<std::vector<double>> means(k, std::vector<double>(dim, 0.0));
  std::vector<std::vector<double>> covars(k,
                                          std::vector<double>(dim * dim, 0.0));

  int nsamples = 5;

  // Init mean
  for (int i = 0; i < k; i++) {
    for (int j = 0; j < nsamples; j++) {
      for (int l = 0; l < dim; l++) { means[i][l] += mySamples[i * nsamples + j]->getDouble(l);
      }
    }
    for (int j = 0; j < dim; j++) {
      means[i][j] /= nsamples;
    }
  }

  // Init covar
  for (int i = 0; i < k; i++) {
    for (int j = 0; j < nsamples; j++) {
      for (int l = 0; l < dim; l++) { double t = (mySamples[i * nsamples + j]->getDouble(l) - means[i][l]);
        t *= t;
        covars[i][l * dim + l] += t;
      }
    }
    for (int j = 0; j < dim; j++) {
      covars[i][j * dim + j] /= nsamples; 
    } 
  } 
  model->updateMeans(means);
  model->updateCovars(covars);

5. Run the main algorithm. Each iteration invokes GmmAggregateLazy, that contains the GMM learning algorithm (based on E-M).

  //***********************************************************************************
  //**** MAIN ALGORITHM:
  //***************************************************************
  //***********************************************************************************
  //- Aggregation performs Expectation - Maximization Steps
  //- The partial sums from the Output are used to update Means, Weights and
  //Covars

  // Recording iteration times
  std::vector<float> iterTimes(iter);
  std::vector<float> iterProcessingOnlyTimes(iter);

  bool converged = false;
  double previousLogLikelihood;
  double currentLogLikelihood;

  for (int currentIter = 0; currentIter < iter; currentIter++) {

    std::cout << "ITERATION " << (currentIter + 1) << std::endl;

    const UseTemporaryAllocationBlock tempBlock{256 * 1024 * 1024};

    Handle<GmmModel> currentModel = model;

    Handle<Computation> scanInputSet =
        makeObject<ScanUserSet<DoubleVector>>("gmm_db", "gmm_input_set");
    Handle<Computation> gmmIteration =
        makeObject<GmmAggregateLazy>(currentModel);
    gmmIteration->setInput(scanInputSet);
    gmmIteration->setOutput("gmm_db", "gmm_output_set");

    std::cout << "Ready to start computations" << std::endl;

    auto begin = std::chrono::high_resolution_clock::now();

    pdbClient.executeComputations(gmmIteration);

    auto end = std::chrono::high_resolution_clock::now();
    std::cout << "Query finished!	" << std::endl;

    // Read output and update Means, Weights and Covars in Model
    SetIterator<GmmAggregateOutputLazy> result =
        pdbClient.getSetIterator<GmmAggregateOutputLazy>("gmm_db",
                                                         "gmm_output_set");
    for (Handle<GmmAggregateOutputLazy> a : result) {
      previousLogLikelihood = currentLogLikelihood;
      currentLogLikelihood = currentModel->updateModel((*a).getNewComp());
    }

    model = currentModel;