Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions cmd/workspace/workspace/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ type fileIOEvent struct {
type EventType string

const (
EventTypeFileExported = EventType("FILE_EXPORTED")
EventTypeFileExported = EventType("FILE_EXPORTED")
EventTypeFileSkipped = EventType("FILE_SKIPPED")
EventTypeFileImported = EventType("FILE_IMPORTED")

EventTypeExportStarted = EventType("EXPORT_STARTED")
EventTypeExportCompleted = EventType("EXPORT_COMPLETED")
EventTypeFileSkipped = EventType("FILE_SKIPPED")

EventTypeImportStarted = EventType("IMPORT_STARTED")
EventTypeImportCompleted = EventType("IMPORT_COMPLETED")
)

func newFileExportedEvent(sourcePath, targetPath string) fileIOEvent {
Expand Down Expand Up @@ -44,3 +49,25 @@ func newExportStartedEvent(sourcePath string) fileIOEvent {
Type: EventTypeExportStarted,
}
}

func newImportStartedEvent(sourcePath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
Type: EventTypeImportStarted,
}
}

func newImportCompletedEvent(targetPath string) fileIOEvent {
return fileIOEvent{
TargetPath: targetPath,
Type: EventTypeImportCompleted,
}
}

func newFileImportedEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
TargetPath: targetPath,
SourcePath: sourcePath,
Type: EventTypeFileImported,
}
}
138 changes: 138 additions & 0 deletions cmd/workspace/workspace/import_dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package workspace

import (
"context"
"errors"
"io/fs"
"os"
"path"
"path/filepath"
"strings"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/notebook"
"github.com/spf13/cobra"
)

// The callback function imports the file specified at sourcePath. This function is
// meant to be used in conjunction with fs.WalkDir
//
// We deal with 3 different names for files. The need for this
// arises due to workspace API behaviour and limitations
//
// 1. Local name: The name for the file in the local file system
// 2. Remote name: The name of the file as materialized in the workspace
// 3. API payload name: The name to be used for API calls
//
// Example, consider the notebook "foo\\myNotebook.py" on a windows file system.
// The process to upload it would look like
// 1. Read the notebook, referring to it using it's local name "foo\\myNotebook.py"
// 2. API call to import the notebook to the workspace, using it API payload name "foo/myNotebook.py"
// 3. The notebook is materialized in the workspace using it's remote name "foo/myNotebook"
func importFileCallback(ctx context.Context, workspaceFiler filer.Filer, sourceDir, targetDir string) func(string, fs.DirEntry, error) error {
return func(sourcePath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

// localName is the name for the file in the local file system
localName, err := filepath.Rel(sourceDir, sourcePath)
Comment thread
shreyas-goenka marked this conversation as resolved.
if err != nil {
return err
}

// nameForApiCall is the name for the file to be used in any API call.
// This is a file name we provide to the filer.Write and Mkdir methods
nameForApiCall := filepath.ToSlash(localName)

// create directory and return early
if d.IsDir() {
return workspaceFiler.Mkdir(ctx, nameForApiCall)
}

// remoteName is the name of the file as visible in the workspace. We compute
// the remote name on the client side for logging purposes
remoteName := filepath.ToSlash(localName)
isNotebook, _, err := notebook.Detect(sourcePath)
if err != nil {
return err
}
if isNotebook {
ext := path.Ext(localName)
remoteName = strings.TrimSuffix(localName, ext)
Comment thread
shreyas-goenka marked this conversation as resolved.
}

// Open the local file
f, err := os.Open(sourcePath)
if err != nil {
return err
}
Comment thread
shreyas-goenka marked this conversation as resolved.
defer f.Close()

// Create file in WSFS
if importOverwrite {
err = workspaceFiler.Write(ctx, nameForApiCall, f, filer.OverwriteIfExists)
if err != nil {
return err
}
} else {
err = workspaceFiler.Write(ctx, nameForApiCall, f)
if errors.Is(err, fs.ErrExist) {
// Emit file skipped event with the appropriate template
fileSkippedEvent := newFileSkippedEvent(localName, path.Join(targetDir, remoteName))
template := "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n"
return cmdio.RenderWithTemplate(ctx, fileSkippedEvent, template)
}
if err != nil {
return err
}
}
fileImportedEvent := newFileImportedEvent(localName, path.Join(targetDir, remoteName))
return cmdio.RenderWithTemplate(ctx, fileImportedEvent, "{{.SourcePath}} -> {{.TargetPath}}\n")
}
}

var importDirCommand = &cobra.Command{
Use: "import-dir SOURCE_PATH TARGET_PATH",
Short: `Import a directory from the local filesystem to a Databricks workspace.`,
Long: `
Import a directory recursively from the local file system to a Databricks workspace.
Notebooks will have their extensions (one of .scala, .py, .sql, .ipynb, .r) stripped
`,
PreRunE: root.MustWorkspaceClient,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()
w := root.WorkspaceClient(ctx)
sourceDir := args[0]
targetDir := args[1]

// Initialize a filer rooted at targetDir
workspaceFiler, err := filer.NewWorkspaceFilesClient(w, targetDir)
if err != nil {
return err
}

// TODO: print progress events on stderr instead: https://github.com/databricks/cli/issues/448
err = cmdio.RenderJson(ctx, newImportStartedEvent(sourceDir))
if err != nil {
return err
}

// Walk local directory tree and import files to the workspace
err = filepath.WalkDir(sourceDir, importFileCallback(ctx, workspaceFiler, sourceDir, targetDir))
if err != nil {
return err
}
return cmdio.RenderJson(ctx, newImportCompletedEvent(targetDir))
},
}

