I wanted to write some more JNI code and decided to (re)implement a semaphore for MacOSX. In order to assert somehow correctness of my implementation I hereby supply a (multi)consumer/(multi)producer demo.
semaphore_impl.h (autogenerated by javah
):
#include <jni.h> #ifndef MACOSX_SEMAPHORE_IMPL_H #define MACOSX_SEMAPHORE_IMPL_H #ifdef __cplusplus extern "C" { #endif JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv*, jobject, jint); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv*, jobject); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv*, jobject); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv*, jobject); #ifdef __cplusplus } #endif #endif // MACOSX_SEMAPHORE_IMPL_H
semaphore_impl.cpp:
#include "semaphore_impl.h"
#include <mach/task.h>
#include <mach/semaphore.h>
#include <mach/mach.h>
static const char* CLASS = "net/coderodde/util/concurrent/MacOSXSemaphoreImpl";
static const char* FIELD = "semaphoreHandle";
static semaphore_t get_semaphore_handle(JNIEnv* env, jobject obj)
{
jclass clazz = env->FindClass(CLASS);
jfieldID fid = env->GetFieldID(clazz, FIELD, "I");
return env->GetIntField(obj, fid);
}
JNIEXPORT void JNICALL
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv* env,
jobject obj,
jint count)
{
// Create a handle to a semaphore.
semaphore_t semaphore;
semaphore_create(current_task(), &semaphore, SYNC_POLICY_FIFO, count);
// Get to the 'semaphoreHandle' field.
const jclass clazz =
env->FindClass(CLASS);
const jfieldID semaphore_field_id = env->GetFieldID(clazz, FIELD, "I");
// Store the value of the semaphore to the Java semaphore object.
env->SetIntField(obj, semaphore_field_id, semaphore);
}
JNIEXPORT void JNICALL
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv* env,
jobject obj)
{
semaphore_destroy(get_semaphore_handle(env, obj), current_task());
}
JNIEXPORT void JNICALL
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv* env,
jobject obj)
{
semaphore_wait(get_semaphore_handle(env, obj));
}
JNIEXPORT void JNICALL
Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv* env,
jobject obj)
{
semaphore_signal(get_semaphore_handle(env, obj));
}
Makefile:
libsemaphore.jnilib: semaphore_impl.o
g++ -dynamiclib -o libsemaphore.jnilib semaphore_impl.o
cp libsemaphore.jnilib /usr/local/lib
semaphore_impl.o: semaphore_impl.cpp
g++ -std=c++11 -O3 -I/Developer/SDKs/MacOSX10.6.sdk/System/Library/Frameworks/JavaVM.framework/Versions/A/Headers/ -c semaphore_impl.cpp
MacOSXSemaphoreImpl.java:
package net.coderodde.util.concurrent;
import java.io.File;
/**
* This class implements the semaphore type interfacing with MacOSX.
*
* @author Rodion "rodde" Efremov
* @version 1.6
*/
final class MacOSXSemaphoreImpl implements SemaphoreImpl {
static {
try {
// Putting the .jnilib into working directory is
// a safe choice. 'System.getProperty("user.dir")'.
System.loadLibrary("semaphore");
} catch (UnsatisfiedLinkError error) {
error.printStackTrace();
System.exit(1);
}
}
/**
* Holds the handle to a semaphore.
*/
private int semaphoreHandle;
/**
* Constructs a semaphore with <code>counter</code> permits.
*
* @param counter the amount of permits.
*/
MacOSXSemaphoreImpl(int counter) {
init(counter);
}
/**
* Creates the semaphore and loads its handle into <code>semaphoreId</code>.
*/
@Override
public native void init(int counter);
/**
* Releases resources.
*/
@Override
public native void clean();
/**
* Acquires this semaphore. If the current counter of this semaphore is
* zero, the calling thread is blocked.
*/
@Override
public native void lock();
/**
* Releases this semaphore. Effectively increments the counter of this
* semaphore so that other threads may acquire this semaphore.
*/
@Override
public native void unlock();
/**
* Release the resources associated with this semaphore.
*/
@Override
protected void finalize() {
try {
super.finalize();
} catch (Throwable t) {}
clean();
}
}
SemaphoreImpl.java:
package net.coderodde.util.concurrent;
/**
* This package-private interface defines the API for semaphore implementation
* types.
*
* @author Rodion "rodde" Efremov
* @version 1.6
*/
interface SemaphoreImpl {
/**
* Initialization routine.
*
* @param counter the amount of threads that can pass without blocking.
*/
void init(int counter);
/**
* Releases all the resources.
*/
void clean();
/**
* Locks the implementing semaphore.
*/
void lock();
/**
* Unlocks the implementing semaphore.
*/
void unlock();
}
Semaphore.java:
package net.coderodde.util.concurrent;
/**
* This class implements a semaphore type ported to MacOSX.
*
* @author Rodion "rodde" Efremov
* @version 1.6
*/
public class Semaphore {
/**
* Holds the actual native implementation of this semaphore.
*/
private final SemaphoreImpl impl;
/**
* Constructs a new semaphore with <code>counter</code> permits.
*
* @param counter the amount of threads that can lock this semaphore without
* blocking.
*/
public Semaphore(int counter) {
checkCounter(counter);
this.impl = new MacOSXSemaphoreImpl(counter);
}
/**
* Acquires this semaphore.
*/
public void lock() {
impl.lock();
}
/**
* Releases this semaphore.
*/
public void unlock() {
impl.unlock();
}
/**
* Checks the sanity of <code>counter</code>.
*
* @param counter the initial amount of permits.
*/
private static void checkCounter(int counter) {
if (counter < 0) {
throw new IllegalArgumentException(
"The semaphore counter too small: " + counter + ", " +
"should be at least 0.");
}
}
}
Demo.java:
package net.coderodde.util.concurrent;
import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
/**
* This class implements a demonstration for the semaphore.
*
* @author Rodion "rodde" Efremov
* @version 1.6
*/
public class Demo {
/**
* This character denotes so called "poison pill" for communicating to the
* consumers that they should exit.
*/
private static final Character TERMINATION_SENTINEL = '\u2622';
/**
* Implements a consumer thread.
*/
static class Consumer extends Thread {
/**
* The buffer to consume from.
*/
private final ConcurrentBuffer<Character> buffer;
/**
* Constructs this consumer thread.
*
* @param buffer the concurrent buffer to consume from.
* @param id the ID of this consumer thread.
*/
Consumer(ConcurrentBuffer<Character> buffer, int id) {
this.buffer = buffer;
this.setName("Consumer " + id);
}
/**
* The actual code of this consumer thread.
*/
@Override
public void run() {
for (;;) {
final Character c = buffer.remove();
if (c.equals(TERMINATION_SENTINEL)) {
// We have a poison pill. Put it back in the buffer and
// terminate this thread.
buffer.add(c);
return;
}
}
}
}
/**
* This class implements producer threads.
*/
static class Producer extends Thread {
/**
* The concurrent set holding all active producers.
*/
private Set<Producer> activeProducers;
/**
* The actual concurrent buffer to produce to.
*/
private final ConcurrentBuffer<Character> buffer;
/**
* Constructs this producer thread.
*
* @param buffer the buffer for producing items.
* @param id the ID of this producer thread.
*/
Producer(ConcurrentBuffer<Character> buffer, int id) {
this.buffer = buffer;
this.setName("Producer " + id);
}
/**
* Sets the set of active producer threads.
*
* @param set a set of threads.
*/
void setProducerSet(Set<Producer> set) {
activeProducers = set;
}
/**
* The actual code for this producer thread.
*/
@Override
public void run() {
final Random rnd = new Random();
for (int i = 0; i < 50; ++i) {
final Character c = (char)('A' + rnd.nextInt(26));
buffer.add(c);
}
activeProducers.remove(this);
if (activeProducers.isEmpty()) {
// The last thread terminates the consumers.
buffer.add(TERMINATION_SENTINEL);
}
}
}
/**
* Implements a concurrent buffer queue.
*
* @param <E> the actual type of elements.
*/
static class ConcurrentBuffer<E> {
/**
* The default capacity of this buffer.
*/
private static final int DEFAULT_CAPACITY = 20;
/**
* A binary semaphore (mutex) for synchronizing the access to internals
* of this buffer.
*/
private final Semaphore mutex;
/**
* Guards against the empty buffer.
*/
private final Semaphore fillCount;
/**
* Guards against the full buffer.
*/
private final Semaphore emptyCount;
/**
* The actual storage array.
*/
private final Object[] storage;
/**
* The index of the head element.
*/
private int index;
/**
* The size of this buffer.
*/
private int size;
/**
* Constructs this buffer.
*/
ConcurrentBuffer() {
this.mutex = new Semaphore(1);
this.fillCount = new Semaphore(0);
this.emptyCount = new Semaphore(DEFAULT_CAPACITY);
this.storage = new Object[DEFAULT_CAPACITY];
}
/**
* Appends <code>element</code> to the tail of this buffer. If this
* buffer is full, blocks the calling thread until some space becomes
* available.
*
* @param element the element to append.
*/
void add(E element) {
emptyCount.lock();
mutex.lock();
storage[(index + size) % storage.length] = element;
++size;
System.out.println(Thread.currentThread().getName() + " produced " +
element + ": " + this);
mutex.unlock();
fillCount.unlock();
}
/**
* Removes the element at the head of this buffer. If this buffer is
* empty, blocks the calling thread until some content appears in this
* buffer.
*
* @return the element at the head of this buffer.
*/
E remove() {
fillCount.lock();
mutex.lock();
final E ret = (E) storage[index % storage.length];
index = (index + 1) % storage.length;
--size;
System.out.println(Thread.currentThread().getName() + " consumed " +
ret + ": " + this);
mutex.unlock();
emptyCount.unlock();
return ret;
}
/**
* Returns the string representation of the contents of this buffer.
* This method is not synchronized.
*
* @return a string.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("[");
for (int i = index, j = 0;
j < size;
++j, i = (i + 1) % storage.length) {
sb.append(storage[i]);
if (j < size - 1) {
sb.append(", ");
}
}
return sb.append("]").toString();
}
}
/**
* Implements a demonstration.
*
* @param consumerAmount the amount of consumers to use.
* @param producerAmount the amount of producers to use.
*/
public static void run(int consumerAmount, int producerAmount) {
final Consumer[] consumers = new Consumer[consumerAmount];
final Producer[] producers = new Producer[producerAmount];
final ConcurrentBuffer<Character> buffer = new ConcurrentBuffer<>();
for (int i = 0; i < consumerAmount; ++i) {
consumers[i] = new Consumer(buffer, i);
}
for (int i = 0; i < producerAmount; ++i) {
producers[i] = new Producer(buffer, i);
}
final Set<Producer> producerSet = new HashSet<>(producerAmount);
for (final Producer p : producers) {
producerSet.add(p);
}
final Set<Producer> synchronizedSet =
Collections.synchronizedSet(producerSet);
for (final Producer p : producers) {
p.setProducerSet(synchronizedSet);
}
for (final Producer p : producers) {
p.start();
}
for (final Consumer c : consumers) {
c.start();
}
}
public static void main(String... args) {
run(2, 3);
}
}
So, what do you think?