
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from delta.tables import * #added to support delta tables
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://<BUCKET NAME>/tickets/dms_sample/mlb_data/"],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("mlb_id", "string", "mlb_id", "int"),
("mlb_name", "string", "mlb_name", "string"),
("mlb_pos", "string", "mlb_pos", "string"),
("mlb_team", "string", "mlb_team", "string"),
("mlb_team_long", "string", "mlb_team_long", "string"),
("bats", "string", "bats", "string"),
("throws", "string", "throws", "string"),
("birth_year", "string", "birth_year", "int"),
("bp_id", "string", "bp_id", "int"),
("bref_id", "string", "bref_id", "int"),
("bref_name", "string", "bref_name", "string"),
("cbs_id", "string", "cbs_id", "int"),
("cbs_name", "string", "cbs_name", "string"),
("cbs_pos", "string", "cbs_pos", "string"),
("espn_id", "string", "espn_id", "int"),
("espn_name", "string", "espn_name", "string"),
("espn_pos", "string", "espn_pos", "string"),
("fg_id", "string", "fg_id", "int"),
("fg_name", "string", "fg_name", "string"),
("lahman_id", "string", "lahman_id", "int"),
("nfbc_id", "string", "nfbc_id", "int"),
("nfbc_name", "string", "nfbc_name", "string"),
("nfbc_pos", "string", "nfbc_pos", "string"),
("retro_id", "string", "retro_id", "int"),
("retro_name", "string", "retro_name", "string"),
("debut", "string", "debut", "string"),
("yahoo_id", "string", "yahoo_id", "int"),
("yahoo_name", "string", "yahoo_name", "string"),
("yahoo_pos", "string", "yahoo_pos", "string"),
("mlb_depth", "string", "mlb_depth", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node S3 bucket
additional_options = {
"path": "s3://<BUCKET NAME>/tickets/dms_sample_deltalake/mlb_data/",
"write.parquet.compression-codec": "snappy",
}
S3bucket_node3_df = ApplyMapping_node2.toDF()
S3bucket_node3_df.write.format("delta").options(**additional_options).mode("append").save()
job.commit()

Chọn Crawlers
Nhấn: Create crawler
Nhập tên là: Create crawler
Chọn Data Source, Include delta lake table paths và Create Symlink tables

Chọn IAM và Chọn Database đích

Tạo và Nhấn Run crawler để chạy job


Phần này tương tự như lab: Bước 3 Lake Formation Lab for Apache Hudi Tables