DATAFLUCT Tech Blog

データ技術・データサイエンス・MLOps に関するトレンドを追いかけます

Metaflowでモデルの学習をpipeline化するまで

皆さんは「MLOps」について取り組んでいらっしゃるでしょうか。私は2018年頃からデータクレンジングや機械学習モデルの構築や運用をしてきましたが、当時の日本で私の耳にはMLOpsという言葉が入ってくることはありませんでした。
ただMLOpsの元となった「DevOps」については当時から認知していました。今日のMLOpsというのはDevOpsの考えや知見を機械学習分野に適応、進化させたものだと考えられています。
そんなMLOpsですが、厳密な定義はなく、私自身も専門的に取り組んだことがない分野です。今回はMLOpsを快適に実現するためのライブラリのMetaflowについて一緒に取り組んでいきましょう。

Metaflowとは

元々は、Netflixが社内で開発をしていたPython製のライブラリで、Netflix内の機械学習プロジェクトの発展や品質向上のために開発されていました。
現在ではgithubにコードが公開されてOSSとして誰でも使用、開発に参加することが出来ます。Metaflowを一言で説明すると、機械学習プロジェクトの開発サイクルをパイプラインとして定義することで、開発サイクルを快適にするためのライブラリです。

「開発サイクルを快適にすることが嬉しいの?」と思われる方もいるかもしれませんが、いざ機械学習モデルをサービスとして公開しようとすると非常に多くのステップが考えられます。

  • 学習データを集めてくる
  • 集めたデータをクレンジングし前処理を行う
  • モデルを学習or追加学習させて、精度を検証する
  • 最新のモデルをクラウドサービス等にリリースする

繰り返し…
Metaflowの公式HPにも多岐に渡る現在のデータサイエンティストの苦悩を表現した図が記載されています。良い味を出していますね…笑

引用元: https://docs.metaflow.org/introduction/why-metaflow

初回や数回だけならまだしも、サービスを運用し続けるためには日々、改善が求められるため、上記のステップを何度も何度も、繰り返し行う必要があります。この多くのステップを毎回、手作業で実施したり、管理されていない状況で行うのが大変そうだというのは想像に難しくありません。
この問題を解決しステップをパイプラインとして定義するためにMetaflowが開発されています。

Metaflowの強み

詳細は後ほどの「基本的な使い方」にて紹介しますが、Metaflowは各ステップをPythonのクラス内に定義した関数に「@step」というデコレーターを付与するだけで簡単にパイプラインを構築することが出来ます。
パイプラインを構築するために、Metaflowについて熟知してなくとも、通常のPythonのコードを書く感覚で使うことが出来ます。実装にTensorflow、scikit-learn、PyTorchなど何を使っていようがMetaflowの実装には影響はなく、好きなライブラリを用いることが可能です。

また、実行結果を拡張サービスにはなりますが、Metaflow UIという付属の可視化サービスで、確認することが出来る上に、実行結果の記録まで出来てしまいます。

github.com

Metaflowを検討するケース

簡単にパイプライン処理を構築可能なMetaflowですが、そもそもパイプラインの構築、すなわちMetaflowを使用すべきタイミングがいつなのでしょうか。過去に自分が機械学習モデルの運用に関わっていた時は、一連のステップをパイプライン化して、実行管理したいと思ったことはありませんでした。この問いに対する回答はMetaflowのHPに記載されています。

  1. Scalability: Do you need more than one laptop-size computer in the project?

    スケーラビリティ: プロジェクトに1台以上のラップトップサイズのコンピュータが必要か

  2. Criticality: Is it important that results are produced correctly and in a timely manner?

    重要性: 結果を正しくタイムリーに提供することが重要か

  3. Complexity: Does the project have many moving pieces or many people working together?

    複雑さ: プロジェクトに多くの未確定な要素(事項)があるか、多くの人が一緒に働いているか

上記3つの質問のうち、1つでも「はい」と答えた方はMetaflowが効力を発揮するようです。私は3つ目の質問にだけ明確に「はい」と答えることが出来ました。中小規模のプロジェクトの場合、多くの方は3つ目の質問が当てはまるのではないでしょうか。

他パイプラインツールとの比較

Metaflow意外にもパイプラインを構築することが出来るツールはいくつか存在してます。有名どころとしては以下の名前が上がります。

Metaflowの他パイプラインツールと比べると、大規模すぎず、小規模すぎずと中規模の立ち位置にいます。分散機能やUIでの結果確認、クラウドサービス(AWS)との連携が可能など、近年、多くのパイプラインツールが提供している機能がMetaflowでも提供されています。
また、機能1つ1つはシンプルでUI機能(Metaflow UI)は外部パッケージ化されているなど、Metaflowに必要以上に多くの機能を詰め込まないという意図を感じます。これまでパイプラインツールを全く触ったことがない私でも、Metaflowで簡単にパイプラインを構築することが出来ました。

基本的な使い方

MetaflowではパイプラインをFlowという単位で作成します。
そしてFlowを構成する1つ1つの要素であるStepを作成します。コード上では任意のクラスにMetaflowのFlowクラスを継承させて、各Stepには@stepというデコレーターを定義します。
さっそく例を見てみましょう。今回は簡単のため、2つのステップ、それぞれがstep1, 2と標準出力するだけの簡単なパイプラインです。MetaflowでFlowを作成する際には、「start」と「end」という名前の関数が必ず必要になりますので、ご注意ください。

また、Metaflowのインストールはpip経由で行ってください。

pip install metaflow

パイプラインの構築には各関数の最終行で「self.next(次に実行したい関数)」と記述します。今回の場合はstart関数から始まって以下のようになります。

  • start関数: self.next(self.step1)
  • step1関数: self.next(self.step2)
  • step2関数: self.next(self.end)
  • end関数: なし

