diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go deleted file mode 100644 index 8fcabc113ec..00000000000 --- a/libs/sync/repofiles/repofiles.go +++ /dev/null @@ -1,159 +0,0 @@ -package repofiles - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" -) - -// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files -type RepoFiles struct { - repoRoot string - localRoot string - workspaceClient *databricks.WorkspaceClient -} - -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { - return &RepoFiles{ - repoRoot: repoRoot, - localRoot: localRoot, - workspaceClient: workspaceClient, - } -} - -func (r *RepoFiles) remotePath(relativePath string) (string, error) { - fullPath := path.Join(r.repoRoot, relativePath) - cleanFullPath := path.Clean(fullPath) - if !strings.HasPrefix(cleanFullPath, r.repoRoot) { - return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath) - } - // path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot - if cleanFullPath == r.repoRoot { - return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath) - } - return cleanFullPath, nil -} - -func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { - localPath := filepath.Join(r.localRoot, relativePath) - return os.ReadFile(localPath) -} - -func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err - } - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them - if err != nil { - // Delete any artifact files incase non overwriteable by the current file - // type and thus are failing the PUT request. - // files, folders and notebooks might not have been cleaned up and they - // can't overwrite each other. If a folder `foo` exists, then attempts to - // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: true, - }, - ) - // ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - if err != nil { - return err - } - - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } - } - return nil -} - -func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) -} - -// The API calls for a python script foo.py would be -// `PUT foo.py` -// `DELETE foo.py` -// -// The API calls for a python notebook foo.py would be -// `PUT foo.py` -// `DELETE foo` -// -// The workspace file system backend strips .py from the file name if the python -// file is a notebook -func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error { - content, err := r.readLocal(relativePath) - if err != nil { - return err - } - - return r.writeRemote(ctx, relativePath, content) -} - -func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { - err := r.deleteRemote(ctx, relativePath) - - // We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - return nil -} - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go deleted file mode 100644 index 2a881d90d06..00000000000 --- a/libs/sync/repofiles/repofiles_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package repofiles - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRepoFilesRemotePath(t *testing.T) { - repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) - - remotePath, err := repoFiles.remotePath("a/b/c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/../d") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/d", remotePath) - - remotePath, err = repoFiles.remotePath("a/../c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/.") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - - _, err = repoFiles.remotePath("..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - - _, err = repoFiles.remotePath("a/../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("/./.././..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("./..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - - _, err = repoFiles.remotePath("./../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - - _, err = repoFiles.remotePath("./../a/./b../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - - _, err = repoFiles.remotePath("../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - - _, err = repoFiles.remotePath(".//a/..//./b/..") - assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - - _, err = repoFiles.remotePath("a/b/../..") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath(".") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("/") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") -} - -func TestRepoReadLocal(t *testing.T) { - tempDir := t.TempDir() - helloPath := filepath.Join(tempDir, "hello.txt") - err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) - assert.NoError(t, err) - - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) - bytes, err := repoFiles.readLocal("./a/../hello.txt") - assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) -} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77c..5c4c9d8f688 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) @@ -29,9 +29,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - snapshot *Snapshot - repoFiles *repofiles.RepoFiles + fileSet *git.FileSet + snapshot *Snapshot + filer filer.Filer // Synchronization progress events are sent to this event notifier. notifier EventNotifier @@ -77,16 +77,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - repoFiles: repoFiles, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 3e7acccc2c5..1abfa5575b4 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,7 +2,12 @@ package sync import ( "context" + "errors" + "io/fs" + "os" + "path/filepath" + "github.com/databricks/cli/libs/filer" "golang.org/x/sync/errgroup" ) @@ -21,8 +26,8 @@ func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteNam group.Go(func() error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteName) - if err != nil { + err := s.filer.Delete(ctx, remoteName) + if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) @@ -41,8 +46,16 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st } group.Go(func() error { + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + defer localFile.Close() + s.notifyProgress(ctx, EventActionPut, localName, 0.0) - err := s.repoFiles.PutFile(ctx, localName) + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, localFile, opts...) if err != nil { return err }