Trong bài lab này, ta sẽ tìm hiểu cách nhập, xử lý và sử dụng streaming data bằng các dịch vụ serverless của AWS như Kinesis Data Streams, Glue, S3 và Athena. Để mô phỏng đầu vào truyền dữ liệu, chúng tôi sẽ sử dụng Kinesis Data Generator (KDG).

Sử dụng DMS Lab Student PreLab CloudFormation để thiết lập môi trường cơ sở hạ tầng hội thảo cốt lõi của bạn. Bỏ qua PreLab tương tự trong phần DMS. Nhấp vào biểu tượng Triển khai lên AWS bên dưới:
Deploy To AWS

Mở tab AWS Glue console
Tạo database có tên là “tickettransactiondatabase”

Tạo tables có tên là “TicketTransactionStreamData” ở trong database “tickettransactiondatabase”

Chọn Kinesis làm nguồn, chọn Luồng trong my account để chọn luồng dữ liệu Kinesis, chọn khu vực AWS thích hợp nơi bạn đã tạo luồng, chọn tên luồng là TicketTransactionStreamingData từ danh sách thả xuống, chọn JSON làm định dạng dữ liệu đến, vì chúng ta sẽ gửi JSON payloads từ Kinesis Data Generator theo các bước sau. và nhấp vào Tiếp theo.

Để trống schema vì chúng ta sẽ bật tính năng schema detection. Để trống partition indices. Chọn Next

Review lại tất cả thông tin và nhấn Create



Chọn Visual with a blank canvas và nhấn Create

Chọn Amazon Kinesis từ Source drop down

Trong bảng bên phải phía dưới “Data source properties - Kinesis Stream”, cấu hình như sau:



Nhấn Save button để tạo job
Khi thấy Successfully created job ta nhấn Run button để start job
{
"customerId": "{{random.number(50)}}",
"transactionAmount": {{random.number(
{
"min":10,
"max":150
}
)}},
"sourceIp" : "{{internet.ip}}",
"status": "{{random.weightedArrayElement({
"weights" : [0.8,0.1,0.1],
"data": ["OK","FAIL","PENDING"]
}
)}}",
"transactionTime": "{{date.now}}"
}

Truy cập AWS Glue console
Tại AWS Glue menu, chọn Crawlers and click Add crawler


Click vào Add a datasource

Chọn S3 và chỉ định path

Sau khi thêm datasource, nhấn next



Đăt Crawler Schedule chạy mỗi giờ.

Review lại Crawler và Click Create để tạo Crawler

Sau khi Crawler tạo xong. Nhấn Run crawler để trigger lần đầu.



Mở Kinesis Data Generator, chọn đúng region. Chọn TicketTransactionStreamingData là Kinesis stream đích

Template cho record
{
"customerId": "{{random.number(50)}}",
"transactionAmount": {{random.number(
{
"min":10,
"max":150
}
)}},
"sourceIp" : "221.233.116.256",
"status": "{{random.weightedArrayElement({
"weights" : [0.8,0.1,0.1],
"data": ["OK","FAIL","PENDING"]
}
)}}",
"transactionTime": "{{date.now}}"
}

Chọn AwsDataCatalog làm data source và tickettransactiondatabase là database

Sử dụng các truy vấn sau để xem dữ liệu
SELECT count(*) as numberOfTransactions, sourceip
FROM "tickettransactiondatabase"."parquet_tickettransactionstreamingdata"
WHERE ingest_year='2024'
AND cast(ingest_year as bigint)=year(now())
AND cast(ingest_month as bigint)=month(now())
AND cast(ingest_day as bigint)=day_of_month(now())
AND cast(ingest_hour as bigint)=hour(now())
GROUP BY sourceip
Order by numberOfTransactions DESC;