sample.py

from metaflow import FlowSpec, step

# FlowSpec
class SampleFlow(FlowSpec):
  """
    はじめてのMetaflow
    2つのステップを順番に実行します
  """

  @step
  def start(self):
    """
      startです
    """
    print("start metaflow pipeline")
    self.next(self.step1)

  @step
  def step1(self):
    """
      step1です
    """
    print("step1です")
    self.next(self.step2)

  @step
  def step2(self):
    """
    step2です
    """
    print("step2です")
    self.next(self.end)

  @step
  def end(self):
    """
      endです
    """
    print("all step done✨")


if __name__=="__main__":
  SampleFlow()

これで簡易なパイプラインの作成が完了しました。
作成したパイプラインの詳細は「python ファイル名 show」で。実行は「python ファイル名 run」で行うことが出来ます。

$ python sample.py show
Metaflow 2.6.3 executing SampleFlow for user:okb

はじめてのMetaflow
2つのステップを順番に実行します

Step start
    startです
    => step1

Step step1
    step1です
    => step2

Step step2
    step2です
    => end

Step end
    endです

結果には各Stepが表示され、それぞれに記載した関数内ドキュメントが出力されます。
もっと丁寧にコメントを記載すれば、各Stepが何をするものなのかをコードを見ずに「show」を実行するだけで確認することが出来ます。
この時点では、コードの実施はされていないので、次は「run」を実行してみます。

$ python sample.py run
Metaflow 2.6.3 executing SampleFlow for user:okb
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-07-03 11:53:55.166 Workflow starting (run-id 1656816835161994):
2022-07-03 11:53:55.169 [1656816835161994/start/1 (pid 46322)] Task is starting.
2022-07-03 11:53:55.361 [1656816835161994/start/1 (pid 46322)] start metaflow pipeline
2022-07-03 11:53:55.394 [1656816835161994/start/1 (pid 46322)] Task finished successfully.
2022-07-03 11:53:55.398 [1656816835161994/step1/2 (pid 46325)] Task is starting.
2022-07-03 11:53:55.590 [1656816835161994/step1/2 (pid 46325)] step1です
2022-07-03 11:53:55.623 [1656816835161994/step1/2 (pid 46325)] Task finished successfully.
2022-07-03 11:53:55.628 [1656816835161994/step2/3 (pid 46328)] Task is starting.
2022-07-03 11:53:55.840 [1656816835161994/step2/3 (pid 46328)] step2です
2022-07-03 11:53:55.887 [1656816835161994/step2/3 (pid 46328)] Task finished successfully.
2022-07-03 11:53:55.891 [1656816835161994/end/4 (pid 46331)] Task is starting.
2022-07-03 11:53:56.225 [1656816835161994/end/4 (pid 46331)] all step done✨
2022-07-03 11:53:56.315 [1656816835161994/end/4 (pid 46331)] Task finished successfully.
2022-07-03 11:53:56.316 Done!

まず、作成されたMetaflowのパイプラインがグラフとして適切かどうかを確認します。
今回はstep1 → step2という簡単なパイプラインですが、ケースによってはStepを分岐をさせたり、ループさせたりと自由にグラフを組むことが可能なため、始点と終点が設定されているか、すなわちグラフとして適切かを確認しています。
パイプラインのチェックの後にPylintが実行され、コードチェックが走ります。ここまで全て通過して初めて作成したパイプラインが実施されます。
さきほど、各Stepに記載したprintが実行されて「step1です」…「all step done✨」が表示されていることが確認出来ました。

MetaflowUIについて

拡張サービスであるMetaflowUIを用いることで、実行結果を専属のUIで確認することが出来ます。Metaflowのインストールとは別にMetaflowServiceとMetaflowUIの用意が必要になります。それぞれ、データベースとの接続や実行結果の記録、UIへの出力を行っているようです。
MetaflowServiceとMetaflowUIの用意は手順が多いため今回は省略して、後日、別の記事で紹介しようと思います。
MetaflowUIについてはデモが公開されているので、こちらを用いて簡単に解説します。

トップページには作成されているFlowの一覧が表示され、それぞれ最終実行時のメタ情報が表示されています。

試しに一番上に表示されている「WhyIsThisFlowShow」を開いてみます。

Flowに定義されている各Stepの実行時間が記録されています。さらに、各StepをクリックすることでStepの実行中に出力されたログを確認することが出来ます。

パイプライン化するフロー

では、Metaflowの紹介はここまでにして、いざパイプラインの構築を進めていきます。
今回、パイプライン化を行うのは、Amazonのレビュー情報に適当な前処理を実施後、単語情報を数値ベクトルに変換(fasttext)して、近しい単語を5件出力させる処理です。

使用するモデル

当時、facebook(現在はMeta)が作成したword2vecを改良し、高速化したモデルのfast textを使用します。モデルの詳細については今回の目的から逸れるため、割愛しますが、先代のword2vecと比べると圧倒的な速度で学習をすることが可能で、標準マルチコアCPUであっても10億もの単語を10分以内に学習が出来たという記載がされています。

We have seen results of models trained on more than 1 billion words in less than 10 minutes using a standard multicore CPU

引用元: https://research.facebook.com/blog/2016/08/fasttext/

word2vecでも実現可能ですが、今回は学習の高速化のためfasttextを採用しました。

使用するデータ

hugging face というOSSにて公開されている日本語のAmazonレビューのデータセットを使用します。
非常に多くのデータセットが公開されており、hugging faceをインストールするだけで、簡単にデータを読み込むことが出来るという優れものです。

huggingface.co

