I am working on the following small software stack:

The Wikipath stack structure

The objective of the entire stack is to search for shortest paths in the Wikipedia article graph as fast as possible. As of now, I rely on two speed up techniques:

  1. Bidirectional search: the time complexity of the breadth-first search (BFS) is \$\mathcal{O}(\sum_{i = 0}^N d^i)\$, where \$d\$ is an average out-degree of all nodes, and \$N\$ is the length of the shortest path found expressed in the number of edges (arcs whenever we talk about directed graphs). However, the running time of a bidirectional breadth-first search is \$\mathcal{O}(2 \sum_{i = 0}^{N / 2} d^i)\$, which is a large improvement compared to the unidirectional BFS.

  2. Parallel search: since generating the neighbours of a node (i.e., generating its neighbour articles) is a time-consuming operation (load JSON from the server, parse it, generate the nodes), it is reasonable to use a two thread pools; each pool works on one of the two possible search direction. Since most time is spent in waiting for Wikipedia to answer, thread contention is not a big deal in this setting.

Part I/IV - The search algorithm

The below is all my code that implements the DelayedGraphSearchLibrary. The library is tailor-made for those graphs whose neighbour generation is slow (for example, of the order at least 10 milliseconds).

AbstractDelayedGraphPathFinder.java:

package net.coderodde.graph.pathfinding.uniform.delayed;

import java.util.List;

/**
 * This abstract class defines the API for algorithms searching for shortest, 
 * <b>unweighted</b> paths in a graph with slow ("<i>delayed</i>") node 
 * expansion operations.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Aug 4, 2016)
 * @param <N> the actual node type.
 */
public abstract class AbstractDelayedGraphPathFinder<N> {

    /**
     * Stores the duration of the previous graph search in milliseconds.
     */
    protected long duration;

    /**
     * Stores the number of expanded nodes in the previous graph search.
     */
    protected int numberOfExpandedNodes;

    /**
     * Searches for a shortest unweighted path from {@code source} to 
     * {@code target}. If a path is found, returns the list of nodes that are 
     * ordered in the list in the same manner as they appear on a shortest path.
     * <p>
     * 
     * If {@code target} is not reachable from {@code source}, an empty list is
     * returned.
     * <p>
     * 
     * In case the graph to search is undirected (edges have no direction), the
     * client programmer may pass the same {@link AbstractNodeExpander} for both
     * {@code forwardSearchNodeExpander} and {@code backwardSearchNodeExpander}.
     * <p>
     * 
     * What comes to progress logging, {@code forwardSearchProgressLogger} will
     * log the progress for forward search direction only, 
     * {@code backwardSearchProgressLogger} will log the progress for backward
     * direction only, and {@code sharedSearchProgressLogger} will log 
     * everything related to the entire search such as beginning of the search,
     * and the result of it.
     * <p>
     * 
     * Any progress logger may be set to {@code null} so that the respective 
     * parts of progress will not be logged.
     * <p>
     * 
     * The following table summarizes
     * 
     * <p>
     * 
     * Since bidirectional search outperforms the unidirectional search, this
     * abstract class assumes that all implementing classes implement
     * bidirectional search, which is reflected in the API of this very class. 
     * In a bidirectional search, we run simultaneously two search frontiers: 
     * one in a normal fashion from the source node, and the another one in 
     * "opposite" direction starting from the target node. Note that if the 
     * graph is directed, the forward search traverses each edge from tail node
     * to head node, and the backward search traverses each edge from head node
     * to tail node.
     * 
     * @param source                       the source node.
     * @param target                       the target node.
     * @param forwardSearchNodeExpander    the expander generating all the 
     *                                     child nodes.
     * @param backwardSearchNodeExpander   the expander generating all the 
     *                                     parent nodes.                           
     * @param forwardSearchProgressLogger  the forward search -related logger.
     * @param backwardSearchProgressLogger the backward search -related logger.
     * @param sharedSearchProgressLogger   the shared logger.
     * @return the shortest path as a list of nodes, or an empty list if the 
     *         target is not reachable from the source.
     */
    public abstract List<N> 
        search(final N source,
               final N target,
               final AbstractNodeExpander<N> forwardSearchNodeExpander,
               final AbstractNodeExpander<N> backwardSearchNodeExpander,
               final ProgressLogger<N> forwardSearchProgressLogger,
               final ProgressLogger<N> backwardSearchProgressLogger,
               final ProgressLogger<N> sharedSearchProgressLogger);

    /**
     * Searches for the shortest path in a <b>undirected</b> graph.
     * 
     * @param source                       the source node.
     * @param target                       the target node.
     * @param nodeExpander                 the expander generating all neighbors
     *                                     nodes of a given node.
     * @param forwardSearchProgressLogger  the forward search related logger.
     * @param backwardSearchProgressLogger the backward search related logger.
     * @param sharedSearchProgressLogger   the shared logger.
     * @return the shortest path as a list of nodes, or an empty list if the
     *         target is not reachable from the source.
     */
    public List<N>
        search(final N source,
               final N target,
               final AbstractNodeExpander<N> nodeExpander,
               final ProgressLogger<N> forwardSearchProgressLogger,
               final ProgressLogger<N> backwardSearchProgressLogger,
               final ProgressLogger<N> sharedSearchProgressLogger) {
        return search(source,
                      target,
                      nodeExpander,
                      nodeExpander,
                      forwardSearchProgressLogger,
                      backwardSearchProgressLogger,
                      sharedSearchProgressLogger);
    }

