# Objective Spark SQL PySpark
1. Inspect Missing Data SELECT count_if(email IS NULL) FROM users_dirty; OR
SELECT count(*) FROM users_dirty WHERE email IS NULL;
from pyspark.sql.functions import col
usersDF = spark.read.table("users_dirty")
usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()
2. Remove duplicate rows (Dedupe) SELECT DISTINCT(*) FROM users_dirty usersDF.distinct().display()
3. Dedupe Based on Columns CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;
from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col("user_id").isNotNull())
.groupBy("user_id", "user_first_touch_timestamp")
.agg(max("email").alias("email"),
max("updated").alias("updated"))
)
4. Validate data after Dedupe SELECT max(row_count) <= 1 no_duplicate_ids FROM (
SELECT user_id, count(*) AS row_count
FROM deduped_users
GROUP BY user_id)
from pyspark.sql.functions import count
display(dedupedDF
.groupBy("user_id")
.agg(count("*").alias("row_count"))
.select((max("row_count") <= 1).alias("no_duplicate_ids")))
5. Confirm email is TAGGED with ONLY one user_id SELECT max(user_id_count) <= 1 at_most_one_id FROM (
SELECT email, count(user_id) AS user_id_count
FROM deduped_users
WHERE email IS NOT NULL
GROUP BY email)
display(dedupedDF .where(col("email").isNotNull())
.groupby("email")
.agg(count("user_id").alias("user_id_count"))
.select((max("user_id_count") <= 1).alias("at_most_one_id")))
6. Date Format and Regex SELECT *,
date_format(first_touch, "MMM d, yyyy") AS first_touch_date,
date_format(first_touch, "HH:mm:ss") AS first_touch_time,
regexp_extract(email, "(?<=@).+", 0) AS email_domain
FROM (
SELECT *,
CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch
FROM deduped_users
from pyspark.sql.functions import date_format, regexp_extract
display(dedupedDF
.withColumn("first_touch", (col("user_first_touch_timestamp") / 1e6).cast("timestamp"))
.withColumn("first_touch_date", date_format("first_touch", "MMM d, yyyy"))
.withColumn("first_touch_time", date_format("first_touch", "HH:mm:ss"))
.withColumn("email_domain", regexp_extract("email", "(?<=@).+", 0))
7. Create Temp View CREATE OR REPLACE TEMP VIEW events_strings AS
SELECT string(key), string(value) FROM events_raw;
SELECT * FROM events_strings
from pyspark.sql.functions import col
events_stringsDF = (spark
.table("events_raw")
.select(col("key").cast("string"),
col("value").cast("string"))
) display(events_stringsDF)
8. Nested Data (access subfields in JSON-":" and Struct-".") SELECT * FROM events_strings WHERE value:event_name = "finalize" ORDER BY key LIMIT 1 display(events_stringsDF
.where("value:event_name = 'finalize'")
.orderBy("key")
.limit(1)
)
9. User Defined Functions (UDF) CREATE OR REPLACE FUNCTION sale_announcement(item_name STRING, item_price INT)
RETURNS STRING
RETURN concat("The ", item_name, " is on sale for $", round(item_price * 0.8, 0));
SELECT *, sale_announcement(name, price) AS message FROM item_lookup
DESCRIBE FUNCTION EXTENDED sale_announcement
def sale_announcement(item_name, item_price):
return "The ", item_name, " is on sale for $", round(item_price * 0.8, 0)
display(sale_announcement("Scooter", 4000))
10. Managed Table CREATE SCHEMA IF NOT EXISTS Test_Schema;
DESCRIBE SCHEMA EXTENDED Test_Schema;
USE Test_Schema;
CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);
INSERT INTO managed_table
VALUES (3, 2, 1);
SELECT * FROM managed_table;
tbl_location = spark.sql(f"DESCRIBE DETAIL managed_table").first().location
print(tbl_location)
files = dbutils.fs.ls(tbl_location)
display(files)
11. External Table CREATE SCHEMA IF NOT EXISTS Test_Schema;
DESCRIBE SCHEMA EXTENDED Test_Schema;
USE Test_Schema;
CREATE OR REPLACE TEMPORARY VIEW temp_delays USING CSV OPTIONS (
path = '${da.paths.datasets}/flights/departuredelays.csv',
header = "true",
mode = "FAILFAST" -- abort file parsing with a RuntimeException if any malformed lines are encountered
); CREATE OR REPLACE TABLE external_table LOCATION '${da.paths.working_dir}/external_table' AS
SELECT * FROM temp_delays;
SELECT * FROM external_table;
tbl_path = f"{DA.paths.working_dir}/external_table"
files = dbutils.fs.ls(tbl_path)
display(files)