このデータセットではAmazonのさまざまなカテゴリーに投稿された日本語のレビューが20万件、登録されています。今回、メインで使用するのは「review_body」です。

  • review_id: レビューに紐づくユニークなID
  • product_id: 出品に紐づくユニークなID
  • reviewer_id: レビュワーに紐づくユニークなID
  • stars: 参考になったの件数
  • review_body: レビューの本文
  • review_title: レビューのタイトル
  • language: 言語(今回は全て日本語なのでja)
  • product_category: Amazon上でのカテゴリー(eg: video_games)

前処理の内容について

レビュー本文のままではfasttextに単語を埋め込むことが難しいので以下の前処理を行います。特別、特殊な前処理は行わずに自然言語の前処理としてポピュラーなものを採用しています。

  • テキストを形態素解析して単語に分割する
  • 分割した単語から「名詞、動詞、形容詞」のみを取り出す
  • 不要な単語の除去(記号や数値、絵文字など)
  • ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します

Metaflowでパイプラインを作る

任意のディレクトリの中に「my_flow.py」というファイルを用意して、パイプラインを作っていきます。現時点ではmy_flow.pyは以下にしておきます。まだ、Metaflowを読み込んでいるだけの状態です。
併せて仮想環境も作成しておきます。グローバルにインストールされても問題ない方は作成不要です。

仮想環境の作成

$ python3 -m venv enviroment
$ source enviroment/bin/acativate
$ (enviroment) ....

my_flow.py

from metaflow import FlowSpec, step

データの読み込み

まずはhugging faceのdatasetからAmazonのレビュー結果の読み込みを行います。
データを使用するためにdatasetをpip経由でインストールする必要があり、ここで合わせて、今回必要となるライブラリも合わせてインストールしてしまいます。

M1 macの場合には「mecab-python3」と「gensim」のインストールに失敗します。Dockerを使用する等でインストールできない問題を回避することが出来ます。

pythonのバージョン

$ python --version
Python 3.10.5

requirements.txt

metaflow==2.7.2
pandas==1.4.2
numpy==1.22.4
ipadic==1.0.0
gensim==4.2.0
mecab-python3==1.0.5
datasets==2.3.2
unidic-lite==1.0.8

インストールを実行します。

pip install -r requirements.txt

続いて、インストールしたdatasetからデータを読み込みます。データの読み込みには「load_dataset」という関数を用います。
今回は日本語のAmazonレビューのデータセットを読み込むため、第1引数に「amazon_reviews_multi」を指定し、第2引数に「ja」を指定します。また「split」という引数にtrainかtestを与えることでライブラリ側でデータセットを学習用とテスト用で、分割をしてくれます。

from datasets import load_dataset

dataset = load_dataset('amazon_reviews_multi', 'ja', split='train')
len(dataset) # 200000

データの読み込みが確認出来たので、さっそくFlowに組み込んでみます。
データの読み込みはFlowの開始時に読み込ませたいので、「start」関数の中で実行させるようにします。合わせて、ストップワードも読み込むようにします。ストップワードはこちらにある一覧を「stop_words.txt」に保存してテキストから読み込むようにします。

my_flow.py

from metaflow import FlowSpec, step
from datasets import load_dataset
import pandas as pd
import os

def load_stop_words(filename):
    pwd = os.path.join(os.path.dirname(__file__))
    filepath = os.path.join(pwd, filename)
    with open(filepath) as f:
        return f.read().split("\n")

class MyFlow(FlowSpec):
    """
    Metaflowを用いて、fasttextを学習させます。
    学習データにはhagging faceのdatasetで公開されている20万件のAmazonのレビューを使用します。
    前処理として以下を実施します。
    - 不要な単語の除去(記号や数値、絵文字など)
    - テキストを形態素解析して単語に分割して「名詞、動詞、形容詞」のみを取り出す
    - ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します
    """

    @step
    def start(self):
        """
        最初にデータの読み込みを行います。
        - hugging faceのdatasetからアマゾンレビューを読み込む
        - stop_wordsをstop_words.txtから読み込む
        """
        self.stop_words = set(load_stop_words('stop_words.txt')) # 判定を楽にするためにsetに変換
        self.dataset = load_dataset('amazon_reviews_multi', 'ja', split='train')
        self.df_dataset = pd.DataFrame(self.dataset)

        print(f"::: ストップワードの読み込みが完了しました(データ数={len(self.stop_words)})")
        print(f"::: データの読み込みが完了しました(データ数={len(self.dataset)})")
        self.next(self.end)

    @step
    def end(self):
        """
        全てのステップが完了後に実行します。
        """
        pass


if __name__ == "__main__":
    MyFlow()

これでデータ読み込みのstartの記述が完了しました。さっそく「show」で内容を確認後、「run」で実行してみます。なお、初回の実行時にはデータのダウンロードがされるため、時間がかかります。どちらも結果が長いので、部分的に割愛しています。
補足ですが、Docker等での実行をする場合に「$USERNAME」という環境変数が設定されていないと以下のエラーが出てしまいます。設定されていない場合は「$ export USERNAME='ユーザーネーム’」で設定を行ってください。

>> Metaflow 2.7.2 executing MyFlow Unknown user:
>> Metaflow could not determine your user name based on environment variables ($USERNAME etc.)

$ python my_flow.py show

Metaflow 2.6.3 executing MyFlow for user: okb

Metaflowを用いて、fasttextを学習させます。
学習データにはhagging faceのdatasetで公開されている20万件のAmazonのレビューを使用します。
前処理として以下を実施します。
:

$ python my_flow.py run

