Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions splunk/com/splunk/InsertRootElementFilterInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
package com.splunk;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.Callable;

/**
* Takes an InputStream containing a UTF-8 encoded XML document containing one or more
Expand All @@ -31,22 +28,35 @@
* it is filtering.
*/
class InsertRootElementFilterInputStream extends FilterInputStream {
private static final int REREAD_BUFFER_SIZE = 512;
private static byte[] resultsTagBytes;
private final ByteArrayInputStream suffix = new ByteArrayInputStream("</doc>".getBytes("UTF-8"));
private ByteArrayInputStream beforeResultsBuffer;
private boolean wrotePrefix;

private byte[] oneByte = new byte[1];

static {
try {
resultsTagBytes = "results".getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
//should not be thrown because UTF-8 is supported
throw new RuntimeException(e);
}
}

InsertRootElementFilterInputStream(InputStream in) throws IOException {
// Wrap in with a pushback stream so we can write our modified version back
// onto the beginning of it.
super(new PushbackInputStream(in, 512));
super(new PushbackInputStream(in, REREAD_BUFFER_SIZE));

PushbackInputStream pin = (PushbackInputStream)this.in;

// Read bytes until we reach '>', then push everything we read, followed by "<doc>",
// back onto the stream. If we run out of input before we reach '>', then don't
// modify the stream.
ByteArrayOutputStream beforeResultsChars = new ByteArrayOutputStream();
ByteArrayOutputStream atResultsChars = new ByteArrayOutputStream();
beforeResultsBuffer = new ByteArrayInputStream(new byte[0]);

int ch;
while (true) {
Expand All @@ -57,32 +67,19 @@ class InsertRootElementFilterInputStream extends FilterInputStream {
pin.unread(beforeResultsChars.toByteArray());
return;
} else if (ch == (int)'<') {
// Try extending
atResultsChars.reset();
int ech;
boolean matched = true;
for (byte b : "results".getBytes("UTF-8")) {
ech = this.in.read();
atResultsChars.write(ech);
if (ech != b) {
// Extension failed. Put the bytes back on and search again.
pin.unread(atResultsChars.toByteArray());
matched = false;
break;
}
}
boolean resultsTag = isResultsTag(pin);

if (matched) {
if (resultsTag) {
// If we reach here, the extension succeeded, so we insert <doc>, unread everything,
// and return.

// Unread the match.
pin.unread(atResultsChars.toByteArray());
pin.unread(InsertRootElementFilterInputStream.resultsTagBytes);
// Unread the opening '<' that led to our extension
pin.unread(ch);
// Add a '<doc>' element to our read charactes and unread them.
// Add a '<doc>' element to our read characters
beforeResultsChars.write("<doc>".getBytes("UTF-8"));
pin.unread(beforeResultsChars.toByteArray());
beforeResultsBuffer = new ByteArrayInputStream(beforeResultsChars.toByteArray());
wrotePrefix = true;
return;
} else {
Expand All @@ -96,9 +93,38 @@ class InsertRootElementFilterInputStream extends FilterInputStream {
}
}

private boolean isResultsTag(PushbackInputStream pin) throws IOException {
// Try extending
ByteArrayOutputStream atResultsChars = new ByteArrayOutputStream();
int ech;
boolean resultsTag = true;
for (byte b : resultsTagBytes) {
ech = this.in.read();
atResultsChars.write(ech);
if (ech != b) {
// Extension failed. Put the bytes back on and search again.
pin.unread(atResultsChars.toByteArray());
resultsTag = false;
break;
}
}
return resultsTag;
}

@Override
public int read(byte[] buffer, int offset, int length) throws IOException {
int result = in.read(buffer, offset, length);
// first we read from the buffer before the first results xml tag
int result = 0;
int availableFromBuffer = beforeResultsBuffer.available();
if (offset < availableFromBuffer) {
result = beforeResultsBuffer.read(buffer, offset, length);
if (length <= result) {
return result;
}
}

// then we read from the original input stream
result += in.read(buffer, offset+result, length-result);
if (result == -1 && wrotePrefix) {
// No more bytes to read from in, and we have written '<doc>' earlier in the stream
return suffix.read(buffer, offset, length);
Expand Down
Loading