The code here will be directly pasted from this project. Quick summary: it is related to a Stack Overflow question.
So, basically, all the code below aims to implement CharSequence
over a large text file, within the limitation of CharSequence
itself (that is, up to Integer.MAX_VALUE
chars). All the code below is extracted from the same package.
The process is as follows:
- a
TextDecoder
is issued for one file which (supposedly) contains text; it will decode byte sequences into char sequences; - it instantiates one instance of a
DecodingStatus
; this class will keep track of how many characters were successfully decoded so far, and if necessary will report an error instead; - processes "waiting" for a given amount of characters will create a
CharWaiter
instance; they will be queues into theDecoderStatus
only if their requirement (character offset) has not been meant; - should a decoding error happen, the
DecodingStatus
instance will terminate allCharWaiter
s for an instance.
OK; so, first things first: the code works and is mostly tested; my problem is that it is ugly. In particular, in DecodingStatus
, all methods except one are synchronized. Also, error "recovery" code is duplicated in several places.
How can I improve upon this code? In particular, can I avoid making (nearly) all methods in DecodingStatus
synchronized
? Can I avoid duplicating error checking?
First, the CharWaiter
:
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.PriorityQueue;
import java.util.concurrent.CountDownLatch;
/**
* A waiter on a number of available characters in a {@link TextDecoder}
*
* <p>When it is woken up, it will check for the status of the operation; it
* will throw a {@link RuntimeException} if the decoding operation fails, or it
* has waited to more characters than what is actually available.</p>
*
* <p>It implements {@link Comparable} since instances of this class are used in
* a {@link PriorityQueue}.</p>
*
* <p>Inspired from <a href="http://stackoverflow.com/a/22055231/1093528">this
* StackOverflow answer</a>.</p>
*
* @see DecodingStatus
* @see TextDecoder#needChars(int)
*/
final class CharWaiter
implements Comparable<CharWaiter>
{
private final int required;
private final CountDownLatch latch = new CountDownLatch(1);
private int nrChars = 0;
private IOException exception = null;
CharWaiter(final int required)
{
if (required < 0)
throw new ArrayIndexOutOfBoundsException(required);
this.required = required;
}
void setNrChars(final int nrChars)
{
this.nrChars = nrChars;
}
void setException(final IOException exception)
{
this.exception = exception;
}
int getRequired()
{
return required;
}
void await()
throws InterruptedException
{
latch.await();
if (exception != null)
throw new RuntimeException("decoding error", exception);
if (nrChars < required)
throw new ArrayIndexOutOfBoundsException(required);
}
void wakeUp()
{
latch.countDown();
}
@Override
public int compareTo(@Nonnull final CharWaiter o)
{
return Integer.compare(required, o.required);
}
@Override
public String toString()
{
return "waiting for " + required + " character(s)";
}
}
Then, the DecodingStatus
:
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
/**
* The watchdog class for a text decoding operation
*
* <p>This class takes care of {@link CharWaiter}s and callers to {@link
* TextDecoder#getTotalChars()}.</p>
*
* <p>The decoding process in {@link TextDecoder} will update the internal
* status of this object when the decoding operation makes progress; on an
* update, this class will wake up the relevant waiters.</p>
*
* <p>In the event of an error, all waiters are woken up.</p>
*
* @see CharWaiter
*/
@ThreadSafe
final class DecodingStatus
{
private boolean finished = false;
private int nrChars = -1;
private IOException exception = null;
private final Queue<CharWaiter> waiters = new PriorityQueue<>();
private final CountDownLatch endLatch = new CountDownLatch(1);
synchronized boolean addWaiter(final CharWaiter waiter)
{
if (exception != null)
throw new RuntimeException("decoding error", exception);
final int required = waiter.getRequired();
if (required <= nrChars)
return false;
if (!finished) {
waiters.add(waiter);
return true;
}
if (required > nrChars)
throw new ArrayIndexOutOfBoundsException(required);
return false;
}
synchronized void setNrChars(final int nrChars)
{
this.nrChars = nrChars;
CharWaiter waiter;
while (!waiters.isEmpty()) {
waiter = waiters.peek();
if (waiter.getRequired() > nrChars)
break;
waiter.setNrChars(nrChars);
waiters.remove().wakeUp();
}
}
synchronized void setFailed(final IOException exception)
{
this.exception = exception;
final List<CharWaiter> list = new ArrayList<>(waiters);
waiters.clear();
for (final CharWaiter waiter: list) {
waiter.setException(exception);
waiter.wakeUp();
}
endLatch.countDown();
}
synchronized void setFinished(final int nrChars)
{
finished = true;
this.nrChars = nrChars;
final List<CharWaiter> list = new ArrayList<>(waiters);
waiters.clear();
for (final CharWaiter waiter: list) {
waiter.setNrChars(nrChars);
waiter.wakeUp();
}
endLatch.countDown();
}
int getTotalSize()
{
try {
endLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted", e);
}
if (exception != null)
throw new RuntimeException("decoding error", exception);
return nrChars;
}
@Override
public synchronized String toString()
{
if (exception != null)
return "decoding error after reading " + nrChars + " character(s)";
return "currently decoded: " + nrChars + " character(s); finished: "
+ finished;
}
}
And finally, the TextDecoder
:
import com.github.fge.largetext.LargeText;
import com.github.fge.largetext.LargeTextFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
import java.io.IOException;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* Text file decoder
*
* <p>This is the first core class of this package (the second is {@link
* TextLoader}. Its role is to decode a text file chunk by chunk. The size of
* chunks to use is determined when you build your {@link LargeTextFactory}.</p>
*
* <p>{@link LargeText} will call upon this class to obtain a {@link TextRange}
* (or a list of them) containing the character at a given index (or the range
* of characters), by using the methods {@link #getRange(int)} and {@link
* #getRanges(com.google.common.collect.Range)} respectively.</p>
*
* <p>These methods are blocking, but they <em>do not</em> throw {@link
* InterruptedException}; if an interruption occurs, these methods reset the
* thread interruption status and throw the appropriate {@link
* RuntimeException} (for instance, an {@link ArrayIndexOutOfBoundsException} if
* the requested offset exceeds the number of characters in the file).</p>
*
* <p>Implementation note: this class uses a <em>single threaded</em> {@link
* ExecutorService} to perform the decoding operation. Decoding is not done in
* parallel, and cannot be, since it is not guaranteeed that a byte mapping can
* be decoded exactly to a character sequence (for instance, using UTF-8, the
* end of the mapping may contain one byte only of a three-byte sequence).</p>
*
* @see DecodingStatus
*/
@ThreadSafe
public final class TextDecoder
implements Closeable
{
private static final ThreadFactory THREAD_FACTORY
= new ThreadFactoryBuilder().setDaemon(true).build();
private final ExecutorService executor
= Executors.newSingleThreadExecutor(THREAD_FACTORY);
private final DecodingStatus status = new DecodingStatus();
@GuardedBy("ranges")
private final RangeMap<Integer, TextRange> ranges = TreeRangeMap.create();
private final FileChannel channel;
private final Charset charset;
private final long fileSize;
private final long targetMapSize;
/**
* Constructor; don't use directly!
*
* @param channel the {@link FileChannel} to the target file
* @param charset the character encoding to use
* @param targetMapSize the target byte mapping size
* @throws IOException error obtaining information on the channel
*/
public TextDecoder(final FileChannel channel, final Charset charset,
final long targetMapSize)
throws IOException
{
this.channel = channel;
fileSize = channel.size();
this.targetMapSize = targetMapSize;
this.charset = charset;
executor.submit(decodingTask());
}
/**
* Return the appropriate text range containing the character at the given
* offset
*
* @param charOffset the offset
* @return the appropriate {@link TextRange}
* @throws RuntimeException method has been interrupted, or a decoding error
* has occurred
* @throws ArrayIndexOutOfBoundsException offset requested is out of range
*/
public TextRange getRange(final int charOffset)
{
try {
needChars(charOffset + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
synchronized (ranges) {
return ranges.get(charOffset);
}
}
/**
* Return an ordered iterable of text ranges covering the requested range
*
* @param range the range
* @return the appropriate list of text ranges
* @throws RuntimeException method has been interrupted, or a decoding error
* has occurred
* @throws ArrayIndexOutOfBoundsException range is out of bounds for this
* decoder
*/
public List<TextRange> getRanges(final Range<Integer> range)
{
try {
needChars(range.upperEndpoint());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
final Collection<TextRange> ret;
synchronized (ranges) {
ret = ranges.subRangeMap(range).asMapOfRanges().values();
}
return ImmutableList.copyOf(ret);
}
/**
* Return the total number of characters in this decoder
*
* <p>This method sleeps until the decoding operation finishes (either
* successfully or with an error).</p>
*
* @return the total number of characters
* @throws RuntimeException method has been interrupted, or a decoding error
* has occurred
*
* @see DecodingStatus#getTotalSize()
*/
public int getTotalChars()
{
return status.getTotalSize();
}
@Override
public void close()
throws IOException
{
executor.shutdown();
}
private void needChars(final int needed)
throws InterruptedException
{
final CharWaiter waiter = new CharWaiter(needed);
if (status.addWaiter(waiter))
waiter.await();
}
// TODO: move to another class?
private Runnable decodingTask()
{
return new Runnable()
{
@Override
public void run()
{
final CharsetDecoder decoder = charset.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT);
final CharBuffer charMap
= CharBuffer.allocate((int) targetMapSize);
long byteOffset = 0L;
int charOffset = 0;
TextRange range;
while (byteOffset < fileSize) {
try {
range = nextRange(byteOffset, charOffset, decoder,
charMap);
if (range.getByteRange().isEmpty())
throw new IOException("unable to read file as text "
+ "starting from byte offset " + byteOffset);
} catch (IOException e) {
status.setFailed(e);
break;
}
byteOffset = range.getByteRange().upperEndpoint();
charOffset = range.getCharRange().upperEndpoint();
status.setNrChars(charOffset);
synchronized (ranges) {
ranges.put(range.getCharRange(), range);
}
}
status.setFinished(charOffset);
}
};
}
private TextRange nextRange(final long byteOffset, final int charOffset,
final CharsetDecoder decoder, final CharBuffer charMap)
throws IOException
{
long nrBytes = Math.min(targetMapSize, fileSize - byteOffset);
final MappedByteBuffer byteMap
= channel.map(FileChannel.MapMode.READ_ONLY, byteOffset, nrBytes);
charMap.rewind();
decoder.reset();
final CoderResult result = decoder.decode(byteMap, charMap, true);
// FIXME
if (result.isUnmappable())
result.throwException();
/*
* Incomplete byte sequence: in this case, the mapping position reflects
* what was actually read; change the mapping size
*/
if (result.isMalformed())
nrBytes = (long) byteMap.position();
return new TextRange(byteOffset, nrBytes, charOffset,
charMap.position());
}
}