New aatams dm v2 #208
Conversation
e07d5d4 to
ada46be
Compare
Codecov Report
@@ Coverage Diff @@
## master #208 +/- ##
=========================================
Coverage ? 87.87%
=========================================
Files ? 45
Lines ? 2894
Branches ? 490
=========================================
Hits ? 2543
Misses ? 223
Partials ? 128
Continue to review full report at Codecov.
|
ada46be to
6ad3153
Compare
|
I just fixed the tests & ironed out some extra code that was unnecessary - I rebased since no one looked into :) Other bits:
|
Included here is:
1. CSVSchema class
2. AATAMS_DM_Schema class
3. AATAMS_DM pipeline handler
The schema classes are "LEVT" classes. They can Load, Extract, Validate and Transform the required csv files into dictionaries/lists.
In particular, the AATAMS class can do schema validation and transformations in several levels:
a.filename convention in the zip
b.headers of all filenames
c.types of all columns
d.type-cast all columns
e.stores the type-cast of the files.
The nice thing about the implementation is that, after the LEVT flow is done, we could potentially just load the csv (or the dicts/lists) into the database directly (as long as the dbschema is defined and we got authentications).
A full content validation is done by default, but that can be easily changed.
Tests:
1. Schema is validated against an aggregation of all the zip files I found in the error directory.
2. Pipeline tests are limited but we need to discuss where the err files are defacto invalid or valid.
Performance: I aggregated all aatams_dm csv in the error folder and got about ~370mb. Reading this took a bit but not much with the current implementation(<5min). It may be an issue if the files are too big. This is the reason some options are provided that may skip some rows in the content validation.
dad0932 to
7ad1ebc
Compare
|
Refactored/squashed everything since no one read it :) @evacougnon - extra attention on the #TODO markings of the schema types, since I had to estimate types based on limited data instead of specifications. @mhidas - I would appreciate some feedback here, particularly if the pipeline is following standards . I need a refresh regarding the error files here too - I used them as examples/valid input, and couldn't find anything wrong with them ! |
|
@ocehugo I will take a look next week. Please assign me to the relevant PO Backlog item to remind me. |
|
@ocehugo I will have a look at the TODO marking tomorrow @mhidas I added you to the backlog item for iteration 198 so you can add your time on it |
|
@lbesnard might be able to help review the PR if Marty is too busy on moorings products. |
|
Some todos, so I don't lose track:
|
|
@ocehugo is it a draft PR or is it ready to be reviewed? |
|
@lbesnard, it's ready - I'm just raising things about extra functionality /non default beahaviour that can wait until your/Eva comments. |
what does this mean @ocehugo |
|
hehehe "LoadExtractValidateTransform" |
oh right! like talend! |
not quite Talend is ETL |
lbesnard
left a comment
There was a problem hiding this comment.
general comment
-
maybe the folder doesn/t have to be specific to *_dm. I think we have tried limiting the number of folders in the aodndata folder. I'm imagine this could go to the existing aatams folder and some of the current aatams pipelines (in the aatams folder) could probably be removed with this PR. see with @evacougnon
-
Out of interest, you're pretty much validating all data values, checking str/floats/positivity ... Why are doing this with the schema package? Could it have been simpler with something like Pandas?
-
There are quite a few bits I just don't get, some parts of the code seemed unused, and I tried quite hard in understanding it.
-
Please add a few more comments, as it can be hard to follow with multiple one-liner functions, renamed to partials ... I feel like I have to navigate a bit everywhere to understand what is happening. I guess my brain is quite linear.
otherwise great code, I learnt a lot, not for the faint-hearted.
I definitely need a beer ...
| } | ||
|
|
||
| INSTALL_REQUIRES = [ | ||
| 'schema>=0.7.0', |
There was a problem hiding this comment.
please sort alphabetically
There was a problem hiding this comment.
missing hanlder name in entry_points in setup.py
| self.assertEqual(AATAMS_SATTAG_DM_BASE, | ||
| os.path.dirname(file.dest_path)) | ||
|
|
||
|
|
There was a problem hiding this comment.
it would be great to test the dest_path return value for various cases, not just the directory output
There was a problem hiding this comment.
This definition was cloudy at the moment - this is why the path is hardcoded to the constant.
There was a problem hiding this comment.
I guess what I meant is that you're only testing the value AATAMS_SATTAG_DM_BASE which is the directory output, but not the full path including os.path.basename(file.dest_path)
There was a problem hiding this comment.
Ahh that's because we don't change filenames. I will add it anyway since this would be more explicit
| } | ||
|
|
||
|
|
||
| class AATAMS_SATTAG_QC_DM_SCHEMA(CSVSchema): |
There was a problem hiding this comment.
The upper case for the class name is confusing as we usually use upper case for global variables. see comment above in the import
There was a problem hiding this comment.
Funny bit - this comment doesn't appear in the review window only in the PR window. As said, addressed in the review comments.
| bool: False if invalid | ||
|
|
||
| """ | ||
| if astr in ("", "NA", "NaN"): |
There was a problem hiding this comment.
i find this a bit weird. does not take cases of space, case ... can't you use something from numpy or panda/
There was a problem hiding this comment.
Funny thing - some comments arent in the github review window only in the PR window - completely missed those.
I didn't use pandas because to import it just to do that is overkill. Also, to type check the result of , say, pd.to_numeric, I would need to check against numpy types instead of built-ins, which would increase the number of possible definitions (check for float, check for np.float64, np.float32, etc).
Finally, I prefer to keep the result as much as close to the actual data being read (a string) - that's why I put an empty string.
Yes - I'll add a check for white spaces and the UPPERcase cases.
There was a problem hiding this comment.
Yes - I'll add a check for white spaces and the UPPERcase cases.
I can't see this added in the function. Is it somewhere else?
| check_len_list, AATAMS_DM_NUMBER_OF_FILES_IN_ZIP | ||
| ) | ||
| check_name_of_files_is_correct = partial(check_file_names, AATAMS_DM_FILE_TYPE_NAMES) | ||
|
|
There was a problem hiding this comment.
I understand the code is pretty clean! however, I think it's quite unnecessary complicated.
You created check_len_list and check_file_names which both take 2 arguments. Both functions are only used once throughout your code.
Then you create a partial function so those two functions above only take now 1 argument ... Why? I mean yes it works but it's quite convoluted
There was a problem hiding this comment.
This is so to reduce arity - this way, all schema definitions are functions of a single argument - the field being validated.
There was a problem hiding this comment.
yeah sure, I got that, but my point is why creating check_len_list and check_file_names with 2 arguments in the first place
There was a problem hiding this comment.
don't take this comment as something you gotta change. it's fine, it's working, I'm just questioning the usefulness of doing so
There was a problem hiding this comment.
Ahh - it's because I didn't knew, before-hand, If I would need to reuse the functions to create others - just a composition thing. It's also useful to build like this if I may need to split/move some of the functions to another files (e.g. re-use). This way the port is easy since the functions are independent of "globals".
| self.validate_zip_names(file_list) | ||
| self.validate_headers_only(file_list) | ||
|
|
||
| pass |
There was a problem hiding this comment.
pass not required - was removed.
| self.headers = {} | ||
| self.headers["metadata"], self.metadata = self.validate_file(metadata_file[0]) | ||
|
|
||
| if self.cross_content_scope: |
There was a problem hiding this comment.
why creating this condition since this is always defined as "cid": "sattag_program" anyway ?
There was a problem hiding this comment.
yes - the cross validation was broken when I incompletely rebase - fixed the logic in new commits
| self.report("\tPopulating cross content based on %s" % metadata_file) | ||
| ckeys = self.cross_content_scope.keys() | ||
| self.cross_values = self.compute_cross_values(self.metadata, keys=ckeys) | ||
|
|
There was a problem hiding this comment.
this block above confuses me. If I remove it from, it doesn't seem to change anything in the behaviour of the code
|
|
||
| if self.same_keys_content: | ||
| self.report("\t\t\tValidating inter columns in %s" % file) | ||
| self.validate_self_content(self.same_keys_content, valid_dataset) |
There was a problem hiding this comment.
I can't find anywhere where this value self.same_keys_content may changed in the code. And I actually don't understand where it is actually used
There was a problem hiding this comment.
Good catch - forgot to remove this legacy bit and the related method that is now unused.
| self.report("\t\t\tValidating inter columns in %s" % file) | ||
| self.validate_self_content(self.same_keys_content, valid_dataset) | ||
|
|
||
| if self.cross_content_scope and self.cross_values: |
There was a problem hiding this comment.
I'm trying hard to understand the logic with self.cross_values across the different functions, and running the unittests. It seems like the value is always empty, and the if block always bypassed. please enlight me!
There was a problem hiding this comment.
yeap - I will clarify with docstrings and fix what I have broken in the rebase.
It doesn't ring anything in tests because I changed the argument call to the function that computes the cross_values, which return empty results.
cross_content_scope is a dictionary mapping a column name in one file to a different column name in another file. This is used by compute_cross_values to compare compare set(dict1[keyA]) against set(dict2[keyB]). "cross"-> (among different dicts/files) "content" -> values , "scope" (set comparison) .
In the case of AATAMS, this is true for the "sattag_program", which is a field in metadata.csv, while this same field is named "cid" in other files.
My fail here is that cross_content_scope and related got no docs :)
If we change in line 673 in the aatams_schema:
-ckeys = self.cross_content_scope.keys()
+ckeys = self.cross_content_scope.values()
the cross content will be set-compared, but note that the naming is a bit of an anti-pattern ATM - a tuple is better .
evacougnon
left a comment
There was a problem hiding this comment.
I had a look at the #TODO markings of the schema types. Note that we are actually not harvesting all the fields we have in the csv, so you may not need all these checks.
About the naming convention, I don't mind whether or not we use the _dm. Happy with any simpler suggestion, we'll just have to take it in consideration for the harvester.
And yes we'll have to set up the archive of the .zip file
| def valid_device(astr): # TODO expand to particular devices fmts? | ||
| """Check if a valid device. | ||
|
|
||
| Args: | ||
| astr: a device string | ||
|
|
||
| Returns: | ||
| str: astr if valid | ||
| bool: False if invalid | ||
|
|
||
| """ | ||
| return astr if len(astr.split("-")) >= 3 else False |
There was a problem hiding this comment.
this has to be a 3-part code string, so you can use:
return astr if len(astr.split("-")) == 3 else False
| def valid_wmo_device(astr): # TODO expand to particular codes? | ||
| """Check if a valid wmo device. | ||
|
|
||
| Args: | ||
| astr: a wmo device string | ||
|
|
||
| Returns: | ||
| str: astr if valid | ||
| bool: False if invalid | ||
|
|
||
| """ | ||
| return astr if astr[0] == "Q" else False |
There was a problem hiding this comment.
I would remove this part, with the documentation I have I can't guarantee they all start with 'Q'
There was a problem hiding this comment.
ok, will just leave a string
| def valid_tag_type(astr): # TODO expand to particular strings. | ||
| """Check if a valid tag type. | ||
|
|
||
| Args: | ||
| astr: a tag type string | ||
|
|
||
| Returns: | ||
| str: astr if valid | ||
| bool: False if invalid | ||
|
|
||
| """ | ||
| return astr if "_" in astr else False |
There was a problem hiding this comment.
TODO expand to particular strings. -- I would remove this part, with the doco I have I can't guarantee they all have a '_'
| return astr if "_" in astr else False | ||
|
|
||
|
|
||
| def valid_longitude(afloat): # TODO check convention |
There was a problem hiding this comment.
yes, I confirm this is the right convention
| return afloat if valid else False | ||
|
|
||
|
|
||
| def not_applicable(astr): # TODO: handle every case as particular? |
There was a problem hiding this comment.
empty field and 'NA' are commonly used in the files we'll harvest, but I'm happy to keep 'NaN' in the list
There was a problem hiding this comment.
there r some nans - that's why I put everything "missing" in one definition to simplify stuff
| "surf.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO | ||
| "dive.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO | ||
| "haul.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO |
| "surf.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO | ||
| "dive.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO | ||
| "haul.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), # TODO | ||
| "n.cycles": CSV_INT, |
There was a problem hiding this comment.
"n.cycles": CSV_POSITIVE_INT,
| "cruise.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "avg.sst": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "avg.speed": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), |
There was a problem hiding this comment.
we're not harvesting these fields into our DB, so ignore
| "dp.n.cycles": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.av.depth": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.max.depth": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.avg.speed": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.sd.depth": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.av.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.sd.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.max.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.dive.tm": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "av.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "sd.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "max.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.av.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.sd.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "dp.max.surf.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "pca": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "swim.eff.desc": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "swim.eff.asc": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "swim.eff.whole": Or(CSV_EMPTY, CSV_POSITIVE_FLOAT), | ||
| "secs.desc": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "secs.asc": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "pitch.desc": Or(CSV_EMPTY, CSV_INT), | ||
| "pitch.asc": Or(CSV_EMPTY, CSV_INT), | ||
| "av.haulout.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "sd.haulout.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "max.haulout.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "av.phosi.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "sd.phosi.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), | ||
| "max.phosi.dur": Or(CSV_EMPTY, CSV_POSITIVE_INT), |
There was a problem hiding this comment.
we're not harvesting these fields into our DB, so ignore
| "ssm_y": COORD_SCHEMA["ssm_y"], | ||
| "ssm_x.se": COORD_SCHEMA["ssm_x.se"], | ||
| "ssm_y.se": COORD_SCHEMA["ssm_y.se"], | ||
| "cid": METADATA_SCHEMA["sattag_program"], |
There was a problem hiding this comment.
we're not harvesting these fields into our DB, so ignore
|
@lbesnard - sorry for the assynchronous review comments - I had a bad github day yesterday!
Yeap, the name was requested by Eva, but its a better idea to put this inside aatams folder.
Yes, we can validate all values or some of them - up to the harvest definition. The reason why I used schema is that it greatly simplifies the type definitions, is very flexible and extensible, and provides nice output msgs. For example, for Every As I said in a comment above, I could've used the "converters" from pandas but this functionality is not as flexible as schema and I would be doing exactly what schema is supposed to do! Alternatively, there is also a pandas_schema package. I was not convinced if it was flexible as the schema package, however.
Yes, those were the one left-overs I mentioned above. But as you noticed, they do no harm because default empty definitions are skipped.
I will add some more, but I think here we can't run away from it - it's just the nature of the data (a lot of columns with different types!). The partials uses are tricks to reduce all validation functions to be unary and easily used on the schema verification call. Yes, you had to drill down because my aim here was to reuse some of this stuff. Hopefully, we may start validating the actual data in python instead of skipping it or relying on talend.
Thanks - I should have pointed to the schema package help before - once you get the grits, all is fine - it's simple but powerful. It also helps to know a bit of some comon patterns (e.g. partials,lazyness). |
This cleans up dead code and fix the cross validation feature for the AATAMS schema class. I moved the dead code from CSVSchema to aatams since this feature is beyond the CSVSchema task. I also clean up the AATAMS Schema class by adding more explicit definitions plus the init signature.
+docstring fix
1. Fix missing returns in two functions 2. Move schema types as requested 3. Rename class and improve docstrings 4. Remove todos entries 5. All tests passing
tests pass here
I assumed the logger package was used instead of logging. This created a nasty bug + ugly outputs in the tests that was hard to track.
The new zip files contains updated files/columns sent by data provider
|
@evacougnon - See new commits above. Contents:
|
| # self.input_file_object.name, | ||
| # old_metadata_file.name, | ||
| # ), | ||
| # ) |
There was a problem hiding this comment.
why not just removing it ? Do you think this will be re-used in the future
There was a problem hiding this comment.
@evacougnon mentioned that it could be turned on in the future, so I thought it would be better to keep the work done here than to throw it away.
| """Initialize the class by only accepting zip files.""" | ||
| super(AatamsSattagQcDmHandler, self).__init__(*args, **kwargs) | ||
| self.allowed_extensions = [".zip"] | ||
| self.allowed_regexes = ["^.+_((?!nrt\\.)).+$"] |
There was a problem hiding this comment.
why not ^.+_dm\\.zip$ ? This regex seems to allow anything different from a _nrt file
There was a problem hiding this comment.
yeap, forgot to update that too. I'm doing a refactoring since I didn't realize that we can just use a single pipeline class to handle both cases and reduce code size
There was a problem hiding this comment.
hum not too forgoet that the pipeline should be able to handle a zip file, but also a csv file that we have decided to repush for any reason
There was a problem hiding this comment.
@lbesnard - I didn't knew about that requirement - will add a csv option then. However, should I allow older files to be uploaded even though current files are newer (NRT and DM?).
There was a problem hiding this comment.
no i don't think it was, but considering how little the change is, I think it's valuable to have
should I allow older files to be uploaded even though current files are newer
I would yes at this stage as it would require to check what is on the storage broker or in the wfs. but don't quote me. To be confirmed with @evacougnon
There was a problem hiding this comment.
no i don't think it was, but considering how little the change is, I think it's valuable to have
Not as simple as adding to the extension list, but still doable (schema functions have to adapt to accept single file instead of a list of 7 files)
There was a problem hiding this comment.
Ahh, I found a simple solution - just short-circuiting in preprocessing - done.
This renames some classes update dm test files and includes extra csv files for testing
This commit includes major refactoring in both NRT and DM classes. 1. Both pipelines now uses the same base class, but only initialized with different options. 2. The code was updated to block ingestion of old files in the NRT pipeline. 3. All tests were updated/extended and the file was renamed accordingly.
|
DM and NRT share the same class now, with just different init options. Ingestion was updated to allow a single csv file to be accepted and tests were updated. See commit msgs for the gist of it. |
evacougnon
left a comment
There was a problem hiding this comment.
Looks good on my side. @lbesnard , it will be great if you can also review.
| "NRT update requested: Comparing timestamps in the metadata files %s and %s" | ||
| ) | ||
| NRT_TIMESTAMP_DIFFERS_MSG = ( | ||
| "File %s within %s contains older entries than current NRT state from %s" |
There was a problem hiding this comment.
maybe the message could be clearer?
"Incoming File ... contains .... older timestamps values (or simply data, or something) than latest published NRT file"
What do you think?
There was a problem hiding this comment.
Yeap - better to reference the actual column/field name
1.This consistently renames some classes, constants and functions related to the AATAMS name soup. 2. Fix small typos and docstrings 3. Move some definitions up in the source code
|
@lbesnard - all comments addressed - see above |
| 'AatamsDmHandler = aodndata.aatams.aatams_dm:AatamsDmHandler', | ||
| 'AatamsNrtHandler = aodndata.aatams.aatams_nrt:AatamsNrtHandler', | ||
| 'AatamsSattagQcDmHandler = aodndata.aatams.aatams_sattag:AatamsSattagQcDmHandler', | ||
| 'AatamsSattagNrtHandler = aodndata.aatams.aatams_sattag:AatamsSattagNrtHandler', |
There was a problem hiding this comment.
the handler has to be renamed here to match your changes
There was a problem hiding this comment.
Arg, always forgetting these bits.
|
@ocehugo one minor change to be done to make it work. longest conversation ever on this PR !! |
This fixes the following problems: 1. Handling of zero in is_positive/negative. A zero entry is valid for both now. 2. Correct CSV_EMPTY to raise error instead of returning False. This fixes handling the chaining of commands for two other entries (list_of_ints, and the new dates that accept empty entries now). 3. Better handling dialect and csv delimiters.
Allows handling lists with strings and datenum items when estimating the most recent entry. This is required now since some dates may be empty.
|
@evacougnon , @lbesnard - ready for review. Some small changes are just things I noticed when revisiting some files or handling some fields. |
feat(pipeline): new aatams_sattag_qc_dm pipeline
Included here is:
1. CSVSchema class
2. AATAMS_DM_Schema class
3. AATAMS_DM pipeline handler
The schema classes are "LEVT" classes. They can Load, Extract, Validate and Transform the required csv files into dictionaries/lists.
In particular, the AATAMS class can do schema validation and transformations in several levels:
The nice thing about the implementation is that, after the LEVT flow is done, we could potentially just load the csv (or the dicts/lists) into the database directly (as long as the dbschema is defined and we got authentications).
A full content validation is done by default, but that can be easily changed.
Tests:
Performance: I aggregated all aatams_dm csv in the error folder and got about ~370mb. Reading this took a bit but not much with the current implementation(<5min). It may be an issue if the files are too big. This is the reason some options are provided that may skip some rows in the content validation.