Metaflow 2.6.3 executing MyFlow for okb
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-07-15 09:34:19.844 Workflow starting (run-id 1657845259841081):
2022-07-15 09:34:19.848 [1657845259841081/start/1 (pid 78640)] Task is starting.
2022-07-15 09:34:21.594 [1657845259841081/start/1 (pid 78640)] Reusing dataset amazon_reviews_multi (/Users/takamizawa46/.cache/huggingface/datasets/amazon_reviews_multi/ja/1.0.0/724e94f4b0c6c405ce7e476a6c5ef4f87db30799ad49f765094cf9770e0f7609)
2022-07-15 09:34:28.544 [1657845259841081/start/1 (pid 78640)] ::: ストップワードの読み込みが完了しました(データ数=311
2022-07-15 09:34:29.092 [1657845259841081/start/1 (pid 78640)] ::: データの読み込みが完了しました(データ数=200000
2022-07-15 09:34:29.094 [1657845259841081/start/1 (pid 78640)] Task finished successfully.
2022-07-15 09:34:29.100 [1657845259841081/end/2 (pid 78645)] Task is starting.
2022-07-15 09:34:29.731 [1657845259841081/end/2 (pid 78645)] Task finished successfully.
2022-07-15 09:34:29.732 Done!

無事に実行されました!

前処理の実施

次に前処理を行なっていきます。今回、行う前処理は改めて以下になります。

  • exec_mecab: テキストを形態素解析して単語に分割して「名詞、動詞、形容詞」のみを取り出す
  • remove_stop_words: ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します
  • remove_numeric_and_symbol: 不要な単語の除去(記号と数値)

それぞれの処理はstartで読み込んだデータセット(self.df_dataset)に対して行なっていきます。また、先ほどはstartからendにstepを進めるように(self.next(self.end))していましたが、前処理を行うため、stepの順を以下のように変更します。

  • 1.start
  • 2.exec_mecab
  • 3.remove_stop_words
  • 4.remove_numeric_and_symbol
  • 5.end

また、前処理の結果が想定通りに動いているか簡単に確認するために、ランダムで何行か取得して各stepでの実行結果を確認するための「sample_logging」という処理を追加しています。各stepで3件、データを取得して結果を確認するようにログを出力させます。

from metaflow import FlowSpec, step
from datasets import load_dataset
from gensim.models import FastText
import pandas as pd
import os
import MeCab
import ipadic
import re
from pprint import pprint

def load_stop_words(filename):
    pwd = os.path.join(os.path.dirname(__file__))
    filepath = os.path.join(pwd, filename)
    with open(filepath) as f:
        return f.read().split("\n")


class MyFlow(FlowSpec):
    """
    Metaflowを用いて、fasttextを学習させます。
    学習データにはhagging faceのdatasetで公開されている20万件のAmazonのレビューを使用します。
    前処理として以下を実施します。
    - テキストを形態素解析して単語に分割して「名詞、動詞、形容詞」のみを取り出す
    - ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します
    - 不要な単語の除去(記号や数値)
    """

    def sample_logging(self, df, column, top_n):
        print(':::結果サンプル')
        for n in range(top_n):
            val = df.sample()[column].values[0]
            print(f":::サンプル({n + 1}/{top_n})> {val[:5]}...")

    @step
    def start(self):
        """
        最初にデータの読み込みを行います。
        - hugging faceのdatasetからアマゾンレビューを読み込む
        - stop_wordsをstop_words.txtから読み込む
        """
        self.stop_words = set(load_stop_words('stop_words.txt'))
        self.dataset = load_dataset('amazon_reviews_multi', 'ja', split='train')
        self.df_dataset = pd.DataFrame(self.dataset)

        print(f"::: ストップワードの読み込みが完了しました(データ数={len(self.stop_words)})")
        print(f"::: データの読み込みが完了しました(データ数={len(self.dataset)})")
        self.next(self.exec_mecab)

    @step
    def exec_mecab(self):
        """
        Mecabを使用してデータセットを形態素解析を行います。
        - 形態素に分割
        - 名詞, 動詞, 形容詞に該当する単語のみに絞り込む
        - applyを使用してparsed_dfに結果を記録する
        """
        NEEDS_PART_OF_SPEECH =  ['名詞', '動詞', '形容詞']
        tagger = MeCab.Tagger(f"-Ochasen -d {ipadic.MECAB_ARGS}") # 辞書を読み込んでMecabのインスタンスを作成
        def extract_words(text):
            words = []
            result = tagger.parseToNode(text)
            while result:
                splited = result.feature.split(',')
                if splited[0] in NEEDS_PART_OF_SPEECH: # 解析の結果、単語が名詞, 動詞, 形容詞であればリストに追加
                    if splited[6] == '*':
                        words.append(result.surface) # 活用系の原型に変換
                    else:
                        words.append(splited[6])
                result = result.next
            return list(set(words))

        self.df_dataset['parsed_df'] = self.df_dataset.review_body.apply(lambda row: extract_words(row))
        self.sample_logging(self.df_dataset, 'parsed_df', 3)
        self.next(self.remove_stop_words)

    @step
    def remove_stop_words(self):
        """
        形態素解析した単語の一覧からストップワードを除去します
        """
        def _remove_stop_words(words):
            return list(set(words) - self.stop_words) # 集合の差分で重複した単語を除去

        self.df_dataset['remove_stopwords'] = self.df_dataset.parsed_df.apply(lambda row: _remove_stop_words(row))
        self.sample_logging(self.df_dataset, 'remove_stopwords', 3)
        self.next(self.remove_numeric_and_symbol)

    @step
    def remove_numeric_and_symbol(self):
        """
        単語の一覧から記号と数値を除去します
        - 数値: string.isnumeric()に該当するものを除去
        - 記号: unicodeで'[\u3000-\u303F]+'に該当するものを除去(日本語記号のほとんど)
        """
        regex = re.compile('[\u3000-\u303F]+') # unicodeで記号に該当するものを除去
        def _remove_numeric_and_symbol(words):
            return [ word for word in words if not word.isnumeric() and not regex.match(word) ]

        self.df_dataset['remove_numeric_and_symbol'] = self.df_dataset.remove_stopwords.apply(lambda row: _remove_numeric_and_symbol(row))
        self.sample_logging(self.df_dataset, 'remove_numeric_and_symbol', 3)
        self.next(self.end)

    @step
    def end(self):
        """
        全てのステップが完了後に実行します。
        """
        pass


if __name__ == "__main__":
    MyFlow()

なお、各前処理についての解説は簡単のため、今回は省略させて頂きます。各前処理にコメントを添えているので、そちらをご参照ください。前処理の追加が完了したので「show → run」で結果を確認してみます。

_$ python my_flow.py show

Metaflow 2.7.2 executing MyFlow for user:okb

Metaflowを用いて、fasttextを学習させます。
学習データにはhagging faceのdatasetで公開されている20万件のAmazonのレビューを使用します。
前処理として以下を実施します。
- テキストを形態素解析して単語に分割して「名詞、動詞、形容詞」のみを取り出す
- ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します
- 不要な単語の除去(記号や数値)

Step start
    最初にデータの読み込みを行います。
    - hugging faceのdatasetからアマゾンレビューを読み込む
    - stop_wordsをstop_words.txtから読み込む
    => exec_mecab

Step exec_mecab
    Mecabを使用してデータセットを形態素解析を行います。
    - 形態素に分割
    - 名詞, 動詞, 形容詞に該当する単語のみに絞り込む
    - applyを使用してparsed_dfに結果を記録する
    => remove_stop_words

Step remove_stop_words
        """
        単語の一覧から記号と数値を除去します
        - 数値: string.isnumeric()に該当するものを除去
        - 記号: unicodeで'[\u3000-\u303F]+'に該当するものを除去(日本語記号のほとんど)
        """
    形態素解析した単語の一覧からストップワードを除去します
    => remove_numeric_and_symbol

Step remove_numeric_and_symbol
    ?
    => end

Step end
    全てのステップが完了後に実行します。

$ python my_flow.py run

Metaflow 2.7.2 executing MyFlow for user:okb
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-07-18 18:00:31.620 Workflow starting (run-id 1658134831512543):
2022-07-18 18:00:31.669 [1658134831512543/start/1 (pid 7526)] Task is starting.
2022-07-18 18:00:33.747 [1658134831512543/start/1 (pid 7526)] Reusing dataset amazon_reviews_multi (/home/jovyan/.cache/huggingface/datasets/amazon_reviews_multi/ja/1.0.0/724e94f4b0c6c405ce7e476a6c5ef4f87db30799ad49f765094cf9770e0f7609)
2022-07-18 18:00:45.810 [1658134831512543/start/1 (pid 7526)] ::: ストップワードの読み込みが完了しました(データ数=319)
2022-07-18 18:00:46.716 [1658134831512543/start/1 (pid 7526)] ::: データの読み込みが完了しました(データ数=200000)
2022-07-18 18:00:46.740 [1658134831512543/start/1 (pid 7526)] Task finished successfully.
2022-07-18 18:00:46.850 [1658134831512543/exec_mecab/2 (pid 7544)] Task is starting.
2022-07-18 18:00:59.694 [1658134831512543/exec_mecab/2 (pid 7544)] :::結果サンプル
2022-07-18 18:00:59.735 [1658134831512543/exec_mecab/2 (pid 7544)] :::サンプル(1/3)> ['良い', '白い', 'なる', 'おしろい', '伸び']...
2022-07-18 18:00:59.738 [1658134831512543/exec_mecab/2 (pid 7544)] :::サンプル(2/3)> ['楽', 'よろこぶ', 'みる', '言う', 'みたい']...
2022-07-18 18:00:59.740 [1658134831512543/exec_mecab/2 (pid 7544)] :::サンプル(3/3)> ['する', '動く', '充電', 'なる', 'ヶ月']...
2022-07-18 18:01:05.791 [1658134831512543/exec_mecab/2 (pid 7544)] Task finished successfully.
2022-07-18 18:01:05.865 [1658134831512543/remove_stop_words/3 (pid 7562)] Task is starting.
2022-07-18 18:01:10.472 [1658134831512543/remove_stop_words/3 (pid 7562)] :::結果サンプル
2022-07-18 18:01:10.518 [1658134831512543/remove_stop_words/3 (pid 7562)] :::サンプル(1/3)> ['全体', 'ケーブル', '満足', '扱う', '固い']...
2022-07-18 18:01:10.521 [1658134831512543/remove_stop_words/3 (pid 7562)] :::サンプル(2/3)> ['沸かす', '便利', 'お気に入り', '朝', '温かい']...
2022-07-18 18:01:10.523 [1658134831512543/remove_stop_words/3 (pid 7562)] :::サンプル(3/3)> ['デスク', 'タブレット', '周り', '圧', '収納']...
2022-07-18 18:01:18.257 [1658134831512543/remove_stop_words/3 (pid 7562)] Task finished successfully.
2022-07-18 18:01:18.313 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] Task is starting.
2022-07-18 18:01:23.097 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] :::結果サンプル
2022-07-18 18:01:23.147 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] :::サンプル(1/3)> ['にくい', '取り外し', 'くっつく', '使える', '出来る']...
2022-07-18 18:01:23.150 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] :::サンプル(2/3)> ['甘い', '感じる', '値段', '高い', '味']...
2022-07-18 18:01:23.152 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] :::サンプル(3/3)> ['食べる', '残念', 'カリカリ', '言える', '買う']...
2022-07-18 18:01:30.455 [1658134831512543/remove_numeric_and_symbol/4 (pid 7582)] Task finished successfully.
2022-07-18 18:01:30.509 [1658134831512543/end/5 (pid 7601)] Task is starting.
2022-07-18 18:01:31.632 [1658134831512543/end/5 (pid 7601)] Task finished successfully.
2022-07-18 18:01:31.658 Done!

