diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index 396eefab7318..c5cb8a636f33 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -71,6 +71,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker if err != nil { return presult, err } + opts.WorkerHash = hash log.Infof(ctx, "Staged worker binary: %v", workerURL) if err := graphx.UpdateDefaultEnvWorkerType( diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index f0adb21cf714..27a0a0d0f224 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -85,6 +85,9 @@ type JobOptions struct { // Worker is the worker binary override. Worker string + // WorkerHash is the SHA-256 hash of the worker binary. + WorkerHash string + // -- Internal use only. Not supported in public Dataflow. -- TeardownPolicy string @@ -136,6 +139,7 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker packages := []*df.Package{{ Name: "worker", Location: workerURL, + Sha256: opts.WorkerHash, }} for _, url := range opts.ArtifactURLs { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index 901adb6c7b72..07f2d39e552c 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -292,6 +292,7 @@ func TestTranslate(t *testing.T) { Name: "test-job", DiskProvisionedIops: 4000, DiskProvisionedThroughputMibps: 200, + WorkerHash: "worker-sha256-hash", } workerURL := "gs://any-location/temp" modelURL := "gs://any-location/temp" @@ -312,6 +313,21 @@ func TestTranslate(t *testing.T) { if wp.DiskProvisionedThroughputMibps != 200 { t.Errorf("DiskProvisionedThroughputMibps = %v, want 200", wp.DiskProvisionedThroughputMibps) } + + // Verify worker package has Sha256 + found := false + for _, pkg := range wp.Packages { + if pkg.Name == "worker" { + found = true + if pkg.Sha256 != "worker-sha256-hash" { + t.Errorf("worker package Sha256 = %q, want %q", pkg.Sha256, "worker-sha256-hash") + } + break + } + } + if !found { + t.Fatalf("worker package not found in wp.Packages") + } } func TestTranslateWithPipelineHash(t *testing.T) {