This example illustrates the steps required to join two sets. There are three computations involved in this example: 1) IntSimpleJoin
, 2) StringSelectionOfStringIntPair
, and 3) IntAggregation
. These computations have to be compiled and built as shared libraries (See an example here.) and then registered in PlinyCompute’s catalog.
Step 1: Create the databases and sets for storing data.
PDBClient pdbClient(8108, "localhost"); // create a database pdbClient.createDatabase("join_example_db"); // create the int set in that database pdbClient.createSet<int>("join_example_db", "join_example_set1"); // create the StringIntPair set in that database pdbClient.createSet<StringIntPair>("join_example_db", "join_example_set2"); // create a new set in that database to store output data pdbClient.createSet<SumResult>("join_example_db", "output_set1");
Step 2: Register the computations defined in shared libraries:
pdbClient.registerType("libraries/libIntSimpleJoin.so"); pdbClient.registerType("libraries/libStringSelectionOfStringIntPair.so"); pdbClient.registerType("libraries/libIntAggregation.so");
Step 3: Generate and store data for the first set.
makeObjectAllocatorBlock(blockSize * 1024 * 1024, true); Handle<Vector<Handle<int>>> storeMe = makeObject<Vector<Handle<int>>>(); try { for (i = 0; true; i++) { Handle<int> myData = makeObject<int>(i); storeMe->push_back(myData); total++; } } catch (pdb::NotEnoughSpace& n) { // sends the created objects to storage pdbClient.sendData<int>( std::pair<std::string, std::string>("join_example_set1", "join_example_db"), storeMe); } // flush buffered objects pdbClient.flushData();
Step 4: Generate and store data for the second set.
makeObjectAllocatorBlock(blockSize * 1024 * 1024, true); Handle<Vector<Handle<StringIntPair>>> storeMe = makeObject<Vector<Handle<StringIntPair>>>(); try { for (i = 0; true; i++) { std::ostringstream oss; oss << "My string is " << i; oss.str(); Handle<StringIntPair> myData = makeObject<StringIntPair>(oss.str(), i); storeMe->push_back(myData); total++; } } catch (pdb::NotEnoughSpace& n) { std::cout << "got to " << i << " when producing data for input set 2.\n"; // sends the created objects to storage pdbClient.sendData<StringIntPair>( std::pair<std::string, std::string>("join_example_set2", "join_example_db"), storeMe); } // flush buffered objects pdbClient.flushData();
Step 5: Create query computation objects and execute them.
// this is the object allocation block where all of this stuff will reside const UseTemporaryAllocationBlock tempBlock{1024 * 1024 * 128}; // create all of the computation objects Handle<Computation> myScanSet1 = makeObject<ScanUserSet<int>>("join_example_db", "join_example_set1"); Handle<Computation> myScanSet2 = makeObject<ScanUserSet<StringIntPair>>("join_example_db", "join_example_set2"); Handle<Computation> mySelection = makeObject<StringSelectionOfStringIntPair>(); Handle<Computation> myJoin = makeObject<IntSimpleJoin>(); mySelection->setInput(myScanSet2); myJoin->setInput(0, myScanSet1); myJoin->setInput(1, myScanSet2); myJoin->setInput(2, mySelection); Handle<Computation> myAggregation = makeObject<IntAggregation>(); Handle<Computation> myWriter = makeObject<WriteUserSet<SumResult>>("join_example_db", "output_set1"); myAggregation->setInput(myJoin); myWriter->setInput(myAggregation); // execute the computations pdbClient.executeComputations(myWriter);
Step 6: Print results.
std::cout << "to print result..." << std::endl; SetIterator<SumResult> result = pdbClient.getSetIterator<SumResult>("join_example_db", "output_set1"); std::cout << "Query results: "; int count = 0; for (auto a : result) { count++; std::cout << count << ":" << (*a).total << ";"; } std::cout << "join output count:" << count << "\n";