Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions bundle/artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
Expand Down Expand Up @@ -106,31 +107,53 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) error {
return err
}

err = uploadArtifact(ctx, artifact, uploadPath, client)
err = uploadArtifact(ctx, b, artifact, uploadPath, client)
if err != nil {
return fmt.Errorf("upload for %s failed, error: %w", m.name, err)
}

return nil
}

func uploadArtifact(ctx context.Context, a *config.Artifact, uploadPath string, client filer.Filer) error {
func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
filesToLibraries := libraries.MapFilesToTaskLibraries(ctx, b)

for i := range a.Files {
f := &a.Files[i]
if f.NeedsUpload() {
filename := filepath.Base(f.Source)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))

err := uploadArtifactFile(ctx, f.Source, client)
if err != nil {
return err
// Lookup all tasks that reference this file.
libs, ok := filesToLibraries[f.Source]
if !ok {
log.Debugf(ctx, "No tasks reference %s. Skipping upload.", f.Source)
continue
}

filename := filepath.Base(f.Source)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))

err := uploadArtifactFile(ctx, f.Source, client)
if err != nil {
return err
}

log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))

// Update all tasks that reference this file.
for _, lib := range libs {
wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
if lib.Whl != "" {
lib.Whl = remotePath
continue
}
if lib.Jar != "" {
lib.Jar = remotePath
continue
}
log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
}
}

a.NormalisePaths()
return nil
}

Expand Down
40 changes: 2 additions & 38 deletions bundle/config/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package config
import (
"context"
"fmt"
"path"

"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/exec"
"github.com/databricks/databricks-sdk-go/service/compute"
)

type Artifacts map[string]*Artifact
Expand All @@ -23,9 +21,8 @@ type ArtifactType string
const ArtifactPythonWheel ArtifactType = `whl`

type ArtifactFile struct {
Source string `json:"source"`
RemotePath string `json:"-" bundle:"readonly"`
Libraries []*compute.Library `json:"-" bundle:"readonly"`
Source string `json:"source"`
RemotePath string `json:"remote_path" bundle:"readonly"`
}

// Artifact defines a single local code artifact that can be
Expand Down Expand Up @@ -65,36 +62,3 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) {
}
return e.Exec(ctx, a.BuildCommand)
}

func (a *Artifact) NormalisePaths() {
for _, f := range a.Files {
// If no libraries attached, nothing to normalise, skipping
if f.Libraries == nil {
continue
}

wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
for i := range f.Libraries {
lib := f.Libraries[i]
if lib.Whl != "" {
lib.Whl = remotePath
continue
}
if lib.Jar != "" {
lib.Jar = remotePath
continue
}
}

}
}

// This function determines if artifact files needs to be uploaded.
// During the bundle processing we analyse which library uses which artifact file.
// If artifact file is used as a library, we store the reference to this library in artifact file Libraries field.
// If artifact file has libraries it's been used in, it means than we need to upload this file.
// Otherwise this artifact file is not used and we skip uploading
func (af *ArtifactFile) NeedsUpload() bool {
return af.Libraries != nil
}
16 changes: 16 additions & 0 deletions bundle/libraries/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package libraries

import "github.com/databricks/databricks-sdk-go/service/compute"

func libraryPath(library *compute.Library) string {
if library.Whl != "" {
return library.Whl
}
if library.Jar != "" {
return library.Jar
}
if library.Egg != "" {
return library.Egg
}
return ""
}
17 changes: 17 additions & 0 deletions bundle/libraries/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package libraries

import (
"testing"

"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
)

func TestLibraryPath(t *testing.T) {
path := "/some/path"

assert.Equal(t, path, libraryPath(&compute.Library{Whl: path}))
assert.Equal(t, path, libraryPath(&compute.Library{Jar: path}))
assert.Equal(t, path, libraryPath(&compute.Library{Egg: path}))
assert.Equal(t, "", libraryPath(&compute.Library{}))
}
138 changes: 30 additions & 108 deletions bundle/libraries/libraries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,16 @@ package libraries
import (
"context"
"fmt"
"net/url"
"path"
"path/filepath"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

type match struct {
}

func MatchWithArtifacts() bundle.Mutator {
return &match{}
}

func (a *match) Name() string {
return "libraries.MatchWithArtifacts"
}

func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := findAllTasks(b)
for _, task := range tasks {
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
err := findArtifactsAndMarkForUpload(ctx, lib, b)
if err != nil {
return err
}
}
}
return nil
}

