Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions data/aws_cluster_table_acls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"num_workers": 1,
"cluster_name": "API_Table_ACL_Work_Leave_Me_Alone",
"spark_version": "6.5.x-scala2.11",
"spark_conf": {
"spark.databricks.cluster.profile": "serverless",
"spark.databricks.repl.allowedLanguages": "python,sql",
"spark.databricks.acl.dfAclsEnabled": "true"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-west-2b",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
"ebs_volume_size": 100
},
"driver_node_type_id": "m4.xlarge",
"node_type_id": "m4.xlarge",
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 15
}
18 changes: 18 additions & 0 deletions data/azure_cluster_table_acls.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"num_workers": 1,
"cluster_name": "API_Table_ACL_Work_Leave_Me_Alone",
"spark_version": "6.5.x-scala2.11",
"spark_conf": {
"spark.databricks.cluster.profile": "serverless",
"spark.databricks.repl.allowedLanguages": "python,sql",
"spark.databricks.acl.dfAclsEnabled": "true"
},
"node_type_id": "Standard_DS3_v2",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"autotermination_minutes": 120,
"init_scripts": []
}
167 changes: 167 additions & 0 deletions data/notebooks/Export_Table_ACLs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Databricks notebook source
# MAGIC %md #Export Table ACLs
# MAGIC
# MAGIC Exports Table ACLS to a JSON file on DBFS, which can be imported using the Import_Table_ACLs script
# MAGIC
# MAGIC Parameters:
# MAGIC - Databases: [Optional] comma separated list of databases to be exported, if empty all databases will be exported
# MAGIC - OutputPath: Path to write the exported file to
# MAGIC
# MAGIC Execution: **Run the notebook on a cluster with Table ACL's enabled as a user who is an admin**
# MAGIC
# MAGIC Supportes ACLs for Object types:
# MAGIC - Catalog: included if all databases are exported, not included if databases to be exported are specified
# MAGIC - Database: included
# MAGIC - Table: included
# MAGIC - View: included
# MAGIC - Anonymous Function: included (testing pending)
# MAGIC - Any File: included
# MAGIC
# MAGIC Disclaimer: This notebook is still needs some more testing, check back soon as fixes might have been added.

# COMMAND ----------

# DBTITLE 1,Declare Parameters
#dbutils.widgets.removeAll()
dbutils.widgets.text("Databases","db_acl_test,db_acl_test_restricted","1: Databases (opt)")
dbutils.widgets.text("OutputPath","dbfs:/tmp/migrate/test_table_acls.json.gz","2: Output Path")

# COMMAND ----------

# DBTITLE 1,Check Parameters

if not dbutils.widgets.get("OutputPath").startswith("dbfs:/"):
raise Exception(f"Unexpected value for notebook parameter 'InputPath', got <{dbutils.widgets.get('OutputPath')}>, but it must start with <dbfs:/........>")


# COMMAND ----------

# DBTITLE 1,Define Export Logic
import pyspark.sql.functions as sf
from typing import Callable, Iterator, Union, Optional, List
import datetime;

def get_database_names():
database_names = []
for db in spark.sql("show databases").collect():
if hasattr(db,"databaseName"): #Angela has this fallback ...
database_names.append(db.databaseName)
else:
database_names.append(db.namespace)
return database_names

def create_grants_df(database_name: str,object_type: str, object_key: str) -> List[str]:
if object_type in ["CATALOG", "ANY FILE", "ANONYMOUS FUNCTION"]: #without object key
grants_df = (
spark.sql(f"SHOW GRANT ON {object_type}")
.groupBy("ObjectType","ObjectKey","Principal").agg(sf.collect_set("ActionType").alias("ActionTypes"))
.selectExpr("NULL AS Database","Principal","ActionTypes","ObjectType","ObjectKey","Now() AS ExportTimestamp")
)
else:
grants_df = (
spark.sql(f"SHOW GRANT ON {object_type} {object_key}")
.filter(sf.col("ObjectType") == f"{object_type}")
.groupBy("ObjectType","ObjectKey","Principal").agg(sf.collect_set("ActionType").alias("ActionTypes"))
.selectExpr(f"'{database_name}' AS Database","Principal","ActionTypes","ObjectType","ObjectKey","Now() AS ExportTimestamp")
)
return grants_df


def create_table_ACLSs_df_for_databases(database_names: List[str]):