    /**
     * Returns the number of milliseconds the previous search took to complete.
     * 
     * @return duration in milliseconds.
     */
    public long getDuration() {
        return duration;
    }

    /**
     * Returns the number of expanded nodes in the previous search.
     * 
     * @return the number of expanded nodes.
     */
    public int getNumberOfExpandedNodes() {
        return numberOfExpandedNodes;
    }
}

AbstractNodeExpander.java:

package net.coderodde.graph.pathfinding.uniform.delayed;

import java.util.List;

/**
 * This abstract class defines the API for the subclasses that generate all the
 * neighbors of a given node ("<i>expand a node</i>").
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Aug 4, 2016)
 * @param <N> the actual node type.
 */
public abstract class AbstractNodeExpander<N> {

    /**
     * Expands the argument node, i.e., generates all the neighbors of the node
     * {@code node}.
     * 
     * @param node the node whose neighbors to generate.
     * @return the list of neighbor nodes or {@code null} if the node 
     *         {@code node} is invalid.
     */
    public abstract List<N> expand(final N node);

    /**
     * Checks that the input node {@code node} is a valid node in the graph.
     * 
     * @param node the node to check.
     * @return {@code true} only if {@code node} is a valid node.
     */
    public abstract boolean isValidNode(final N node);
}

ProgressLogger.java:

package net.coderodde.graph.pathfinding.uniform.delayed;

import java.util.List;

/**
 * This interface provides the API for anyone interested in the search progress
 * of the shortest path finders.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6
 * @param <N> the actual node type.
 */
public class ProgressLogger<N> {

    /**
     * This method should be called whenever the search is initiated.
     * @param source the source node.
     * @param target the target node.
     */
    public void onBeginSearch(final N source, final N target) {}

    /**
     * This method should be called whenever the search expands {@code node}.
     * 
     * @param node the expanded node.
     */
    public void onExpansion(final N node) {}

    /**
     * This method should be called whenever the search is generating a neighbor
     * node of the being expanded.
     * 
     * @param node the generated neighbor node.
     */
    public void onNeighborGeneration(final N node) {}

    /**
     * This method should be called whenever the search has found a shortest 
     * path.
     * 
     * @param path the shortest path found. 
     */
    public void onShortestPath(final List<N> path) {}

    /**
     * This method should be called whenever the target node is not reachable 
     * from the source node and the search process must stop without finding a 
     * path.
     * 
     * @param source the requested source node.
     * @param target the requested target node.
     */
    public void onTargetUnreachable(final N source, final N target) {}
}

ThreadPoolBidirectionalPathFinder.java:

package net.coderodde.graph.pathfinding.uniform.delayed.support;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.coderodde.graph.pathfinding.uniform.delayed.
       AbstractDelayedGraphPathFinder;
import net.coderodde.graph.pathfinding.uniform.delayed.AbstractNodeExpander;
import net.coderodde.graph.pathfinding.uniform.delayed.ProgressLogger;

/**
 * This class implements a parallel, bidirectional breadth-first search in order
 * to find an unweighted shortest path from a given source node to a given 
 * target node. The underlying algorithm is the bidirectional breadth-first
 * search. However, multiple threads may work on a single search direction in
 * order to speed up the computation: for each search direction (forward and 
 * backward), the algorithm maintains concurrent state, such as the frontier 
 * queue; many threads may pop the queue, expand the node and append the 
 * neighbors to that queue.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6 (Aug 4, 2016)
 * @param <N> the actual graph node type.
 */
