From 9e9f3dd02e74fd5787cd53be73b6a6583964f8e2 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Sat, 6 Jan 2024 16:42:19 +0100 Subject: [PATCH 1/7] FileSystemMetricsRepository sync on DBFS tutorial I need to migrate python-deequ application - I expected this task to take 1-2h initially but It took me a lot more than that. I hope that helps other people, --- tutorials/repository_sync_dbfs.ipynb | 481 +++++++++++++++++++++++++++ 1 file changed, 481 insertions(+) create mode 100644 tutorials/repository_sync_dbfs.ipynb diff --git a/tutorials/repository_sync_dbfs.ipynb b/tutorials/repository_sync_dbfs.ipynb new file mode 100644 index 0000000..d1b813a --- /dev/null +++ b/tutorials/repository_sync_dbfs.ipynb @@ -0,0 +1,481 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3d9f38cf-e24a-4854-8027-09414874fb35", + "showTitle": false, + "title": "" + } + }, + "source": [ + "####Synchronising Computed Metrics in a MetricsRepository to a json file stored on DBFS\n", + "\n", + "PyDeequ allows us to persist the metrics we computed on dataframes in a so-called MetricsRepository. In the following example, we showcase repository json file managed by python-deequ on DBFS. This can be especially usefull:\n", + "- For python-deequ application migration,\n", + "- To manage MetricsRepository json on the application side application,\n", + "- To enable explainability and analytics using MetricsRepository json.\n", + "\n", + "Note: As of 1.1.0 release of Python Deequ release initialising repository json as FileSystemMetricsRepository is the only way to run validations on historical metrics. InMemoryMetricsRepository does not support initialising from historical metrics." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "d31f7330-f71c-4a6b-80d0-852e3fee833c", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 0) Set file location for metrics repository\n", + "Write repository file using File API Format but provide Spark API Format to [FileSystemMetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "ed4115df-bf05-42ad-8feb-3e9ecf17e470", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "metrics_file = '/table_xyz_pydeequ_metrics_repository.json'\n", + "metrics_file_api =f\"/dbfs/dbfs{metrics_file}\"\n", + "metrics_spark_api =f\"dbfs:/dbfs{metrics_file}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "71833639-88d8-4151-8011-d5d659253e00", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 1) Create json - that stores historical metrics\n", + "\n", + "This json structure is retrived from a previos pydeequ run." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1bf992c7-009a-4bef-9bb8-c3645870a610", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "\n", + "with open(metrics_file_api, \"w\", encoding='utf-8') as file:\n", + " file.write(\"\"\"\n", + " [\n", + " {\n", + " \"resultKey\": { \"dataSetDate\": 1702836503289, \"tags\": {} },\n", + " \"analyzerContext\": {\n", + " \"metricMap\": [\n", + " {\n", + " \"analyzer\": {\n", + " \"analyzerName\": \"Mean\",\n", + " \"column\": \"age\"\n", + " },\n", + " \"metric\": {\n", + " \"metricName\": \"DoubleMetric\",\n", + " \"entity\": \"Column\",\n", + " \"instance\": \"age\",\n", + " \"name\": \"Mean\",\n", + " \"value\": 32\n", + " }\n", + " }\n", + " ]\n", + " }\n", + " }\n", + " ]\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "19d86aa3-a7d7-43d7-9a6a-a7f1073c2610", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 1.1) Validate the file is written to DBFS" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "e9c997a9-4917-4de5-8b8f-d0c6990baaca", + "showTitle": false, + "title": "" + } + }, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "output_type": "stream", + "text": [ + "\r\n [\r\n {\r\n \"resultKey\": { \"dataSetDate\": 1702836503289, \"tags\": {} },\r\n \"analyzerContext\": {\r\n \"metricMap\": [\r\n {\r\n \"analyzer\": {\r\n \"analyzerName\": \"Mean\",\r\n \"column\": \"age\"\r\n },\r\n \"metric\": {\r\n \"metricName\": \"DoubleMetric\",\r\n \"entity\": \"Column\",\r\n \"instance\": \"age\",\r\n \"name\": \"Mean\",\r\n \"value\": 32\r\n }\r\n }\r\n ]\r\n }\r\n }\r\n ]" + ] + } + ], + "source": [ + "!cat /dbfs/dbfs/table_xyz_pydeequ_metrics_repository.json" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "fb162f44-4f97-43bf-804a-7256cc4dd4cd", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 2) Initiate FileSystemMetricsRepository with underlying file" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2e2e05b0-45ac-4cfa-bc7b-7155c895680d", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pydeequ.repository import FileSystemMetricsRepository\n", + "\n", + "repository = FileSystemMetricsRepository(spark, metrics_spark_api)\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "53716975-d2ed-4dc4-9f5f-c0805c2d9f67", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 3) Run anomaly checks on new data but also using underlying file with historical metrics" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8d3984da-2324-4a5e-aa07-2d05ead562c7", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "\n", + "from pydeequ.repository import ResultKey\n", + "from pydeequ.verification import VerificationSuite\n", + "from pydeequ.anomaly_detection import RelativeRateOfChangeStrategy\n", + "from pydeequ.analyzers import Mean\n", + "\n", + "COLUMN_NAME = \"age\"\n", + "\n", + "df = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21}])\n", + "\n", + "verification_suite = (\n", + " VerificationSuite(spark)\n", + " .onData(df)\n", + " .useRepository(repository)\n", + " .saveOrAppendResult(\n", + " ResultKey(\n", + " spark, \n", + " ResultKey.current_milli_time()\n", + " )\n", + " )\n", + " .addAnomalyCheck(\n", + " RelativeRateOfChangeStrategy(\n", + " maxRateDecrease=0.8,\n", + " maxRateIncrease=1.2\n", + " ), \n", + " Mean(COLUMN_NAME)\n", + " )\n", + " )\n", + "results = verification_suite.run()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "222a856f-716c-47bc-be14-1506de64f8d5", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 3.1) Validate that historical metrics were taken into consideration when calculating anomaly\n", + "\n", + "New data age mean is 20, while old data age average is 30. RelativeRateOfChangeStrategy should fails as accepted rate of change is +/-20%." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9c43827c-68f2-406c-814d-951353a173bd", + "showTitle": false, + "title": "" + } + }, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/html": [ + "
checkcheck_levelcheck_statusconstraintconstraint_statusconstraint_message
Anomaly check for Mean(age,None)WarningWarningAnomalyConstraint(Mean(age,None))FailureValue: 20.0 does not meet the constraint requirement!
" + ] + }, + "metadata": { + "application/vnd.databricks.v1+output": { + "addedWidgets": {}, + "aggData": [], + "aggError": "", + "aggOverflow": false, + "aggSchema": [], + "aggSeriesLimitReached": false, + "aggType": "", + "arguments": {}, + "columnCustomDisplayInfos": {}, + "data": [ + [ + "Anomaly check for Mean(age,None)", + "Warning", + "Warning", + "AnomalyConstraint(Mean(age,None))", + "Failure", + "Value: 20.0 does not meet the constraint requirement!" + ] + ], + "datasetInfos": [], + "dbfsResultPath": null, + "isJsonSchema": true, + "metadata": {}, + "overflow": false, + "plotOptions": { + "customPlotOptions": {}, + "displayType": "table", + "pivotAggregation": null, + "pivotColumns": null, + "xColumns": null, + "yColumns": null + }, + "removedWidgets": [], + "schema": [ + { + "metadata": "{}", + "name": "check", + "type": "\"string\"" + }, + { + "metadata": "{}", + "name": "check_level", + "type": "\"string\"" + }, + { + "metadata": "{}", + "name": "check_status", + "type": "\"string\"" + }, + { + "metadata": "{}", + "name": "constraint", + "type": "\"string\"" + }, + { + "metadata": "{}", + "name": "constraint_status", + "type": "\"string\"" + }, + { + "metadata": "{}", + "name": "constraint_message", + "type": "\"string\"" + } + ], + "type": "table" + } + }, + "output_type": "display_data" + } + ], + "source": [ + "results.checkResultsAsDataFrame(spark_session=spark, verificationResult=results).display()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "4385b496-0cdf-430e-93d2-46ff6f26bf78", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 4) Validate that repository json is updated after VerificationSuite run" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "68d72b50-e7c2-4527-88d0-3ddc8d686e86", + "showTitle": false, + "title": "" + } + }, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "output_type": "stream", + "text": [ + "Out[63]: [{'resultKey': {'dataSetDate': 1702836503289, 'tags': {}},\n 'analyzerContext': {'metricMap': [{'analyzer': {'analyzerName': 'Mean',\n 'column': 'age'},\n 'metric': {'metricName': 'DoubleMetric',\n 'entity': 'Column',\n 'instance': 'age',\n 'name': 'Mean',\n 'value': 32.0}}]}},\n {'resultKey': {'dataSetDate': 1704554941399, 'tags': {}},\n 'analyzerContext': {'metricMap': [{'analyzer': {'analyzerName': 'Mean',\n 'column': 'age'},\n 'metric': {'metricName': 'DoubleMetric',\n 'entity': 'Column',\n 'instance': 'age',\n 'name': 'Mean',\n 'value': 20.0}}]}}]" + ] + } + ], + "source": [ + "import json\n", + "with open(metrics_file_api, \"r\", encoding=\"utf-8\") as file:\n", + " repository_str = file.read()\n", + "\n", + "json.loads(repository_str)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "27207de9-faf8-4842-b564-ac940807d465", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "pydeequ - FileSystemMetricsRepository sync - stand alone example", + "widgets": {} + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} From 3692c47fe95a53bb73aa60cfa6b89be9708bec54 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:23:03 +0100 Subject: [PATCH 2/7] Add files via upload --- tutorials/repository_sync_dbfs.ipynb | 438 +++++++++++++++------------ 1 file changed, 239 insertions(+), 199 deletions(-) diff --git a/tutorials/repository_sync_dbfs.ipynb b/tutorials/repository_sync_dbfs.ipynb index d1b813a..c5cbf60 100644 --- a/tutorials/repository_sync_dbfs.ipynb +++ b/tutorials/repository_sync_dbfs.ipynb @@ -4,7 +4,10 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, "nuid": "3d9f38cf-e24a-4854-8027-09414874fb35", "showTitle": false, @@ -14,8 +17,16 @@ "source": [ "####Synchronising Computed Metrics in a MetricsRepository to a json file stored on DBFS\n", "\n", - "PyDeequ allows us to persist the metrics we computed on dataframes in a so-called MetricsRepository. In the following example, we showcase repository json file managed by python-deequ on DBFS. This can be especially usefull:\n", - "- For python-deequ application migration,\n", + "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository.\n", + "\n", + "In the following example is created to demonstrate\n", + "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1)\n", + "- How to create FileSystemMetricsRepository with managed data (section 2.3)\n", + "- How to run VerificationSuite using specific managed data (all sections)\n", + "\n", + "\n", + "This can be especially usefull:\n", + "- For python-deequ application migration from one Databricks Workspace to another,\n", "- To manage MetricsRepository json on the application side application,\n", "- To enable explainability and analytics using MetricsRepository json.\n", "\n", @@ -26,16 +37,38 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "71833639-88d8-4151-8011-d5d659253e00", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 1) Simulate historical Anomaly run" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, - "nuid": "d31f7330-f71c-4a6b-80d0-852e3fee833c", + "nuid": "2e7debbf-532b-446c-99ef-89ad9c580742", "showTitle": false, "title": "" } }, "source": [ - "#### 0) Set file location for metrics repository\n", - "Write repository file using File API Format but provide Spark API Format to [FileSystemMetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala)" + "##### 1.1) Create FileSystemMetricsRepository with autogenerated path\n", + "\n", + "When initializing the FileSystemMetricsRepository without explicitly specifying a path, it will autonomously generate the path." ] }, { @@ -48,33 +81,53 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "ed4115df-bf05-42ad-8feb-3e9ecf17e470", + "nuid": "a1b22354-660c-4177-be3a-1e5c18e97be1", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "metrics_file = '/table_xyz_pydeequ_metrics_repository.json'\n", - "metrics_file_api =f\"/dbfs/dbfs{metrics_file}\"\n", - "metrics_spark_api =f\"dbfs:/dbfs{metrics_file}\"" + "from pydeequ.repository import FileSystemMetricsRepository\n", + "\n", + "repository = FileSystemMetricsRepository(spark)\n", + "print(repository.path)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, - "nuid": "71833639-88d8-4151-8011-d5d659253e00", + "nuid": "6a6cb69f-1219-4b32-8f44-e5a6d0687ffe", "showTitle": false, "title": "" } }, "source": [ - "#### 1) Create json - that stores historical metrics\n", - "\n", - "This json structure is retrived from a previos pydeequ run." + "This path is crucial for materializing json file containing metrics for consecutive PyDeequ runs. PyDeequ, in turn, matches check definitions in the persisted data model and utilizes the underlying metrics to calculate specific anomalies." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b50a2fde-1836-4317-a893-c5e25c6f82e4", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 1.2) Run anomaly checks" ] }, { @@ -87,53 +140,62 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "1bf992c7-009a-4bef-9bb8-c3645870a610", + "nuid": "3a8cef92-4212-412d-9e99-9efc7c946238", "showTitle": false, "title": "" } }, "outputs": [], "source": [ + "from pydeequ.repository import ResultKey\n", + "from pydeequ.verification import VerificationSuite\n", + "from pydeequ.anomaly_detection import RelativeRateOfChangeStrategy\n", + "from pydeequ.analyzers import Mean\n", + "\n", + "COLUMN_NAME = \"age\"\n", "\n", - "with open(metrics_file_api, \"w\", encoding='utf-8') as file:\n", - " file.write(\"\"\"\n", - " [\n", - " {\n", - " \"resultKey\": { \"dataSetDate\": 1702836503289, \"tags\": {} },\n", - " \"analyzerContext\": {\n", - " \"metricMap\": [\n", - " {\n", - " \"analyzer\": {\n", - " \"analyzerName\": \"Mean\",\n", - " \"column\": \"age\"\n", - " },\n", - " \"metric\": {\n", - " \"metricName\": \"DoubleMetric\",\n", - " \"entity\": \"Column\",\n", - " \"instance\": \"age\",\n", - " \"name\": \"Mean\",\n", - " \"value\": 32\n", - " }\n", - " }\n", - " ]\n", - " }\n", - " }\n", - " ]\"\"\")" + "df_xyz = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21}])\n", + "\n", + "verification_suite = (\n", + " VerificationSuite(spark)\n", + " .onData(df_xyz)\n", + " .useRepository(repository)\n", + " .saveOrAppendResult(\n", + " ResultKey(\n", + " spark, \n", + " ResultKey.current_milli_time(),\n", + " {\"tag\": \"historical-run-0\"}\n", + " )\n", + " )\n", + " .addAnomalyCheck(\n", + " RelativeRateOfChangeStrategy(\n", + " maxRateDecrease=0.8,\n", + " maxRateIncrease=1.2\n", + " ), \n", + " Mean(COLUMN_NAME)\n", + " )\n", + " )\n", + "results = verification_suite.run()\n" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, - "nuid": "19d86aa3-a7d7-43d7-9a6a-a7f1073c2610", + "nuid": "9ea4b0b4-d455-45b9-8551-aa8278258e42", "showTitle": false, "title": "" } }, "source": [ - "##### 1.1) Validate the file is written to DBFS" + "##### 1.3) Verify metrics.json is persisted to dbfs\n", + "\n", + "Triggering VerificationSuite.run() will persist metrics to the file underlying FileSystemMetricsRepository." ] }, { @@ -146,23 +208,34 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "e9c997a9-4917-4de5-8b8f-d0c6990baaca", + "nuid": "bd3f16f7-ce2e-4146-a230-68ebbb36450a", "showTitle": false, "title": "" } }, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "output_type": "stream", - "text": [ - "\r\n [\r\n {\r\n \"resultKey\": { \"dataSetDate\": 1702836503289, \"tags\": {} },\r\n \"analyzerContext\": {\r\n \"metricMap\": [\r\n {\r\n \"analyzer\": {\r\n \"analyzerName\": \"Mean\",\r\n \"column\": \"age\"\r\n },\r\n \"metric\": {\r\n \"metricName\": \"DoubleMetric\",\r\n \"entity\": \"Column\",\r\n \"instance\": \"age\",\r\n \"name\": \"Mean\",\r\n \"value\": 32\r\n }\r\n }\r\n ]\r\n }\r\n }\r\n ]" - ] + "outputs": [], + "source": [ + "historical_repository_path = repository.path\n", + "with open(f\"/dbfs{historical_repository_path}\", \"r\") as f: \n", + " print(f.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "27326ab8-d587-448d-b576-84fe32b8047d", + "showTitle": false, + "title": "" } - ], + }, "source": [ - "!cat /dbfs/dbfs/table_xyz_pydeequ_metrics_repository.json" + "#### 2) Create FileSystemMetricsRepository with managed path" ] }, { @@ -174,13 +247,13 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "fb162f44-4f97-43bf-804a-7256cc4dd4cd", + "nuid": "aee23203-1791-4453-b20a-412320e12642", "showTitle": false, "title": "" } }, "source": [ - "#### 2) Initiate FileSystemMetricsRepository with underlying file" + "##### 2.1) Define target path" ] }, { @@ -193,24 +266,112 @@ "rowLimit": 10000 }, "inputWidgets": {}, - "nuid": "2e2e05b0-45ac-4cfa-bc7b-7155c895680d", + "nuid": "7437cd9c-6585-486c-a071-165cdbdc615e", "showTitle": false, "title": "" } }, "outputs": [], "source": [ - "from pydeequ.repository import FileSystemMetricsRepository\n", + "import os\n", + "\n", + "os.makedirs(\"/dbfs/table_xyz\", exist_ok=True) \n", + "target_metrics_file_path = 'table_xyz/metrics.json'" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1e9bd69a-4d25-4803-a941-b91eeca447b4", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 2.2) Copy historical metrics.json to target path" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c49738f4-9cda-4383-b637-6dad483b3b24", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import shutil\n", + "\n", + "shutil.copyfile(\n", + " src = f\"/dbfs{historical_repository_path}\",\n", + " dst = f\"/dbfs/{target_metrics_file_path}\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "66967785-b567-4fc4-bf2b-d6d15c00c14b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 2.3) Initialize FileSystemMetricsRepository with managed path\n", + "Note: Manage repository file using File API format but provide Spark API format to [FileSystemMetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "555df047-1c6d-44df-aec0-b3730d8931f0", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "metrics_spark_api =f\"dbfs:/{target_metrics_file_path}\"\n", "\n", "repository = FileSystemMetricsRepository(spark, metrics_spark_api)\n", - "\n" + "print(repository.path)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, "nuid": "53716975-d2ed-4dc4-9f5f-c0805c2d9f67", "showTitle": false, @@ -218,7 +379,7 @@ } }, "source": [ - "#### 3) Run anomaly checks on new data but also using underlying file with historical metrics" + "#### 3) Run anomaly checks" ] }, { @@ -238,19 +399,11 @@ }, "outputs": [], "source": [ - "\n", - "from pydeequ.repository import ResultKey\n", - "from pydeequ.verification import VerificationSuite\n", - "from pydeequ.anomaly_detection import RelativeRateOfChangeStrategy\n", - "from pydeequ.analyzers import Mean\n", - "\n", - "COLUMN_NAME = \"age\"\n", - "\n", - "df = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21}])\n", + "df_xyz = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21},{COLUMN_NAME:50}])\n", "\n", "verification_suite = (\n", " VerificationSuite(spark)\n", - " .onData(df)\n", + " .onData(df_xyz)\n", " .useRepository(repository)\n", " .saveOrAppendResult(\n", " ResultKey(\n", @@ -273,7 +426,10 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, "nuid": "222a856f-716c-47bc-be14-1506de64f8d5", "showTitle": false, @@ -283,7 +439,7 @@ "source": [ "##### 3.1) Validate that historical metrics were taken into consideration when calculating anomaly\n", "\n", - "New data age mean is 20, while old data age average is 30. RelativeRateOfChangeStrategy should fails as accepted rate of change is +/-20%." + "New data age mean is 30, while old data age average is 20. RelativeRateOfChangeStrategy should fails as accepted rate of change is +/-20%." ] }, { @@ -301,102 +457,7 @@ "title": "" } }, - "outputs": [ - { - "output_type": "display_data", - "data": { - "text/html": [ - "
checkcheck_levelcheck_statusconstraintconstraint_statusconstraint_message
Anomaly check for Mean(age,None)WarningWarningAnomalyConstraint(Mean(age,None))FailureValue: 20.0 does not meet the constraint requirement!
" - ] - }, - "metadata": { - "application/vnd.databricks.v1+output": { - "addedWidgets": {}, - "aggData": [], - "aggError": "", - "aggOverflow": false, - "aggSchema": [], - "aggSeriesLimitReached": false, - "aggType": "", - "arguments": {}, - "columnCustomDisplayInfos": {}, - "data": [ - [ - "Anomaly check for Mean(age,None)", - "Warning", - "Warning", - "AnomalyConstraint(Mean(age,None))", - "Failure", - "Value: 20.0 does not meet the constraint requirement!" - ] - ], - "datasetInfos": [], - "dbfsResultPath": null, - "isJsonSchema": true, - "metadata": {}, - "overflow": false, - "plotOptions": { - "customPlotOptions": {}, - "displayType": "table", - "pivotAggregation": null, - "pivotColumns": null, - "xColumns": null, - "yColumns": null - }, - "removedWidgets": [], - "schema": [ - { - "metadata": "{}", - "name": "check", - "type": "\"string\"" - }, - { - "metadata": "{}", - "name": "check_level", - "type": "\"string\"" - }, - { - "metadata": "{}", - "name": "check_status", - "type": "\"string\"" - }, - { - "metadata": "{}", - "name": "constraint", - "type": "\"string\"" - }, - { - "metadata": "{}", - "name": "constraint_status", - "type": "\"string\"" - }, - { - "metadata": "{}", - "name": "constraint_message", - "type": "\"string\"" - } - ], - "type": "table" - } - }, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "results.checkResultsAsDataFrame(spark_session=spark, verificationResult=results).display()" ] @@ -405,7 +466,10 @@ "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, "nuid": "4385b496-0cdf-430e-93d2-46ff6f26bf78", "showTitle": false, @@ -413,7 +477,7 @@ } }, "source": [ - "#### 4) Validate that repository json is updated after VerificationSuite run" + "##### 3.2) Validate that repository json is updated after VerificationSuite run" ] }, { @@ -431,38 +495,14 @@ "title": "" } }, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "output_type": "stream", - "text": [ - "Out[63]: [{'resultKey': {'dataSetDate': 1702836503289, 'tags': {}},\n 'analyzerContext': {'metricMap': [{'analyzer': {'analyzerName': 'Mean',\n 'column': 'age'},\n 'metric': {'metricName': 'DoubleMetric',\n 'entity': 'Column',\n 'instance': 'age',\n 'name': 'Mean',\n 'value': 32.0}}]}},\n {'resultKey': {'dataSetDate': 1704554941399, 'tags': {}},\n 'analyzerContext': {'metricMap': [{'analyzer': {'analyzerName': 'Mean',\n 'column': 'age'},\n 'metric': {'metricName': 'DoubleMetric',\n 'entity': 'Column',\n 'instance': 'age',\n 'name': 'Mean',\n 'value': 20.0}}]}}]" - ] - } - ], + "outputs": [], "source": [ "import json\n", - "with open(metrics_file_api, \"r\", encoding=\"utf-8\") as file:\n", + "with open(f\"/dbfs/{target_metrics_file_path}\", \"r\", encoding=\"utf-8\") as file:\n", " repository_str = file.read()\n", "\n", "json.loads(repository_str)" ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, - "inputWidgets": {}, - "nuid": "27207de9-faf8-4842-b564-ac940807d465", - "showTitle": false, - "title": "" - } - }, - "outputs": [], - "source": [] } ], "metadata": { @@ -472,7 +512,7 @@ "notebookMetadata": { "pythonIndentUnit": 4 }, - "notebookName": "pydeequ - FileSystemMetricsRepository sync - stand alone example", + "notebookName": "pydeequ: repository_sync_dbfs", "widgets": {} } }, From f99ee8ea2215920ee2960a2264d78c0bdcd6812b Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:24:41 +0100 Subject: [PATCH 3/7] Rename repository_sync_dbfs.ipynb to repository_file_dbfs.ipynb --- .../{repository_sync_dbfs.ipynb => repository_file_dbfs.ipynb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tutorials/{repository_sync_dbfs.ipynb => repository_file_dbfs.ipynb} (100%) diff --git a/tutorials/repository_sync_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb similarity index 100% rename from tutorials/repository_sync_dbfs.ipynb rename to tutorials/repository_file_dbfs.ipynb From c2d08cdee17f7ccc57e2dd6d4fd530e7a82bae32 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:27:16 +0100 Subject: [PATCH 4/7] Update repository_file_dbfs.ipynb --- tutorials/repository_file_dbfs.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/repository_file_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb index c5cbf60..d727bf2 100644 --- a/tutorials/repository_file_dbfs.ipynb +++ b/tutorials/repository_file_dbfs.ipynb @@ -19,7 +19,7 @@ "\n", "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository.\n", "\n", - "In the following example is created to demonstrate\n", + "The following tutorial is created to demonstrate\n", "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1)\n", "- How to create FileSystemMetricsRepository with managed data (section 2.3)\n", "- How to run VerificationSuite using specific managed data (all sections)\n", From 3ee7986c9e25c2cfde1c43412ed5ad6dd2670f5d Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:27:56 +0100 Subject: [PATCH 5/7] Update repository_file_dbfs.ipynb --- tutorials/repository_file_dbfs.ipynb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tutorials/repository_file_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb index d727bf2..cf928f6 100644 --- a/tutorials/repository_file_dbfs.ipynb +++ b/tutorials/repository_file_dbfs.ipynb @@ -20,9 +20,9 @@ "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository.\n", "\n", "The following tutorial is created to demonstrate\n", - "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1)\n", - "- How to create FileSystemMetricsRepository with managed data (section 2.3)\n", - "- How to run VerificationSuite using specific managed data (all sections)\n", + "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1),\n", + "- How to create FileSystemMetricsRepository with managed data (section 2.3),\n", + "- How to run VerificationSuite using specific managed data (all sections).\n", "\n", "\n", "This can be especially usefull:\n", From a77b5526fd180e2ba9f045ce11816646a3580533 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:29:32 +0100 Subject: [PATCH 6/7] Update repository_file_dbfs.ipynb --- tutorials/repository_file_dbfs.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/repository_file_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb index cf928f6..194f95c 100644 --- a/tutorials/repository_file_dbfs.ipynb +++ b/tutorials/repository_file_dbfs.ipynb @@ -15,7 +15,7 @@ } }, "source": [ - "####Synchronising Computed Metrics in a MetricsRepository to a json file stored on DBFS\n", + "####Manage FileSystemMetricsRepository metrics JSON file stored on DBFS between runs.\n", "\n", "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository.\n", "\n", From a1772ba4f69d47197196429cf0b1ca0ec981bcd7 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Wed, 31 Jan 2024 12:31:21 +0100 Subject: [PATCH 7/7] Update repository_file_dbfs.ipynb --- tutorials/repository_file_dbfs.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/repository_file_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb index 194f95c..cef3b7a 100644 --- a/tutorials/repository_file_dbfs.ipynb +++ b/tutorials/repository_file_dbfs.ipynb @@ -17,7 +17,7 @@ "source": [ "####Manage FileSystemMetricsRepository metrics JSON file stored on DBFS between runs.\n", "\n", - "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository.\n", + "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository. FileSystemMetricsRepository implements MetricsRepository and allows to materialize repository to JSON file.\n", "\n", "The following tutorial is created to demonstrate\n", "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1),\n",