Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bundle/config/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ type Bundle struct {
// Contains Git information like current commit, current branch and
// origin url. Automatically loaded by reading .git directory if not specified
Git Git `json:"git,omitempty"`

Mode Mode `json:"mode,omitempty"`
}
10 changes: 10 additions & 0 deletions bundle/config/environment.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package config

type Mode string

const (
Debug Mode = "debug"
Default Mode = "default"
PullRequest Mode = "pull-request"
)

// Environment defines overrides for a single environment.
// This structure is recursively merged into the root configuration.
type Environment struct {
// Default marks that this environment must be used if one isn't specified
// by the user (through environment variable or command line argument).
Default bool `json:"default,omitempty"`

Mode Mode `json:"mode,omitempty"`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should rather be a property of a bundle flipped by a command-line flag and not the environment variable.

https://github.com/databricks/cli/blob/ccbcccd92903bc0a6dbdbc94072035b30aca8527/cmd/bundle/destroy.go#L27C1-L28

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing is, it's not really "ephemeral" at this time. We need to create real, persistent jobs and pipelines. The main reason is that one job may need to reference another job or another pipeline in the same DAB.

When we say "ephemeral" in this case we really just mean that we need to have "some" garbage collection mechanism that removes these persistent jobs after perhaps 24 hours of non-use.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add that cleanup capability for previous normal runs from bundle runner 🤷‍♂️


Bundle *Bundle `json:"bundle,omitempty"`

Workspace *Workspace `json:"workspace,omitempty"`
Expand Down
50 changes: 50 additions & 0 deletions bundle/config/mutator/override_compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
)

type overrideCompute struct {
compute string
}

func OverrideCompute(compute string) bundle.Mutator {
return &overrideCompute{compute: compute}
}

func (m *overrideCompute) Name() string {
return "OverrideCompute"
}

func (m *overrideCompute) overrideJobCompute(j *resources.Job) {
for i := range j.Tasks {
task := &j.Tasks[i]
if task.NewCluster != nil {
task.NewCluster = nil
task.ExistingClusterId = m.compute
} else if task.ExistingClusterId != "" {
task.ExistingClusterId = m.compute
}
}
}

func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error {
if m.compute == "" {
return nil
}
if b.Config.Bundle.Mode != config.Debug {
return fmt.Errorf("cannot override compute for an environment that does not use 'mode: debug'")
}

r := b.Config.Resources
for i := range r.Jobs {
m.overrideJobCompute(r.Jobs[i])
}

return nil
}
47 changes: 47 additions & 0 deletions bundle/config/mutator/override_compute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutator_test

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOverrideCompute(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.JobTaskSettings{
{
NewCluster: &compute.BaseClusterInfo{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}

m := mutator.OverrideCompute("newClusterID")
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId)
}
4 changes: 4 additions & 0 deletions bundle/config/mutator/populate_current_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (m *populateCurrentUser) Name() string {
}

func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.Config.Workspace.CurrentUser != nil {
return nil
}

w := b.WorkspaceClient()
me, err := w.CurrentUser.Me(ctx)
if err != nil {
Expand Down
82 changes: 82 additions & 0 deletions bundle/config/mutator/process_environment_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/service/ml"
)

type processEnvironmentMode struct{}

const debugConcurrentRuns = 4

func ProcessEnvironmentMode() bundle.Mutator {
return &processEnvironmentMode{}
}

func (m *processEnvironmentMode) Name() string {
return "ProcessEnvironmentMode"
}

// Mark all resources as being for 'debug' purposes, i.e.
// changing their their name, adding tags, and (in the future)
// marking them as 'hidden' in the UI.
func processDebugMode(b *bundle.Bundle) error {
r := b.Config.Resources

for i := range r.Jobs {
r.Jobs[i].Name = "[debug] " + r.Jobs[i].Name
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saad: this should probably say [username debug]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if r.Jobs[i].Tags == nil {
r.Jobs[i].Tags = make(map[string]string)
}
r.Jobs[i].Tags["debug"] = ""
if r.Jobs[i].MaxConcurrentRuns == 0 {
r.Jobs[i].MaxConcurrentRuns = debugConcurrentRuns
}
Comment thread
lennartkats-db marked this conversation as resolved.
if r.Jobs[i].Schedule != nil {
r.Jobs[i].Schedule.PauseStatus = "PAUSED"
}
if r.Jobs[i].Continuous != nil {
r.Jobs[i].Continuous.PauseStatus = "PAUSED"
}
if r.Jobs[i].Trigger != nil {
r.Jobs[i].Trigger.PauseStatus = "PAUSED"
}
}

for i := range r.Pipelines {
r.Pipelines[i].Name = "[debug] " + r.Pipelines[i].Name
r.Pipelines[i].Development = true
// (pipelines don't yet support tags)
}

for i := range r.Models {
r.Models[i].Name = "[debug] " + r.Models[i].Name
r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "debug", Value: ""})
}

