From 1322aa9e2e9f67655a64eb0c175b6b4285bebb3c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 1 Jun 2023 22:44:29 +0200 Subject: [PATCH 1/4] Use filer in sync command This deprecates usage of the `repofiles` package in favor of the filer package and consolidates the code paths into WSFS. Note: one potentially breaking change here is the following. If a file at `foo/bar.txt` is created and removed, the directory `foo` is kept around because we do not perform directory tracking. If subsequently we need to write a file at `foo`, it will result in an `fs.ErrExist` because it is impossible to overwrite a directory. The previous implementation performed a recursive delete of the path if this happened, where this implementation will return the `fs.ErrExist` error to the user. We can mitigate this in one of two ways: * Track directories to remove as part of a `diff` and remove them * Attempt to remove an empty directory tree if we see this error * ...? --- libs/sync/repofiles/repofiles.go | 159 -------------------------- libs/sync/repofiles/repofiles_test.go | 88 -------------- libs/sync/sync.go | 23 ++-- libs/sync/watchdog.go | 16 ++- 4 files changed, 27 insertions(+), 259 deletions(-) delete mode 100644 libs/sync/repofiles/repofiles.go delete mode 100644 libs/sync/repofiles/repofiles_test.go diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go deleted file mode 100644 index 8fcabc113ec..00000000000 --- a/libs/sync/repofiles/repofiles.go +++ /dev/null @@ -1,159 +0,0 @@ -package repofiles - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" -) - -// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files -type RepoFiles struct { - repoRoot string - localRoot string - workspaceClient *databricks.WorkspaceClient -} - -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { - return &RepoFiles{ - repoRoot: repoRoot, - localRoot: localRoot, - workspaceClient: workspaceClient, - } -} - -func (r *RepoFiles) remotePath(relativePath string) (string, error) { - fullPath := path.Join(r.repoRoot, relativePath) - cleanFullPath := path.Clean(fullPath) - if !strings.HasPrefix(cleanFullPath, r.repoRoot) { - return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath) - } - // path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot - if cleanFullPath == r.repoRoot { - return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath) - } - return cleanFullPath, nil -} - -func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { - localPath := filepath.Join(r.localRoot, relativePath) - return os.ReadFile(localPath) -} - -func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err - } - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them - if err != nil { - // Delete any artifact files incase non overwriteable by the current file - // type and thus are failing the PUT request. - // files, folders and notebooks might not have been cleaned up and they - // can't overwrite each other. If a folder `foo` exists, then attempts to - // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: true, - }, - ) - // ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - if err != nil { - return err - } - - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } - } - return nil -} - -func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) -} - -// The API calls for a python script foo.py would be -// `PUT foo.py` -// `DELETE foo.py` -// -// The API calls for a python notebook foo.py would be -// `PUT foo.py` -// `DELETE foo` -// -// The workspace file system backend strips .py from the file name if the python -// file is a notebook -func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error { - content, err := r.readLocal(relativePath) - if err != nil { - return err - } - - return r.writeRemote(ctx, relativePath, content) -} - -func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { - err := r.deleteRemote(ctx, relativePath) - - // We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - return nil -} - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go deleted file mode 100644 index 2a881d90d06..00000000000 --- a/libs/sync/repofiles/repofiles_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package repofiles - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRepoFilesRemotePath(t *testing.T) { - repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) - - remotePath, err := repoFiles.remotePath("a/b/c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/../d") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/d", remotePath) - - remotePath, err = repoFiles.remotePath("a/../c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/.") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - - _, err = repoFiles.remotePath("..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - - _, err = repoFiles.remotePath("a/../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("/./.././..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("./..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - - _, err = repoFiles.remotePath("./../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - - _, err = repoFiles.remotePath("./../a/./b../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - - _, err = repoFiles.remotePath("../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - - _, err = repoFiles.remotePath(".//a/..//./b/..") - assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - - _, err = repoFiles.remotePath("a/b/../..") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath(".") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("/") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") -} - -func TestRepoReadLocal(t *testing.T) { - tempDir := t.TempDir() - helloPath := filepath.Join(tempDir, "hello.txt") - err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) - assert.NoError(t, err) - - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) - bytes, err := repoFiles.readLocal("./a/../hello.txt") - assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) -} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77c..5c4c9d8f688 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) @@ -29,9 +29,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - snapshot *Snapshot - repoFiles *repofiles.RepoFiles + fileSet *git.FileSet + snapshot *Snapshot + filer filer.Filer // Synchronization progress events are sent to this event notifier. notifier EventNotifier @@ -77,16 +77,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - repoFiles: repoFiles, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 3e7acccc2c5..94bd9f33842 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -1,8 +1,12 @@ package sync import ( + "bytes" "context" + "os" + "path/filepath" + "github.com/databricks/cli/libs/filer" "golang.org/x/sync/errgroup" ) @@ -21,7 +25,7 @@ func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteNam group.Go(func() error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteName) + err := s.filer.Delete(ctx, remoteName) if err != nil { return err } @@ -42,10 +46,18 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st group.Go(func() error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) - err := s.repoFiles.PutFile(ctx, localName) + + contents, err := os.ReadFile(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, bytes.NewReader(contents), opts...) if err != nil { return err } + s.notifyProgress(ctx, EventActionPut, localName, 1.0) return nil }) From 91a3d4d4f4f2e81b504805f056c41cba81f70fd7 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 1 Jun 2023 22:51:52 +0200 Subject: [PATCH 2/4] Pass file handle instead of buffer --- libs/sync/watchdog.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 94bd9f33842..84ccc418d44 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -1,7 +1,6 @@ package sync import ( - "bytes" "context" "os" "path/filepath" @@ -47,13 +46,15 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st group.Go(func() error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) - contents, err := os.ReadFile(filepath.Join(s.LocalPath, localName)) + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) if err != nil { return err } + defer localFile.Close() + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} - err = s.filer.Write(ctx, localName, bytes.NewReader(contents), opts...) + err = s.filer.Write(ctx, localName, localFile, opts...) if err != nil { return err } From 952d5ecc2c3493bf6a12db76fbcd6e7e3d4714d6 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 00:08:47 +0200 Subject: [PATCH 3/4] Ignore ErrNotExist on deletes --- libs/sync/watchdog.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 84ccc418d44..828eb1183d5 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,6 +2,8 @@ package sync import ( "context" + "errors" + "io/fs" "os" "path/filepath" @@ -25,7 +27,7 @@ func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteNam group.Go(func() error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) err := s.filer.Delete(ctx, remoteName) - if err != nil { + if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) From 678e090db2ad6d86db7da9619aef0a9c5bfa1fb7 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 15:05:41 +0200 Subject: [PATCH 4/4] Move progress call --- libs/sync/watchdog.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 828eb1183d5..1abfa5575b4 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -46,8 +46,6 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st } group.Go(func() error { - s.notifyProgress(ctx, EventActionPut, localName, 0.0) - localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) if err != nil { return err @@ -55,12 +53,12 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st defer localFile.Close() + s.notifyProgress(ctx, EventActionPut, localName, 0.0) opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} err = s.filer.Write(ctx, localName, localFile, opts...) if err != nil { return err } - s.notifyProgress(ctx, EventActionPut, localName, 1.0) return nil })