# TODO check Catalog heuristic:
# if all databases are exported, we include the Catalog grants as well
#. if only a few databases are exported: we exclude the Catalog
if database_names is None or database_names == '':
database_names = get_database_names()
include_catalog = True
else:
include_catalog = False

# ANONYMOUS FUNCTION
combined_grant_dfs = create_grants_df(None, "ANONYMOUS FUNCTION", None)

# ANY FILE
combined_grant_dfs = combined_grant_dfs.unionAll(
create_grants_df(None, "ANY FILE", None)
)

# CATALOG
if include_catalog:
combined_grant_dfs = combined_grant_dfs.unionAll(
create_grants_df(None, "CATALOG", None)
)
#TODO ELSE: consider pushing catalog grants down to DB level in this case

for database_name in database_names:

# DATABASE
combined_grant_dfs = combined_grant_dfs.unionAll(
create_grants_df(database_name, "DATABASE", database_name)
)

tables_and_views_rows = spark.sql(
f"SHOW TABLES IN {database_name}"
).filter(sf.col("isTemporary") == False).collect()

print(f"{datetime.datetime.now()} working on database {database_name} with {len(tables_and_views_rows)} tables and views")
for table_row in tables_and_views_rows:

# TABLE, VIEW
combined_grant_dfs = combined_grant_dfs.unionAll(
create_grants_df(database_name, "TABLE", f"{table_row.database}.{table_row.tableName}")
)

#TODO ADD USER FUNCTION - not supported in SQL Analytics, so this can wait a bit
# ... SHOW USER FUNCTIONS LIKE <my db>.`*`;
#. function_row['function'] ... nah does not seem to work

#combined_grant_dfs = combined_grant_dfs.sort("")

return combined_grant_dfs


# COMMAND ----------

# DBTITLE 1,Run Export
databases_raw = dbutils.widgets.get("Databases")
output_path = dbutils.widgets.get("OutputPath")

if databases_raw.rstrip() == '':
databases = None
print(f"Exporting all databases")
else:
databases = [x.rstrip().lstrip() for x in databases_raw.split(",")]
print(f"Exporting the following databases: {databases}")


table_ACLs_df = create_table_ACLSs_df_for_databases(databases)

print(f"{datetime.datetime.now()} writing table ACLs to {output_path}")

# with table ACLS active, I direct write to DBFS is not allowed, so we store
# the dateframe as a table for single zipped JSON file sorted, for consitent file diffs
(
table_ACLs_df
.coalesce(1)
.selectExpr("Database","Principal","ActionTypes","ObjectType","ObjectKey","ExportTimestamp")
.sort("Database","Principal","ObjectType","ObjectKey")
.write
.format("JSON")
.option("compression","gzip")
.mode("overwrite")
.save(output_path)
)


# COMMAND ----------

display(spark.read.format("json").load(output_path))

# COMMAND ----------

print(output_path)

# COMMAND ----------


115 changes: 115 additions & 0 deletions data/notebooks/Import_Table_ACLs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Databricks notebook source
# MAGIC %md #Import Table ACLs
# MAGIC
# MAGIC Imports Table ACLS from a JSON file on DBFS, which has been generated by the Export_Table_ACLs script
# MAGIC
# MAGIC Parameters:
# MAGIC - InputPath: Path to the JSON file to import from (gzipped JSON)
# MAGIC
# MAGIC Execution: **Run the notebook on a cluster with Table ACL's enabled as a user who is an admin**
# MAGIC
# MAGIC Supportes ACLs for Object types:
# MAGIC - Catalog: included if all databases are exported, not included if databases to be exported are specified
# MAGIC - Database: included
# MAGIC - Table: included
# MAGIC - View: included
# MAGIC - Anonymous Function: included (testing pending)
# MAGIC - Any File: included
# MAGIC
# MAGIC Disclaimer: This notebook is still needs some more testing, check back soon as fixes might have been added.

# COMMAND ----------

# DBTITLE 1,Define Parameters
#dbutils.widgets.removeAll()
dbutils.widgets.text("InputPath","dbfs:/tmp/migrate/test_table_acls.json.gz","1: Input Path")

# COMMAND ----------

# DBTITLE 1,Validated Parameters
if not dbutils.widgets.get("InputPath").startswith("dbfs:/"):
raise Exception(f"Unexpected value for notebook parameter 'InputPath', got <{dbutils.widgets.get('InputPath')}>, but it must start with <dbfs:/........>")

# COMMAND ----------

# DBTITLE 1,Show Input Data
display(spark.read.format("JSON").load(dbutils.widgets.get("InputPath")))

