diff --git a/data/aws_cluster_table_acls.json b/data/aws_cluster_table_acls.json new file mode 100644 index 00000000..c97eaa07 --- /dev/null +++ b/data/aws_cluster_table_acls.json @@ -0,0 +1,25 @@ +{ + "num_workers": 1, + "cluster_name": "API_Table_ACL_Work_Leave_Me_Alone", + "spark_version": "6.5.x-scala2.11", + "spark_conf": { + "spark.databricks.cluster.profile": "serverless", + "spark.databricks.repl.allowedLanguages": "python,sql", + "spark.databricks.acl.dfAclsEnabled": "true" + }, + "aws_attributes": { + "first_on_demand": 1, + "availability": "SPOT_WITH_FALLBACK", + "zone_id": "us-west-2b", + "spot_bid_price_percent": 100, + "ebs_volume_type": "GENERAL_PURPOSE_SSD", + "ebs_volume_count": 1, + "ebs_volume_size": 100 + }, + "driver_node_type_id": "m4.xlarge", + "node_type_id": "m4.xlarge", + "spark_env_vars": { + "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + }, + "autotermination_minutes": 15 +} \ No newline at end of file diff --git a/data/azure_cluster_table_acls.json b/data/azure_cluster_table_acls.json new file mode 100644 index 00000000..4fc534fc --- /dev/null +++ b/data/azure_cluster_table_acls.json @@ -0,0 +1,18 @@ +{ + "num_workers": 1, + "cluster_name": "API_Table_ACL_Work_Leave_Me_Alone", + "spark_version": "6.5.x-scala2.11", + "spark_conf": { + "spark.databricks.cluster.profile": "serverless", + "spark.databricks.repl.allowedLanguages": "python,sql", + "spark.databricks.acl.dfAclsEnabled": "true" + }, + "node_type_id": "Standard_DS3_v2", + "ssh_public_keys": [], + "custom_tags": {}, + "spark_env_vars": { + "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + }, + "autotermination_minutes": 120, + "init_scripts": [] +} diff --git a/data/notebooks/Export_Table_ACLs.py b/data/notebooks/Export_Table_ACLs.py new file mode 100644 index 00000000..ddc7ed72 --- /dev/null +++ b/data/notebooks/Export_Table_ACLs.py @@ -0,0 +1,167 @@ +# Databricks notebook source +# MAGIC %md #Export Table ACLs +# MAGIC +# MAGIC Exports Table ACLS to a JSON file on DBFS, which can be imported using the Import_Table_ACLs script +# MAGIC +# MAGIC Parameters: +# MAGIC - Databases: [Optional] comma separated list of databases to be exported, if empty all databases will be exported +# MAGIC - OutputPath: Path to write the exported file to +# MAGIC +# MAGIC Execution: **Run the notebook on a cluster with Table ACL's enabled as a user who is an admin** +# MAGIC +# MAGIC Supportes ACLs for Object types: +# MAGIC - Catalog: included if all databases are exported, not included if databases to be exported are specified +# MAGIC - Database: included +# MAGIC - Table: included +# MAGIC - View: included +# MAGIC - Anonymous Function: included (testing pending) +# MAGIC - Any File: included +# MAGIC +# MAGIC Disclaimer: This notebook is still needs some more testing, check back soon as fixes might have been added. + +# COMMAND ---------- + +# DBTITLE 1,Declare Parameters +#dbutils.widgets.removeAll() +dbutils.widgets.text("Databases","db_acl_test,db_acl_test_restricted","1: Databases (opt)") +dbutils.widgets.text("OutputPath","dbfs:/tmp/migrate/test_table_acls.json.gz","2: Output Path") + +# COMMAND ---------- + +# DBTITLE 1,Check Parameters + +if not dbutils.widgets.get("OutputPath").startswith("dbfs:/"): + raise Exception(f"Unexpected value for notebook parameter 'InputPath', got <{dbutils.widgets.get('OutputPath')}>, but it must start with ") + + +# COMMAND ---------- + +# DBTITLE 1,Define Export Logic +import pyspark.sql.functions as sf +from typing import Callable, Iterator, Union, Optional, List +import datetime; + +def get_database_names(): + database_names = [] + for db in spark.sql("show databases").collect(): + if hasattr(db,"databaseName"): #Angela has this fallback ... + database_names.append(db.databaseName) + else: + database_names.append(db.namespace) + return database_names + +def create_grants_df(database_name: str,object_type: str, object_key: str) -> List[str]: + if object_type in ["CATALOG", "ANY FILE", "ANONYMOUS FUNCTION"]: #without object key + grants_df = ( + spark.sql(f"SHOW GRANT ON {object_type}") + .groupBy("ObjectType","ObjectKey","Principal").agg(sf.collect_set("ActionType").alias("ActionTypes")) + .selectExpr("NULL AS Database","Principal","ActionTypes","ObjectType","ObjectKey","Now() AS ExportTimestamp") + ) + else: + grants_df = ( + spark.sql(f"SHOW GRANT ON {object_type} {object_key}") + .filter(sf.col("ObjectType") == f"{object_type}") + .groupBy("ObjectType","ObjectKey","Principal").agg(sf.collect_set("ActionType").alias("ActionTypes")) + .selectExpr(f"'{database_name}' AS Database","Principal","ActionTypes","ObjectType","ObjectKey","Now() AS ExportTimestamp") + ) + return grants_df + + +def create_table_ACLSs_df_for_databases(database_names: List[str]): + + # TODO check Catalog heuristic: + # if all databases are exported, we include the Catalog grants as well + #. if only a few databases are exported: we exclude the Catalog + if database_names is None or database_names == '': + database_names = get_database_names() + include_catalog = True + else: + include_catalog = False + + # ANONYMOUS FUNCTION + combined_grant_dfs = create_grants_df(None, "ANONYMOUS FUNCTION", None) + + # ANY FILE + combined_grant_dfs = combined_grant_dfs.unionAll( + create_grants_df(None, "ANY FILE", None) + ) + + # CATALOG + if include_catalog: + combined_grant_dfs = combined_grant_dfs.unionAll( + create_grants_df(None, "CATALOG", None) + ) + #TODO ELSE: consider pushing catalog grants down to DB level in this case + + for database_name in database_names: + + # DATABASE + combined_grant_dfs = combined_grant_dfs.unionAll( + create_grants_df(database_name, "DATABASE", database_name) + ) + + tables_and_views_rows = spark.sql( + f"SHOW TABLES IN {database_name}" + ).filter(sf.col("isTemporary") == False).collect() + + print(f"{datetime.datetime.now()} working on database {database_name} with {len(tables_and_views_rows)} tables and views") + for table_row in tables_and_views_rows: + + # TABLE, VIEW + combined_grant_dfs = combined_grant_dfs.unionAll( + create_grants_df(database_name, "TABLE", f"{table_row.database}.{table_row.tableName}") + ) + + #TODO ADD USER FUNCTION - not supported in SQL Analytics, so this can wait a bit + # ... SHOW USER FUNCTIONS LIKE .`*`; + #. function_row['function'] ... nah does not seem to work + + #combined_grant_dfs = combined_grant_dfs.sort("") + + return combined_grant_dfs + + +# COMMAND ---------- + +# DBTITLE 1,Run Export +databases_raw = dbutils.widgets.get("Databases") +output_path = dbutils.widgets.get("OutputPath") + +if databases_raw.rstrip() == '': + databases = None + print(f"Exporting all databases") +else: + databases = [x.rstrip().lstrip() for x in databases_raw.split(",")] + print(f"Exporting the following databases: {databases}") + + +table_ACLs_df = create_table_ACLSs_df_for_databases(databases) + +print(f"{datetime.datetime.now()} writing table ACLs to {output_path}") + +# with table ACLS active, I direct write to DBFS is not allowed, so we store +# the dateframe as a table for single zipped JSON file sorted, for consitent file diffs +( + table_ACLs_df + .coalesce(1) + .selectExpr("Database","Principal","ActionTypes","ObjectType","ObjectKey","ExportTimestamp") + .sort("Database","Principal","ObjectType","ObjectKey") + .write + .format("JSON") + .option("compression","gzip") + .mode("overwrite") + .save(output_path) +) + + +# COMMAND ---------- + +display(spark.read.format("json").load(output_path)) + +# COMMAND ---------- + +print(output_path) + +# COMMAND ---------- + + diff --git a/data/notebooks/Import_Table_ACLs.py b/data/notebooks/Import_Table_ACLs.py new file mode 100644 index 00000000..7cd9850b --- /dev/null +++ b/data/notebooks/Import_Table_ACLs.py @@ -0,0 +1,115 @@ +# Databricks notebook source +# MAGIC %md #Import Table ACLs +# MAGIC +# MAGIC Imports Table ACLS from a JSON file on DBFS, which has been generated by the Export_Table_ACLs script +# MAGIC +# MAGIC Parameters: +# MAGIC - InputPath: Path to the JSON file to import from (gzipped JSON) +# MAGIC +# MAGIC Execution: **Run the notebook on a cluster with Table ACL's enabled as a user who is an admin** +# MAGIC +# MAGIC Supportes ACLs for Object types: +# MAGIC - Catalog: included if all databases are exported, not included if databases to be exported are specified +# MAGIC - Database: included +# MAGIC - Table: included +# MAGIC - View: included +# MAGIC - Anonymous Function: included (testing pending) +# MAGIC - Any File: included +# MAGIC +# MAGIC Disclaimer: This notebook is still needs some more testing, check back soon as fixes might have been added. + +# COMMAND ---------- + +# DBTITLE 1,Define Parameters +#dbutils.widgets.removeAll() +dbutils.widgets.text("InputPath","dbfs:/tmp/migrate/test_table_acls.json.gz","1: Input Path") + +# COMMAND ---------- + +# DBTITLE 1,Validated Parameters +if not dbutils.widgets.get("InputPath").startswith("dbfs:/"): + raise Exception(f"Unexpected value for notebook parameter 'InputPath', got <{dbutils.widgets.get('InputPath')}>, but it must start with ") + +# COMMAND ---------- + +# DBTITLE 1,Show Input Data +display(spark.read.format("JSON").load(dbutils.widgets.get("InputPath"))) + +# COMMAND ---------- + +# DBTITLE 1,Define Import Logic +import datetime +import pyspark.sql.functions as sf +from typing import Callable, Iterator, Union, Optional, List + + +def generate_table_acls_command(action_types, object_type, object_key, principal, alter_owner=True): + lines = [] + + grant_privs = [ x for x in action_types if not x.startswith("DENIED_") and x != "OWN" ] + deny_privs = [ x[len("DENIED_"):] for x in action_types if x.startswith("DENIED_") and x != "OWN" ] + + # TODO consider collapsing to all priviledges if all are granted + + if grant_privs: + lines.append(f"GRANT {', '.join(grant_privs)} ON {object_type} {object_key} TO `{principal}`;") + if deny_privs: + lines.append(f"DENY {', '.join(deny_privs)} ON {object_type} {object_key} TO `{principal}`;") + + #TODO !!! NOT QUITE SURE WETHER ALTER OWNER ACTUALLY WORKS !!!!!!!!!!! + if alter_owner and "OWN" in action_types: + lines.append(f"ALTER {object_type} {object_key} OWNER TO `{principal}`;") + + return lines + + +def generate_table_acls_commands(table_ACLs_df, commented: bool=True, alter_owner: bool=True) -> List[str]: + lines = [] + for row in table_ACLs_df.collect(): + + + if row["ObjectType"] == "ANONYMOUS_FUNCTION": + lines.extend(generate_table_acls_command(row['ActionTypes'], 'ANONYMOUS FUNCTION', '', row['Principal'], alter_owner)) + elif row["ObjectType"] == "ANY_FILE": + lines.extend(generate_table_acls_command(row['ActionTypes'], 'ANY FILE', '', row['Principal'], alter_owner)) + elif row["ObjectType"] == "CATALOG$": + lines.extend(generate_table_acls_command(row['ActionTypes'], 'CATALOG', '', row['Principal'], alter_owner)) + elif row["ObjectType"] in ["DATABASE", "TABLE"]: + # DATABASE, TABLE, VIEW (view's seem to show up as tables) + lines.extend(generate_table_acls_command(row['ActionTypes'], row['ObjectType'], row['ObjectKey'], row['Principal'], alter_owner)) + # TODO ADD USER FUNCTION .. need to figure out + + return lines + +def execute_sql_statements(sqls): + for sql in sqls.split(sep=";"): + sql = sql.strip() + if sql: + print(f"{sql};") + spark.sql(sql) + + + +# COMMAND ---------- + +# DBTITLE 1,Run Import +input_path = dbutils.widgets.get("InputPath") + + +table_ACLs_df = spark.read.format("JSON").load(input_path).orderBy("Database","ObjectType") + +print(f"{datetime.datetime.now()} reading table ACLs from {input_path}") + + +lines = generate_table_acls_commands(table_ACLs_df, commented=True, alter_owner=True) + +sql="\n".join(lines) + +print(f"Number of table ACLs statements to execute: {len(lines)}") +print("\n\n") + +execute_sql_statements(sql) + +# COMMAND ---------- + + diff --git a/dbclient/ClustersClient.py b/dbclient/ClustersClient.py index 2cf2dc47..7b1b8a86 100644 --- a/dbclient/ClustersClient.py +++ b/dbclient/ClustersClient.py @@ -338,15 +338,18 @@ def is_spark_3(self, cid): else: return False - def launch_cluster(self, iam_role=None): + def launch_cluster(self, iam_role=None, enable_table_acls=False): """ Launches a cluster to get DDL statements. Returns a cluster_id """ # removed for now as Spark 3.0 will have backwards incompatible changes # version = self.get_latest_spark_version() import os real_path = os.path.dirname(os.path.realpath(__file__)) + + # add _table_acls suffix to cluster config path if enable_table_acls is set + cluster_json_postfix = '_table_acls' if enable_table_acls else '' if self.is_aws(): - with open(real_path + '/../data/aws_cluster.json', 'r') as fp: + with open(f'{real_path}/../data/aws_cluster{cluster_json_postfix}.json', 'r') as fp: cluster_json = json.loads(fp.read()) if iam_role: aws_attr = cluster_json['aws_attributes'] @@ -354,7 +357,7 @@ def launch_cluster(self, iam_role=None): aws_attr['instance_profile_arn'] = iam_role cluster_json['aws_attributes'] = aws_attr else: - with open(real_path + '/../data/azure_cluster.json', 'r') as fp: + with open(f'{real_path}/../data/azure_cluster{cluster_json_postfix}.json', 'r') as fp: cluster_json = json.loads(fp.read()) # set the latest spark release regardless of defined cluster json # cluster_json['spark_version'] = version['key'] diff --git a/dbclient/TableACLsClient.py b/dbclient/TableACLsClient.py new file mode 100644 index 00000000..4bb60c88 --- /dev/null +++ b/dbclient/TableACLsClient.py @@ -0,0 +1,233 @@ +from .ClustersClient import * +import base64 +import shutil + + +# noinspection SpellCheckingInspection +class TableACLsClient(ClustersClient): + """ + Imports and Exports table ACLS to and from a JSON format. + + The actual import and export logic is implemented in two notebooks: + ../data/notebooks/Export_Table_ACLs.py + ../data/notebooks/Import_Table_ACLs.py + + Those notebooks need to be executed on a cluster with Table ACLS activated, + otherwise the commands used for importing and exporting table ACLS : + SHOW GRANT + and + GRANT + are not available + + Those notebooks can be used standalone as well. + + This class inherits from the HiveClient to use some of the funtionaly + inside HiveClient - it would be cleaner to refactor HiveClient + and to pull some of the shared funtionality out. + """ + + REAL_PATH = os.path.dirname(os.path.realpath(__file__)) + EXPORT_TABLE_ACLS_LOCAL_PATH = REAL_PATH + "/../data/notebooks/Export_Table_ACLs.py" + IMPORT_TABLE_ACLS_LOCAL_PATH = REAL_PATH + "/../data/notebooks/Import_Table_ACLs.py" + + CLUSTER_LAUNCH_POLLING_INTERVAL_SECONDS = 5 + NOTEBOOK_RUN_POLLING_INTERVAL_SECONDS = 2 + BUFFER_SIZE_BYTES = 1024 * 1024 # 1MB limit for dbfs blocks in API + + def import_file_to_workspace(self, source_local_path, workspace_path): + workspace_mkdirs_params = { + "path": workspace_path[:workspace_path.rindex('/')] + } + self.post("/workspace/mkdirs", workspace_mkdirs_params) + + with open(source_local_path, 'rb') as local_file: + contents = local_file.read() + workspace_import_params = { + "content": base64.encodebytes(contents).decode('utf-8'), + "path": workspace_path, + "language": "PYTHON", + "overwrite": True, + "format": "SOURCE" + } + self.post("/workspace/import", workspace_import_params) + + def copy_file_to_dbfs(self, source_local_path, target_dbfs_path): + dbfs_create_params = { + "path": target_dbfs_path, + "overwrite": True + } + res = self.post("/dbfs/create", dbfs_create_params) + dbfs_file_handle = res["handle"] + with open(source_local_path, 'rb') as local_file: + + while True: + contents = local_file.read(self.BUFFER_SIZE_BYTES) + if len(contents) == 0: + break + dbfs_add_block_params = { + "data": base64.encodebytes(contents).decode('utf-8'), + "handle": dbfs_file_handle + } + self.post("/dbfs/add-block", dbfs_add_block_params) + self.post("/dbfs/close", {"handle": dbfs_file_handle}) + + def copy_files_to_dbfs_path(self, source_local_path, target_dbfs_path): + dbfs_mkdirs_params = { + "path": target_dbfs_path + } + self.post("/dbfs/mkdirs", dbfs_mkdirs_params) + + try: + dirpath, _dirnames, filenames = next(os.walk(source_local_path)) + for filename in filenames: + local_filepath = os.path.join(dirpath, filename) + dbfs_filepath = os.path.join(target_dbfs_path, filename) + self.copy_file_to_dbfs(local_filepath, dbfs_filepath) + except StopIteration: + raise Exception(f"not a valid source path: '{source_local_path}'") + + def copy_files_from_dbfs_path(self, source_dbfs_path, target_local_path, target_file_name): + dbfs_list_params = { + "path": source_dbfs_path + } + file_infos = self.get("/dbfs/list", dbfs_list_params).get('files', None) + if file_infos: + # delete all files in target_local_path + shutil.rmtree(target_local_path, ignore_errors=True) + os.mkdir(target_local_path) + file_count=0 + for file_info in file_infos: + if not file_info['is_dir']: + dbfs_path = file_info['path'] + filename = dbfs_path[dbfs_path.rindex('/') + 1:] + if not filename[0] == '_': + length = file_info['file_size'] + offset = 0 + with open(f"{target_local_path}{file_count:02}_{target_file_name}", 'wb') as local_file: + while offset < length: + dbfs_read_params = { + "path": dbfs_path, + "offset": offset, + "length": length + } + resp = self.get("/dbfs/read", dbfs_read_params) + + bytes_read = resp['bytes_read'] + data = resp['data'] + offset += bytes_read + local_file.write(base64.b64decode(data)) + file_count = file_count+1 + + def delete_files_on_dbfs(self, dbfs_acls_input_path): + dbfs_delete_params = { + "path": dbfs_acls_input_path, + "recursive": True + } + self.post("/dbfs/delete", dbfs_delete_params) + + def wait_for_notebook_to_terminate(self, run_id): + while True: # TODO add a timeout here + res = self.get('/jobs/runs/get', {'run_id': run_id}, print_json=False) + if self.is_verbose(): + print(f"polling for job to finish: {res['run_page_url']}") + if res["http_status_code"] != 200 or res["state"]['life_cycle_state'] == 'TERMINATED': + break + time.sleep(self.NOTEBOOK_RUN_POLLING_INTERVAL_SECONDS) + + def run_notebook_on_cluster(self, cid, notebook_path, notebook_params): + runs_submit_params = { + "run_name": f"migrate runs submit (notebook_path)", + "existing_cluster_id": cid, + "notebook_task": { + "notebook_path": notebook_path, + "base_parameters": notebook_params + } + } + res = self.post('/jobs/runs/submit', runs_submit_params, print_json=True) + + if res["http_status_code"] != 200: + # TODO add more error_text + raise Exception("Could not run submit notebook") + + return res["run_id"] + + def get_current_username(self, must_be_admin=False): + user_info = self.get("/preview/scim/v2/Me") + user_name = user_info.get("userName", None) + + if must_be_admin: + is_admin = False + for group in user_info.get("groups", []): + if group.get('display', None) == 'admins': + is_admin = True + break + if not is_admin: + raise Exception(f"The table acl notebooks need to be run by an active admin user, {user_info}") + + return user_name + + def export_table_acls(self, db_name=None, table_alcs_dir='table_acls/'): + """Exports all table ACLs or just for a single database + + :param db_name: unless set export ACLs for all databases + :param table_alcs_dir: overwrite export path for table ACLs + :return: + """ + # TODO check whether this logic supports unicode (metadata had to do something to support it + + # as the IAM role is not used for Table ACLS, only metastore access required + cid = self.launch_cluster(iam_role=None, enable_table_acls=True) + self.wait_for_cluster(cid) + + user_name = self.get_current_username(must_be_admin=True) + export_table_acls_workspace_path = f"/Users/{user_name}/tmp/migrate/Export_Table_ACLs.py" + + self.import_file_to_workspace(self.EXPORT_TABLE_ACLS_LOCAL_PATH, export_table_acls_workspace_path) + + dbfs_acls_output_path = "dbfs:/tmp/migrate/table_acl_perms.json.gz" + + run_notebook_params = { + "Databases": db_name, + "OutputPath": dbfs_acls_output_path + } + run_id = self.run_notebook_on_cluster(cid, export_table_acls_workspace_path, run_notebook_params) + self.wait_for_notebook_to_terminate(run_id) + + # download all files inside output path from dbfs + self.copy_files_from_dbfs_path(dbfs_acls_output_path, self._export_dir + table_alcs_dir, "table_acls.json.gz") + + # leave notebook there, so it can be removed later + # we leave the cluster running, makes trouble shooting more efficient + print(f"We leave the cluster running, in case you needed again: cluster_id: {cid}") + + def import_table_acls(self, table_alcs_dir='table_acls/'): + """ + Imports table ACLS exported before using export_table_acls + + :param table_alcs_dir: + :return: + """ + # TODO check whether this logic supports unicode (metadata had to do something to support it + + cid = self.launch_cluster(enable_table_acls=True) + self.wait_for_cluster(cid) + + user_name = self.get_current_username(must_be_admin=True) + import_table_acls_workspace_path = f"/Users/{user_name}/tmp/migrate/Import_Table_ACLs.py" + self.import_file_to_workspace(self.IMPORT_TABLE_ACLS_LOCAL_PATH, import_table_acls_workspace_path) + + dbfs_acls_input_path = "dbfs:/tmp/migrate/table_acl_perms.json.gz" + self.copy_files_to_dbfs_path(self._export_dir + table_alcs_dir, dbfs_acls_input_path) + + run_notebook_params = { + "InputPath": dbfs_acls_input_path + } + run_id = self.run_notebook_on_cluster(cid, import_table_acls_workspace_path, run_notebook_params) + self.wait_for_notebook_to_terminate(run_id) + + # cleanup dbfs: remove one directory above dbfs_acls_input_path + self.delete_files_on_dbfs(dbfs_acls_input_path[0:dbfs_acls_input_path.rindex('/')]) + + # leave notebook there, so it can be removed later + # we leave the cluster running, makes trouble shooting more efficient + print(f"We leave the cluster running, in case you needed again: cluster_id: {cid}") diff --git a/dbclient/__init__.py b/dbclient/__init__.py index b239f29a..c4062a2f 100644 --- a/dbclient/__init__.py +++ b/dbclient/__init__.py @@ -8,4 +8,5 @@ from .WorkspaceClient import WorkspaceClient from .HiveClient import HiveClient from .SecretsClient import SecretsClient +from .TableACLsClient import TableACLsClient from .parser import * diff --git a/dbclient/parser.py b/dbclient/parser.py index 32af03e3..70e25dc6 100644 --- a/dbclient/parser.py +++ b/dbclient/parser.py @@ -117,13 +117,17 @@ def get_export_parser(): parser.add_argument('--metastore-unicode', action='store_true', help='log all the metastore table definitions including unicode characters') + # get all table ACLs (TODO need to make sure that unicode database object names are supported) + parser.add_argument('--table-acls', action='store_true', + help='log all table ACL grant and deny statements') + # cluster name used to export the metastore parser.add_argument('--cluster-name', action='store', help='Cluster name to export the metastore to a specific cluster. Cluster will be started.') - # get database to export for metastore + # get database to export for metastore and table ACLs parser.add_argument('--database', action='store', - help='Database name to export for the metastore. Single database name supported') + help='Database name to export for the metastore and table ACLs. Single database name supported') # iam role used to export the metastore parser.add_argument('--iam', action='store', @@ -256,6 +260,10 @@ def get_import_parser(): parser.add_argument('--metastore-unicode', action='store_true', help='Import all the metastore table definitions with unicode characters') + # import all table acls + parser.add_argument('--table-acls', action='store_true', + help='Import table acls to the workspace.') + parser.add_argument('--get-repair-log', action='store_true', help='Report on current tables requiring repairs') diff --git a/export_db.py b/export_db.py index c9086a26..786762ca 100644 --- a/export_db.py +++ b/export_db.py @@ -152,6 +152,14 @@ def main(): end = timer() print("Complete Metastore Export Time: " + str(timedelta(seconds=end - start))) + if args.table_acls: + print("Export the table ACLs configs at {0}".format(now)) + start = timer() + table_acls_c = TableACLsClient(client_config) + table_acls_c.export_table_acls(db_name=args.database) + end = timer() + print("Complete Table ACL Export Time: " + str(timedelta(seconds=end - start))) + if args.secrets: if not args.cluster_name: print("Please provide an existing cluster name w/ --cluster-name option\n") diff --git a/import_db.py b/import_db.py index 66723250..7bce422b 100644 --- a/import_db.py +++ b/import_db.py @@ -115,6 +115,15 @@ def main(): end = timer() print("Complete Metastore Import Time: " + str(timedelta(seconds=end - start))) + if args.table_acls: + print("Importing table acls configs at {0}".format(now)) + start = timer() + table_acls_c = TableACLsClient(client_config) + # log table ACLS configs + table_acls_c.import_table_acls() + end = timer() + print("Complete Table ACLs Import Time: " + str(timedelta(seconds=end - start))) + if args.pause_all_jobs: print("Pause all current jobs {0}".format(now)) start = timer()