Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,25 @@ PROPERTIES
);
```

9. Create repository and delete snapshots if exists.

```sql
CREATE REPOSITORY `s3_repo`
WITH S3
ON LOCATION "s3://s3-repo"
PROPERTIES
(
"s3.endpoint" = "http://s3-REGION.amazonaws.com",
"s3.region" = "s3-REGION",
"s3.access_key" = "AWS_ACCESS_KEY",
"s3.secret_key"="AWS_SECRET_KEY",
"s3.region" = "REGION",
"delete_if_exists" = "true"
);
```

Note: only the s3 service supports the "delete_if_exists" property.

### Keywords

CREATE, REPOSITORY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,25 @@ PROPERTIES
);
```

9. 创建仓库并删除已经存在的 snapshot

```sql
CREATE REPOSITORY `s3_repo`
WITH S3
ON LOCATION "s3://s3-repo"
PROPERTIES
(
"s3.endpoint" = "http://s3-REGION.amazonaws.com",
"s3.region" = "s3-REGION",
"s3.access_key" = "AWS_ACCESS_KEY",
"s3.secret_key"="AWS_SECRET_KEY",
"s3.region" = "REGION",
"delete_if_exists" = "true"
);
```

注:目前只有 s3 支持 "delete_if_exists" 属性。

### Keywords

CREATE, REPOSITORY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;

public class CreateRepositoryStmt extends DdlStmt {
public static String PROP_DELETE_IF_EXISTS = "delete_if_exists";

private boolean isReadOnly;
private String name;
private StorageBackend storage;
Expand Down Expand Up @@ -71,6 +73,16 @@ public void analyze(Analyzer analyzer) throws UserException {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
FeNameFormat.checkCommonName("repository", name);

// check delete_if_exists, this property will be used by Repository.initRepository.
Map<String, String> properties = getProperties();
String deleteIfExistsStr = properties.get(PROP_DELETE_IF_EXISTS);
if (deleteIfExistsStr != null) {
if (!deleteIfExistsStr.equalsIgnoreCase("true") && !deleteIfExistsStr.equalsIgnoreCase("false")) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
"'" + PROP_DELETE_IF_EXISTS + "' in properties, you should set it false or true");
}
}
}

@Override
Expand Down
24 changes: 23 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.backup;

import org.apache.doris.analysis.CreateRepositoryStmt;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -215,6 +216,27 @@ public Status initRepository() {
if (FeConstants.runningUnitTest) {
return Status.OK;
}

// A temporary solution is to delete all stale snapshots before creating an S3 repository
// so that we can add regression tests about backup/restore.
//
// TODO: support hdfs/brokers
if (fileSystem instanceof S3FileSystem) {
String deleteStaledSnapshots = fileSystem.getProperties()
.getOrDefault(CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, "false");
if (deleteStaledSnapshots.equalsIgnoreCase("true")) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check "true" as true/false in analyze

// delete with prefix:
// eg. __palo_repository_repo_name/
String snapshotPrefix = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name));
LOG.info("property {} is set, delete snapshots with prefix: {}",
CreateRepositoryStmt.PROP_DELETE_IF_EXISTS, snapshotPrefix);
Status st = ((S3FileSystem) fileSystem).deleteDirectory(snapshotPrefix);
if (!st.ok()) {
return st;
}
}
}

String repoInfoFilePath = assembleRepoInfoFilePath();
// check if the repo is already exist in remote
List<RemoteFile> remoteFiles = Lists.newArrayList();
Expand Down Expand Up @@ -245,8 +267,8 @@ public Status initRepository() {
return new Status(ErrCode.COMMON_ERROR,
"failed to parse create time of repository: " + root.get("create_time"));
}
return Status.OK;