var importOverwrite bool

func init() {
importDirCommand.Flags().BoolVar(&importOverwrite, "overwrite", false, "overwrite existing workspace files")
Cmd.AddCommand(importDirCommand)
}
137 changes: 137 additions & 0 deletions internal/filer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"io/fs"
"net/http"
"regexp"
"strings"
"testing"

Expand Down Expand Up @@ -312,6 +313,142 @@ func TestAccFilerDbfsReadDir(t *testing.T) {
runFilerReadDirTest(t, ctx, f)
}

var jupyterNotebookContent1 = `
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Jupyter Notebook Version 1\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
`

var jupyterNotebookContent2 = `
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"Jupyter Notebook Version 2\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
`

func TestAccFilerWorkspaceNotebookConflict(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
var err error

// Upload the notebooks
err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))"))
require.NoError(t, err)
err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))"))
require.NoError(t, err)
err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\""))
require.NoError(t, err)
err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))"))
require.NoError(t, err)
err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1))
require.NoError(t, err)

// Assert contents after initial upload
filerTest{t, f}.assertContents(ctx, "pyNb", "# Databricks notebook source\nprint('first upload'))")
filerTest{t, f}.assertContents(ctx, "rNb", "# Databricks notebook source\nprint('first upload'))")
filerTest{t, f}.assertContents(ctx, "sqlNb", "-- Databricks notebook source\n SELECT \"first upload\"")
filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"first upload\"))")
filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 1\")")

// Assert uploading a second time fails due to overwrite mode missing
err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))"))
assert.ErrorIs(t, err, fs.ErrExist)
assert.Regexp(t, regexp.MustCompile(`file already exists: .*/pyNb$`), err.Error())

err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))"))
assert.ErrorIs(t, err, fs.ErrExist)
assert.Regexp(t, regexp.MustCompile(`file already exists: .*/rNb$`), err.Error())

err = f.Write(ctx, "sqlNb.sql", strings.NewReader("# Databricks notebook source\n SELECT \"second upload\")"))
assert.ErrorIs(t, err, fs.ErrExist)
assert.Regexp(t, regexp.MustCompile(`file already exists: .*/sqlNb$`), err.Error())

err = f.Write(ctx, "scalaNb.scala", strings.NewReader("# Databricks notebook source\n println(\"second upload\"))"))
assert.ErrorIs(t, err, fs.ErrExist)
assert.Regexp(t, regexp.MustCompile(`file already exists: .*/scalaNb$`), err.Error())

err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2))
assert.ErrorIs(t, err, fs.ErrExist)
assert.Regexp(t, regexp.MustCompile(`file already exists: .*/jupyterNb$`), err.Error())
}

func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
var err error

// Upload notebooks
err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))"))
require.NoError(t, err)
err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))"))
require.NoError(t, err)
err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\""))
require.NoError(t, err)
err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))"))
require.NoError(t, err)
err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1))
require.NoError(t, err)

// Assert contents after initial upload
filerTest{t, f}.assertContents(ctx, "pyNb", "# Databricks notebook source\nprint('first upload'))")
filerTest{t, f}.assertContents(ctx, "rNb", "# Databricks notebook source\nprint('first upload'))")
filerTest{t, f}.assertContents(ctx, "sqlNb", "-- Databricks notebook source\n SELECT \"first upload\"")
filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"first upload\"))")
filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 1\")")

// Upload notebooks a second time, overwriting the initial uplaods
err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), filer.OverwriteIfExists)
require.NoError(t, err)
err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), filer.OverwriteIfExists)
require.NoError(t, err)
err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"second upload\""), filer.OverwriteIfExists)
require.NoError(t, err)
err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"second upload\"))"), filer.OverwriteIfExists)
require.NoError(t, err)
err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2), filer.OverwriteIfExists)
require.NoError(t, err)

// Assert contents have been overwritten
filerTest{t, f}.assertContents(ctx, "pyNb", "# Databricks notebook source\nprint('second upload'))")
filerTest{t, f}.assertContents(ctx, "rNb", "# Databricks notebook source\nprint('second upload'))")
filerTest{t, f}.assertContents(ctx, "sqlNb", "-- Databricks notebook source\n SELECT \"second upload\"")
filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))")
filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")")
}

func setupFilerLocalTest(t *testing.T) (context.Context, filer.Filer) {
ctx := context.Background()
f, err := filer.NewLocalClient(t.TempDir())
Expand Down
1 change: 1 addition & 0 deletions internal/testdata/import_dir/a/b/c/file-b
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
file-in-dir
1 change: 1 addition & 0 deletions internal/testdata/import_dir/file-a
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello, world
21 changes: 21 additions & 0 deletions internal/testdata/import_dir/jupyterNotebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"jupyter\")"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/pyNotebook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Databricks notebook source
print("python")
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/rNotebook.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Databricks notebook source
print("r")
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/scalaNotebook.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Databricks notebook source
println("scala")
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/sqlNotebook.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Databricks notebook source
SELECT "sql"
Loading