Lambda を使用した DynamoDB ストリーム

Lambda Wo Shi Yongshita Dynamodb Sutorimu



DynamoDB Streams は、DynamoDB テーブルに加えられた変更またはデータ変更のリアルタイム ストリームを取得できるようにする Amazon DynamoDB の機能です。この機能を使用して、キャッシュの更新や通知の送信など、DynamoDB テーブルのデータ変更に対応するアプリケーションを構築できます。

もちろん、DynamoDB ストリームを使用して、ダウンストリーム プロセスと AWS Lambda 関数をトリガーすることもできます。定義上、Lambda は、イベントに応答してコードを実行し、コンピューティング リソースを自動的に管理するサーバーレス コンピューティング サービスです。

Lambda を使用して Node.js、Python、Java、または C# でコードを記述し、ストリーム レコードを処理して適切なアクションを実行できます。 DynamoDB ストリームを Lambda と統合する主な利点は、Lambda を使用すると、管理を必要とせずに、サポートされているサービスまたはアプリケーション コードを実行できることです。







Lambda で AWS DynamoDB ストリームを使用する方法

DynamoDB ストリームからのイベントとオカレンスを使用する Lambda 関数を作成することは可能ですが、特に最初の試行では、そのプロセスは非常に困難になる可能性があります。次の手順が役に立ちます。



ステップ 1: システムが前提条件を満たしていることを確認する

この手順は、基本的な Lambda 操作とプロセスを知っている場合にのみ成功します。したがって、Lambda についての理解が平均以上であることを確認するには、これが最初です。



考慮に値する 2 番目の前提条件は、システムの AWS バージョンを確認することです。次のコマンドを使用できます。





aws –バージョン

提供されたコマンドの結果は次のようになります。

aws-cli/ 2 .x.x パイソン/ 3 .xx Linux/ 4 .x.x-xxx-std ボトコア/ 2 .xx

指定されたサンプル応答には、AWS CLI のインストール済みバージョンが含まれています ( aws-cli/2.x.x )、Python バージョン ( Python/3.x.x )、およびオペレーティング システム ( Linux/4.x.x-xxx-std )。応答の最後の部分は、AWS CLI が実行される Botocore ライブラリのバージョンを定義します ( ボトコア/2.x.x )。



したがって、次のような結果になります。

ステップ 2: 実行ロールを作成する

次のステップは、AWS CLI で実行ロールを作成することです。実行ロールは、ユーザーに代わってタスクを実行するために AWS サービスによって引き受けられる AWS Identity and Access Management (IAM) ロールです。これにより、途中で必要になる AWS リソースにアクセスできます。

次のコマンドを使用してロールを作成できます。

aws iam create-role \

--role-name LambdaDynamoDBExecutionRole \

--assume-role-policy-document ファイル://assume-role-policy.json \

- 説明 「AWSLambdaDynamoDBExecutionRole」 \

-- サービス名 lambda.amazonaws.com

前のコマンドは、ロールを作成するための AWS CLI コマンドです。 Amazon Management Console を使用してロールを作成することもできます。 IAM コンソールに移動したら、 役割 ページをクリックし、 ロールを作成 ボタン。

次の入力に進みます。

  • 信頼できるエンティティ: ラムダ
  • ロール名: lambda-dynamodb-role
  • アクセス許可: AWSLambdaDynamoDBExecutionRole

最初に AWS SDK for Python をインストールして、Python を使用することもできます。

pip インストール boto3

ステップ 3: テーブルで DynamoDB ストリームを有効にする

テーブルで DynamoDB ストリームを有効にする必要があります。この図では、Boto3、AWS SDK for Python を使用します。次のコマンドが役立ちます。

boto3をインポート

# DynamoDB サービスに接続する
dynamodb = boto3.client( 「ダイナモッド」 )

# 「my-table」テーブルで DynamoDB ストリームを有効にする
応答 = dynamodb.update_table(
テーブル名= 「マイテーブル」
ストリーム仕様={
「ストリーム対応」 : 真実、
'StreamViewType' : 「NEW_AND_OLD_IMAGES」
}
)

# 応答をチェックして、ストリームが正常に有効化されたことを確認します
もし応答[ 「ストリーム仕様」 ][ 「ストリーム対応」 ]:
印刷( 「DynamoDB ストリームが正常に有効になりました」 )
そうしないと:
印刷( 「DynamoDB ストリームの有効化エラー」 )

このコードは、変更が発生するとすぐにアイテムの新しいイメージと古いイメージの両方をストリーミングする「mytable」テーブルで DynamoDB ストリームを有効にします。 StreamViewType を「NEW_IMAGE」にするとすぐに、新しい画像のみをストリーミングするように選択できます。

