I have made a Skyline query solver which for now can solve the query with two different algorithms.
The first is Block-nested loop (BNL) and the second is Sort filter skyline (SFS). The two are extremely similar. I will explain the basic things that are needed to review the part I am interested in so I won't bore you with the whole algorithm.
BNL and SFS have two parts, divide and merge. Divide is exactly the same and merge is mostly the same with two main differences:
- SFS sorts the key-value RDD input by values while BNL does not.
- SFS does not discard
Point2DAdvanced
elements from theglobalSkylines
list (because it is not needed since thePoint2DAdvanced
are sorted) but BNL does.
In order to differentiate these main differences I thought that the Template Method Pattern was my best bet with two hook functions:
- 'sortRDD` in SFS sorts the key-value RDD by value and returns it while in BNL it just returns the same RDD.
globalAddDiscardOrDominate
in SFS just checks if thecandidateSkyline
is dominated by any other skyline points and if it is then it is discarded, otherwise it is added to the list. In BNL it also checks if thecandidateSkyline
dominates any point in the list in order to remove it.
These are the main differences that I have based my template method pattern on. Here are the classes themselves:
BlockNestedLoopTemplate
class:
public abstract class BlockNestedLoopTemplate implements SkylineAlgorithm, Serializable {
private final transient TextFileToPointRDD txtToPoints;
private FlagPointPairProducer flagPointPairProducer;
public BlockNestedLoopTemplate(SparkContextWrapper sparkContext) {
this.txtToPoints = new TextFileToPointRDD(sparkContext);
}
@Override
public final List<Point2DAdvanced> getSkylinePoints(String filePath) {
JavaRDD<Point2DAdvanced> points = txtToPoints.getPointRDDFromTextFile(filePath, " ");
flagPointPairProducer = createFlagPointPairProducer(points);
JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinePointsByFlag = divide(points);
JavaRDD<Point2DAdvanced> skylinePoints = merge(localSkylinePointsByFlag);
return skylinePoints.collect();
}
private FlagPointPairProducer createFlagPointPairProducer(JavaRDD<Point2DAdvanced> points) {
Point2DAdvanced medianPoint = MedianPointFinder.findMedianPoint(points);
return new FlagPointPairProducer(medianPoint);
}
private JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> divide(JavaRDD<Point2DAdvanced> points) {
JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs = points.mapToPair(p -> flagPointPairProducer.getFlagPointPair(p));
JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> pointsGroupedByFlag = flagPointPairs.groupByKey();
JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> flagsWithLocalSkylines
= pointsGroupedByFlag.mapToPair(fp -> new Tuple2(fp._1, getLocalSkylinesWithBNL(fp._2)));
return flagsWithLocalSkylines;
}
private Iterable<Point2DAdvanced> getLocalSkylinesWithBNL(Iterable<Point2DAdvanced> pointIterable) {
List<Point2DAdvanced> localSkylines = new ArrayList<>();
for (Point2DAdvanced point : pointIterable) {
localAddDiscardOrDominate(localSkylines, point);
}
return localSkylines;
}
private void localAddDiscardOrDominate(List<Point2DAdvanced> localSkylines, Point2DAdvanced candidateSkylinePoint) {
for (Iterator it = localSkylines.iterator(); it.hasNext();) {
Point2DAdvanced pointToCheckAgainst = (Point2DAdvanced) it.next();
if (pointToCheckAgainst.dominates(candidateSkylinePoint)) {
return;
} else if (candidateSkylinePoint.dominates(pointToCheckAgainst)) {
it.remove();
}
}
localSkylines.add(candidateSkylinePoint);
}
protected JavaRDD<Point2DAdvanced> merge(
JavaPairRDD<PointFlag, Iterable<Point2DAdvanced>> localSkylinesGroupedByFlag) {
JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines
= localSkylinesGroupedByFlag.flatMapValues(point -> point);
JavaPairRDD<PointFlag, Point2DAdvanced> sortedLocalSkylines = sortRDD(ungroupedLocalSkylines);
JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = groupByTheSameId(sortedLocalSkylines);
JavaRDD<Point2DAdvanced> globalSkylinePoints
= groupedByTheSameId.flatMap(singleList -> getGlobalSkylineWithBNLAndPrecomparisson(singleList));
return globalSkylinePoints;
}
protected abstract JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs);
private JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupByTheSameId(JavaPairRDD<PointFlag, Point2DAdvanced> ungroupedLocalSkylines) {
JavaPairRDD<PointFlag, Point2DAdvanced> mergedInOnePartition = ungroupedLocalSkylines.coalesce(1);
JavaRDD<List<Tuple2<PointFlag, Point2DAdvanced>>> groupedByTheSameId = mergedInOnePartition.glom();
return groupedByTheSameId;
}
private List<Point2DAdvanced> getGlobalSkylineWithBNLAndPrecomparisson(List<Tuple2<PointFlag, Point2DAdvanced>> flagPointPairs) {
List<Point2DAdvanced> globalSkylines = new ArrayList<>();
for (Tuple2<PointFlag, Point2DAdvanced> flagPointPair : flagPointPairs) {
PointFlag flag = flagPointPair._1;
if (!passesPreComparisson(flag)) {
continue;
}
Point2DAdvanced point = flagPointPair._2;
globalAddDiscardOrDominate(globalSkylines, point);
}
return globalSkylines;
}
private boolean passesPreComparisson(PointFlag flagToCheck) {
PointFlag rejectedFlag = new PointFlag(1, 1);
return !flagToCheck.equals(rejectedFlag);
}
protected abstract void globalAddDiscardOrDominate(
List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint);
}
BlockNestedLoop
(BNL) extension:
public class BlockNestedLoop extends BlockNestedLoopTemplate {
public BlockNestedLoop(SparkContextWrapper sparkContext) {
super(sparkContext);
}
@Override
protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
return flagPointPairs;
}
@Override
protected void globalAddDiscardOrDominate(List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
for (Iterator it = globalSkylines.iterator(); it.hasNext();) {
Point2DAdvanced skyline = (Point2DAdvanced) it.next();
if (skyline.dominates(candidateGlobalSkylinePoint)) {
return;
} else if (candidateGlobalSkylinePoint.dominates(skyline)) {
it.remove();
}
}
globalSkylines.add(candidateGlobalSkylinePoint);
}
}
SortFilterSkyline
(SFL) extension:
public class SortFilterSkyline extends BlockNestedLoopTemplate {
public SortFilterSkyline(SparkContextWrapper sparkContext) {
super(sparkContext);
}
@Override
protected JavaPairRDD<PointFlag, Point2DAdvanced> sortRDD(
JavaPairRDD<PointFlag, Point2DAdvanced> flagPointPairs) {
JavaPairRDD<Point2DAdvanced, PointFlag> swapped = flagPointPairs.mapToPair(fp -> fp.swap());
JavaPairRDD<Point2DAdvanced, PointFlag> sorted = swapped.sortByKey(new DominationComparator());
JavaPairRDD<PointFlag, Point2DAdvanced> unswapped = sorted.mapToPair(fp -> fp.swap());
return unswapped;
}
@Override
protected void globalAddDiscardOrDominate(
List<Point2DAdvanced> globalSkylines, Point2DAdvanced candidateGlobalSkylinePoint) {
if (!isDominatedBySkylines(globalSkylines, candidateGlobalSkylinePoint)) {
globalSkylines.add(candidateGlobalSkylinePoint);
}
}
private boolean isDominatedBySkylines(List<Point2DAdvanced> skylines, Point2DAdvanced candidateSkylinePoint) {
for (Point2DAdvanced skyline : skylines) {
if (skyline.dominates(candidateSkylinePoint)) {
return true;
}
}
return false;
}
}
What I mainly care about is if there is a better way to reduce the code duplication. As a secondary issue feel free to comment on anything code wise (naming conventions, method sorting or anything that you feel is worth changing).