Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
047a0a0
[BEAM-10258] Support type hint annotations on PTransform's expand()
saavan-google Jun 14, 2020
0826bb3
Fixup: apply YAPF
saavan-google Jun 14, 2020
2798308
Moving PCollectionTypeConstraint to typehints.py
saavan-google Jun 17, 2020
3423349
Uses Generic[T] instead of PCollectionTypeConstraint
saavan-google Jun 20, 2020
54d587b
Fixup: apply YAPF
saavan-google Jun 20, 2020
e35eb3b
Remove unused imports
saavan-google Jun 20, 2020
f2db8a3
Force user to wrap typehints in PCollections
saavan-google Jun 23, 2020
96a2aab
Add unit tests for various usages of typehints on PTransforms
saavan-google Jun 23, 2020
35dd79e
Add tests that use typehints on real pipelines
saavan-google Jun 23, 2020
9a67cc6
Fixup: apply YAPF
saavan-google Jun 23, 2020
59c5134
Fix bad merge
saavan-google Jun 25, 2020
a3a5114
Support PDone, PBegin, and better handling of error cases
saavan-google Jun 25, 2020
90a9335
Fix test syntax
saavan-google Jun 25, 2020
1143587
Refactors strip_pcoll_input() and strip_pcoll_output() to a shared fu…
saavan-google Jun 26, 2020
eafcdd1
Add unit tests
saavan-google Jun 26, 2020
913197f
Add more tests
saavan-google Jun 26, 2020
8196c15
Add website documentation
saavan-google Jun 30, 2020
3532abf
Fix linting issues
saavan-google Jul 6, 2020
63c4433
Fix linting issue by using multi-line function annotations
saavan-google Jul 6, 2020
3535ee0
Fix more lint errors
saavan-google Jul 6, 2020
33d0e2b
Fix import order, and other changes for PR
saavan-google Jul 8, 2020
689c792
Fix ungrouped-imports error
saavan-google Jul 8, 2020
9fb01eb
Alphabetically order the imports
saavan-google Jul 8, 2020
437c83b
Fixup: apply YAPF
saavan-google Jul 8, 2020
54fd032
Fixes a bug where a type can have an empty __args__ attribute
saavan-google Jul 8, 2020
725ba08
Fix bug in website snippet code
saavan-google Jul 8, 2020
e5ecd01
Fixup: apply YAPF
saavan-google Jul 8, 2020
9469a5d
Fixup: apply YAPF
saavan-google Jul 8, 2020
4cd0801
Fix NoneType error
saavan-google Jul 9, 2020
42454da
Fix NoneType error part 2
saavan-google Jul 9, 2020
17c30a6
Use classes instead of strings during typecheck, and add tests
saavan-google Jul 16, 2020
27cb6f3
Resolve circular import error and fix readability issues
saavan-google Jul 21, 2020
c36eefe
Fix lint errors
saavan-google Jul 21, 2020
0e6f19e
Add back accidentally removed test
saavan-google Jul 21, 2020
82c6f4c
Support None as an output annotation
saavan-google Jul 27, 2020
39ef9bd
Show incorrect type in error message
saavan-google Jul 28, 2020
a30800c
Allow Pipeline as an input
saavan-google Jul 29, 2020
a56e947
Fix import bug
saavan-google Jul 29, 2020
6d3df88
Alphabetically order imports inside function (but really this is just…
saavan-google Jul 30, 2020
444f5f4
Display warning instead of throwing error for oddly formed type hints
saavan-google Jul 31, 2020
107e8e6
Convert to Beam types
saavan-google Aug 4, 2020
2c046d6
Add test for generic TypeVars
saavan-google Aug 4, 2020
fc54c04
Fix bug by skipping DoOutputsTuple
saavan-google Aug 4, 2020
cb97557
Fix typo
saavan-google Aug 4, 2020
7389193
Add test for DoOutputsTuple
saavan-google Aug 5, 2020
3c01bfd
Fix lint errors
saavan-google Aug 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ def my_fn(element: int) -> str:
ids = numbers | 'to_id' >> beam.Map(my_fn)
# [END type_hints_map_annotations]

# Example using an annotated PTransform.
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_ptransforms]
from apache_beam.pvalue import PCollection

class IntToStr(beam.PTransform):
def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
return pcoll | beam.Map(lambda elem: str(elem))

ids = numbers | 'convert to str' >> IntToStr()
# [END type_hints_ptransforms]


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ class and wrapper class that allows lambda functions to be used as
from apache_beam.internal import pickler
from apache_beam.internal import util
from apache_beam.portability import python_urns
from apache_beam.pvalue import DoOutputsTuple
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.display import HasDisplayData
from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints import typehints
from apache_beam.typehints.decorators import IOTypeHints
from apache_beam.typehints.decorators import TypeCheckError
from apache_beam.typehints.decorators import WithTypeHints
from apache_beam.typehints.decorators import get_signature
from apache_beam.typehints.decorators import get_type_hints
from apache_beam.typehints.decorators import getcallargs_forhints
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import validate_composite_type_param
Expand Down Expand Up @@ -350,6 +353,14 @@ def default_label(self):
# type: () -> str
return self.__class__.__name__

def default_type_hints(self):
fn_type_hints = IOTypeHints.from_callable(self.expand)
if fn_type_hints is not None:
fn_type_hints = fn_type_hints.strip_pcoll()

# Prefer class decorator type hints for backwards compatibility.
return get_type_hints(self.__class__).with_defaults(fn_type_hints)