特に、このコードを実行すると、しばらくするとテーブルのストリームのみが有効になる場合があります。代わりに、プロセスには時間がかかる場合があります。 describe_table メソッドを使用して、ストリームのステータスを確認できます。

ステップ 4: Lambda 関数を作成する

次のステップは、DynamoDB ストリームをトリガーする Lambda 関数を作成することです。次の手順が役立ちます。

  • AWS Lambda コンソールを開き、[関数の作成] タブをクリックします。 「関数の作成」ページで、「ゼロから作成」を選択し、関数の名前を入力します。この時点でランタイムも入力する必要があります。この図では Python を選択しました。
  • [実行ロールを選択または作成] で、[基本的な Lambda アクセス許可を持つ新しいロールを作成する] を選択して、Lambda 関数に必要なアクセス許可を持つ IAM ロールを作成します。
  • [関数の作成] ボタンをクリックして、Lambda 関数を作成します。
  • 関数の [構成] ページで、[デザイナー] セクションまで下にスクロールし、[トリガーの追加] タブをクリックします。
  • 表示される「トリガー構成」ボックスで、「トリガー」ドロップダウンメニューから「DynamoDB」を選択します。
  • 関数のトリガーに使用する DynamoDB テーブルを選択します。完了したら、関数をテーブルのすべての更新でトリガーするか、特定の更新 (特定の列の更新など) でのみトリガーするかを選択します。
  • 「追加」ボタンをクリックしてトリガーを作成します。
  • 「関数コード」エディターで、関数の Python コードを記述します。関数に渡されるイベント オブジェクトを使用して、関数をトリガーするデータにアクセスできます。
  • 「保存」ボタンをクリックして関数を保存します。

Lambda 関数を作成するときに行われるのはこれだけです。指定された DynamoDB テーブルが更新されるたびに、関数がトリガーされるようになりました。

DynamoDB ストリームがトリガーできる単純な Python 関数の例を次に示します。

def lambda_handler(イベント、コンテキスト):

イベントの記録用[ '記録' ]:

print(レコード[ 「ダイナモッド」 ][ '新しいイメージ' ]))

この関数は、イベント オブジェクト内のレコードを反復処理し、関数をトリガーする DynamoDB テーブル内のアイテムの新しいイメージを出力します。

ステップ 5: Lambda 関数をテストする

DynamoDB ストリームがトリガーできる Lambda 関数をテストするには、 boto3 DynamoDB API にアクセスするためのライブラリと 呼び出す の方法 ラムダ クライアントが関数をトリガーします。

これを行う方法の例を次に示します。

boto3をインポート

# DynamoDB サービスに接続する
dynamodb = boto3.client( 「ダイナモッド」 )

# Lambda サービスに接続する
lambda_client = boto3.client( 'ラムダ' )

# 'my-table' テーブルに項目を挿入
応答 = dynamodb.put_item(
テーブル名= 「マイテーブル」
アイテム={
「id」 :{ 「ん」 : '123' }、
'名前' :{ 'S' : 'ジョエル・オースティン}、
'
':{' N ':' 3.4 '}
}
)

# 応答をチェックして、項目が正常に挿入されたことを確認します
もし応答['
応答メタデータ '][' HTTPStatusCode '] == 200:
print('アイテムが正常に挿入されました')
そうしないと:
print('アイテムの挿入エラー')

# ' にサブスクライブされている Lambda 関数をトリガーします
マイテーブル ' テーブル
応答 = lambda_client.invoke(
Function、
InvocationType='
イベント '、
ログタイプ='
しっぽ '、
ペイロード='
{ '記録' :[{ 「ダイナモッド」 :{ '新しいイメージ' :{ 「イド」 :{ 「ン」 : 「123」 }、 '名前' :{ 'S' : 「ジョエル・オースティン」 }、 '年' :{ 「ン」 : 「3.4」 }}}}]} '
)

# 応答をチェックして、関数が正常にトリガーされたことを確認します
もし応答['
ステータスコード '] == 202:
print('Lambda 関数が正常にトリガーされました')
そうしないと:
print('Lambda 関数のトリガー エラー')

このコードは、最初に項目を マイテーブル テーブルをトリガーし、 私の機能 を使用して関数にサンプル イベント ペイロードを送信することによる Lambda 関数。 呼び出す 方法。イベント ペイロードは、挿入されたアイテムの新しいイメージを含む DynamoDB ストリーム イベントをシミュレートします。

その後、Lambda 関数のログをチェックして、イベント データが正常にトリガーおよび処理されたかどうかを確認できます。

結論

DynamoDB ストリームが Lambda 関数をトリガーできる同じストリーム レコードに対して複数回呼び出すことができることに注意することが重要です。これの背後にある主な理由は、ストリーム レコードが最終的に一貫しており、Lambda 関数によって同じレコードを複数回処理できるためです。このケースを正しく処理するように Lambda 関数を設計することが重要です。