Skip to content

Commit

Permalink
Copy Files From Source Repo (2024-08-02 20:01)
Browse files Browse the repository at this point in the history
  • Loading branch information
olprod committed Aug 3, 2024
1 parent 8a54e94 commit d6959ad
Show file tree
Hide file tree
Showing 12 changed files with 2,403 additions and 0 deletions.
184 changes: 184 additions & 0 deletions Instructions/Exercises/DE-01-Real-time-ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
---
lab:
title: Azure Databricks を使用した Spark 構造化ストリーミングと Delta Lake を使用したリアルタイムのインジェストと処理
---

# Azure Databricks を使用した Spark 構造化ストリーミングと Delta Lake を使用したリアルタイムのインジェストと処理

Spark Structured Streaming を使用すると、エンドツーエンドのフォールト トレランスを使用してリアルタイムでデータを処理できます。 Delta Lakeは、ACIDトランザクションを提供するストレージ層を追加することでこれを強化し、データの整合性と一貫性を確保します。 クラウド ストレージから Delta Lake にデータを取り込み、Delta Live Tables を使用してストリーミング データ パイプラインを管理および最適化できます。

このラボは完了するまで、約 **30** 分かかります。

## Azure Databricks ワークスペースをプロビジョニングする

> **ヒント**: 既に Azure Databricks ワークスペースがある場合は、この手順をスキップして、既存のワークスペースを使用できます。
この演習には、新しい Azure Databricks ワークスペースをプロビジョニングするスクリプトが含まれています。 このスクリプトは、この演習で必要なコンピューティング コアに対する十分なクォータが Azure サブスクリプションにあるリージョンに、*Premium* レベルの Azure Databricks ワークスペース リソースを作成しようとします。また、使用するユーザー アカウントのサブスクリプションに、Azure Databricks ワークスペース リソースを作成するための十分なアクセス許可があることを前提としています。 十分なクォータやアクセス許可がないためにスクリプトが失敗した場合は、[Azure portal で、Azure Databricks ワークスペースを対話形式で作成](https://learn.microsoft.com/azure/databricks/getting-started/#--create-an-azure-databricks-workspace)してみてください。

1. Web ブラウザーで、`https://portal.azure.com`[Azure portal](https://portal.azure.com) にサインインします。

2. ページ上部の検索バーの右側にある **[\>_]** ボタンを使用して、Azure portal に新しい Cloud Shell を作成します。メッセージが表示されたら、***PowerShell*** 環境を選んで、ストレージを作成します。 次に示すように、Azure portal の下部にあるペインに、Cloud Shell のコマンド ライン インターフェイスが表示されます。

![Azure portal と Cloud Shell のペイン](./images/cloud-shell.png)

> ****: 前に *Bash* 環境を使ってクラウド シェルを作成している場合は、そのクラウド シェル ペインの左上にあるドロップダウン メニューを使って、***PowerShell*** に変更します。
3. ペインの上部にある区分線をドラッグして Cloud Shell のサイズを変更したり、ペインの右上にある **—****◻****X** アイコンを使用して、ペインを最小化または最大化したり、閉じたりすることができます。 Azure Cloud Shell の使い方について詳しくは、[Azure Cloud Shell のドキュメント](https://docs.microsoft.com/azure/cloud-shell/overview)をご覧ください。

4. PowerShell のペインで、次のコマンドを入力して、リポジトリを複製します。

```powershell
rm -r mslearn-databricks -f
git clone https://github.com/MicrosoftLearning/mslearn-databricks
```
5. リポジトリをクローンした後、次のコマンドを入力して **setup.ps1** スクリプトを実行します。これにより、使用可能なリージョンに Azure Databricks ワークスペースがプロビジョニングされます。
```powershell
./mslearn-databricks/setup.ps1
```
6. メッセージが表示された場合は、使用するサブスクリプションを選択します (これは、複数の Azure サブスクリプションへのアクセス権を持っている場合にのみ行います)。
7. スクリプトが完了するまで待ちます。通常、約 5 分かかりますが、さらに時間がかかる場合もあります。 待っている間に、Azure Databricks ドキュメントの[Delta Lake の概要](https://docs.microsoft.com/azure/databricks/delta/delta-intro)に関する記事をご確認ください。
## クラスターの作成
Azure Databricks は、Apache Spark "クラスター" を使用して複数のノードでデータを並列に処理する分散処理プラットフォームです。** 各クラスターは、作業を調整するドライバー ノードと、処理タスクを実行するワーカー ノードで構成されています。 この演習では、ラボ環境で使用されるコンピューティング リソース (リソースが制約される場合がある) を最小限に抑えるために、*単一ノード* クラスターを作成します。 運用環境では、通常、複数のワーカー ノードを含むクラスターを作成します。
> **ヒント**: Azure Databricks ワークスペースに 13.3 LTS 以降のランタイム バージョンを持つクラスターが既にある場合は、それを使ってこの演習を完了し、この手順をスキップできます。
1. Azure portal で、スクリプトによって作成された **msl-*xxxxxxx*** リソース グループ (または既存の Azure Databricks ワークスペースを含むリソース グループ) に移動します
1. Azure Databricks Service リソース (セットアップ スクリプトを使って作成した場合は、**databricks-*xxxxxxx*** という名前) を選択します。
1. Azure Databricks ワークスペースの [**概要**] ページで、[**ワークスペースの起動**] ボタンを使用して、新しいブラウザー タブで Azure Databricks ワークスペースを開きます。サインインを求められた場合はサインインします。
> **ヒント**: Databricks ワークスペース ポータルを使用すると、さまざまなヒントと通知が表示される場合があります。 これらは無視し、指示に従ってこの演習のタスクを完了してください。
1. 左側のサイドバーで、**[(+) 新規]** タスクを選択し、**[クラスター]** を選択します。
1. **[新しいクラスター]** ページで、次の設定を使用して新しいクラスターを作成します。
- **クラスター名**: "ユーザー名の" クラスター (既定のクラスター名)**
- **ポリシー**:Unrestricted
- **クラスター モード**: 単一ノード
- **アクセス モード**: 単一ユーザー (*自分のユーザー アカウントを選択*)
- **Databricks Runtime のバージョン**: 13.3 LTS (Spark 3.4.1、Scala 2.12) 以降
- **Photon Acceleration を使用する**: 選択済み
- **ノードの種類**: Standard_DS3_v2
- **非アクティブ状態が ** *20* ** 分間続いた後終了する**
1. クラスターが作成されるまで待ちます。 これには 1、2 分かかることがあります。
> **注**: クラスターの起動に失敗した場合、Azure Databricks ワークスペースがプロビジョニングされているリージョンでサブスクリプションのクォータが不足していることがあります。 詳細については、「[CPU コアの制限によってクラスターを作成できない](https://docs.microsoft.com/azure/databricks/kb/clusters/azure-core-limit)」を参照してください。 その場合は、ワークスペースを削除し、別のリージョンに新しいワークスペースを作成してみてください。 次のように、セットアップ スクリプトのパラメーターとしてリージョンを指定できます: `./mslearn-databricks/setup.ps1 eastus`
## ノートブックを作成してデータを取り込む
Azure Databricks ワークスペースにノートブックを作成して、さまざまなプログラミング言語で記述されたコードを実行できます。 この演習では、ファイルからデータを取り込んで Databricks File System (DBFS) のフォルダーに保存する簡単なノートブックを作成します。
1. 次に、Azure Databricks ワークスペース ポータルを表示し、左側のサイド バーに、実行できるさまざまなタスクのアイコンが含まれていることに注目します。
1. サイド バーで **[(+) 新規]** タスクを使用して、**Notebook** を作成します。
1. 既定のノートブック名 (**無題のノートブック *[日付]***) を **RealTimeIngestion** に変更します。
1. ノートブックの最初のセルに次のコードを入力します。このコードは、"シェル" コマンドを使用して、GitHub からクラスターで使用されるファイル システムにデータ ファイルをダウンロードします。**
```python
%sh
rm -r /dbfs/device_stream
mkdir /dbfs/device_stream
wget -O /dbfs/device_stream/devices1.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices1.json
```
1. セルの左側にある **[▸ セルの実行]** メニュー オプションを使用して実行を行います。 そして、コードによって実行される Spark ジョブが完了するまで待ちます。
## ストリーミング データにデルタ テーブルを使用する
Delta Lake では、"*ストリーミング*" データがサポートされています。 デルタ テーブルは、Spark 構造化ストリーミング API を使用して作成されたデータ ストリームの "シンク" または "ソース" に指定できます。** ** この例では、モノのインターネット (IoT) のシミュレーション シナリオで、一部のストリーミング データのシンクにデルタ テーブルを使用します。 シミュレートされたデバイス データは、次のような JSON 形式です。
```json
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
```

1. 新しいセルで、次のコードを実行して、JSON デバイス データを含むフォルダーに基づいてストリームを作成します。

```python
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads data from the folder, using a JSON schema
inputPath = '/device_stream/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
print("Source stream created...")
```

1. 新しいコード セルを追加し、それを使用して、データのストリームをデルタ フォルダーに永続的に書き込みます。

```python
# Write the stream to a delta table
delta_stream_table_path = '/delta/iotdevicedata'
checkpointpath = '/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")
```

1. 他の差分フォルダーと同様に、データを読み取るコードを追加します。

```python
# Read the data in delta format into a dataframe
df = spark.read.format("delta").load(delta_stream_table_path)
display(df)
```

1. 次のコードを追加して、ストリーミング データの書き込み先となる差分フォルダーに基づいてテーブルを作成します。

```python
# create a catalog table based on the streaming sink
spark.sql("CREATE TABLE IotDeviceData USING DELTA LOCATION '{0}'".format(delta_stream_table_path))
```

1. 次のコードを使用してテーブルにクエリを実行します。

```sql
%sql
SELECT * FROM IotDeviceData;
```

1. 次のコードを実行して、新しいデバイス データをストリームに追加します。

```Bash
%sh
wget -O /dbfs/device_stream/devices2.json https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/devices2.json
```

1. 次の SQL クエリ コードを再実行して、新しいデータがストリームに追加され、差分フォルダーに書き込まれたことを確認します。

```sql
%sql
SELECT * FROM IotDeviceData;
```

1. 次のコードを実行してストリームを停止します。

```python
deltastream.stop()
```

## クリーンアップ

Azure Databricks ポータルの **[コンピューティング]** ページでクラスターを選択し、**[■ 終了]** を選択してクラスターをシャットダウンします。

Azure Databricks を調べ終わったら、作成したリソースを削除できます。これにより、不要な Azure コストが生じないようになり、サブスクリプションの容量も解放されます。
Loading

0 comments on commit d6959ad

Please sign in to comment.