メタラーまとんがハイソにやらかすようです

東大理系修士卒JTBCエンジニアのハイソサイエティ(上流階級)な日常

レアカード価格をmplfinanceで週足グラフにしてS3に保存するLambda関数(DynamoDB Streamトリガー)

ども!AWS認定ソリューションアーキテクトアソシエイトのまとんです。

「レアカード売り時お知らせ君」の続きを作ったので記事にします。

TL; DR

前回までに作ったところ

  • DynamoDBにレアカード価格の時系列データが保存してある
  • Lambdaで1日に1回、最新のカード価格を追加する

この記事で作ったところ

  • DynamoDBの更新をトリガー(DynamoDB Stream)にしてLambdaを起動
  • 株などのローソク足を描画するPythonライブラリmplfinanceを使って、レアカード価格の日推移と週足をプロット
  • プロットした画像をS3に保存し、S3の静的ウェブサイトホスティングで公開

以下のようなグラフを作ってweb公開しました。

f:id:highso:20210504230619j:image

「レアカード売り時お知らせ君」とは?

カードゲームユーザーの永遠の悩み「レアカードをいつ売ればいいか分からない!!」を解決するために作っているアプリです。

自分が持っているカードの値段が高騰したら、「今が売り時だ!売れーー!!」と教えてくれるロボットを作ります。

具体的には、以下のようなロボットを作ります。

  • 自分が持っているレアカードを、事前に登録しておく
  • 1日に1回、レアカードの価格をwebで調査
  • 価格が急激に上がったカードがあれば、LINEで「売れーー!!」と通知してくれる

詳細は以下を参照。

highso.hatenablog.com

アーキテクチャ

当初に考えていたアーキテクチャ、および前回の記事で作ったところは以下の図の通りです。

f:id:highso:20210504224306p:plain

前回の記事では、Wisdom Guild様からスクレイピングで価格を1日1回取得し、DynamoDBに記録するところを作りました。UIのLINEボット部分はまだできていません。

今回の記事では、当初の計画から少し機能を増やして、以下の部分を作りました。

f:id:highso:20210504225803p:plain

DynamoDBの更新をトリガーにしてLambdaを起動し、価格のプロット(日推移、週足)を作って画像をS3に保存し、S3の静的ウェブサイトホスティングを使ってウェブ公開します。

