Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I wanted to write some more JNI code and decided to (re)implement a semaphore for MacOSX. In order to assert somehow correctness of my implementation I hereby supply a (multi)consumer/(multi)producer demo.

semaphore_impl.h (autogenerated by javah):

#include <jni.h>

#ifndef MACOSX_SEMAPHORE_IMPL_H
#define MACOSX_SEMAPHORE_IMPL_H

#ifdef __cplusplus
extern "C" {
#endif

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv*, 
                                                            jobject, 
                                                            jint);

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv*, jobject);

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv*, jobject);

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv*, jobject);

#ifdef __cplusplus
}
#endif

#endif // MACOSX_SEMAPHORE_IMPL_H

semaphore_impl.cpp:

#include "semaphore_impl.h"
#include <mach/task.h>
#include <mach/semaphore.h>
#include <mach/mach.h>

static const char* CLASS = "net/coderodde/util/concurrent/MacOSXSemaphoreImpl";
static const char* FIELD = "semaphoreHandle";

static semaphore_t get_semaphore_handle(JNIEnv* env, jobject obj)
{
    jclass clazz = env->FindClass(CLASS);
    jfieldID fid = env->GetFieldID(clazz, FIELD, "I");
    return env->GetIntField(obj, fid);
}

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv* env, 
                                                            jobject obj, 
                                                            jint count)
{
    // Create a handle to a semaphore.
    semaphore_t semaphore;
    semaphore_create(current_task(), &semaphore, SYNC_POLICY_FIFO, count);

    // Get to the 'semaphoreHandle' field.
    const jclass clazz = 
          env->FindClass(CLASS);

    const jfieldID semaphore_field_id = env->GetFieldID(clazz, FIELD, "I");

    // Store the value of the semaphore to the Java semaphore object.
    env->SetIntField(obj, semaphore_field_id, semaphore);
}

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv* env, 
                                                             jobject obj)
{
    semaphore_destroy(get_semaphore_handle(env, obj), current_task());
}

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv* env, 
                                                            jobject obj)
{
    semaphore_wait(get_semaphore_handle(env, obj));
}

JNIEXPORT void JNICALL 
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv* env, 
                                                              jobject obj)
{
    semaphore_signal(get_semaphore_handle(env, obj));
}

Makefile:

libsemaphore.jnilib: semaphore_impl.o
    g++ -dynamiclib -o libsemaphore.jnilib semaphore_impl.o
    cp libsemaphore.jnilib /usr/local/lib

semaphore_impl.o: semaphore_impl.cpp
    g++ -std=c++11 -O3 -I/Developer/SDKs/MacOSX10.6.sdk/System/Library/Frameworks/JavaVM.framework/Versions/A/Headers/ -c semaphore_impl.cpp

MacOSXSemaphoreImpl.java:

package net.coderodde.util.concurrent;

import java.io.File;

/**
 * This class implements the semaphore type interfacing with MacOSX.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6
 */
final class MacOSXSemaphoreImpl implements SemaphoreImpl {

    static {
        try {
            // Putting the .jnilib into working directory is
            // a safe choice. 'System.getProperty("user.dir")'.
            System.loadLibrary("semaphore");
        } catch (UnsatisfiedLinkError error) {
            error.printStackTrace();
            System.exit(1);
        }
    }

    /**
     * Holds the handle to a semaphore.
     */
    private int semaphoreHandle;

    /**
     * Constructs a semaphore with <code>counter</code> permits.
     * 
     * @param counter the amount of permits.
     */
    MacOSXSemaphoreImpl(int counter) {
        init(counter);
    }

    /**
     * Creates the semaphore and loads its handle into <code>semaphoreId</code>.
     */
    @Override
    public native void init(int counter);

    /**
     * Releases resources.
     */
    @Override
    public native void clean();

    /**
     * Acquires this semaphore. If the current counter of this semaphore is
     * zero, the calling thread is blocked.
     */
    @Override
    public native void lock();

    /**
     * Releases this semaphore. Effectively increments the counter of this 
     * semaphore so that other threads may acquire this semaphore.
     */
    @Override
    public native void unlock();