# COMMAND ----------

# DBTITLE 1,Define Import Logic
import datetime
import pyspark.sql.functions as sf
from typing import Callable, Iterator, Union, Optional, List


def generate_table_acls_command(action_types, object_type, object_key, principal, alter_owner=True):
lines = []

grant_privs = [ x for x in action_types if not x.startswith("DENIED_") and x != "OWN" ]
deny_privs = [ x[len("DENIED_"):] for x in action_types if x.startswith("DENIED_") and x != "OWN" ]

# TODO consider collapsing to all priviledges if all are granted

if grant_privs:
lines.append(f"GRANT {', '.join(grant_privs)} ON {object_type} {object_key} TO `{principal}`;")
if deny_privs:
lines.append(f"DENY {', '.join(deny_privs)} ON {object_type} {object_key} TO `{principal}`;")

#TODO !!! NOT QUITE SURE WETHER ALTER OWNER ACTUALLY WORKS !!!!!!!!!!!
if alter_owner and "OWN" in action_types:
lines.append(f"ALTER {object_type} {object_key} OWNER TO `{principal}`;")

return lines


def generate_table_acls_commands(table_ACLs_df, commented: bool=True, alter_owner: bool=True) -> List[str]:
lines = []
for row in table_ACLs_df.collect():


if row["ObjectType"] == "ANONYMOUS_FUNCTION":
lines.extend(generate_table_acls_command(row['ActionTypes'], 'ANONYMOUS FUNCTION', '', row['Principal'], alter_owner))
elif row["ObjectType"] == "ANY_FILE":
lines.extend(generate_table_acls_command(row['ActionTypes'], 'ANY FILE', '', row['Principal'], alter_owner))
elif row["ObjectType"] == "CATALOG$":
lines.extend(generate_table_acls_command(row['ActionTypes'], 'CATALOG', '', row['Principal'], alter_owner))
elif row["ObjectType"] in ["DATABASE", "TABLE"]:
# DATABASE, TABLE, VIEW (view's seem to show up as tables)
lines.extend(generate_table_acls_command(row['ActionTypes'], row['ObjectType'], row['ObjectKey'], row['Principal'], alter_owner))
# TODO ADD USER FUNCTION .. need to figure out

return lines

def execute_sql_statements(sqls):
for sql in sqls.split(sep=";"):
sql = sql.strip()
if sql:
print(f"{sql};")
spark.sql(sql)



# COMMAND ----------

# DBTITLE 1,Run Import
input_path = dbutils.widgets.get("InputPath")


table_ACLs_df = spark.read.format("JSON").load(input_path).orderBy("Database","ObjectType")

print(f"{datetime.datetime.now()} reading table ACLs from {input_path}")


lines = generate_table_acls_commands(table_ACLs_df, commented=True, alter_owner=True)

sql="\n".join(lines)

print(f"Number of table ACLs statements to execute: {len(lines)}")
print("\n\n")

execute_sql_statements(sql)

# COMMAND ----------


9 changes: 6 additions & 3 deletions dbclient/ClustersClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,23 +338,26 @@ def is_spark_3(self, cid):
else:
return False

def launch_cluster(self, iam_role=None):
def launch_cluster(self, iam_role=None, enable_table_acls=False):
""" Launches a cluster to get DDL statements.
Returns a cluster_id """
# removed for now as Spark 3.0 will have backwards incompatible changes
# version = self.get_latest_spark_version()
import os
real_path = os.path.dirname(os.path.realpath(__file__))

# add _table_acls suffix to cluster config path if enable_table_acls is set
cluster_json_postfix = '_table_acls' if enable_table_acls else ''
if self.is_aws():
with open(real_path + '/../data/aws_cluster.json', 'r') as fp:
with open(f'{real_path}/../data/aws_cluster{cluster_json_postfix}.json', 'r') as fp:
cluster_json = json.loads(fp.read())
if iam_role:
aws_attr = cluster_json['aws_attributes']
print("Creating cluster with: " + iam_role)
aws_attr['instance_profile_arn'] = iam_role
cluster_json['aws_attributes'] = aws_attr
else:
with open(real_path + '/../data/azure_cluster.json', 'r') as fp:
with open(f'{real_path}/../data/azure_cluster{cluster_json_postfix}.json', 'r') as fp:
cluster_json = json.loads(fp.read())
# set the latest spark release regardless of defined cluster json
# cluster_json['spark_version'] = version['key']
Expand Down
Loading