return Status.OK;
} catch (IOException e) {
return new Status(ErrCode.COMMON_ERROR, "failed to read repo info file: " + e.getMessage());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface ObjStorage<C> {

Status deleteObject(String remotePath);

Status deleteObjects(String remotePath);

Status copyObject(String origFilePath, String destFilePath);

RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
Expand All @@ -56,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class S3ObjStorage implements ObjStorage<S3Client> {
private static final Logger LOG = LogManager.getLogger(S3ObjStorage.class);
Expand Down Expand Up @@ -223,6 +228,52 @@ public Status deleteObject(String remotePath) {
}
}

@Override
public Status deleteObjects(String absolutePath) {
try {
S3URI baseUri = S3URI.create(absolutePath, forceHostedStyle);
String continuationToken = "";
boolean isTruncated = false;
long totalObjects = 0;
do {
RemoteObjects objects = listObjects(absolutePath, continuationToken);
List<RemoteObject> objectList = objects.getObjectList();
if (!objectList.isEmpty()) {
Delete delete = Delete.builder()
.objects(objectList.stream()
.map(RemoteObject::getKey)
.map(k -> ObjectIdentifier.builder().key(k).build())
.collect(Collectors.toList()))
.build();
DeleteObjectsRequest req = DeleteObjectsRequest.builder()
.bucket(baseUri.getBucket())
.delete(delete)
.build();

DeleteObjectsResponse resp = getClient(baseUri.getVirtualBucket()).deleteObjects(req);
if (resp.errors().size() > 0) {
LOG.warn("{} errors returned while deleting {} objects for dir {}",
resp.errors().size(), objectList.size(), absolutePath);
}
LOG.info("{} of {} objects deleted for dir {}",
resp.deleted().size(), objectList.size(), absolutePath);
totalObjects += objectList.size();
}

isTruncated = objects.isTruncated();
continuationToken = objects.getContinuationToken();
} while (isTruncated);
LOG.info("total delete {} objects for dir {}", totalObjects, absolutePath);
return Status.OK;
} catch (DdlException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "list objects for delete objects failed: " + e.getMessage());
} catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what difference between DdlException and other Exceptions

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DdlException only throws by listObjects, catching it would provide a detailed error message.

LOG.warn("delete objects {} failed, force visual host style {}", absolutePath, e, forceHostedStyle);
return new Status(Status.ErrCode.COMMON_ERROR, "delete objects failed: " + e.getMessage());
}
}