    /**
     * Release the resources associated with this semaphore.
     */
    @Override
    protected void finalize() {
        try {
            super.finalize();
        } catch (Throwable t) {}

        clean();
    }
}

SemaphoreImpl.java:

package net.coderodde.util.concurrent;

/**
 * This package-private interface defines the API for semaphore implementation 
 * types.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6
 */
interface SemaphoreImpl {

    /**
     * Initialization routine.
     * 
     * @param counter the amount of threads that can pass without blocking.
     */
    void init(int counter);

    /**
     * Releases all the resources.
     */
    void clean();

    /**
     * Locks the implementing semaphore.
     */
    void lock();

    /**
     * Unlocks the implementing semaphore.
     */
    void unlock();
}

Semaphore.java:

package net.coderodde.util.concurrent;

/**
 * This class implements a semaphore type ported to MacOSX.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6
 */
public class Semaphore {

    /**
     * Holds the actual native implementation of this semaphore.
     */
    private final SemaphoreImpl impl;

    /**
     * Constructs a new semaphore with <code>counter</code> permits.
     * 
     * @param counter the amount of threads that can lock this semaphore without
     *                blocking.
     */
    public Semaphore(int counter) {
        checkCounter(counter);
        this.impl = new MacOSXSemaphoreImpl(counter);
    }

    /**
     * Acquires this semaphore.
     */
    public void lock() {
        impl.lock();
    }

    /**
     * Releases this semaphore.
     */
    public void unlock() {
        impl.unlock();
    }

    /**
     * Checks the sanity of <code>counter</code>.
     * 
     * @param counter the initial amount of permits.
     */
    private static void checkCounter(int counter) {
        if (counter < 0) {
            throw new IllegalArgumentException(
                    "The semaphore counter too small: " + counter + ", " +
                    "should be at least 0.");
        }
    }
}

Demo.java:

package net.coderodde.util.concurrent;

import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;

/**
 * This class implements a demonstration for the semaphore.
 * 
 * @author Rodion "rodde" Efremov
 * @version 1.6
 */
public class Demo {

    /**
     * This character denotes so called "poison pill" for communicating to the 
     * consumers that they should exit.
     */
    private static final Character TERMINATION_SENTINEL = '\u2622';

    /**
     * Implements a consumer thread.
     */
    static class Consumer extends Thread {

        /**
         * The buffer to consume from.
         */
        private final ConcurrentBuffer<Character> buffer;

        /**
         * Constructs this consumer thread.
         * 
         * @param buffer the concurrent buffer to consume from.
         * @param id     the ID of this consumer thread.
         */
        Consumer(ConcurrentBuffer<Character> buffer, int id) {
            this.buffer = buffer;
            this.setName("Consumer " + id);
        }

        /**
         * The actual code of this consumer thread.
         */
        @Override
        public void run() {
            for (;;) {
                final Character c = buffer.remove();

                if (c.equals(TERMINATION_SENTINEL)) {
                    // We have a poison pill. Put it back in the buffer and 
                    // terminate this thread.
                    buffer.add(c);
                    return;
                }
            }
        }
    }

    /**
     * This class implements producer threads.
     */
    static class Producer extends Thread {

        /**
         * The concurrent set holding all active producers.
         */
        private Set<Producer> activeProducers;

        /**
         * The actual concurrent buffer to produce to.
         */
        private final ConcurrentBuffer<Character> buffer;

        /**
         * Constructs this producer thread.
         * 
         * @param buffer the buffer for producing items.
         * @param id     the ID of this producer thread.
         */
        Producer(ConcurrentBuffer<Character> buffer, int id) {
            this.buffer = buffer;
            this.setName("Producer " + id);
        }

        /**
         * Sets the set of active producer threads.
         * 
         * @param set a set of threads.
         */
        void setProducerSet(Set<Producer> set) {
            activeProducers = set;
        }