def with_input_types(self, input_type_hint):
"""Annotates the input type of a :class:`PTransform` with a type-hint.

Expand Down Expand Up @@ -419,6 +430,8 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
root_hint = (
arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
if isinstance(pvalue_, DoOutputsTuple):
continue
if pvalue_.element_type is None:
# TODO(robertwb): It's a bug that we ever get here. (typecheck)
continue
Expand Down
70 changes: 70 additions & 0 deletions sdks/python/apache_beam/typehints/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def foo((a, b)):
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union

from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints import typehints
Expand Down Expand Up @@ -378,6 +379,75 @@ def has_simple_output_type(self):
self.output_types and len(self.output_types[0]) == 1 and
not self.output_types[1])

def strip_pcoll(self):
from apache_beam.pipeline import Pipeline
from apache_beam.pvalue import PBegin
from apache_beam.pvalue import PDone

return self.strip_pcoll_helper(self.input_types,
self._has_input_types,
'input_types',
[Pipeline, PBegin],
'This input type hint will be ignored '
'and not used for type-checking purposes. '
'Typically, input type hints for a '
'PTransform are single (or nested) types '
'wrapped by a PCollection, or PBegin.',
'strip_pcoll_input()').\
strip_pcoll_helper(self.output_types,
self.has_simple_output_type,
'output_types',
[PDone, None],
'This output type hint will be ignored '
'and not used for type-checking purposes. '
'Typically, output type hints for a '
'PTransform are single (or nested) types '
'wrapped by a PCollection, PDone, or None.',
'strip_pcoll_output()')

def strip_pcoll_helper(
self,
my_type, # type: any
has_my_type, # type: Callable[[], bool]
my_key, # type: str
special_containers, # type: List[Union[PBegin, PDone, PCollection]]
error_str, # type: str
source_str # type: str
):
# type: (...) -> IOTypeHints
from apache_beam.pvalue import PCollection

if not has_my_type() or not my_type or len(my_type[0]) != 1:
return self

my_type = my_type[0][0]

if isinstance(my_type, typehints.AnyTypeConstraint):
return self

special_containers += [PCollection]
kwarg_dict = {}

if (my_type not in special_containers and
getattr(my_type, '__origin__', None) != PCollection):
logging.warning(error_str + ' Got: %s instead.' % my_type)
kwarg_dict[my_key] = None
return self._replace(
origin=self._make_origin([self], tb=False, msg=[source_str]),
**kwarg_dict)

if (getattr(my_type, '__args__', -1) in [-1, None] or
len(my_type.__args__) == 0):
# e.g. PCollection (or PBegin/PDone)
kwarg_dict[my_key] = ((typehints.Any, ), {})
else:
# e.g. PCollection[type]
kwarg_dict[my_key] = ((convert_to_beam_type(my_type.__args__[0]), ), {})

return self._replace(
origin=self._make_origin([self], tb=False, msg=[source_str]),
**kwarg_dict)

def strip_iterable(self):
# type: () -> IOTypeHints

Expand Down
130 changes: 130 additions & 0 deletions sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from __future__ import absolute_import

import typing
import unittest

import apache_beam as beam
Expand Down Expand Up @@ -257,6 +258,135 @@ def fn2(element: int) -> int:
result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
self.assertCountEqual([4, 6], result)

def test_typed_ptransform_with_no_error(self):
class StrToInt(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

_ = ['1', '2', '3'] | StrToInt() | IntToStr()

def test_typed_ptransform_with_bad_typehints(self):
class StrToInt(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

with self.assertRaisesRegex(typehints.TypeCheckError,
"Input type hint violation at IntToStr: "
"expected <class 'str'>, got <class 'int'>"):
_ = ['1', '2', '3'] | StrToInt() | IntToStr()

def test_typed_ptransform_with_bad_input(self):
class StrToInt(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

with self.assertRaisesRegex(typehints.TypeCheckError,
"Input type hint violation at StrToInt: "
"expected <class 'str'>, got <class 'int'>"):
# Feed integers to a PTransform that expects strings
_ = [1, 2, 3] | StrToInt() | IntToStr()

def test_typed_ptransform_with_partial_typehints(self):
class StrToInt(beam.PTransform):
def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

# Feed integers to a PTransform that should expect strings
# but has no typehints so it expects any
_ = [1, 2, 3] | StrToInt() | IntToStr()

def test_typed_ptransform_with_bare_wrappers(self):
class StrToInt(beam.PTransform):
def expand(
self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

_ = [1, 2, 3] | StrToInt() | IntToStr()

def test_typed_ptransform_with_no_typehints(self):
class StrToInt(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: int(x))

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

# Feed integers to a PTransform that should expect strings
# but has no typehints so it expects any
_ = [1, 2, 3] | StrToInt() | IntToStr()

def test_typed_ptransform_with_generic_annotations(self):
T = typing.TypeVar('T')

class IntToInt(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
return pcoll | beam.Map(lambda x: x)

class IntToStr(beam.PTransform):
def expand(
self,
pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
return pcoll | beam.Map(lambda x: str(x))

_ = [1, 2, 3] | IntToInt() | IntToStr()

def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
class MyDoFn(beam.DoFn):
def process(self, element: int, *args, **kwargs):
if element % 2:
yield beam.pvalue.TaggedOutput('odd', 1)
else:
yield beam.pvalue.TaggedOutput('even', 1)

class MyPTransform(beam.PTransform):
def expand(self, pcoll: beam.pvalue.PCollection[int]):
return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')

# This test fails if you remove the following line from ptransform.py
# if isinstance(pvalue_, DoOutputsTuple): continue
_ = [1, 2, 3] | MyPTransform()


class AnnotationsTest(unittest.TestCase):
def test_pardo_dofn(self):
Expand Down
Loading