無事に全てのstepを通過しました。
結果を見てみると、まず形態素解析が問題なく行われたことが分かります。ただし、辞書の活用とストップワード、数値や記号の除去はログからは正しく行われているのか、判別が難しいですが、ひとまずFlowが完遂したので、次に進みたいと思います。

学習と検証

前処理が完了したので、残りは学習を行なって、結果の確認をするのみです。
今回はgensimのfasttextを使うため、以下の3行で学習が可能です。またfasttextを使うため、非常に高速で学習を終えることが出来ます。

texts = ['大根', '人参', '那須', 'きゅうり'] # example

# パラメータに関してはgensimのfasttextのdocumentを参考
# https://radimrehurek.com/gensim/models/fasttext.html
model = FastText(
  vector_size=4, # 単語ベクトルの次元数
  window=3, # 単語と予測される単語との距離の最大値
  min_count=1 # 登場回数が1回以下であれば無視
)
model.build_vocab(corpus_iterable=texts)
model.train(corpus_iterable=texts, total_examples=len(texts), epochs=10)

学習結果の確認は任意の単語を与え、近しい単語を上位5件出力させることで行うことにします。

match = self.model.wv.most_similar(word, topn=5)

先程の前処理、「remove_numeric_and_symbol」に続いて学習の実行と、学習結果の確認の2つをstepに組み込みます。

  • 5.train
  • 6.verification
  • 7.end