@Override
public Status copyObject(String origFilePath, String destFilePath) {
try {
S3URI origUri = S3URI.create(origFilePath);
Expand All @@ -249,9 +300,26 @@ public Status copyObject(String origFilePath, String destFilePath) {
public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException {
try {
S3URI uri = S3URI.create(absolutePath, forceHostedStyle);
String bucket = uri.getBucket();
String prefix = uri.getKey();
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket())
.prefix(normalizePrefix(prefix));
if (!StringUtils.isEmpty(uri.getVirtualBucket())) {
// Support s3 compatible service. The generated HTTP request for list objects likes:
//
// GET /<bucket-name>?list-type=2&prefix=<prefix>
prefix = bucket + "/" + prefix;
String endpoint = properties.get(S3Properties.ENDPOINT);
if (endpoint.contains("cos.")) {
bucket = "/";
} else if (endpoint.contains("oss-")) {
bucket = uri.getVirtualBucket();
} else if (endpoint.contains("obs.")) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe throw an exception

// FIXME: unlike cos and oss, the obs will report 'The specified key does not exist'.
throw new DdlException("obs does not support list objects via s3 sdk. path: " + absolutePath);
}
}
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(normalizePrefix(prefix));
if (!StringUtils.isEmpty(continuationToken)) {
requestBuilder.continuationToken(continuationToken);
}
Expand All @@ -263,7 +331,7 @@ public RemoteObjects listObjects(String absolutePath, String continuationToken)
}
return new RemoteObjects(remoteObjects, response.isTruncated(), response.nextContinuationToken());
} catch (Exception e) {
LOG.warn("Failed to list objects for S3", e);
LOG.warn("Failed to list objects for S3: {}", absolutePath, e);
throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ public Status list(String remotePath, List<RemoteFile> result, boolean fileNameO
}
return Status.OK;
}

public Status deleteDirectory(String absolutePath) {
return ((S3ObjStorage) objStorage).deleteObjects(absolutePath);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;

import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import software.amazon.awssdk.core.sync.RequestBody;
Expand All @@ -36,26 +36,85 @@

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class S3ObjStorageTest {
private S3ObjStorage storage;
@Test
public void testS3BaseOp() throws UserException {
String ak = System.getenv("S3_ACCESS_KEY");
String sk = System.getenv("S3_SECRET_KEY");
String endpoint = System.getenv("S3_ENDPOINT");
String region = System.getenv("S3_REGION");
String bucket = System.getenv("S3_BUCKET");
String prefix = System.getenv("S3_PREFIX");

// Skip this test if ENV variables are not set.
if (StringUtils.isEmpty(endpoint) || StringUtils.isEmpty(ak)
|| StringUtils.isEmpty(sk) || StringUtils.isEmpty(region)
|| StringUtils.isEmpty(bucket) || StringUtils.isEmpty(prefix)) {
return;
}

Map<String, String> properties = new HashMap<>();
properties.put("s3.endpoint", endpoint);
properties.put("s3.access_key", ak);
properties.put("s3.secret_key", sk);
properties.put("s3.region", region);
S3ObjStorage storage = new S3ObjStorage(properties);

String baseUrl = "s3://" + bucket + "/" + prefix + "/";
for (int i = 0; i < 5; ++i) {
Status st = storage.putObject(baseUrl + "key" + i, RequestBody.fromString("mocked"));
Assertions.assertEquals(Status.OK, st);
}

RemoteObjects remoteObjects = storage.listObjects(baseUrl, null);
Assertions.assertEquals(5, remoteObjects.getObjectList().size());
Assertions.assertFalse(remoteObjects.isTruncated());
Assertions.assertEquals(null, remoteObjects.getContinuationToken());

List<RemoteObject> objectList = remoteObjects.getObjectList();
for (int i = 0; i < objectList.size(); i++) {
RemoteObject remoteObject = objectList.get(i);
Assertions.assertEquals("key" + i, remoteObject.getRelativePath());
}

Status st = storage.headObject(baseUrl + "key" + 0);
Assertions.assertEquals(Status.OK, st);

private MockedS3Client mockedClient;
File file = new File("test-file.txt");
file.delete();
st = storage.getObject(baseUrl + "key" + 0, file);
Assertions.assertEquals(Status.OK, st);

st = storage.deleteObject(baseUrl + "key" + 0);
Assertions.assertEquals(Status.OK, st);

file.delete();
st = storage.getObject(baseUrl + "key" + 0, file);
Assertions.assertEquals(Status.ErrCode.COMMON_ERROR, st.getErrCode());
Assertions.assertTrue(st.getErrMsg().contains("The specified key does not exist"));
file.delete();

st = storage.deleteObjects(baseUrl);
Assertions.assertEquals(Status.OK, st);

remoteObjects = storage.listObjects(baseUrl, null);
Assertions.assertEquals(0, remoteObjects.getObjectList().size());
Assertions.assertFalse(remoteObjects.isTruncated());
Assertions.assertEquals(null, remoteObjects.getContinuationToken());
}

@BeforeAll
public void beforeAll() throws Exception {
@Test
public void testBaseOp() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("s3.endpoint", "s3.e.c");
properties.put("s3.access_key", "abc");
properties.put("s3.secret_key", "123");
storage = new S3ObjStorage(properties);
S3ObjStorage storage = new S3ObjStorage(properties);
Field client = storage.getClass().getDeclaredField("client");
client.setAccessible(true);
mockedClient = new MockedS3Client();
MockedS3Client mockedClient = new MockedS3Client();
client.set(storage, mockedClient);
Assertions.assertTrue(storage.getClient("mocked") instanceof MockedS3Client);
}

@Test
public void testBaseOp() throws UserException {
S3URI vUri = S3URI.create("s3://bucket/key", true);
S3URI uri = S3URI.create("s3://bucket/key", false);
Assertions.assertEquals(vUri.getVirtualBucket(), "bucket");
Expand Down Expand Up @@ -98,7 +157,16 @@ public void testBaseOp() throws UserException {
List<RemoteObject> list = remoteObjectsVBucket.getObjectList();
for (int i = 0; i < list.size(); i++) {
RemoteObject remoteObject = list.get(i);
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("keys/key" + i));
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("key" + i));
}

storage.properties.put("use_path_style", "true");
storage.setProperties(storage.properties);
remoteObjectsVBucket = storage.listObjects("oss://bucket/keys", null);
list = remoteObjectsVBucket.getObjectList();
for (int i = 0; i < list.size(); i++) {
RemoteObject remoteObject = list.get(i);
Assertions.assertTrue(remoteObject.getRelativePath().startsWith("key" + i));
}
}
}
Loading