Skip to content

UUIDType with BucketTransform incorrectly converts int to str in PartitionKey #2002

@dingo4dev

Description

@dingo4dev

Apache Iceberg version

0.9.0 (latest release)

Please describe the bug 🐞

Description

When using UUIDType as a BucketTransform Partition, an error occurs during table operations such as upsert. The issue appears to be related to the partition key changing from int to str, which causes a type mismatch when the Avro encoder attempts to write an integer.

Steps to Reproduce

  1. Create a table with UUIDType column
  2. Configure the table to use BucketTransform on that column for partitioning
  3. Attempt to upsert data into the table

Current Behavior

The operation fails with a TypeError as the system attempts to perform integer operations on a string value.

Expected Behavior

The operation should properly handle UUIDType columns when used with BucketTransform partitioning. The uuid bucket partition value should be 1 instead of "1"

Error Stack Trace

Traceback (most recent call last):
    File "test_upsert.py", line 248, in <module>
        result = table.upsert(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\__init__.py", line 1216, in upsert
        tx.append(rows_to_insert)
    File ".venv\Lib\site-packages\pyiceberg\table\__init__.py", line 470, in append
        with self._append_snapshot_producer(snapshot_properties) as append_files:
    File ".venv\Lib\site-packages\pyiceberg\table\update\__init__.py", line 71, in __exit__
        self.commit()
    File ".venv\Lib\site-packages\pyiceberg\table\update\__init__.py", line 67, in commit
        self._transaction._apply(*self._commit())
                                                            ^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 242, in _commit
        new_manifests = self._manifests()
                                        ^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 201, in _manifests
        return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())
                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^
    File "~\Python312\Lib\concurrent\futures\_base.py", line 456, in result
        return self.__get_result()
                     ^^^^^^^^^^^^^^^^^^^
    File "~\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
        raise self._exception
    File "~\Python312\Lib\concurrent\futures\thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File ".venv\Lib\site-packages\pyiceberg\table\update\snapshot.py", line 159, in _write_added_manifest        
        writer.add(
    File ".venv\Lib\site-packages\pyiceberg\manifest.py", line 847, in add
        self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
    File ".venv\Lib\site-packages\pyiceberg\manifest.py", line 840, in add_entry
        self._writer.write_block([self.prepare_entry(entry)])
    File ".venv\Lib\site-packages\pyiceberg\avro\file.py", line 281, in write_block       
        self.writer.write(block_content_encoder, obj)
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 176, in write
        writer.write(encoder, val[pos] if pos is not None else None)
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 176, in write
        writer.write(encoder, val[pos] if pos is not None else None)
    File ".venv\Lib\site-packages\pyiceberg\avro\writer.py", line 66, in write
        encoder.write_int(val)
    File ".venv\Lib\site-packages\pyiceberg\avro\encoder.py", line 45, in write_int
        datum = (integer << 1) ^ (integer >> 63)

Potential Fix

The issue appears to be in the type handling in partition_record_value function when initial PartitionKey with the PartitionFieldValue.

@dataclass(frozen=True)
class PartitionKey:
field_values: List[PartitionFieldValue]
partition_spec: PartitionSpec
schema: Schema
@cached_property
def partition(self) -> Record: # partition key transformed with iceberg internal representation as input
iceberg_typed_key_values = []
for raw_partition_field_value in self.field_values:
partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
if len(partition_fields) != 1:
raise ValueError(f"Cannot have redundant partitions: {partition_fields}")
partition_field = partition_fields[0]
iceberg_typed_key_values.append(
partition_record_value(
partition_field=partition_field,
value=raw_partition_field_value.value,
schema=self.schema,
)
)
return Record(*iceberg_typed_key_values)

Would add Union type for value to handle the transformed value.

@_to_partition_representation.register(UUIDType)
def _(type: IcebergType, value: Optional[uuid.UUID]) -> Optional[str]:
return str(value) if value is not None else None

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions