Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-4270]Delete segment expect remain_number #4203

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val REGISTER = carbonKeyWord("REGISTER")
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
protected val REFRESH = carbonKeyWord("REFRESH")
protected val EXPECT = carbonKeyWord("EXPECT")
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")

// For materialized view
// Keywords used in this parser
Expand Down Expand Up @@ -353,4 +355,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
p.getClass.getSimpleName.equals("FloatLit") ||
p.getClass.getSimpleName.equals("DecimalLit")
}) ^^ (_.chars)

protected lazy val number: Parser[Int] = numericLit ^^ (_.toInt)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.management

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, withEvents}

/**
* A command for delete by remaining number.
* In general, keep the latest segment.
*
* @param remaining expected remaining quantity after deletion
*/
case class CarbonDeleteLoadByRemainNumberCommand(
remaining: Int,
databaseNameOp: Option[String],
tableName: String)
extends DataCommand {

override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("remaining number" -> remaining.toString))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

// if insert overwrite in progress, do not allow delete segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this pr do not support insert overwrite

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table cannot be updated when deleted


val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)

var deleteSegmentIds = List[String]()
if (carbonTable.isHivePartitionTable) {
segments.map(segment =>
(CarbonStore.getPartitions(carbonTable.getTablePath, segment), segment))
.groupBy(m => m._1)
.foreach(elem => {
val ids = elem._2.map(p => p._2).filter(segment =>
segment.getSegmentStatus == SegmentStatus.SUCCESS ||
segment.getSegmentStatus == SegmentStatus.COMPACTED)
.sortBy(_.getLoadStartTime)
.map(_.getLoadName)
.reverse
.drop(remaining).toList
deleteSegmentIds = List.concat(deleteSegmentIds, ids)
})
} else {
deleteSegmentIds = segments.filter(segment =>
segment.getSegmentStatus == SegmentStatus.SUCCESS ||
segment.getSegmentStatus == SegmentStatus.COMPACTED)
.sortBy(_.getLoadStartTime)
.map(_.getLoadName)
.reverse
.drop(remaining).toList
}

if (deleteSegmentIds.isEmpty) {
return Seq.empty
}

withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
CarbonStore.deleteLoadById(
deleteSegmentIds,
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
carbonTable
)
}
Seq.empty
}

override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {

protected lazy val segmentManagement: Parser[LogicalPlan] =
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
showSegments
showSegments | deleteSegmentByRemainNumber

protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn

Expand Down Expand Up @@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> number) <~
opt(";") ^^ {
case dbName ~ tableName ~ remaining =>
CarbonDeleteLoadByRemainNumberCommand(remaining, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.deletesegment

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

/**
* test class for testing the delete segment expect remaining number.
*/
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
val DELETED_STATUS = "Marked for Delete"

val SUCCESSFUL_STATUS = "Success"

override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
}

override def beforeEach(): Unit = {
sql("drop table if exists deleteSegmentPartitionTable")
sql("drop table if exists deleteSegmentTable")
sql("drop table if exists indexTable")
sql(
"CREATE table deleteSegmentPartitionTable (ID int, date String, country String, name " +
"String, phonetype String, serialname String, salary String) STORED AS carbondata " +
"PARTITIONED by(age int)"
)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='30')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='30')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

sql(
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
"String, phonetype String, serialname String, salary String) STORED AS carbondata"
)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

sql("create index indexTable on table deleteSegmentTable(country) as 'carbondata'" +
"properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test cases with SI as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. i have add test cases with SI

override def afterAll(): Unit = {
sql("drop table if exists deleteSegmentTable")
sql("drop table if exists deleteSegmentPartitionTable")
sql("drop table if exists indexTable")
}

test("delete segment, remain_number = 1") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val segments1 = sql("show segments on deleteSegmentTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments1(0).get(1))
assertResult(DELETED_STATUS)(segments1(1).get(1))
assertResult(DELETED_STATUS)(segments1(2).get(1))
assertResult(sql("select * from indexTable").count())(10)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments2(0).get(1))
assertResult(SUCCESSFUL_STATUS)(segments2(1).get(1))
assertResult(DELETED_STATUS)(segments2(2).get(1))
assertResult(DELETED_STATUS)(segments2(3).get(1))
}

test("delete segment, remain nothing") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
val segments1 = sql("show segments on deleteSegmentTable").collect()
segments1.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
assertResult(sql("select * from indexTable").count())(0)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 0")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
segments2.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
}

test("delete segment, remain all") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
val segments1 = sql("show segments on deleteSegmentTable").collect()
segments1.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
assertResult(sql("select * from indexTable").count())(30)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 3")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
segments2.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
}

test("delete segment, remain_number is invalid") {
val ex1 = intercept[Exception] {
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
}
assert(ex1.getMessage.contains("not found"))
val ex2 = intercept[Exception] {
sql("delete from table deleteSegmentTable expect segment.remain_number = 2147483648")
}
assert(ex2.getMessage.contains("SqlParse"))
assertResult(sql("select * from indexTable").count())(30)

val ex3 = intercept[Exception] {
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = -1")
}
assert(ex3.getMessage.contains("not found"))
val ex4 = intercept[Exception] {
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 2147483648")
}
assert(ex4.getMessage.contains("SqlParse"))
}

test("delete segment after delete newest segment by segmentId") {
sql("delete from table deleteSegmentTable where segment.id in (2)")
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val segments1 = sql("show segments on deleteSegmentTable").collect()
assertResult(DELETED_STATUS)(segments1(0).get(1))
assertResult(SUCCESSFUL_STATUS)(segments1(1).get(1))
assertResult(DELETED_STATUS)(segments1(2).get(1))
assertResult(sql("select * from indexTable").count())(10)

W1thOut marked this conversation as resolved.
Show resolved Hide resolved
sql("delete from table deleteSegmentPartitionTable where segment.id in (2)")
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
sql("show segments on deleteSegmentPartitionTable").show()
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments2(0).get(1))
assertResult(DELETED_STATUS)(segments2(1).get(1))
assertResult(DELETED_STATUS)(segments2(2).get(1))
assertResult(SUCCESSFUL_STATUS)(segments2(3).get(1))
}

test("delete segment by partition id") {
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='30')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='40')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='40')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
val segments = sql("show segments on deleteSegmentPartitionTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments(0).get(1))
assertResult(SUCCESSFUL_STATUS)(segments(2).get(1))
assertResult(SUCCESSFUL_STATUS)(segments(3).get(1))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need some after all function to make sure the table has been removed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have define function "beforeEach" to drop table

}