This example illustrates the steps required to join two sets. There are three computations involved in this example: 1) IntSimpleJoin, 2) StringSelectionOfStringIntPair, and 3) IntAggregation. These computations have to be compiled and built as shared libraries (See an example here.) and then registered in PlinyCompute’s catalog.
Step 1: Create the databases and sets for storing data.
PDBClient pdbClient(8108, "localhost");
// create a database
pdbClient.createDatabase("join_example_db");
// create the int set in that database
pdbClient.createSet<int>("join_example_db", "join_example_set1");
// create the StringIntPair set in that database
pdbClient.createSet<StringIntPair>("join_example_db", "join_example_set2");
// create a new set in that database to store output data
pdbClient.createSet<SumResult>("join_example_db", "output_set1");
Step 2: Register the computations defined in shared libraries:
pdbClient.registerType("libraries/libIntSimpleJoin.so");
pdbClient.registerType("libraries/libStringSelectionOfStringIntPair.so");
pdbClient.registerType("libraries/libIntAggregation.so");
Step 3: Generate and store data for the first set.
makeObjectAllocatorBlock(blockSize * 1024 * 1024, true);
Handle<Vector<Handle<int>>> storeMe = makeObject<Vector<Handle<int>>>();
try {
for (i = 0; true; i++) {
Handle<int> myData = makeObject<int>(i);
storeMe->push_back(myData);
total++;
}
} catch (pdb::NotEnoughSpace& n) {
// sends the created objects to storage
pdbClient.sendData<int>(
std::pair<std::string, std::string>("join_example_set1", "join_example_db"), storeMe);
}
// flush buffered objects
pdbClient.flushData();
Step 4: Generate and store data for the second set.
makeObjectAllocatorBlock(blockSize * 1024 * 1024, true);
Handle<Vector<Handle<StringIntPair>>> storeMe =
makeObject<Vector<Handle<StringIntPair>>>();
try {
for (i = 0; true; i++) {
std::ostringstream oss;
oss << "My string is " << i;
oss.str();
Handle<StringIntPair> myData = makeObject<StringIntPair>(oss.str(), i);
storeMe->push_back(myData);
total++;
}
} catch (pdb::NotEnoughSpace& n) {
std::cout << "got to " << i << " when producing data for input set 2.\n";
// sends the created objects to storage
pdbClient.sendData<StringIntPair>(
std::pair<std::string, std::string>("join_example_set2", "join_example_db"), storeMe);
}
// flush buffered objects
pdbClient.flushData();
Step 5: Create query computation objects and execute them.
// this is the object allocation block where all of this stuff will reside
const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 128};
// create all of the computation objects
Handle<Computation> myScanSet1 =
makeObject<ScanUserSet<int>>("join_example_db", "join_example_set1");
Handle<Computation> myScanSet2 =
makeObject<ScanUserSet<StringIntPair>>("join_example_db", "join_example_set2");
Handle<Computation> mySelection =
makeObject<StringSelectionOfStringIntPair>();
Handle<Computation> myJoin = makeObject<IntSimpleJoin>();
mySelection->setInput(myScanSet2);
myJoin->setInput(0, myScanSet1);
myJoin->setInput(1, myScanSet2);
myJoin->setInput(2, mySelection);
Handle<Computation> myAggregation =
makeObject<IntAggregation>();
Handle<Computation> myWriter =
makeObject<WriteUserSet<SumResult>>("join_example_db", "output_set1");
myAggregation->setInput(myJoin);
myWriter->setInput(myAggregation);
// execute the computations
pdbClient.executeComputations(myWriter);
Step 6: Print results.
std::cout << "to print result..." << std::endl;
SetIterator<SumResult> result =
pdbClient.getSetIterator<SumResult>("join_example_db", "output_set1");
std::cout << "Query results: ";
int count = 0;
for (auto a : result) {
count++;
std::cout << count << ":" << (*a).total << ";";
}
std::cout << "join output count:" << count << "\n";