-
Notifications
You must be signed in to change notification settings - Fork 271
Optimize GpuLiteral with per-task GpuScalar cache #14049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add per-task cache for GpuScalar in GpuLiteral to reuse identical scalars - Make GpuScalar thread-safe with volatile fields and double-check locking - Throw IllegalStateException instead of NullPointerException when closed - Register cleanup callback on TaskContext completion Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
480e346 to
ef7a060
Compare
Greptile SummaryThis PR optimizes GpuLiteral by introducing a per-task cache for GpuScalar instances, reusing identical scalars within the same Spark task to reduce memory allocation and GPU data copies. The implementation adds thread-safety improvements to GpuScalar with volatile fields and double-check locking patterns in Confidence Score: 4/5
Sequence DiagramsequenceDiagram
participant Task as Spark Task
participant GL as GpuLiteral
participant Cache as GpuLiteral.taskScalarCache
participant GS as GpuScalar
participant TC as TaskContext
Task->>GL: columnarEvalAny()
GL->>TC: TaskContext.get()
alt Has TaskContext
GL->>Cache: getOrCreateCachedScalar(literal)
alt First access for task
Cache->>TC: register cleanup callback
Cache->>Cache: create new ConcurrentHashMap
end
alt Scalar exists in cache
Cache-->>GS: return cached GpuScalar
else Scalar not in cache
Cache->>GS: create new GpuScalar
Cache->>Cache: store in cache
end
GS->>GS: incRefCount()
GS-->>GL: return cached scalar
else No TaskContext (unit test)
GL->>GS: create new GpuScalar
GS-->>GL: return new scalar
end
GL-->>Task: return GpuScalar
Task->>Task: task completes
TC->>Cache: cleanup callback triggered
Cache->>Cache: remove taskId entry
Cache->>GS: close all scalars in task cache
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 files reviewed, 1 comment
|
build |
Greptile SummaryThis PR optimizes GpuLiteral by adding a per-task cache for GpuScalar and improving thread-safety. The changes include: (1) introducing a per-task cache ( The implementation leverages Spark's Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Task as Spark Task
participant GpuLiteral as GpuLiteral
participant Cache as taskScalarCache
participant TaskCtx as TaskContext
participant GpuScalar as GpuScalar
Task->>GpuLiteral: columnarEvalAny(batch)
GpuLiteral->>TaskCtx: get()
TaskCtx-->>GpuLiteral: taskContext
GpuLiteral->>Cache: getOrCreateCachedScalar(literal)
Cache->>Cache: computeIfAbsent(taskId)
Cache->>TaskCtx: addTaskCompletionListener
Cache->>GpuScalar: create(value, dataType)
GpuScalar-->>Cache: cachedScalar
Cache-->>GpuLiteral: cachedScalar
GpuLiteral->>GpuScalar: incRefCount
GpuScalar-->>GpuLiteral: this
GpuLiteral-->>Task: GpuScalar
Task->>Task: task completion
TaskCtx->>Cache: onTaskCompletion callback
Cache->>GpuScalar: close()
Cache->>Cache: remove(taskId)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes GpuLiteral by introducing a per-task cache for GpuScalar instances, allowing identical literals within the same task to share the same underlying GPU scalar. The changes also enhance thread-safety of GpuScalar through volatile fields and double-check locking, and improve error handling by throwing IllegalStateException instead of NullPointerException for most closed-state access.
Key changes:
- Adds per-task GpuScalar caching with automatic cleanup on task completion
- Makes GpuScalar thread-safe with volatile fields and synchronized access
- Changes exception types from NullPointerException to IllegalStateException for most methods when accessing closed scalars
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| tests/src/test/scala/com/nvidia/spark/rapids/unit/GpuScalarUnitTest.scala | Updates copyright year and modifies test expectations to assert IllegalStateException instead of NullPointerException for most methods when accessing closed GpuScalar instances |
| sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala | Implements per-task GpuScalar caching in GpuLiteral, adds thread-safety to GpuScalar with volatile fields and double-check locking, integrates cleanup via ScalableTaskCompletion, and updates columnarEvalAny to use cached scalars when TaskContext is available |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assertThrows[NullPointerException](gsv.getValue) | ||
| assertThrows[IllegalStateException](gsv.getBase) | ||
| assertThrows[IllegalStateException](gsv.getValue) | ||
| assertThrows[NullPointerException](gsv.isValid) |
Copilot
AI
Dec 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isValid method still throws NullPointerException after close, but other methods like getBase, getValue, and isNan now throw IllegalStateException. This inconsistency should be addressed. The isValid method accesses the value field directly via value.map(), which will throw NullPointerException when value is null (after close), while the updated methods explicitly check for null and throw IllegalStateException.
| override def columnarEvalAny(batch: ColumnarBatch): Any = { | ||
| // Returns a Scalar instead of the value to support the scalar of nested type, and | ||
| // simplify the handling of result from a `expr.columnarEval`. | ||
| GpuScalar(value, dataType) | ||
| // When there's no TaskContext (e.g., unit tests), create a new GpuScalar | ||
| // each time like original behavior, because there's no cleanup callback | ||
| // to properly close the cached scalar. Otherwise use the cached scalar. | ||
| if (TaskContext.get() == null) { | ||
| GpuScalar(value, dataType) | ||
| } else { | ||
| cachedScalar.incRefCount | ||
| } | ||
| } |
Copilot
AI
Dec 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new per-task GpuScalar caching mechanism introduced in columnarEvalAny lacks test coverage. Tests should verify: (1) that identical GpuLiterals in the same task share the same GpuScalar instance, (2) that scalars from different tasks are separate, (3) that reference counting works correctly with caching, and (4) that cached scalars are properly cleaned up at task completion.
revans2
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see some performance numbers for this optimization.
| // Per-task cache: taskId -> (GpuLiteral -> GpuScalar) | ||
| // GpuLiterals with same value and dataType in the same task share the same GpuScalar. | ||
| // GpuLiteral's equals/hashCode already handles Array[Byte] correctly. | ||
| private val taskScalarCache = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we please have a follow on issue to look at making some, or all of this spillable. Not that I actually want to spill a Literal, but more around if we do run out of memory and this is causing fragmentation, then I want to be able to remove and close things in the cache to try and let more stuff succeeed.
|
@binmahone agree with @revans2. We need perf numbers for this. I like the idea of caching scalars and with the async allocator this strategy should be OK, where we will eventually see defragmentation. Otherwise, with For example, your pattern was allocate 1GB, then scalar, then allocate 1GB, and you OOM, you would free the 1GB allocations but not the scalar, leaving a hole in non-defragmenting allocators. A second task wanting to allocate a bit above 1GB would fail, unless async defragmented. Could we do the cache opportunistically? Could we allocate up front say 8MB and we have something like the I was going to suggest to make this spillable. I don't know how to do that, actually. Because you are proposing reuse, every scalar we hand out we are going to incRefCount, so there's no way to spill it if 1+ threads owns it. Unfortunately. This last piece makes me propose more strongly that there should be a scalar block of memory we allocate up front and we go there when we can. |
|
I think |
abellina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to update copyright, also you were going to try other things as well and hopefully post perf numbers. Let us know your state here when you get a chance.
|
NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release. |
This PR is broke up from #14032, please visit that to see more details