これで全体のコードが出揃いました。

from metaflow import FlowSpec, step
from datasets import load_dataset
from gensim.models import FastText
import pandas as pd
import os
import MeCab
import ipadic
import re
from pprint import pprint

def load_stop_words(filename):
    pwd = os.path.join(os.path.dirname(__file__))
    filepath = os.path.join(pwd, filename)
    with open(filepath) as f:
        return f.read().split("\n")


class MyFlow(FlowSpec):
    """
    Metaflowを用いて、fasttextを学習させます。
    学習データにはhagging faceのdatasetで公開されている20万件のAmazonのレビューを使用します。
    前処理として以下を実施します。
    - テキストを形態素解析して単語に分割して「名詞、動詞、形容詞」のみを取り出す
    - ストップワード(eg: なに)の除去。ストップワードはこちらのサイトにあるものを使用します
    - 不要な単語の除去(記号や数値)
    """

    def sample_logging(self, df, column, top_n):
        print(':::結果サンプル')
        for n in range(top_n):
            val = df.sample()[column].values[0]
            print(f":::サンプル({n + 1}/{top_n})> {val[:5]}...")

    @step
    def start(self):
        """
        最初にデータの読み込みを行います。
        - hugging faceのdatasetからアマゾンレビューを読み込む
        - stop_wordsをstop_words.txtから読み込む
        """
        self.stop_words = set(load_stop_words('stop_words.txt'))
        self.dataset = load_dataset('amazon_reviews_multi', 'ja', split='train')
        self.df_dataset = pd.DataFrame(self.dataset)

        print(f"::: ストップワードの読み込みが完了しました(データ数={len(self.stop_words)})")
        print(f"::: データの読み込みが完了しました(データ数={len(self.dataset)})")
        self.next(self.exec_mecab)

    @step
    def exec_mecab(self):
        """
        Mecabを使用してデータセットを形態素解析を行います。
        - 形態素に分割
        - 名詞, 動詞, 形容詞に該当する単語のみに絞り込む
        - applyを使用してparsed_dfに結果を記録する
        """
        NEEDS_PART_OF_SPEECH =  ['名詞', '動詞', '形容詞']
        tagger = MeCab.Tagger(f"-Ochasen -d {ipadic.MECAB_ARGS}") # 辞書を読み込んでMecabのインスタンスを作成
        def extract_words(text):
            words = []
            result = tagger.parseToNode(text)
            while result:
                splited = result.feature.split(',')
                if splited[0] in NEEDS_PART_OF_SPEECH: # 解析の結果、単語が名詞, 動詞, 形容詞であればリストに追加
                    if splited[6] == '*':
                        words.append(result.surface) # 活用系の原型に変換
                    else:
                        words.append(splited[6])
                result = result.next
            return list(set(words))

        self.df_dataset['parsed_df'] = self.df_dataset.review_body.apply(lambda row: extract_words(row))
        self.sample_logging(self.df_dataset, 'parsed_df', 3)
        self.next(self.remove_stop_words)

    @step
    def remove_stop_words(self):
        """
        形態素解析した単語の一覧からストップワードを除去します
        """
        def _remove_stop_words(words):
            return list(set(words) - self.stop_words) # 集合の差分で重複した単語を除去

        self.df_dataset['remove_stopwords'] = self.df_dataset.parsed_df.apply(lambda row: _remove_stop_words(row))
        self.sample_logging(self.df_dataset, 'remove_stopwords', 3)
        self.next(self.remove_numeric_and_symbol)

    @step
    def remove_numeric_and_symbol(self):
        """
        単語の一覧から記号と数値を除去します
        - 数値: string.isnumeric()に該当するものを除去
        - 記号: unicodeで'[\u3000-\u303F]+'に該当するものを除去(日本語記号のほとんど)
        """
        regex = re.compile('[\u3000-\u303F]+') # unicodeで記号に該当するものを除去
        def _remove_numeric_and_symbol(words):
            return [ word for word in words if not word.isnumeric() and not regex.match(word) ]

        self.df_dataset['remove_numeric_and_symbol'] = self.df_dataset.remove_stopwords.apply(lambda row: _remove_numeric_and_symbol(row))
        self.sample_logging(self.df_dataset, 'remove_numeric_and_symbol', 3)
        self.next(self.train)

    @step
    def train(self):
        """
        前処理したテキストを用いてfasttextの学習を行う
        - vector_size=50
        - window=3
        - min_count=1
        """
        print('::: fasttextの学習を開始します')
        texts = self.df_dataset['remove_numeric_and_symbol']
        self.model = FastText(vector_size=50, window=3, min_count=1)
        self.model.build_vocab(corpus_iterable=texts)
        self.model.train(corpus_iterable=texts, total_examples=len(texts), epochs=10)

        print('::: fasttextの学習が完了しました')
        self.next(self.verification)

    @step
    def verification(self):
        """
        学習したfasttextを用いて任意の単語から近しい単語を5件出力する
        - df_datasetから1件、ランダムに前処理後の単語一覧を取得
        - 上から5つの単語を取得して、それぞれ近しい単語を5件出力させる
        """
        print(':::学習結果')
        selected = self.df_dataset.sample().remove_numeric_and_symbol.values[0][0:5]
        for word in selected:
            match = self.model.wv.most_similar(word, topn=5)
            print(f":::単語->{word}")
            pprint(match)

        self.next(self.end)

    @step
    def end(self):
        """
        全てのステップが完了後に実行します。
        """
        pass


