Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/customer.tbl.gz")
INTO TABLE customer
COLUMNS TERMINATED BY "|"
(c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,temp)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
delete from customer where c_custkey > 1500 ;
delete from customer where c_custkey > 1500000 ;
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CREATE TABLE IF NOT EXISTS `customer` (
UNIQUE KEY (`c_custkey`)
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 10
PROPERTIES (
"function_column.sequence_type" = 'int',
"function_column.sequence_col" = 'c_custkey',
"compression"="zstd",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/date.tbl.gz")
INTO TABLE date
COLUMNS TERMINATED BY "|"
(d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,temp)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
delete from `date` where d_datekey >= '19920701' and d_datekey <= '19920731';
delete from `date` where d_datekey >= '19950702';
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CREATE TABLE IF NOT EXISTS `date` (
UNIQUE KEY (`d_datekey`)
DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1
PROPERTIES (
"function_column.sequence_type" = 'int',
"function_column.sequence_col" = 'd_datekey',
"compression"="zstd",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/lineorder.tbl.*.gz")
INTO TABLE lineorder
COLUMNS TERMINATED BY "|"
(lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,temp)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
delete from lineorder where lo_orderkey >= 240001 and lo_orderkey <= 360000;
delete from lineorder where lo_orderkey >= 300013154;
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PARTITION p1997 VALUES [("19970101"), ("19980101")),
PARTITION p1998 VALUES [("19980101"), ("19990101")))
DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 48
PROPERTIES (
"function_column.sequence_type" = 'int',
"function_column.sequence_col" = 'lo_orderkey',
"compression"="zstd",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/part.tbl.gz")
INTO TABLE part
COLUMNS TERMINATED BY "|"
(p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,temp)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
delete from `part` where p_partkey > 10000;
delete from `part` where p_partkey > 700000;
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS `part` (
UNIQUE KEY (`p_partkey`)
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 10
PROPERTIES (
"function_column.sequence_type" = 'int',
"function_column.sequence_col" = 'p_partkey',
"compression"="zstd",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/supplier.tbl.gz")
INTO TABLE supplier
COLUMNS TERMINATED BY "|"
(s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,temp)
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
delete from `supplier` where s_suppkey > 100;
delete from `supplier` where s_suppkey > 100000;
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE IF NOT EXISTS `supplier` (
UNIQUE KEY (`s_suppkey`)
DISTRIBUTED BY HASH(`s_suppkey`) BUCKETS 10
PROPERTIES (
"function_column.sequence_type" = 'int',
"function_column.sequence_col" = 's_suppkey',
"compression"="zstd",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,92 +20,88 @@
// and modified by Doris.

suite("load_four_step") {
def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000, "c_custkey", 1500],
def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000, 1500000],
"lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572, "lo_orderkey", 481137],
"part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000, "p_partkey", 10000],
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902, 300018949],
"part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000, 700000],
"date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth,
d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear,
d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255, "d_datekey", 224],
"supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200, "s_suppkey", 100]]
d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556, 1278],
"supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000, 100000]]

def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()

// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"

tables.each { tableName, rows ->
// create table
sql """ DROP TABLE IF EXISTS $tableName """
sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_sequence_create.sql""").text
for (j in 0..<2) {
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', rows[0]
set 'function_column.sequence_col', rows[2]

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz"""


time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows
// step 1: load data
// step 2: load all data for 3 times
for (j in 0..<2) {
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
// load data from cos
def loadLabel = tableName + '_' + uniqueID
def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ('CANCELLED'.equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ('FINISHED'.equalsIgnoreCase(loadState)) {
break
}
sleep(5000)
}
sql 'sync'
int flag = 1
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[1])
}
rowCount = sql "select count(*) from ${tableName}"
assertEquals(rows[1], rowCount[0][0])
}

// step 3: delete 50% data
sql """ set delete_without_partition = true; """
sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_part_delete.sql""").text
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[3])
assertTrue(loadRowCount[0][0] == rows[2])
}
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', rows[0]
set 'function_column.sequence_col', rows[2]

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz"""

time 10000 // limit inflight 10s
// step 4: load full data again
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def loadLabel = tableName + '_' + uniqueID
def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ('CANCELLED'.equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ('FINISHED'.equalsIgnoreCase(loadState)) {
break
}
sleep(5000)
}

sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,53 @@


suite("load_one_step") {
def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000],
def tables = ["customer": ["""c_custkey,c_name,c_address,c_city,c_nation,c_region,c_phone,c_mktsegment,no_use""", 3000000],
"lineorder": ["""lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600572],
"part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 20000],
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""", 600037902],
"part": ["""p_partkey,p_name,p_mfgr,p_category,p_brand,p_color,p_type,p_size,p_container,p_dummy""", 1400000],
"date": ["""d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth,
d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear,
d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 255],
"supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200]]
d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,d_dummy""", 2556],
"supplier": ["""s_suppkey,s_name,s_address,s_city,s_nation,s_region,s_phone,s_dummy""", 200000]]

def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()

// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"

def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { tableName, rows ->
// create table
sql """ DROP TABLE IF EXISTS $tableName """
sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_create.sql""").text
streamLoad {
table "${tableName}"
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', "${rows[0]}"

// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/ssb/sf0.1/${tableName}.tbl.gz"""

time 10000 // limit inflight 10s
// load data from cos
def loadLabel = tableName + '_' + uniqueID
def loadSql = new File("""${context.file.parentFile.parent}/ddl/${tableName}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ('CANCELLED'.equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ('FINISHED'.equalsIgnoreCase(loadState)) {
break
}
sleep(5000)
}
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[1])
}
rowCount = sql "select count(*) from ${tableName}"
assertEquals(rows[1], rowCount[0][0])
}
}
Loading