We wrote a simple server earlier this month and were surprised how inefficient it was when under heavy load.
Following the Agile mantra of the simplistic thing that works, we used standard Java classes such as java.io.BufferedReader. To our surprise, this proved a very poor performer when compared to reading the raw InputStream. To illustrate, let's create an interface thus:
package com.henryp.io.stream;
import java.io.IOException;
public interface StreamReader {
public void readStream() throws IOException;
}
and have two implementing classes. The first is a simple implementation of BufferedReader:
package com.henryp.io.stream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
public class BufferedStreamReader implements StreamReader {
private final BufferedReader bufferedReader;
BufferedStreamReader(InputStream inputStream) {
Reader reader = new InputStreamReader(inputStream);
bufferedReader = new BufferedReader(reader);
}
public void readStream() throws IOException {
String line = bufferedReader.readLine();
while (line != null) {
line = bufferedReader.readLine();
}
}
}
The second is a little more sophisticated. It doesn't allow the creation of Strings but instead uses byte arrays. What's more, it caches these byte arrays so it can reuse them:
package com.henryp.io.stream;
import java.io.IOException;
import java.io.InputStream;
public class FixedBufferStreamReader implements StreamReader {
private static final int BUFFER_SIZE = 8192;
private final ThreadLocal buffers = new ThreadLocal();
private final InputStream inputStream;
FixedBufferStreamReader(InputStream inputStream) {
super();
this.inputStream = inputStream;
}
@Override
public void readStream() throws IOException {
byte[] buffer = getBufferForThread();
int read = inputStream.read(buffer);
while (read != -1) {
read = inputStream.read(buffer);
}
}
private byte[] getBufferForThread() {
byte[] buffer = buffers.get();
if (buffer == null) {
buffer = new byte[BUFFER_SIZE];
buffers.set(buffer);
}
return buffer;
}
}
Note, neither implementation actually parses the data. Obviously, using raw byte arrays will be harder but if you want better performance, you'll have to take that hit.
The test harness looks like this:
package com.henryp.io.stream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamReaderStressTest {
private static final int CONTENT_SIZE = 10000;
private final int numThreads = 100;
private final int numIterations = 200;
private final AtomicInteger readerIndex = new AtomicInteger(-1);
private final StreamReader[] streamReaders;
private final CountDownLatch countDownLatch = new CountDownLatch(numThreads);
public static void main(String[] args) throws InterruptedException {
long initialMemory = memory();
System.out.println("Initializing...");
StreamReaderStressTest app = new StreamReaderStressTest();
long memoryBeforeRun = memory();
System.out.println("Sleeping...");
Thread.sleep(4000L);
app.runTest();
long memoryAfterRun = memory();
System.out.println("Memory consumed during setup: " + (initialMemory - memoryBeforeRun) + " bytes"
+ "\nChange in memory during test: " + (memoryBeforeRun - memoryAfterRun) + " bytes");
}
private static long memory() {
System.gc();
long freeMemory = Runtime.getRuntime().freeMemory();
System.out.println("Free memory: " + (freeMemory / (1024L*1024L)) + " MiB");
return freeMemory;
}
StreamReaderStressTest() {
InputStream[] inputStreams = initialize();
streamReaders = new StreamReader[inputStreams.length];
for (int i = 0 ; i < inputStreams.length ; i++) {
streamReaders[i] =
//new FixedBufferStreamReader(inputStreams[i]); // 13ms
new BufferedStreamReader(inputStreams[i]); // 367ms
}
}
private void runTest() {
System.out.println("About to start test...");
StreamReadingThread[] readers = new StreamReadingThread[numThreads];
Thread[] threads = new Thread[numThreads];
for (int i = 0 ; i < numThreads ; i++) {
readers[i] = new StreamReadingThread();
threads[i] = new Thread(readers[i]);
threads[i].start();
}
waitForThreadsToEnd(threads);
outputStats(readers);
}
private void outputStats(StreamReadingThread[] readers) {
long totalTime = 0;
for (int i = 0 ; i < readers.length ; i++) {
totalTime += readers[i].getDuration();
}
System.out.println("\nAverage time per thred " + (totalTime / (numThreads)) + "ms\n");
}
private void waitForThreadsToEnd(Thread[] threads) {
for (int i = 0 ; i < threads.length ; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class StreamReadingThread implements Runnable {
private volatile long duration = 0;
@Override
public void run() {
getReady();
long start = System.currentTimeMillis();
try {
for (int i = 0 ; i < numIterations ; i++) {
streamReaders[readerIndex.incrementAndGet()].readStream();
}
duration = System.currentTimeMillis() - start;
} catch (IOException x) {
x.printStackTrace();
}
}
private void getReady() {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long getDuration() {
return duration;
}
}
private InputStream[] initialize() {
int numStreams = numThreads * numIterations;
InputStream[] inputStreams = new InputStream[numStreams];
String stringForStream = createArbitraryString(CONTENT_SIZE);
for (int i = 0 ; i < numStreams ; i++) {
inputStreams[i] = new ByteArrayInputStream(stringForStream.getBytes());
}
return inputStreams;
}
private String createArbitraryString(int length) {
StringBuffer szb = new StringBuffer();
for (int i = 0 ; i < length ; i++) {
szb.append("A");
}
return szb.toString();
}
}
Note:
Over 5 runs each, the use of BufferedReader yielded a mean time of 1813ms whereas using the raw InputStream took a mean time of 380ms.
A clue for why this is the case is in the source code for BufferedReader. There is a liberal sprinkling of System.arraycopy().
Furthermore, the amount of memory used by the BufferedReaders was about 4 times greater than the InputStreams (roughly 700MB versus 185MB). This is not surprising when you look at the code for BufferedReader. It creates byte arrays on instantiation.
Greater efficiency using the raw InputStream was achieved by using a smaller size for the byte array. Reducing it to 1024 bytes gave a mean time of 217ms (although with a greater standard deviation).
Looking at the source code for GlassFish's Grizzly project, you can see ByteBuffers being cached to read the data from a SocketChannel (see the method org.glassfish.grizzly.nio.transport.TCPNIOUtils.readSimpleByteBuffer(...) for more information. The test org.glassfish.grizzly.http.server.HttpInputStreamsTest.testCharacter010() executes this code).
[Aside: to make Grizzly into an Eclipse project, you need to run:
mvn org.apache.maven.plugins:maven-eclipse-plugin:2.6:eclipse
See here for why.]
Following the Agile mantra of the simplistic thing that works, we used standard Java classes such as java.io.BufferedReader. To our surprise, this proved a very poor performer when compared to reading the raw InputStream. To illustrate, let's create an interface thus:
package com.henryp.io.stream;
import java.io.IOException;
public interface StreamReader {
public void readStream() throws IOException;
}
and have two implementing classes. The first is a simple implementation of BufferedReader:
package com.henryp.io.stream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
public class BufferedStreamReader implements StreamReader {
private final BufferedReader bufferedReader;
BufferedStreamReader(InputStream inputStream) {
Reader reader = new InputStreamReader(inputStream);
bufferedReader = new BufferedReader(reader);
}
public void readStream() throws IOException {
String line = bufferedReader.readLine();
while (line != null) {
line = bufferedReader.readLine();
}
}
}
The second is a little more sophisticated. It doesn't allow the creation of Strings but instead uses byte arrays. What's more, it caches these byte arrays so it can reuse them:
package com.henryp.io.stream;
import java.io.IOException;
import java.io.InputStream;
public class FixedBufferStreamReader implements StreamReader {
private static final int BUFFER_SIZE = 8192;
private final ThreadLocal
private final InputStream inputStream;
FixedBufferStreamReader(InputStream inputStream) {
super();
this.inputStream = inputStream;
}
@Override
public void readStream() throws IOException {
byte[] buffer = getBufferForThread();
int read = inputStream.read(buffer);
while (read != -1) {
read = inputStream.read(buffer);
}
}
private byte[] getBufferForThread() {
byte[] buffer = buffers.get();
if (buffer == null) {
buffer = new byte[BUFFER_SIZE];
buffers.set(buffer);
}
return buffer;
}
}
Note, neither implementation actually parses the data. Obviously, using raw byte arrays will be harder but if you want better performance, you'll have to take that hit.
The test harness looks like this:
package com.henryp.io.stream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class StreamReaderStressTest {
private static final int CONTENT_SIZE = 10000;
private final int numThreads = 100;
private final int numIterations = 200;
private final AtomicInteger readerIndex = new AtomicInteger(-1);
private final StreamReader[] streamReaders;
private final CountDownLatch countDownLatch = new CountDownLatch(numThreads);
public static void main(String[] args) throws InterruptedException {
long initialMemory = memory();
System.out.println("Initializing...");
StreamReaderStressTest app = new StreamReaderStressTest();
long memoryBeforeRun = memory();
System.out.println("Sleeping...");
Thread.sleep(4000L);
app.runTest();
long memoryAfterRun = memory();
System.out.println("Memory consumed during setup: " + (initialMemory - memoryBeforeRun) + " bytes"
+ "\nChange in memory during test: " + (memoryBeforeRun - memoryAfterRun) + " bytes");
}
private static long memory() {
System.gc();
long freeMemory = Runtime.getRuntime().freeMemory();
System.out.println("Free memory: " + (freeMemory / (1024L*1024L)) + " MiB");
return freeMemory;
}
StreamReaderStressTest() {
InputStream[] inputStreams = initialize();
streamReaders = new StreamReader[inputStreams.length];
for (int i = 0 ; i < inputStreams.length ; i++) {
streamReaders[i] =
//new FixedBufferStreamReader(inputStreams[i]); // 13ms
new BufferedStreamReader(inputStreams[i]); // 367ms
}
}
private void runTest() {
System.out.println("About to start test...");
StreamReadingThread[] readers = new StreamReadingThread[numThreads];
Thread[] threads = new Thread[numThreads];
for (int i = 0 ; i < numThreads ; i++) {
readers[i] = new StreamReadingThread();
threads[i] = new Thread(readers[i]);
threads[i].start();
}
waitForThreadsToEnd(threads);
outputStats(readers);
}
private void outputStats(StreamReadingThread[] readers) {
long totalTime = 0;
for (int i = 0 ; i < readers.length ; i++) {
totalTime += readers[i].getDuration();
}
System.out.println("\nAverage time per thred " + (totalTime / (numThreads)) + "ms\n");
}
private void waitForThreadsToEnd(Thread[] threads) {
for (int i = 0 ; i < threads.length ; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class StreamReadingThread implements Runnable {
private volatile long duration = 0;
@Override
public void run() {
getReady();
long start = System.currentTimeMillis();
try {
for (int i = 0 ; i < numIterations ; i++) {
streamReaders[readerIndex.incrementAndGet()].readStream();
}
duration = System.currentTimeMillis() - start;
} catch (IOException x) {
x.printStackTrace();
}
}
private void getReady() {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long getDuration() {
return duration;
}
}
private InputStream[] initialize() {
int numStreams = numThreads * numIterations;
InputStream[] inputStreams = new InputStream[numStreams];
String stringForStream = createArbitraryString(CONTENT_SIZE);
for (int i = 0 ; i < numStreams ; i++) {
inputStreams[i] = new ByteArrayInputStream(stringForStream.getBytes());
}
return inputStreams;
}
private String createArbitraryString(int length) {
StringBuffer szb = new StringBuffer();
for (int i = 0 ; i < length ; i++) {
szb.append("A");
}
return szb.toString();
}
}
Note:
- All initialization is done up-front.
- A 4 second pause is implemented before the test so there is no interference from biased locking being turned off at start up (see here for a full description).
- Run the JVM with the arguments -Xmx1024m -Xmn1024m so the memory is fully initialized to begin with.
Over 5 runs each, the use of BufferedReader yielded a mean time of 1813ms whereas using the raw InputStream took a mean time of 380ms.
A clue for why this is the case is in the source code for BufferedReader. There is a liberal sprinkling of System.arraycopy().
Furthermore, the amount of memory used by the BufferedReaders was about 4 times greater than the InputStreams (roughly 700MB versus 185MB). This is not surprising when you look at the code for BufferedReader. It creates byte arrays on instantiation.
Greater efficiency using the raw InputStream was achieved by using a smaller size for the byte array. Reducing it to 1024 bytes gave a mean time of 217ms (although with a greater standard deviation).
Looking at the source code for GlassFish's Grizzly project, you can see ByteBuffers being cached to read the data from a SocketChannel (see the method org.glassfish.grizzly.nio.transport.TCPNIOUtils.readSimpleByteBuffer(...) for more information. The test org.glassfish.grizzly.http.server.HttpInputStreamsTest.testCharacter010() executes this code).
[Aside: to make Grizzly into an Eclipse project, you need to run:
mvn org.apache.maven.plugins:maven-eclipse-plugin:2.6:eclipse
See here for why.]
No comments:
Post a Comment