if __name__ == "__main__":
    MyFlow()

$ python my_flow.py show

Metaflow 2.7.2 executing MyFlow for user:okb
:
Step remove_numeric_and_symbol
    :
    => train

Step train
    前処理したテキストを用いてfasttextの学習を行う
    - vector_size=50
    - window=3
    - min_count=1
    => verification

Step verification
    学習したfasttextを用いて任意の単語から近しい単語を5件出力する
    - df_datasetから1件、ランダムに前処理後の単語一覧を取得
    - 上から5つの単語を取得して、それぞれ近しい単語を5件出力させる
    => end

Step end
    全てのステップが完了後に実行します。

$ python my_flow.py run

Metaflow 2.7.2 executing MyFlow for user:okb
:
2022-07-18 19:10:04.454 [1658138943074180/train/5 (pid 10449)] Task is starting.
2022-07-18 19:10:05.343 [1658138943074180/train/5 (pid 10449)] ::: fasttextの学習を開始します
2022-07-18 19:10:34.197 [1658138943074180/train/5 (pid 10449)] ::: fasttextの学習が完了しました
2022-07-18 19:11:11.368 [1658138943074180/train/5 (pid 10449)] Task finished successfully.
2022-07-18 19:11:11.474 [1658138943074180/verification/6 (pid 10535)] Task is starting.
2022-07-18 19:11:12.861 [1658138943074180/verification/6 (pid 10535)] :::学習結果
2022-07-18 19:11:32.282 [1658138943074180/verification/6 (pid 10535)] :::単語->きれい
2022-07-18 19:12:08.313 [1658138943074180/verification/6 (pid 10535)] [('手垢', 0.8051472902297974),
2022-07-18 19:12:08.323 [1658138943074180/verification/6 (pid 10535)] ('黄ばむ', 0.7989108562469482),
2022-07-18 19:12:08.323 [1658138943074180/verification/6 (pid 10535)] ('糊', 0.7932964563369751),
2022-07-18 19:12:08.323 [1658138943074180/verification/6 (pid 10535)] ('拭く', 0.7872304916381836),
2022-07-18 19:12:08.324 [1658138943074180/verification/6 (pid 10535)] ('後処理', 0.7870801091194153)]
2022-07-18 19:12:08.324 [1658138943074180/verification/6 (pid 10535)] :::単語->マトメージュ
2022-07-18 19:12:08.324 [1658138943074180/verification/6 (pid 10535)] [('コロリアージュ', 0.962466299533844),
2022-07-18 19:12:08.325 [1658138943074180/verification/6 (pid 10535)] ('ルポルタージュ', 0.9411123394966125),
2022-07-18 19:12:08.325 [1658138943074180/verification/6 (pid 10535)] ('リネージュ', 0.9188073873519897),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('コラージュ', 0.905333936214447),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('コサージュ', 0.8862292766571045)]
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] :::単語->クセ
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] [('イカ', 0.8742212653160095),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('がんばる', 0.8731144070625305),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('刺激', 0.8662601113319397),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('症', 0.8616589903831482),
2022-07-18 19:12:08.326 [1658138943074180/verification/6 (pid 10535)] ('寝起き', 0.8578631281852722)]
:
2022-07-18 19:12:08.355 [1658138943074180/verification/6 (pid 10535)] Task finished successfully.
2022-07-18 19:12:08.427 [1658138943074180/end/7 (pid 10581)] Task is starting.
2022-07-18 19:12:10.302 [1658138943074180/end/7 (pid 10581)] Task finished successfully.
2022-07-18 19:12:10.312 Done!

学習の実行と出力結果の確認まで行えました。
ランダムに出力された結果を見てみると「きれい」に対して「黄ばむ, 糊, 拭く, 後処理」と言われてみると近そう…な単語が列挙されていることが分かります。
その一方で、「クセ」に対して「イカ, がんばる, 刺激…」と微妙な結果も出力されています。
とはいえ、これでMetaflowを使ったパイプラインの構築が完了しました。

以降は「run」と入力するだけで、学習データの読み込みから前処理、学習の実行、学習結果の確認までが完了するので、気軽に様々なパラメーターにしたり、前処理を追加、削除したりといった検証が行うことが出来ます。
また、今回は行いませんでしたが、クラウド環境へのデプロイや、データドリフトの検知、追加データのアップロードやダウンロードなど、ユースケースに合わせて様々なstepを組み込むことがMetaflowでは可能です。

まとめ

いかがでしたでしょうか。
今回はMLOps初心者の私がMetaflowを使って学習までのサイクルをFlow(パイプライン)にして、実行出来る形にしてみました。MetaflowではFlowを1つ1つの処理単位であるStepを用意していくことで、シンプルな記述で簡単にFlowを構築することが出来ました。