public class ThreadPoolBidirectionalPathFinder<N> 
extends AbstractDelayedGraphPathFinder<N> {

    /**
     * The default number of milliseconds a master thread sleeps when it finds
     * the frontier queue empty.
     */
    private static final int DEFAULT_MASTER_THREAD_SLEEP_DURATION = 10;

    /**
     * The default number of milliseconds a slave thread sleeps when it finds
     * the frontier queue empty.
     */
    private static final int DEFAULT_SLAVE_THREAD_SLEEP_DURATION = 10;

    /**
     * The default upper bound on the number of times a master thread hibernates
     * due to the frontier queue being empty before the entire search is 
     * terminated.
     */
    private static final int DEFAULT_NUMBER_OF_TRIALS = 50;

    /**
     * The minimum number of threads to allow. One thread per each of the two
     * search directions.
     */
    private static final int MINIMUM_NUMBER_OF_THREADS = 2;

    /**
     * The minimum number of milliseconds a <b>master thread</b> sleeps when it 
     * finds the frontier queue empty.
     */
    private static final int MINIMUM_MASTER_THREAD_SLEEP_DURATION = 3;

    /**
     * The minimum number of milliseconds a <b>slave thread</b> sleeps when it 
     * finds the frontier queue empty.
     */
    private static final int MINIMUM_SLAVE_THREAD_SLEEP_DURATION = 3;

    /**
     * The lower bound on the amount of trials.
     */
    private static final int MINIMUM_NUMBER_OF_TRIALS = 1;

    /**
     * Caches the requested number of threads to use in the each search process.
     */
    private final int numberOfThreads;

    /**
     * The duration of sleeping in milliseconds for the master threads.
     */
    private final int masterThreadSleepDuration;

    /**
     * The duration of sleeping in milliseconds for the slave threads.
     */
    private final int slaveThreadSleepDuration;

    /**
     * While a master thread waits the frontier queue to become non-empty, the
     * master thread makes at most {@code masterThreadTrials} sleeping sessions
     * before giving up and terminating the search.
     */
    private final int masterThreadTrials;

    /**
     * Constructs this path finder.
     * 
     * @param requestedThreadCount      the number of threads per search
     *                                  direction.
     * @param masterThreadSleepDuration the number of milliseconds a master 
     *                                  thread sleeps whenever it discovers the
     *                                  frontier queue being empty.
     * @param slaveThreadSleepDuration  the number of milliseconds a slave
     *                                  thread sleeps whenever it discovers the
     *                                  frontier queue being empty.
     * @param masterThreadTrials        the number of times the master thread
     *                                  hibernates itself before terminating the
     *                                  entire search.
     */
    public ThreadPoolBidirectionalPathFinder(
            final int requestedThreadCount,
            final int masterThreadSleepDuration,
            final int slaveThreadSleepDuration,
            final int masterThreadTrials) {
        this.numberOfThreads = Math.max(requestedThreadCount, 
                                        MINIMUM_NUMBER_OF_THREADS);

        this.masterThreadSleepDuration = 
                Math.max(masterThreadSleepDuration,
                         MINIMUM_MASTER_THREAD_SLEEP_DURATION);

        this.slaveThreadSleepDuration = 
                Math.max(slaveThreadSleepDuration,
                         MINIMUM_SLAVE_THREAD_SLEEP_DURATION);

        this.masterThreadTrials = 
                Math.max(masterThreadTrials,
                         MINIMUM_NUMBER_OF_TRIALS);
    }

    /**
     * Construct this path finder using default sleeping duration.
     * 
     * @param requestedThreadCount the number of threads per search direction.
     */
    public ThreadPoolBidirectionalPathFinder(
            final int requestedThreadCount) {
        this(requestedThreadCount, 
             DEFAULT_MASTER_THREAD_SLEEP_DURATION,
             DEFAULT_SLAVE_THREAD_SLEEP_DURATION,
             DEFAULT_NUMBER_OF_TRIALS);
    }

    /**
     * {@inheritDoc }
     */
    @Override
    public List<N> 
        search(final N source, 
               final N target, 
               final AbstractNodeExpander<N> forwardSearchNodeExpander, 
               final AbstractNodeExpander<N> backwardSearchNodeExpander, 
               final ProgressLogger<N> forwardSearchProgressLogger, 
               final ProgressLogger<N> backwardSearchProgressLogger, 
               final ProgressLogger<N> sharedSearchProgressLogger) {
        Objects.requireNonNull(forwardSearchNodeExpander, 
                               "The forward search node expander is null.");

        Objects.requireNonNull(backwardSearchNodeExpander,
                               "The backward search node expander is null.");

        // Check the validity of the source node:
        if (!forwardSearchNodeExpander.isValidNode(source)) {
            throw new IllegalArgumentException(
                    "The source node (" + source + ") was rejected by the " +
                    "forward search node expander.");
        }

        // Check the validity of the target node:
        if (!backwardSearchNodeExpander.isValidNode(target)) {
            throw new IllegalArgumentException(
                    "The target node (" + target + ") was rejected by the " +
                    "backward search node expander.");
        }

        // Possibly log the beginning of the search:
        if (sharedSearchProgressLogger != null) {
            sharedSearchProgressLogger.onBeginSearch(source, target);
        }

        // This path finder collects some performance related statistics:
        this.duration = System.currentTimeMillis();

        // Compute the numbers of threads for each of the search direction:
        final int forwardSearchThreadCount  = numberOfThreads / 2;
        final int backwardSearchThreadCount = numberOfThreads - 
                                              forwardSearchThreadCount;

        // Create the state object shared by all the threads working on forward
        // search direction:
        final SearchState<N> forwardSearchState  = 
                new SearchState<>(source, forwardSearchThreadCount);

        // Create the state object shared by all the threads working on backward
        // search direction:
        final SearchState<N> backwardSearchState = 
                new SearchState<>(target, backwardSearchThreadCount);

        // Create the state object shared by both the search direction:
        final SharedSearchState<N> sharedSearchState = 
                new SharedSearchState(source, 
                                      target, 
                                      forwardSearchState,
                                      backwardSearchState,
                                      sharedSearchProgressLogger);

        // The array holding all forward search threads:
        final ForwardSearchThread[] forwardSearchThreads =
                new ForwardSearchThread[numberOfThreads];

        // Below, the value of 'sleepDuration' is ignored since the thread being 
        // created is a master thread that never sleeps.
        forwardSearchThreads[0] = 
                new ForwardSearchThread(0, 
                                        forwardSearchNodeExpander,
                                        forwardSearchState,
                                        sharedSearchState,
                                        true,
                                        forwardSearchProgressLogger,
                                        masterThreadSleepDuration,
                                        masterThreadTrials);

        // Spawn the forward search master thread:
        forwardSearchState.introduceThread(forwardSearchThreads[0]);
        forwardSearchThreads[0].start();

        // Create and spawn all the slave threads working on forward search 
        // direction.
        for (int i = 1; i < numberOfThreads; ++i) {
            forwardSearchThreads[i] = 
                    new ForwardSearchThread(i,
                                            forwardSearchNodeExpander,
                                            forwardSearchState,
                                            sharedSearchState,
                                            false,
                                            forwardSearchProgressLogger,
                                            slaveThreadSleepDuration,
                                            masterThreadTrials);

            forwardSearchState.introduceThread(forwardSearchThreads[i]);
            forwardSearchThreads[i].start();
        }

        // The array holding all backward search threads:
        final BackwardSearchThread[] backwardSearchThreads =
                new BackwardSearchThread[numberOfThreads];

        // Below, the value of 'sleepDuration' is ignored since the thread being
        // created is a master thread that never sleeps.
        backwardSearchThreads[0] = 
                new BackwardSearchThread(forwardSearchThreads.length,
                                         backwardSearchNodeExpander,
                                         backwardSearchState,
                                         sharedSearchState,
                                         true,
                                         backwardSearchProgressLogger,
                                         masterThreadSleepDuration,
                                         masterThreadTrials);

        // Spawn the backward search master thread:
        backwardSearchState.introduceThread(backwardSearchThreads[0]);
        backwardSearchThreads[0].start();

        // Create and spawn all the slave threads working on backward search
        // direction:
        for (int i = 1; i < numberOfThreads; ++i) {
            backwardSearchThreads[i] = 
                    new BackwardSearchThread(forwardSearchThreads.length + i,
                                             backwardSearchNodeExpander,
                                             backwardSearchState,
                                             sharedSearchState,
                                             false,
                                             backwardSearchProgressLogger,
                                             slaveThreadSleepDuration,
                                             masterThreadTrials);

            backwardSearchState.introduceThread(backwardSearchThreads[i]);
            backwardSearchThreads[i].start();
        }

        // Wait all forward search threads to finish their work:
        try {
            for (final ForwardSearchThread thread : forwardSearchThreads) {
                thread.join();
            }
        } catch (final InterruptedException ex) {
            throw new IllegalStateException("The forward thread threw " +
                    ex.getClass().getSimpleName() + ": " +
                    ex.getMessage(), ex);
        }

        // Wait all backward search threads to finish their work: 
        try {
            for (final BackwardSearchThread thread : backwardSearchThreads) {
                thread.join();
            }
        } catch (final InterruptedException ex) {
            throw new IllegalStateException("The backward thread threw " +
                    ex.getClass().getSimpleName() + ": " +
                    ex.getMessage(), ex);
        }

        // Record the duration of the search:
        this.duration = System.currentTimeMillis() - this.duration;

        // Count the number of expanded nodes over all threads:
        this.numberOfExpandedNodes = 0;

        for (final ForwardSearchThread thread : forwardSearchThreads) {
            this.numberOfExpandedNodes += thread.getNumberOfExpandedNodes();
        }

        for (final BackwardSearchThread thread : backwardSearchThreads) {
            this.numberOfExpandedNodes += thread.getNumberOfExpandedNodes();
        }

        // Construct and return the path:
        return sharedSearchState.getPath();
    }

    /**
     * This class holds the state shared by the two search directions.
     */
    private static final class SharedSearchState<N> {

        /**
         * The source node.
         */
        private final N source;

        /**
         * The target node. 
         */
        private final N target;

        /**
         * The state of all the forward search threads.
         */
        private final SearchState<N> forwardSearchState;

        /**
         * The state of all the backward search threads.
         */
        private final SearchState<N> backwardSearchState;

        /**
         * Caches the best known length from the source to the target nodes.
         */
        private volatile int bestPathLengthSoFar = Integer.MAX_VALUE;

        /**
         * The best search frontier touch node so far.
         */
        private volatile N touchNode;

        /**
         * Caches whether the shortest path was found.
         */
        private boolean pathIsFound;

        /**
         * The progress logger for reporting the progress.
         */
        private final ProgressLogger<N> sharedProgressLogger;

        /**
         * Constructs a shared state information object for the search.
         * 
         * @param source               the source node.
         * @param target               the target node.
         * @param forwardSearchState   the state of the forward search
         *                             direction.
         * @param backwardSearchState  the state of the backward search
         *                             direction.
         * @param sharedProgressLogger the progress logger for logging the 
         *                             overall progress of the path finder.
         */
        SharedSearchState(final N source,
                          final N target,
                          final SearchState<N> forwardSearchState,
                          final SearchState<N> backwardSearchState, 
                          final ProgressLogger<N> sharedProgressLogger) {
            this.source = source;
            this.target = target;
            this.forwardSearchState   = forwardSearchState;
            this.backwardSearchState  = backwardSearchState;
            this.sharedProgressLogger = sharedProgressLogger;
        }

        /**
         * Attempts to update the best known path.
         * 
         * @param current the touch node candidate.
         */
        synchronized void updateSearchState(final N current) {
            if (backwardSearchState.getDistanceMap().containsKey(current)
                    && forwardSearchState.getDistanceMap()
                                         .containsKey(current)) {
                final int currentDistance = 
                        forwardSearchState .getDistanceMap().get(current) +
                        backwardSearchState.getDistanceMap().get(current);

                if (bestPathLengthSoFar > currentDistance) {
                    bestPathLengthSoFar = currentDistance;
                    touchNode = current;
                }
            }
        }

        /**
         * Checks whether the node {@code node} can improve the best known touch
         * node. If not, this method will return {@code true} which implies that
         * the shortest path is found.
         * 
         * @param node the new touch node candidate.
         * @return {@code true} only if the current stored path is the shortest.
         */
        synchronized boolean pathIsOptimalOld(final N node) {
            if (touchNode == null) {
                // Once here, the two search trees did not meet each other yet:
                return false;
            }

            if (!forwardSearchState.getDistanceMap().containsKey(node)) {
                // The forward search did not reach the node 'node' yet:
                return false;
            }

            if (!backwardSearchState.getDistanceMap().containsKey(node)) {
                // The backward search did not reach the node 'node' yet:
                return false;
            }

            // Both the search direction has reached the node 'node':
            final int distance = 
                    forwardSearchState .getDistanceMap().get(node) +
                    backwardSearchState.getDistanceMap().get(node);

            if (distance > bestPathLengthSoFar) {
                // The current touch node is on the shortest path. It can be
                // reconstructed. Ask all the threads to exit:
                forwardSearchState .requestThreadsToExit();
                backwardSearchState.requestThreadsToExit();
                return true;
            }

            // The current best known path cannot yet be guaranteed to be the
            // shortest, so return 'false':
            return false;
        }

        synchronized boolean pathIsOptimal() {
            if (touchNode == null) {
                return false;
            }

            final N forwardSearchHead = forwardSearchState.getQueue().getHead();

            if (forwardSearchHead == null) {
                return false;
            }

            final N backwardSearchHead = backwardSearchState.getQueue()
                                                            .getHead();

            if (backwardSearchHead == null) {
                return false;
            }

            final int distance =
                    forwardSearchState.getDistanceMap()
                                      .get(forwardSearchHead) +
                    backwardSearchState.getDistanceMap()
                                       .get(backwardSearchHead);

            return distance > bestPathLengthSoFar;
        }

        /**
         * Asks every single thread to exit.
         */
        synchronized void requestExit() {
            forwardSearchState .requestThreadsToExit();
            backwardSearchState.requestThreadsToExit();
        }

        /**
         * Constructs a shortest path and returns it as a list. If the target
         * node is unreachable from the source node, return an empty list.
         * 
         * @return a shortest path found, or an empty list if target node is not 
         *         reachable from the source node.
         */
        synchronized List<N> getPath() {
            if (touchNode == null) {
                if (sharedProgressLogger != null) {
                    sharedProgressLogger.onTargetUnreachable(source, target);
                }

                return new ArrayList<>();
            }

            final ConcurrentMapWrapper<N, N> parentMapForward = 
                    forwardSearchState.getParentMap();

            final ConcurrentMapWrapper<N, N> parentMapBackward = 
                    backwardSearchState.getParentMap();

            final List<N> path = new ArrayList<>();

            N current = touchNode;

            while (current != null) {
                path.add(current);
                current = parentMapForward.get(current);
            }

            Collections.<String>reverse(path);
            current = parentMapBackward.get(touchNode);

            while (current != null) {
                path.add(current);
                current = parentMapBackward.get(current);
            }

            if (sharedProgressLogger != null) {
                sharedProgressLogger.onShortestPath(path);
            }

            return path;
        }
    }

    /**
     * This class holds all the state of a single search direction.
     */
    private static final class SearchState<N> {

        /**
         * This FIFO queue contains the queue of nodes reached but not yet 
         * expanded. It is called the <b>search frontier</b>.
         */
        private final ConcurrentQueueWrapper<N> queue = 
                new ConcurrentQueueWrapper(new ArrayDeque<>());

        /**
         * This map maps each discovered node to its predecessor on the shortest 
         * path.
         */
        private final ConcurrentMapWrapper<N, N> parents = 
                new ConcurrentMapWrapper<>();

        /**
         * This map maps each discovered node to its shortest path distance from
         * the source node.
         */
        private final ConcurrentMapWrapper<N, Integer> distance =
                new ConcurrentMapWrapper<>();

        /**
         * Caches the total number of threads working on the search direction 
         * specified by this state object.
         */
        private final int totalNumberOfThreads;

        /**
         * The set of all the threads working on this particular direction.
         */
        private final Set<StoppableThread> runningThreadSet = 
                Collections.<StoppableThread>
                        newSetFromMap(new ConcurrentHashMap<>());

        /**
         * The set of all <b>slave</b> threads that are currently sleeping.
         */
        private final Set<SleepingThread> sleepingThreadSet = 
                Collections.<SleepingThread>
                        newSetFromMap(new ConcurrentHashMap<>());

        /**
         * Constructs the search state object.
         * 
         * @param initialNode          the node from which the search begins. If
         *                             this state object is used in the forward
         *                             search, this node should be the source 
         *                             node. Otherwise, if this state object is
         *                             used in the backward search, this node
         *                             should be the target node.
         * @param totalNumberOfThreads the number of threads working on a 
         *                             particular search direction.
         */
        SearchState(final N initialNode, final int totalNumberOfThreads) {
            this.totalNumberOfThreads = totalNumberOfThreads;
            queue.enqueue(initialNode);
            parents.put(initialNode, null);
            distance.put(initialNode, 0);
        }

        /**
         * Returns the number of threads hold by this state objects.
         * 
         * @return the number of threads.
         */
        int getTotalNumberOfThreads() {
            return totalNumberOfThreads;
        }

        /**
         * Return the number of slave threads sleeping in the search direction
         * specified by this search state object.
         * 
         * @return the number of slave threads sleeping.
         */
        int getSleepingThreadCount() {
            return sleepingThreadSet.size();
        }

        /**
         * Returns the queue of the search frontier.
         * 
         * @return the queue of the search frontier.
         */
        ConcurrentQueueWrapper<N> getQueue() {
            return queue;
        }

        /**
         * Returns the map mapping each node to its parent.
         * 
         * @return the parent map.
         */
        ConcurrentMapWrapper<N, N> getParentMap() {
            return parents;
        }

        /**
         * Returns the map mapping each node to its shortest distance from the
         * starting node.
         * 
         * @return the distance map.
         */
        ConcurrentMapWrapper<N, Integer> getDistanceMap() {
            return distance;
        }

        /**
         * Introduces a new thread to this search direction.
         * 
         * @param thread the thread to introduce.
         */
        void introduceThread(final StoppableThread thread) {
            runningThreadSet.add(thread);
        }

        /**
         * Asks the argument thread to go to sleep and adds it to the set of
         * sleeping slave threads.
         * 
         * @param thread the <b>slave</b> thread to hibernate.
         */
        void putThreadToSleep(final SleepingThread thread) {
            sleepingThreadSet.add(thread);
            thread.putThreadToSleep(true);
        }

        /**
         * Wakes up all the sleeping slave threads.
         */
        void wakeupAllThreads() {
            for (final SleepingThread thread : sleepingThreadSet) {
                thread.putThreadToSleep(false);
            }

            sleepingThreadSet.clear();
        }

        /**
         * Tells all the thread working on current direction to exit so that the
         * threads may be joined.
         */
        void requestThreadsToExit() {
            for (final StoppableThread thread : runningThreadSet) {
                thread.requestThreadToExit();
            }
        }
    }

    /**
     * This abstract class defines a thread that may be asked to terminate.
     */
    private abstract static class StoppableThread extends Thread {

        /**
         * If set to {@code true}, this thread should exit.
         */
        protected volatile boolean exit;

        /**
         * Sends a request to finish the work.
         */
        void requestThreadToExit() {
            exit = true;
        }
    }

    /**
     * This abstract class defines a thread that may be asked to go to sleep.
     */
    private abstract static class SleepingThread extends StoppableThread {

        /**
         * Holds the flag indicating whether this thread is put to sleep.
         */
        protected volatile boolean sleepRequested;

        /**
         * The number of milliseconds to sleep during each hibernation.
         */
        protected final int threadSleepDuration;

        /**
         * The maximum number of times a master thread hibernates itself before
         * giving up and terminating the entire search.
         */
        protected final int threadSleepTrials;

        /**
         * Constructs this thread supporting sleeping.
         * 
         * @param threadSleepDuration the number of milliseconds to sleep each 
         *                            time.
         * @param threadSleepTrials   the maximum number of trials to hibernate
         *                            a master thread before giving up.
         */
        SleepingThread(final int threadSleepDuration,
                       final int threadSleepTrials) {
            this.threadSleepDuration = threadSleepDuration;
            this.threadSleepTrials   = threadSleepTrials;
        }

        /**
         * Sets the current sleep status of this thread.
         * 
         * @param toSleep indicates whether to put this thread to sleep or 
         *                wake it up.
         */
        void putThreadToSleep(final boolean toSleep) {
            this.sleepRequested = toSleep;
        }
    }

    /**
     * This class defines all the state that should appear in threads working in
     * both search direction.
     * 
     * @param <N> the actual node type.
     */
    private abstract static class SearchThread<N> extends SleepingThread {

        /**
         * The ID of this thread.
         */
        protected final int id;

        /**
         * Holds the reference to the class responsible for computing the 
         * neighbor nodes of a given node.
         */
        protected final AbstractNodeExpander<N> nodeExpander;

        /**
         * The entire state of this search thread, shared possibly with other
         * threads working on the same search direction.
         */
        protected final SearchState<N> searchState;

        /**
         * The state shared by both the directions.
         */
        protected final SharedSearchState<N> sharedSearchState;

        /**
         * Indicates whether this thread is a master or a slave thread.
         */
        protected final boolean isMasterThread;

        /**
         * The progress logger.
         */
        protected final ProgressLogger<N> searchProgressLogger;

        /**
         * Caches the amount of nodes expanded by this thread.
         */
        protected int numberOfExpandedNodes;

        /**
         * Construct this search thread.
         * 
         * @param id                   the ID number of this thread. Must be
         *                             unique over <b>all</b> search threads.
         * @param nodeExpander         the node expander responsible for 
         *                             generating the neighbors in this search
         *                             thread.
         * @param searchState          the search state object.
         * @param sharedSearchState    the search state object shared with both
         *                             forward search threads and backward
         *                             search threads.
         * @param isMasterThread       indicates whether this search thread is a
         *                             master thread or a slave thread.
         * @param searchProgressLogger the progress logger for the search 
         *                             direction of this search thread.
         * @param threadSleepDuration  the duration of sleeping in milliseconds
         *                             always when a thread finds the frontier 
         *                             queue empty.
         * @param threadSleepTrials    the maximum number of hibernation trials
         *                             before a master thread gives up and 
         *                             terminates the entire search process.
         */
        SearchThread(final int id,
                     final AbstractNodeExpander<N> nodeExpander,
                     final SearchState<N> searchState, 
                     final SharedSearchState<N> sharedSearchState,
                     final boolean isMasterThread,
                     final ProgressLogger<N> searchProgressLogger,
                     final int threadSleepDuration,
                     final int threadSleepTrials) {
            super(threadSleepDuration, threadSleepTrials);
            this.id                   = id;
            this.nodeExpander         = nodeExpander;
            this.searchState          = searchState;
            this.sharedSearchState    = sharedSearchState;
            this.isMasterThread       = isMasterThread;
            this.searchProgressLogger = searchProgressLogger;
        }

        /**
         * {@inheritDoc }
         */
        @Override
        public boolean equals(final Object other) {
            if (other == null) {
                return false;
            }

            if (!getClass().equals(other.getClass())) {
                return false;
            }

            return id == ((SearchThread) other).id;
        }

        /**
         * {@inheritDoc }
         */
        @Override
        public int hashCode() {
            return id;
        }

        /**
         * {@inheritDoc }
         */
        public String toString() {
            return "[Thread ID: " + id + "]";
        }

        /**
         * Returns the number of nodes expanded by this search thread.
         * 
         * @return the number of nodes.
         */
        int getNumberOfExpandedNodes() {
            return numberOfExpandedNodes;
        }
    }

    /**
     * This class implements a search thread searching in forward direction.
     */
    private static final class ForwardSearchThread<N> extends SearchThread<N> {

        /**
         * Constructs a forward search thread.
         * 
         * @param id                   the ID of this thread. Must be unique 
         *                             over <b>all</b> search threads.
         * @param nodeExpander         the node expander responsible for 
         *                             generating the neighbor nodes of a given
         *                             node.
         * @param searchState          the search state object.
         * @param sharedSearchState    the shared search state object.
         * @param isMasterThread       indicates whether this thread is a master
         *                             or a slave thread.
         * @param searchProgressLogger the progress logger for logging the 
         *                             progress of this thread.
         * @param threadSleepDuration  the number of milliseconds to sleep 
         *                             whenever a thread finds the frontier 
         *                             queue empty.
         * @param threadSleepTrials    the maximum number of times a master
         *                             thread hibernates itself before giving 
         *                             up.
         */
        ForwardSearchThread(
                final int id,
                final AbstractNodeExpander<N> nodeExpander,
                final SearchState<N> searchState, 
                final SharedSearchState<N> sharedSearchState,
                final boolean isMasterThread,
                final ProgressLogger<N> searchProgressLogger,
                final int threadSleepDuration,
                final int threadSleepTrials) {
            super(id,
                  nodeExpander,
                  searchState, 
                  sharedSearchState,
                  isMasterThread,
                  searchProgressLogger,
                  threadSleepDuration,
                  threadSleepTrials);
        }

        @Override
        public void run() {
            final ConcurrentQueueWrapper<N> QUEUE = searchState.getQueue();
            final ConcurrentMapWrapper<N, N> PARENTS   = 
                    searchState.getParentMap();

            final ConcurrentMapWrapper<N, Integer> DISTANCE = 
                    searchState.getDistanceMap();

            while (true) {
                if (exit) {
                    // This thread is asked to exit:
                    return;
                }

                if (sleepRequested) {
                    // Only a slave thread may get here. Just sleep and then
                    // reiterate.
                    mysleep(threadSleepDuration);
                    continue;
                }

                N current = QUEUE.dequeue();

                if (current == null) {
                    if (isMasterThread) {
                        for (int trials = 0; 
                             trials < threadSleepTrials; 
                             trials++) {
                            // Only a master thread may get here.
                            mysleep(threadSleepDuration);

                            if ((current = QUEUE.dequeue()) != null) {
                                break;
                            }
                        }

                        if (current == null) {
                            sharedSearchState.requestExit();
                            return;
                        } else {
                            searchState.wakeupAllThreads();
                        }
                    } else {
                        // This thread is a slave thread, make it sleep:
                        searchState.putThreadToSleep(this);
                        putThreadToSleep(true);
                        continue;
                    }
                } else if (!QUEUE.isEmpty()) {
                    searchState.wakeupAllThreads();
                }

                // Possibly log the expansion of a node:
                if (searchProgressLogger != null) {
                    searchProgressLogger.onExpansion(current);
                }

                // Possibly improve the shortest path so far:
                sharedSearchState.updateSearchState(current);

                // If the current path is guaranteed to be optimal, terminate
                // the entire search so that the threads may be joined and the 
                // path constructed:
                if (sharedSearchState.pathIsOptimal()) {
                    sharedSearchState.requestExit();
                    return;
                }

                numberOfExpandedNodes++; 

                // Expand the current node:
                for (final N child : nodeExpander.expand(current)) {
                    if (!PARENTS.containsKey(child)) {
                        PARENTS.put(child, current);
                        DISTANCE.put(child, DISTANCE.get(current) + 1);
                        QUEUE.enqueue(child);

                        if (searchProgressLogger != null) {
                            searchProgressLogger.onNeighborGeneration(child);
                        }
                    }
                }
            }
        }
    }

    /**
     * This class implements a search thread searching in backward direction.
     */
    private static final class BackwardSearchThread<N> extends SearchThread<N> {

        /**
         * Constructs a backward search thread.
         * 
         * @param id                   the ID of this thread. Must be unique 
         *                             over <b>all</b> search threads.
         * @param nodeExpander         the node expander responsible for 
         *                             generating the neighbor nodes of a given
         *                             node.
         * @param searchState          the search state object.
         * @param sharedSearchState    the shared search state object.
         * @param isMasterThread       indicates whether this thread a master or
         *                             a slave thread.
         * @param searchProgressLogger the progress logger for logging the 
         *                             progress of this thread.
         * @param threadSleepDuration  the number of milliseconds to sleep 
         *                             whenever a slave thread finds the
         *                             frontier queue empty.
         * @param threadSleepTrials    the maximum number of times a master
         *                             thread hibernates itself before giving 
         *                             up.
         */
        BackwardSearchThread(final int id,
                             final AbstractNodeExpander<N> nodeExpander,
                             final SearchState<N> searchState, 
                             final SharedSearchState<N> sharedSearchState,
                             final boolean isMasterThread,
                             final ProgressLogger<N> searchProgressLogger,
                             final int threadSleepDuration,
                             final int threadSleepTrials) {
           super(id,
                 nodeExpander,
                 searchState,
                 sharedSearchState,
                 isMasterThread,
                 searchProgressLogger,
                 threadSleepDuration,
                 threadSleepTrials);
        }

        @Override
        public void run() {
            final ConcurrentQueueWrapper<N> QUEUE = searchState.getQueue();
            final ConcurrentMapWrapper<N, N> PARENTS = 
                    searchState.getParentMap();

            final ConcurrentMapWrapper<N, Integer> DISTANCE = 
                    searchState.getDistanceMap();

            while (true) {
                if (exit) {
                    // This thread is asked to exit:
                    return;
                }

                if (sleepRequested) {
                    // Only a slave thread may get here. Just sleep and then
                    // reiterate.
                    mysleep(threadSleepDuration);
                    continue;
                }

                N current = QUEUE.dequeue();

                if (current == null) {
                    if (isMasterThread) {
                        for (int trials = 0; 
                             trials < threadSleepTrials; 
                             trials++) {
                            mysleep(threadSleepDuration);

                            if ((current = QUEUE.dequeue()) != null) {
                                break;
                            }
                        }

                        if (current == null) {
                            sharedSearchState.requestExit();
                            return;
                        } else {
                            searchState.wakeupAllThreads();
                        }
                    } else {
                        // This thread is a slave thread, make it sleep:
                        searchState.putThreadToSleep(this);
                        putThreadToSleep(true);
                        continue;
                    }
                } else if (!QUEUE.isEmpty()) {
                    searchState.wakeupAllThreads();
                }

                // Possibly log the expansion of a node:
                if (searchProgressLogger != null) {
                    searchProgressLogger.onExpansion(current);
                }

                // Possibly improve the shortest path so far:
                sharedSearchState.updateSearchState(current);

                // If the current path is guaranteed to be optimal, terminate
                // the entire search so that the threads may be joined and the 
                // path constructed:
                if (sharedSearchState.pathIsOptimal()) {
                    sharedSearchState.requestExit();
                    return;
                }

                numberOfExpandedNodes++;

                // Expand the current node:
                for (final N parent : nodeExpander.expand(current)) {
                    if (!PARENTS.containsKey(parent)) {
                        PARENTS.put(parent, current);
                        DISTANCE.put(parent, DISTANCE.get(current) + 1);
                        QUEUE.enqueue(parent);

                        if (searchProgressLogger != null) {
                            searchProgressLogger.onNeighborGeneration(parent);
                        }
                    }
                }
            }
        }
    }

    /**
     * This class implements a concurrent {@link java.util.Map} wrapper. One 
     * reason to use this instead of a
     * {@link java.util.concurrent.ConcurrentHashMap} is that the latter does
     * not allow {@code null} <b>values</b>. However, we have to map the source
     * and the target nodes to {@code null}.
     * 
     * @param <K> the actual key type.
     * @param <V> the actual value type.
     */
    private static final class ConcurrentMapWrapper<K, V> {

        private final Map<K, V> map = new HashMap<>();

        synchronized boolean containsKey(final K key) {
            return map.containsKey(key);
        }

        // Unlike java.util.concurrent.ConcurrentHashMap, this map wrapper 
        // allows 'null' values:
        synchronized void put(final K key, final V value) {
            map.put(key, value);
        }

        synchronized V get(final K key) {
            return map.get(key);
        }
    }

    /**
     * This class implements a concurrent {@link java.util.Deque} wrapper. We 
     * need this to be able to implement {@code dequeue} atomically.
     * 
     * @param <N> the actual element type.
     */
    private static final class ConcurrentQueueWrapper<N> {

        private final Deque<N> queue;

        ConcurrentQueueWrapper(final Deque<N> queue) {
            this.queue = queue;
        }

        synchronized N dequeue() {
            if (queue.isEmpty()) {
                return null;
            }

            return queue.removeFirst();
        }

        synchronized void enqueue(final N node) {
            queue.addLast(node);
        }

        synchronized boolean isEmpty() {
            return queue.isEmpty();
        }

        synchronized N getHead() {
            return queue.isEmpty() ? null : queue.getFirst();
        }
    }

    /**
     * This method puts the calling thread to sleep for {@code milliseconds}
     * milliseconds.
     * 
     * @param milliseconds the number of milliseconds to sleep for.
     */
    private static void mysleep(final int milliseconds) {
        try {
            Thread.sleep(milliseconds);
        } catch (final InterruptedException ex) {

        }
    }
}

Demo

You can see the algorithm in action at https://wikipath.herokuapp.com/

Critique request

I want to improve the quality of my stack; please tell me anything that comes mind.

Component list so far

Your Answer

 

By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Browse other questions tagged or ask your own question.