        /**
         * The actual code for this producer thread.
         */
        @Override
        public void run() {
            final Random rnd = new Random();

            for (int i = 0; i < 50; ++i) {
                final Character c = (char)('A' + rnd.nextInt(26));
                buffer.add(c);
            }

            activeProducers.remove(this);

            if (activeProducers.isEmpty()) {
                // The last thread terminates the consumers.
                buffer.add(TERMINATION_SENTINEL);
            }
        }
    }

    /**
     * Implements a concurrent buffer queue. 
     * 
     * @param <E> the actual type of elements.
     */
    static class ConcurrentBuffer<E> {

        /**
         * The default capacity of this buffer.
         */
        private static final int DEFAULT_CAPACITY = 20;

        /**
         * A binary semaphore (mutex) for synchronizing the access to internals
         * of this buffer.
         */
        private final Semaphore mutex;

        /**
         * Guards against the empty buffer.
         */
        private final Semaphore fillCount;

        /**
         * Guards against the full buffer.
         */
        private final Semaphore emptyCount;

        /**
         * The actual storage array.
         */
        private final Object[] storage;

        /**
         * The index of the head element.
         */
        private int index;

        /**
         * The size of this buffer.
         */
        private int size;

        /**
         * Constructs this buffer.
         */
        ConcurrentBuffer() {
            this.mutex = new Semaphore(1);
            this.fillCount = new Semaphore(0);
            this.emptyCount = new Semaphore(DEFAULT_CAPACITY);
            this.storage = new Object[DEFAULT_CAPACITY];
        }

        /**
         * Appends <code>element</code> to the tail of this buffer. If this 
         * buffer is full, blocks the calling thread until some space becomes
         * available.
         * 
         * @param element the element to append.
         */
        void add(E element) {
            emptyCount.lock();
            mutex.lock();

            storage[(index + size) % storage.length] = element;
            ++size;

            System.out.println(Thread.currentThread().getName() + " produced " +
                               element + ": " + this);

            mutex.unlock();
            fillCount.unlock();
        }

        /**
         * Removes the element at the head of this buffer. If this buffer is
         * empty, blocks the calling thread until some content appears in this
         * buffer.
         * 
         * @return the element at the head of this buffer.
         */
        E remove() {
            fillCount.lock();
            mutex.lock();

            final E ret = (E) storage[index % storage.length];
            index = (index + 1) % storage.length;
            --size;

            System.out.println(Thread.currentThread().getName() + " consumed " +
                               ret + ": " + this);

            mutex.unlock();
            emptyCount.unlock();

            return ret;
        }

        /**
         * Returns the string representation of the contents of this buffer. 
         * This method is not synchronized.
         * 
         * @return a string.
         */
        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("[");

            for (int i = index, j = 0; 
                    j < size; 
                    ++j, i = (i + 1) % storage.length) {
                sb.append(storage[i]);

                if (j < size - 1) {
                    sb.append(", ");
                }
            }

            return sb.append("]").toString();
        }
    }

    /**
     * Implements a demonstration.
     * 
     * @param consumerAmount the amount of consumers to use.
     * @param producerAmount the amount of producers to use.
     */
    public static void run(int consumerAmount, int producerAmount) {
        final Consumer[] consumers = new Consumer[consumerAmount];
        final Producer[] producers = new Producer[producerAmount];
        final ConcurrentBuffer<Character> buffer = new ConcurrentBuffer<>();

        for (int i = 0; i < consumerAmount; ++i) {
            consumers[i] = new Consumer(buffer, i);
        }

        for (int i = 0; i < producerAmount; ++i) {
            producers[i] = new Producer(buffer, i);
        }

        final Set<Producer> producerSet = new HashSet<>(producerAmount);

        for (final Producer p : producers) {
            producerSet.add(p);
        }

        final Set<Producer> synchronizedSet = 
                Collections.synchronizedSet(producerSet);

        for (final Producer p : producers) {
            p.setProducerSet(synchronizedSet);
        }

        for (final Producer p : producers) {
            p.start();
        }

        for (final Consumer c : consumers) {
            c.start();
        }
    }

    public static void main(String... args) {
        run(2, 3);
    }
}

So, what do you think?

share|improve this question

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.