Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import java.io.IOException;

/**
* This is a special exception.
* If this exception is thrown, it means that the connection to the server is abnormal.
* We need to kill the connection actively.
*/
public class ConnectionException extends IOException {
public ConnectionException(String message) {
super(message);
}

public ConnectionException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.mysql;

import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
Expand Down Expand Up @@ -401,11 +402,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
long start = System.currentTimeMillis();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
TimeUnit.SECONDS);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
long duration = System.currentTimeMillis() - start;
throw new ConnectionException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "], duration: " + duration + " ms");
}
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NotImplementedException;
Expand Down Expand Up @@ -198,9 +199,11 @@ protected void auditAfterExec(String origStmt, StatementBase parsedStmt,
}

// only throw an exception when there is a problem interacting with the requesting client
protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) throws ConnectionException {
try {
executeQuery(mysqlCommand, originStmt);
} catch (ConnectionException exception) {
throw exception;
} catch (Exception ignored) {
// saved use handleQueryException
}
Expand Down Expand Up @@ -414,14 +417,18 @@ private String convertOriginStmt(String originStmt) {

// Use a handler for exception to avoid big try catch block which is a little hard to understand
protected void handleQueryException(Throwable throwable, String origStmt,
StatementBase parsedStmt, Data.PQueryStatistics statistics) {
StatementBase parsedStmt, Data.PQueryStatistics statistics) throws ConnectionException {
if (ctx.getMinidump() != null) {
MinidumpUtils.saveMinidumpString(ctx.getMinidump(), DebugUtil.printId(ctx.queryId()));
}
if (throwable instanceof IOException) {
if (throwable instanceof ConnectionException) {
// Throw this exception to close the connection outside.
LOG.warn("Process one query failed because ConnectionException: ", throwable);
throw (ConnectionException) throwable;
} else if (throwable instanceof IOException) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", throwable);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed");
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed: " + throwable.getMessage());
} else if (throwable instanceof UserException) {
LOG.warn("Process one query failed because.", throwable);
ctx.getState().setError(((UserException) throwable).getMysqlErrorCode(), throwable.getMessage());
Expand Down Expand Up @@ -479,7 +486,7 @@ protected List<StatementBase> parse(String originStmt) throws AnalysisException,

// Get the column definitions of a table
@SuppressWarnings("rawtypes")
protected void handleFieldList(String tableName) {
protected void handleFieldList(String tableName) throws ConnectionException {
// Already get command code.
if (Strings.isNullOrEmpty(tableName)) {
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty tableName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.MysqlColType;
import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.MysqlChannel;
Expand Down Expand Up @@ -245,7 +246,7 @@ private void handleExecute() {
}

// Process COM_QUERY statement,
private void handleQuery(MysqlCommand mysqlCommand) {
private void handleQuery(MysqlCommand mysqlCommand) throws ConnectionException {
// convert statement to Java string
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
Expand Down Expand Up @@ -307,7 +308,7 @@ private void dispatch() throws IOException {
}
}

private void handleFieldList() {
private void handleFieldList() throws ConnectionException {
String tableName = new String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
handleFieldList(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.service.arrowflight;

import org.apache.doris.analysis.Expr;
import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void prepare(MysqlCommand command) {
ctx.setStartTime();
}

public void handleQuery(String query) {
public void handleQuery(String query) throws ConnectionException {
MysqlCommand command = MysqlCommand.COM_QUERY;
prepare(command);

Expand Down