アーキテクチャを変更した理由は以下の通りです。

  • LINEbotは以前に別の作品で作ったことがあったので、新しく学ぶことがないためモチベーションが沸かない
  • S3でウェブ公開して「みんなから見れる状態」にする方が楽しいと思った
  • 未経験のAWSサービスを使ってみたかった(DynamoDB Stream、S3の静的ホスティング
  • 個人投資家としてローソク足を描画できるPythonライブラリ「mplfinance」を使ってみたかった

毎日株価をチェックしているので、ローソク足を作れるとテンション上がります!!

GitHub

LambdaにデプロイするコードはGitHubに公開してあります。

github.com

ただし、サーバーレスアプリはコードだけでは動かないので、AWS上であれこれ操作する手順について解説します。

アーキテクチャの構築も含めて、SAM(Serverless Application Model)という仕組みを使えば自動化できるようなので、今後はSAMにもトライしてみた思っています。

AWS構築手順

概略すると以下のことをします。

  • DynamoDBとS3バケットを用意
  • Lambda関数に権限を与えるIAMポリシーの作成
  • Lambda関数のデプロイ

DynamoDB(前回の記事で作成)

オレゴンリージョンにテーブル名「SellTimeRemainder」で、1日1回、カード価格が追加されるテーブルを、前回の記事で作っています。

f:id:highso:20210504232436p:plain

f:id:highso:20210504232440p:plain

プライマリーキーをCard(カードの日本語名)とし、URLにスクレイピング先のURLを記載しています。URLの最後にカードの英語略称が付いています(Wisdom Guild様の仕様)。

Pricesに、日付(YYYY-MM-DD)をkey、カード価格(int)をvalueでひたすら追加しています。
作成手順の詳細は前回の記事を参照ください。

highso.hatenablog.com

静的ウェブサイトホスティング用S3バケットの作成

S3バケットは、アクセスする人(主にTwitterのフォロワーを想定)の日本在住率が高いことを想定し、東京リージョンに作成。

バケット名は「highso.com」。念願のハイソドットコムです。

「S3 静的ホスティング」でググって、一般的な方法でホスティングを有効化。

デフォルトのindex.htmlに、「http://highso.com.s3-website-ap-northeast-1.amazonaws.com/」でアクセスできるようになります。

DynamoDBストリームの設定

DynamoDBのテーブル「SellTimeRemainder」を選択→エクスポートおよびストリーム→有効化

表示タイプ「新しいイメージ」を選択。他には「キーだけ」や「新旧イメージ」などがあり、Lambdaに渡されるeventの中身が変わります。

今回は、新規価格を登録後の新しいイメージの情報があれば十分なので、「新しいイメージ」にしました。

設定後にトリガー作成を促されますが、後でLambda関数を作るところで設定するので、とりあえずここではトリガーを作成しなくてOKです。

有効化が完了すると、以下のように確認できます。

f:id:highso:20210504233527p:plain

IAM設定

Lambda関数に付与するIAMロールを作ります。

セキュリティを高めるためには闇雲に権限を与えるのは避けるべきなので、必要最小限の権限を付与します。

Lambdaに与える権限は(1)DynamoDB Streamを読み出しできる、(2)S3に書き出しできる、(3)Lambdaの一般的な権限、の3つです。

(1)と(2)のポリシーは自作します。

IAMポリシー1:DynamoDB読み出しポリシー作成

IAMのポリシーを開き、ポリシーの作成→ビジュアルエディタに次のように入力します。

・サービスの選択: DynamoDB

・アクション(DynamoDB Streamを受けるためには次の4個が必要)

  - GetShardIterator

  - GetRecords

  - ListStream

  - DescribeStream

・リソース: ARN arn:aws:dynamodb:us-west-2:<AWSアカウントID>:table/SellTimeRemainder/stream/*
・ポリシー名: SellTImeRemainder_DynamoDBStream

JSONでは以下のようになります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:ListStreams"
            ],
            "Resource": "arn:aws:dynamodb:us-west-2:<AWSアカウントID>:table/SellTimeRemainder/stream/*"
        }
    ]
}
IAMポリシー2:S3書き出しポリシー作成

ポリシーの作成→ビジュアルエディタに次のように入力

・サービスの選択: DynamoDB

・アクション: PutObject (ファイルをアップロードするだけならこれで十分)

・リソース:

  - Bucket name: highso.com

  - Object name: すべて

・ポリシー名: SellTimeRemainder_PutToS3

JSONにすると以下のようになります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::highso.com/*"
        }
    ]
}

S3バケットはリージョンによらずグローバルに一意なので、リージョン情報は含まれていないことが分かります。

IAMロール:Lambda用ロール作成

ロールの作成

ユースケースの選択: Lambda

・ポリシーのアタッチ:

  - SellTimeRemainder_DynamoDBStream

  - SellTimeRemainder_PutToS3

  - AWSLambdaBasicExecutionRole (Lambda用の基本的なロール)

・ロール名: SellTimeRemainder_CreatePlotToS3

f:id:highso:20210504234901p:plain

「信頼関係」タブの「信頼されたエンティティ」には「ID プロバイダー lambda.amazonaws.com」が書かれているはずです。これで、LambdaがこのIAMロールを使用することができます。

Lambda関数作成

関数の作成

Lambda関数を新規作成

・関数名: SellTimeRemainder_CreatePlotToS3

・ランタイム: Python3.8

・実行ロール: SellTimeRemainder_CreatePlotToS3

・再試行: 10回

ここでくせものは「再試行」です。デフォルトの-1設定だと、lambdaがエラーになったときに無限に繰り返されてしまう可能性があります。(要検証)

実際、LambdaがPythonのエラーでエラー終了した際、約10秒ごとにLambdaが繰り返し起動されてしまっていました。

12時から繰り返し起動が始まって、15時頃に気が付いたので約3時間程度に止められましたが、もし気が付かず放置しておいたら、いかに安いLambdaとはいえ、従量課金の料金が膨らんでいたでしょう。危なかった。

「-1」設定が最大何回なのか分からないので、さしあたり10回にしておきました。

トリガーとタイムアウトの設定

トリガーの追加

・トリガー: DynamoDB

・DynamoDBテーブル: SellTimeRemainder

・バッチサイズ: 100(デフォルト)

設定→一般設定→編集

タイムアウト: 20秒

今回のコードは実行時間が平均8秒ほどかかりました。デフォルトの3秒ではタイムアウトしてしまうので、余裕をもって20秒にしました。

具体的には、Pythonのライブラリのローディング(pandas, mplfinanceなど)に1秒ほど、週足を計算するサブルーチンに1秒ほど、図の描画とS3アップロードに3秒ほどかかっているようです。

タイムアウトはもっと長くしてもよいですが、仮に何かしらの無限ループに入ってしまった場合、タイムアウトまで従量課金され続けてしまうのが怖いです。バランスをとって、20秒としました。

コードとライブラリをまとめたzipの作成

まず、上記のGitHubのコードをcloneします。

AWS LambdaではPythonライブラリごとzipで固めてアップロードする必要があります。

僕はWindows10を使用しており、wsl2でUbuntuを走らせて開発しています。wsl2の環境構築についてはこちらの記事で解説しています。

highso.hatenablog.com

Pythonライブラリ(mplfinanceとその依存ライブラリ群)は、再配布とならないようにGitHubにはアップロードしていません。なので、pipでダウンロードをお願いします。

pipでコードと同じディレクトリにインストール(pipで -t ./ オプションをつける)

$ git clone https://github.com/maton456/SellTimeRemainder.git
$ cd SellTimeRemainder
$ cd SellTimeRemainder_CreatePlotToS3 $ pip3 install -U pip $ pip3 install mplfinance -t ./

pipでダウンロードされる「*.dist-info」は不要なので削除

$ rm *.dist-info -r

ディレクトリごとzipで固める

$ sudo apt install zip
$ zip -r SellTimeRemainder_CreatePlotToS3.zip ./*

WSLを使っている場合は、zipをWSL環境からWindows環境(デスクトップ)にコピーするためには以下のコマンドを実行します

$ cp SellTimeRemainder_CreatePlotToS3.zip /mnt/c/Users/<Windowsユーザー名>/Desktop/

これでデプロイ用のzipの準備が整いました。

f:id:highso:20210505001232p:plain

Lambdaにデプロイ

Pythonライブラリ(特にnumpyとpandas)を含めるとzipは40MB程度に肥大化してしまいました。

10MBを超えるzipのデプロイは、S3を経由する必要があります。

Lambdaデプロイ用のバケットを作ります。

・リージョン: us-west-2

バケット名: lambda-zip-maton

webコンソールでS3を開いて、作成したzipをアップロードします。

アップロードしたzipオブジェクトを開き、オブジェクトURLをコピー。

Lambdaを開いて、「アップロード元」→「Amazon S3」を選び、オブジェクトURLを指定してアップロードします。

これでデプロイは完了です。

ソースコード解説

AWS環境構築さえちゃんとできていれば、コードは難しいことはしていませんが、いちおう解説します。

コードは全てlambda_function.pyに書いてあります。

DynamoDB Streamで受け取るevent

DynamoDBで何かしら更新が入ると、lambda_handlerのeventに以下のようなものが渡されてLambdaが起動します。eventはfor inでイテレーションできます。

これは更新したレコードが1個だけの場合で、仮に同時に複数のレコードが更新されたら、Recordsのリストに複数レコードが入ると思います。

    event = {
        "Records": [
            {
                "eventID""37428e30b8ad25bf68eecbdb6e959172",
                "eventName""MODIFY",
                "eventVersion""1.1",
                "eventSource""aws:dynamodb",
                "awsRegion""us-west-2",
                "dynamodb": {
                    "ApproximateCreationDateTime"1619967393,
                    "Keys": {
                        "Owner": {
                            "S""まとん"
                        },
                        "Card": {
                            "S""草むした墓"
                        }
                    },
                    "NewImage": {
                        "Prices": {
                            "M": {
                                "2021-03-10": {
                                    "N""1171"
                                },
                                "2021-03-09": {
                                    "N""1167"
                                },
                                "2021-03-08": {
                                    "N""1164"
                                },
                                "2021-03-07": {
                                    "N""1162"
                                },
                                "2021-03-06": {
                                    "N""1207"
                                }
                            }
                        },
                        "Owner": {
                            "S""まとん"
                        },
                        "URL": {
                            "S""http://wonder.wisdom-guild.net/price/Overgrown+Tomb/"
                        },
                        "Card": {
                            "S""草むした墓"
                        }
                    },
                    "SequenceNumber""462680900000000009038765128",
                    "SizeBytes"1642,
                    "StreamViewType""NEW_IMAGE"
                },
                "eventSourceARN""arn:aws:dynamodb:us-west-2:000000000000:table/SellTimeRemainder/stream/2021-05-02T14:55:28.113"
            }
        ]
    }
 

DynamoDB Streamでは、「レコードの変更された箇所だけ」が渡されるわけではなく、「変更されたレコードの全体」が渡されるようです。

なので、レコードが巨大になったら、1回あたりのLambda起動が重くなると思います。(今回のシステムは1日1key追加の仕組みなので、せいぜい1年で365keyだし、数年間は問題ない・・・だろうと想定)
この中から、必要な値だけを抽出して、リストに入れていきます。

    for record in event['Records']:
        if record['eventName'] != "MODIFY":
            print('This record is not MODIFY event.')
            continue
        card_name = record['dynamodb']['NewImage']['URL']['S'].split('/')[-2]
        print('card_name: ' + card_name)
        prices_dict = record['dynamodb']['NewImage']['Prices']['M']
        #print('prices_dict: ', prices_dict)

        date_list = 
        price_list = 
        for key, value in prices_dict.items():
            date_list.append(key)
            price_list.append(int(value['N']))
        #print('date_list: ', date_list)
        #print('price_list: ', price_list)
        
        # Sort with date_list
        tmp_zip = zip(date_list, price_list)
        tmp_zip_sorted = sorted(tmp_zip, key=lambda x: x[0])
        #print('tmp_zip_sorted: ', tmp_zip_sorted)
        date_list_sorted, price_list_sorted = zip(*tmp_zip_sorted)

eventの中の日付データは順番がめちゃくちゃになっているので、sorted()を使って日付でソートしています。

mplfinanceでグラフの描画

mplfinanceは、DataFrameをちゃんと作ることが大事です。

DataFrameのカラム名でdate(datetime形式の日付)、Open, High, Low, Close(ローソク足の4要素)、およびVolume(取引高)を作っておきます。出来高をプロットしない場合、Volumeはダミー値でもOKです。

dateをindexに指定し(大事)、dateでソートして日付順に並び変えておきます。

Open, High, Low, CloseはintのリストでOKです。

日足を作るなら日ごとに、週足を作るなら週ごとのデータをappendしたリストにします。もちろん全てのカラムの要素数は同じにします。

 
import datetime
import pandas as pd
 
time_sr = pd.Series(date_list_sorted) # str日付のリスト, YYYY-MM-DD形式
time_sr2 = pd.to_datetime(time_sr, format='%Y-%m-%d')
df = pd.DataFrame({'date': time_sr2,
                   'Open': <Open値のリスト>,
                'High': <High値のリスト>,
                   'Low': <Low値のリスト>,
                   'Close': <Close値のリスト>,
                   'Volume': <Volume値のリスト>,
                   })
df = df.set_index('date')
df = df.sort_index()

DataFrameさえ用意できれば、mplfinanceで良い感じにプロットできます。

import mplfinance as mpf
 
mpf.plot(df, type='candle'mav=(525),
    show_nontrading=True,
    datetime_format='%Y/%m/%d',
    title=card_name,
    savefig=save_filename
)

・type=candleでローソク足表示になります。

(lineだと線グラフになります。今回はデータの最小時間分解能が日なので、日足を描画するためにはローソク足の4要素の情報が足りないため、lineにしました。)

・mav=(5, 25)で、5日(週)、25日(週)平均線が描画されます。

・show_nontrading=Trueで、データが無い区間(例えば土日で取引所が開いていない日など)を、省略せずに表示します。通常の日足グラフでは省略することが多いですが、今回はサーバーレスのシステムがダウンしてデータが欠損することも考慮して、省略しないようにしました。

・datetime_formatは、x軸のフォーマットです。デフォルトでは月が英語表示で少し違和感があったので変更しました。

・title=<str>で図の上部にタイトルを表示できます。

・savefigオプションでpathを指定すると画像が保存されます。

LambdaからS3への画像アップロード

S3へのアップロードは、boto3を使って「ロカールのファイルをアップロード」するのが簡単です。

しかし、Lambdaはインラインメモリの仕組みなので、「ローカルに保存」という概念がありません。

とはいえ抜け道はあって、Lambdaは「/tmp/」ディレクトリという一時保存できるフォルダが用意されています。

なので、(1)mplfinanceで/tmp/にpngを保存、(2)boto3でS3にアップロード、(3)/tmp/内部をクリア、という手順にします。

(3)が必要な理由は、/tmp/フォルダ内のデータは残るため、次回のLambda起動時に同じファイル名で上書き保存すると、パーミッションエラーとなるためです。Lambdaは起動ごとに、実行ユーザーが変わったり、変わらなったりするようです。なので、毎回/tmp/内部をクリアしておくのが無難です。

これを加味すると、mplfinanceの保存も含めて、以下のようにすればよいでしょう。

import boto3
import glob
import os
    
# Plot and save in local
card_name = 'Overgrown+Tomb'
save_dir = '/tmp/weekly_'
save_filename = save_dir + card_name + '.png'
mpf.plot(df, type='candle'mav=(525),
    show_nontrading=True,
    datetime_format='%Y/%m/%d',
    title=card_name,
    savefig=save_filename
)
print('Save in temporary local dir: ' + save_filename)

# Upload to S3
S3_bucket_name = 'highso.com'
s3 = boto3.resource('s3')
save_object_path = save_filename.replace(save_dir, 'png/weekly/')
s3.Bucket(S3_bucket_name).upload_file(save_filename, save_object_path)
print('Upload to S3 bucket: ' + S3_bucket_name + ', path: ' + save_object_path)

動作確認

DynamoDBを開き、試しに1レコードを手動で更新してみると、S3バケット「highso.com」のpng/dairy/フォルダとpng/weekly/フォルダ以下に画像がpngで保存されます。

f:id:highso:20210505002231p:plain

highso.comのindex.htmlで、これらの画像を表示するhtmlをてきとうに書いておきます。

S3の静的ホスティングのURL「http://highso.com.s3-website-ap-northeast-1.amazonaws.com/」で、以下のように表示されました!

f:id:highso:20210504230619j:plain

感想

サーバーレスでシステムを組むのは、AWSサービスの仕様を勉強しながら進めることになるので大変です。

一方で、一旦動くようになってしまえば、あとは放置しておいても(ほとんどお金もかからず)安定稼働するのが嬉しいです。

IAMの権限設定は、これまで色々やってきた経験のおかげで、ようやく慣れてきた感があります。

あとは、ウェブサイトのCSSデザインをなんとかしないとなぁ・・・。本業がIoT屋なので、サーバーサイドの開発ばかり好きで、フロントエンドは苦手です。

 

以上、メタラーまとんでした。

ではでは。

レアカード売り時お知らせ君の記事一覧

1. 「レアカード売り時お知らせ君」をクラウドモダンアーキテクチャで作った!AWS Lambda, DynamoDB, EventBridge

2. レアカード価格をmplfinanceで週足グラフにしてS3に保存するLambda関数(DynamoDB Streamトリガー)

過去のエンジニア作品

highso.hatenablog.com

highso.hatenablog.com

highso.hatenablog.com

highso.hatenablog.com

highso.hatenablog.com