Pythonの文法さえ知っていればMetaflowを使うことが出来るので、機械学習プロジェクトにおける一連のサイクルをパイプライン化すれば、面倒な手作業をせずとも簡単にサイクルを実行することが出来るでしょう。
実はMetaflowにはまだまだ多くの機能があります。今回は基本的なものだけを紹介しましたが、他にも便利な機能がいくつもあります。

cardと呼ばれる簡易なレポート画面や、.metaflowフォルダに作成される結果のキャッシュ、および結果の読み込み、再利用。デプロイ先でのスケーリングなど、非常に多くの便利機能があります。

こちらは機会があれば、また後日紹介させて頂きます。みなさんもぜひMetaflowを触ってみてください。

おまけ: resume機能について

metaflowにはデバッグに便利な「resume」という機能が実装されています。
記事内で紹介したのは主に「python my_flow.py run」という実行方式でした。これは、構築したパイプラインにエラーがなければ、最後まで処理が実行されますが、開発段階であれば何かしらエラーに遭遇するでしょう。
そんな時に「さっきコケた箇所の続きから実行出来ればなぁ…」を可能にするのが「resume」です。使い方は簡単で今までrunと実行していた箇所をresumeに変えるだけです。
以下に簡単なサンプルファイルとパイプラインを用意しました。処理はstart → frist → second → endを順に実行するだけですが、関数secondにはあえてエラーが出る処理を記載しています。

resume_sample.py

from metaflow import FlowSpec, step

class ResumeSample(FlowSpec):

    @step
    def start(self):
        self.next(self.first)

    @step
    def first(self):
        print('first exec')
        self.next(self.second)

    @step
    def second(self):
        text = "second exec" + 1 # +の演算子がstringとintで失敗する
        print(text)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    ResumeSample()

まずはrunを使って実行してみます。

$ python resume_sample.py run
Metaflow 2.7.2 executing ResumeSample for user:okb
:
2022-07-31 20:49:04.171 Workflow starting (run-id 1659268143980174):
2022-07-31 20:49:04.233 [1659268143980174/start/1 (pid 71649)] Task is starting.
2022-07-31 20:49:04.961 [1659268143980174/start/1 (pid 71649)] Task finished successfully.
2022-07-31 20:49:05.068 [1659268143980174/first/2 (pid 71652)] Task is starting.
2022-07-31 20:49:05.401 [1659268143980174/first/2 (pid 71652)] first exec
2022-07-31 20:49:05.605 [1659268143980174/first/2 (pid 71652)] Task finished successfully.
2022-07-31 20:49:05.774 [1659268143980174/second/3 (pid 71655)] Task is starting.
2022-07-31 20:52:11.788 [1659268330515128/second/3 (pid 71785)] Task is starting.
2022-07-31 20:52:12.205 [1659268330515128/second/3 (pid 71785)] <flow ResumeSample step second> failed:
2022-07-31 20:52:12.334 [1659268330515128/second/3 (pid 71785)] Internal error
:
2022-07-31 20:52:12.414 [1659268330515128/second/3 (pid 71785)] text = "second exec" + 1
2022-07-31 20:52:12.415 [1659268330515128/second/3 (pid 71785)] TypeError: can only concatenate str (not "int") to str
2022-07-31 20:52:12.415 [1659268330515128/second/3 (pid 71785)]

当然ながら、secondにてエラーが発生しました。まだ修正をしていないですが、resumeで再実行してみるとどうなるでしょうか。

$ python resume_sample.py resume
Metaflow 2.7.2 executing ResumeSample for user:okb
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-07-31 20:54:09.612 Gathering required information to resume run (this may take a bit of time)...
2022-07-31 20:54:09.786 Workflow starting (run-id 1659268449599264):
2022-07-31 20:54:09.830 [1659268449599264/start/1] Cloning results of a previously run task 1659268143980174/start/1
2022-07-31 20:54:10.415 [1659268449599264/first/2] Cloning results of a previously run task 1659268143980174/first/2
:
2022-07-31 20:54:11.644 [1659268449599264/second/3 (pid 71869)] text = "second exec" + 1
2022-07-31 20:54:11.645 [1659268449599264/second/3 (pid 71869)] TypeError: can only concatenate str (not "int") to str
2022-07-31 20:54:11.645 [1659268449599264/second/3 (pid 71869)]

startとfirstの実行ログに注目してみると「Cloning results of a previously run task 1659268143980174…」と出力されています。これは前回の実行結果(1659268143980174)をクローンして実行していることを意味しています。また、print関数の結果が表示されていないことも分かります。

@step
def second(self):
    text = f"second exec #{1}"
    print(text)
    self.next(self.end)
$ python resume_sample.py resume
Metaflow 2.7.2 executing ResumeSample for user:okb
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-07-31 21:02:43.743 Gathering required information to resume run (this may take a bit of time)...
2022-07-31 21:02:43.892 Workflow starting (run-id 1659268963730606):
2022-07-31 21:02:43.933 [1659268963730606/start/1] Cloning results of a previously run task 1659268143980174/start/1
2022-07-31 21:02:44.519 [1659268963730606/first/2] Cloning results of a previously run task 1659268143980174/first/2
2022-07-31 21:02:45.189 [1659268963730606/second/3 (pid 72194)] Task is starting.
2022-07-31 21:02:45.511 [1659268963730606/second/3 (pid 72194)] second exec #1
2022-07-31 21:02:45.647 [1659268963730606/second/3 (pid 72194)] Task finished successfully.
2022-07-31 21:02:45.767 [1659268963730606/end/4 (pid 72197)] Task is starting.
2022-07-31 21:02:46.261 [1659268963730606/end/4 (pid 72197)] Task finished successfully.
2022-07-31 21:02:46.266 Done!

無事に通りました!

参考文献