for i := range r.Experiments {
r.Experiments[i].Name = "[debug] " + r.Experiments[i].Name
r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "debug", Value: ""})
}

return nil
}

func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error {
switch b.Config.Bundle.Mode {
case config.Debug:
return processDebugMode(b)
case config.Default, "":
// No action
case config.PullRequest:
return fmt.Errorf("not implemented")
default:
return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode)
}

return nil
}
75 changes: 75 additions & 0 deletions bundle/config/mutator/process_environment_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package mutator_test

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestProcessEnvironmentModeApplyDebug(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
Experiments: map[string]*resources.MlflowExperiment{
"experiment1": {Experiment: &ml.Experiment{Name: "experiment1"}},
},
Models: map[string]*resources.MlflowModel{
"model1": {Model: &ml.Model{Name: "model1"}},
},
},
},
}

m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "[debug] job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "[debug] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.Equal(t, "[debug] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name)
assert.Equal(t, "[debug] model1", bundle.Config.Resources.Models["model1"].Name)
assert.Equal(t, "debug", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key)
assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}

func TestProcessEnvironmentModeApplyDefault(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Default,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
},
},
}

m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}
4 changes: 4 additions & 0 deletions bundle/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,9 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
}

if env.Mode != "" {
r.Bundle.Mode = env.Mode
}

return nil
}
9 changes: 9 additions & 0 deletions bundle/config/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) {
err := root.InitializeVariables([]string{"bar=567"})
assert.ErrorContains(t, err, "variable bar has not been defined")
}

func TestRootMergeEnvironmentWithMode(t *testing.T) {
root := &Root{
Bundle: Bundle{},
}
env := &Environment{Mode: Debug}
require.NoError(t, root.MergeEnvironment(env))
assert.Equal(t, Debug, root.Bundle.Mode)
}
4 changes: 3 additions & 1 deletion bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// The initialize phase fills in defaults and connects to the workspace.
// Interpolation of fields referring to the "bundle" and "workspace" keys
// happens upon completion of this phase.
func Initialize() bundle.Mutator {
func Initialize(overrideCompute string) bundle.Mutator {
return newPhase(
"initialize",
[]bundle.Mutator{
Expand All @@ -25,6 +25,8 @@ func Initialize() bundle.Mutator {
interpolation.IncludeLookupsInPath("workspace"),
interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix),
),
mutator.OverrideCompute(overrideCompute),
mutator.ProcessEnvironmentMode(),
mutator.TranslatePaths(),
terraform.Initialize(),
},
Expand Down
9 changes: 9 additions & 0 deletions bundle/run/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
if err != nil {
return nil, fmt.Errorf("cannot start job")
}

if opts.NoWait {
details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: waiter.RunId,
})
progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl))
Comment thread
lennartkats-db marked this conversation as resolved.
return nil, err
}

run, err := waiter.OnProgress(func(r *jobs.Run) {
pullRunId(r)
logDebug(r)
Expand Down
1 change: 1 addition & 0 deletions bundle/run/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type Options struct {
Job JobOptions
Pipeline PipelineOptions
NoWait bool
}

func (o *Options) Define(fs *flag.FlagSet) {
Expand Down
4 changes: 4 additions & 0 deletions bundle/run/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
return nil, fmt.Errorf("no progress logger found")
}

if opts.NoWait {
log.Warnf(ctx, "--no-wait is not yet implemented for pipelines")
}

// Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))

Expand Down
1 change: 1 addition & 0 deletions bundle/tests/job_and_pipeline/bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resources:

environments:
development:
mode: debug
resources:
pipelines:
nyc_taxi_pipeline:
Expand Down
2 changes: 2 additions & 0 deletions bundle/tests/job_and_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"path/filepath"
"testing"

"github.com/databricks/cli/bundle/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) {

p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.Equal(t, b.Config.Bundle.Mode, config.Debug)
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
Expand Down
Loading