4

I have written a program that sorts an array with multiple threads by splitting the array into equal chunks and sorting in individual threads with a bubble sort. I have then used a merging algorithm to combine the two arrays.

I would like to compare this program to one which sorts an array using Streams. My question is if I pass an array into a stream how would I go about splitting, sorting and merging to execute the sort in parallel but through the use of parallel streams rather than creating my own threads/runnables etc.

Any ideas?

6
  • 5
    Why not use Arrays.parallelSort?
    – Tunaki
    Feb 23, 2016 at 14:23
  • This was an option, but then it would not match the algorithm I have used in my implementation using threads. I guess I could change the sort algorithm to just use Arrays.Sort and then merge the resultsfor my explicit program, then for the implicit just use Arrays.Parallelsort ??
    – jjharrison
    Feb 23, 2016 at 14:27
  • 3
    Why are you trying to write your own sorting algorithm in parallel to begin with? Is the built-in one not efficient enough for your use-case?
    – Tunaki
    Feb 23, 2016 at 14:29
  • 3
    Not sure I understand. If you already have a sort algorithm, you can just compare this one with Arrays.parallelSort and find out the one that suits the most. You can use a benchmarking framework like JMH. Or if you're asking how to build such a algorithm, I'm afraid it is too-broad.
    – Tunaki
    Feb 23, 2016 at 14:33
  • 4
    btw, why bubble sort!
    – stinepike
    Feb 23, 2016 at 16:28

2 Answers 2

3

I assume that your problem is purely educational and experimental without any practical application as there are much more efficient ways to sort elements in Java. If you want to utilize the Stream API here, you may create a spliterator which performs bubble sorting and collector which performs merge sorting in combiner.

Here's spliterator:

static class BubbleSpliterator<T> implements Spliterator<T> {
    private final Comparator<? super T> cmp;
    private final Spliterator<T> source;
    private T[] data;
    private int offset;

    public BubbleSpliterator(Spliterator<T> source, Comparator<? super T> cmp) {
        this.source = source;
        this.cmp = cmp;
    }

    @SuppressWarnings("unchecked")
    private void init() {
        if (data != null)
            return;
        Stream.Builder<T> buf = Stream.builder();
        source.forEachRemaining(buf);
        data = (T[]) buf.build().toArray();
        bubble(data, cmp);
    }

    private static <T> void bubble(T[] data, Comparator<? super T> cmp) {
        for (int i = 0; i < data.length - 1; i++)
            for (int j = i + 1; j < data.length; j++) {
                if (cmp.compare(data[i], data[j]) > 0) {
                    T tmp = data[i];
                    data[i] = data[j];
                    data[j] = tmp;
                }
            }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        init();
        if (offset >= data.length)
            return false;
        action.accept(data[offset++]);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        init();
        for (int i = offset; i < data.length; i++)
            action.accept(data[i]);
        offset = data.length;
    }

    @Override
    public Spliterator<T> trySplit() {
        if (data != null)
            return null;
        Spliterator<T> prefix = source.trySplit();
        return prefix == null ? null : new BubbleSpliterator<>(prefix, cmp);
    }

    @Override
    public long estimateSize() {
        if (data != null)
            return data.length - offset;
        return source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source.characteristics();
    }

    public static <T> Stream<T> stream(Stream<T> source, 
                                       Comparator<? super T> comparator) {
        Spliterator<T> spltr = source.spliterator();
        return StreamSupport.stream(new BubbleSpliterator<>(spltr, comparator), 
               source.isParallel()).onClose(source::close);
    }
}

It takes source, delegates splitting to the source, but when elements are requested it dumps the source elements to array and performs a bubble sorting for them. You may check it like this:

int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    Collectors.toList());
System.out.println(list);

The result depends on number of hardware threads on your machine and may look like this:

[254, 313, 588, 847, 904, 985, 434, 473, 569, 606, 748, 978, 234, 262, 263, 317, 562, 592, 99, 189, 310,...]

Here you can see that output consists of several sorted sequences. The number of such sequences corresponds to the number of parallel tasks Stream API creates.

Now to combine sorted sequences via merge sorting you may write a special collector like this:

static <T> List<T> merge(List<T> l1, List<T> l2, Comparator<? super T> cmp) {
    List<T> result = new ArrayList<>(l1.size()+l2.size());
    int i=0, j=0;
    while(i < l1.size() && j < l2.size()) {
        if(cmp.compare(l1.get(i), l2.get(j)) <= 0) {
            result.add(l1.get(i++));
        } else {
            result.add(l2.get(j++));
        }
    }
    result.addAll(l1.subList(i, l1.size()));
    result.addAll(l2.subList(j, l2.size()));
    return result;
}

static <T> Collector<T, ?, List<T>> mergeSorting(Comparator<? super T> cmp) {
    return Collector.<T, List<T>> of(ArrayList::new, List::add, 
                                     (l1, l2) -> merge(l1, l2, cmp));
}

In sequential more it works just like the Collectors.toList(), but in parallel it performs merge sorting assuming that both input lists are already sorted. My mergeSorting implementation is probably suboptimal, you may write something better.

So to sort everything via Stream API, you can use BubbleSpliterator and mergeSorting collector together:

int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
    mergeSorting(comparator));
System.out.println(list);

The result will be completely sorted.

This implementation performs unnecessary copying of input data several times, so I guess, custom bubble+merge implementation could beat this one in terms of performance.

1

if you want to sort an array in parallel using Java 8 Stream API, this may help you :

IntStream randomIntegers = ThreadLocalRandom.current().ints(100, 0, 100);
int[] sortedArray = randomIntegers
        .parallel() // (1)
        .sorted() // (2)
        .toArray();
System.out.println(Arrays.toString(sortedArray));

no matter what type of Stream you have, just invoke parallel() and then sorted(). (the order of invocation is not important)

by tracing the code we find that :

final class SortedOps {

    private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
        //...

        @Override
        public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                                       Spliterator<P_IN> spliterator,
                                                       IntFunction<Integer[]> generator) {
            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
                return helper.evaluate(spliterator, false, generator);
            } else {
                Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

                int[] content = n.asPrimitiveArray();
                Arrays.parallelSort(content); // <== this

                return Nodes.node(content);
            }
        }
    }

    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        //...

        @Override
        public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
                                                 Spliterator<P_IN> spliterator,
                                                 IntFunction<T[]> generator) {
            // If the input is already naturally sorted and this operation
            // naturally sorts then collect the output
            if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
                return helper.evaluate(spliterator, false, generator);
            }
            else {
                // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
                T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
                Arrays.parallelSort(flattenedData, comparator); // <== this
                return Nodes.node(flattenedData);
            }
        }
    }

}

Arrays.parallelSort() is used to sort the backing array.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.