func findAllTasks(b *bundle.Bundle) []*jobs.Task {
r := b.Config.Resources
result := make([]*jobs.Task, 0)
Expand Down Expand Up @@ -71,7 +41,7 @@ func FindAllWheelTasksWithLocalLibraries(b *bundle.Bundle) []*jobs.Task {

func IsTaskWithLocalLibraries(task *jobs.Task) bool {
for _, l := range task.Libraries {
if isLocalLibrary(&l) {
if IsLocalLibrary(&l) {
return true
}
}
Expand All @@ -81,25 +51,16 @@ func IsTaskWithLocalLibraries(task *jobs.Task) bool {

func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool {
for _, l := range task.Libraries {
path := libPath(&l)
if isWorkspacePath(path) {
if IsWorkspaceLibrary(&l) {
return true
}
}

return false
}

func isMissingRequiredLibraries(task *jobs.Task) bool {
if task.Libraries != nil {
return false
}

return task.PythonWheelTask != nil || task.SparkJarTask != nil
}

func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error) {
path := libPath(lib)
path := libraryPath(lib)
if path == "" {
return nil, nil
}
Expand All @@ -108,26 +69,27 @@ func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error
return filepath.Glob(fullPath)
}

func findArtifactsAndMarkForUpload(ctx context.Context, lib *compute.Library, b *bundle.Bundle) error {
func findArtifactFiles(ctx context.Context, lib *compute.Library, b *bundle.Bundle) ([]*config.ArtifactFile, error) {
matches, err := findLibraryMatches(lib, b)
if err != nil {
return err
return nil, err
}

if len(matches) == 0 && isLocalLibrary(lib) {
return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libPath(lib))
if len(matches) == 0 && IsLocalLibrary(lib) {
return nil, fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(lib))
}

var out []*config.ArtifactFile
for _, match := range matches {
af, err := findArtifactFileByLocalPath(match, b)
if err != nil {
cmdio.LogString(ctx, fmt.Sprintf("%s. Skipping uploading. In order to use the define 'artifacts' section", err.Error()))
} else {
af.Libraries = append(af.Libraries, lib)
out = append(out, af)
}
}

return nil
return out, nil
}

func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.ArtifactFile, error) {
Expand All @@ -142,67 +104,27 @@ func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.Artifac
return nil, fmt.Errorf("artifact section is not defined for file at %s", path)
}

func libPath(library *compute.Library) string {
if library.Whl != "" {
return library.Whl
}
if library.Jar != "" {
return library.Jar
}
if library.Egg != "" {
return library.Egg
}

return ""
}

func isLocalLibrary(library *compute.Library) bool {
path := libPath(library)
if path == "" {
return false
}

return IsLocalPath(path)
}

func IsLocalPath(path string) bool {
if isExplicitFileScheme(path) {
return true
}

if isRemoteStorageScheme(path) {
return false
}

return !isAbsoluteRemotePath(path)
}

func isExplicitFileScheme(path string) bool {
return strings.HasPrefix(path, "file://")
}
func MapFilesToTaskLibraries(ctx context.Context, b *bundle.Bundle) map[string][]*compute.Library {
tasks := findAllTasks(b)
out := make(map[string][]*compute.Library)
for _, task := range tasks {
for j := range task.Libraries {
lib := &task.Libraries[j]
if !IsLocalLibrary(lib) {
continue
}

func isRemoteStorageScheme(path string) bool {
url, err := url.Parse(path)
if err != nil {
return false
}
matches, err := findLibraryMatches(lib, b)
if err != nil {
log.Warnf(ctx, "Error matching library to files: %s", err.Error())
continue
}

if url.Scheme == "" {
return false
for _, match := range matches {
out[match] = append(out[match], lib)
}
}
}

// If the path starts with scheme:/ format, it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+":/")

}

func isWorkspacePath(path string) bool {
return strings.HasPrefix(path, "/Workspace/") ||
strings.HasPrefix(path, "/Users/") ||
strings.HasPrefix(path, "/Shared/")
}

func isAbsoluteRemotePath(p string) bool {
// If path for library starts with /, it's a remote absolute path
return path.IsAbs(p)
return out
}
Loading