From 0c1e56e34b4114f0824ff7ff8319b1e0fef77197 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 11 Aug 2023 15:40:34 +0200 Subject: [PATCH 1/2] Enable environment overrides for job clusters While they are a slice, we can identify a job cluster by its job cluster key. A job definition with multiple job clusters with the same key is always invalid. We can therefore merge definitions with the same key into one. This is compatible with how environment overrides are applied; merging a slice means appending to it. The override will end up in the job cluster slice of the original, which gives us a deterministic way to merge them. --- bundle/config/resources.go | 11 ++++ bundle/config/resources/job.go | 38 ++++++++++++- bundle/config/resources/job_test.go | 57 +++++++++++++++++++ bundle/config/root.go | 5 ++ .../tests/override_job_cluster/databricks.yml | 35 ++++++++++++ bundle/tests/override_job_cluster_test.go | 29 ++++++++++ 6 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 bundle/config/resources/job_test.go create mode 100644 bundle/tests/override_job_cluster/databricks.yml create mode 100644 bundle/tests/override_job_cluster_test.go diff --git a/bundle/config/resources.go b/bundle/config/resources.go index fc86647ebc7..b15158b45ab 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -113,3 +113,14 @@ func (r *Resources) SetConfigFilePath(path string) { e.ConfigFilePath = path } } + +// MergeJobClusters iterates over all jobs and merges their job clusters. +// This is called after applying the environment overrides. +func (r *Resources) MergeJobClusters() error { + for _, job := range r.Jobs { + if err := job.MergeJobClusters(); err != nil { + return err + } + } + return nil +} diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index a1ea3855e37..327d7e13e64 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -1,6 +1,9 @@ package resources -import "github.com/databricks/databricks-sdk-go/service/jobs" +import ( + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/imdario/mergo" +) type Job struct { ID string `json:"id,omitempty" bundle:"readonly"` @@ -10,3 +13,36 @@ type Job struct { *jobs.JobSettings } + +// MergeJobClusters merges job clusters with the same key. +// The job clusters field is a slice, and as such, overrides are appended to it. +// We can identify a job cluster by its key, however, so we can use this key +// to figure out which definitions are actually overrides and merge them. +func (j *Job) MergeJobClusters() error { + keys := make(map[string]*jobs.JobCluster) + output := make([]jobs.JobCluster, 0, len(j.JobClusters)) + + // Environment overrides are always appended, so we can iterate in natural order to + // first find the base definition, and merge instances we encounter later. + for i := range j.JobClusters { + key := j.JobClusters[i].JobClusterKey + + // Register job cluster with key if not yet seen before. + ref, ok := keys[key] + if !ok { + output = append(output, j.JobClusters[i]) + keys[key] = &j.JobClusters[i] + continue + } + + // Merge this instance into the reference. + err := mergo.Merge(ref, &j.JobClusters[i], mergo.WithOverride, mergo.WithAppendSlice) + if err != nil { + return err + } + } + + // Overwrite resulting slice. + j.JobClusters = output + return nil +} diff --git a/bundle/config/resources/job_test.go b/bundle/config/resources/job_test.go new file mode 100644 index 00000000000..2ff3205e0d3 --- /dev/null +++ b/bundle/config/resources/job_test.go @@ -0,0 +1,57 @@ +package resources + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestJobMergeJobClusters(t *testing.T) { + j := &Job{ + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + JobClusterKey: "foo", + NewCluster: &compute.ClusterSpec{ + SparkVersion: "13.3.x-scala2.12", + NodeTypeId: "i3.xlarge", + NumWorkers: 2, + }, + }, + { + JobClusterKey: "bar", + NewCluster: &compute.ClusterSpec{ + SparkVersion: "10.4.x-scala2.12", + }, + }, + { + JobClusterKey: "foo", + NewCluster: &compute.ClusterSpec{ + NodeTypeId: "i3.2xlarge", + NumWorkers: 4, + }, + }, + }, + }, + } + + err := j.MergeJobClusters() + require.NoError(t, err) + + assert.Len(t, j.JobClusters, 2) + assert.Equal(t, "foo", j.JobClusters[0].JobClusterKey) + assert.Equal(t, "bar", j.JobClusters[1].JobClusterKey) + + // This job cluster was merged with a subsequent one. + jc0 := j.JobClusters[0].NewCluster + assert.Equal(t, "13.3.x-scala2.12", jc0.SparkVersion) + assert.Equal(t, "i3.2xlarge", jc0.NodeTypeId) + assert.Equal(t, 4, jc0.NumWorkers) + + // This job cluster was left untouched. + jc1 := j.JobClusters[1].NewCluster + assert.Equal(t, "10.4.x-scala2.12", jc1.SparkVersion) +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 52f88737881..4ca9d0a0584 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -203,6 +203,11 @@ func (r *Root) MergeEnvironment(env *Environment) error { if err != nil { return err } + + err = r.Resources.MergeJobClusters() + if err != nil { + return err + } } if env.Variables != nil { diff --git a/bundle/tests/override_job_cluster/databricks.yml b/bundle/tests/override_job_cluster/databricks.yml new file mode 100644 index 00000000000..486c42add45 --- /dev/null +++ b/bundle/tests/override_job_cluster/databricks.yml @@ -0,0 +1,35 @@ +bundle: + name: environment_overrides + +workspace: + host: https://acme.cloud.databricks.com/ + +resources: + jobs: + foo: + name: job + job_clusters: + - job_cluster_key: key + new_cluster: + spark_version: 13.3.x-scala2.12 + +environments: + development: + resources: + jobs: + foo: + job_clusters: + - job_cluster_key: key + new_cluster: + node_type_id: i3.xlarge + num_workers: 1 + + staging: + resources: + jobs: + foo: + job_clusters: + - job_cluster_key: key + new_cluster: + node_type_id: i3.2xlarge + num_workers: 4 diff --git a/bundle/tests/override_job_cluster_test.go b/bundle/tests/override_job_cluster_test.go new file mode 100644 index 00000000000..97f7c04ee59 --- /dev/null +++ b/bundle/tests/override_job_cluster_test.go @@ -0,0 +1,29 @@ +package config_tests + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOverrideJobClusterDev(t *testing.T) { + b := loadEnvironment(t, "./override_job_cluster", "development") + assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) + assert.Len(t, b.Config.Resources.Jobs["foo"].JobClusters, 1) + + c := b.Config.Resources.Jobs["foo"].JobClusters[0] + assert.Equal(t, "13.3.x-scala2.12", c.NewCluster.SparkVersion) + assert.Equal(t, "i3.xlarge", c.NewCluster.NodeTypeId) + assert.Equal(t, 1, c.NewCluster.NumWorkers) +} + +func TestOverrideJobClusterStaging(t *testing.T) { + b := loadEnvironment(t, "./override_job_cluster", "staging") + assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name) + assert.Len(t, b.Config.Resources.Jobs["foo"].JobClusters, 1) + + c := b.Config.Resources.Jobs["foo"].JobClusters[0] + assert.Equal(t, "13.3.x-scala2.12", c.NewCluster.SparkVersion) + assert.Equal(t, "i3.2xlarge", c.NewCluster.NodeTypeId) + assert.Equal(t, 4, c.NewCluster.NumWorkers) +} From 5a27731e1657974d5927ba646af48dcc6dad80d0 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 11 Aug 2023 15:42:38 +0200 Subject: [PATCH 2/2] Update test bundle name --- bundle/tests/override_job_cluster/databricks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/tests/override_job_cluster/databricks.yml b/bundle/tests/override_job_cluster/databricks.yml index 486c42add45..33061b2e3d7 100644 --- a/bundle/tests/override_job_cluster/databricks.yml +++ b/bundle/tests/override_job_cluster/databricks.yml @@ -1,5 +1,5 @@ bundle: - name: environment_overrides + name: override_job_cluster workspace: host: https://acme.cloud.databricks.com/