地方在住IT系ニート

bkds

Prefectのメモ

はじめに

この記事では、**Pythonベースのワークフローオーケストレーションツール「Prefect」**を利用して、バッチ処理(定期実行タスク)を効率的に管理する方法をまとめました。 従来は cronAirflow を利用するケースが多くありましたが、最近では 軽量かつクラウド連携しやすい Prefect が注目を集めています。 ここでは、基本概念からインストール、ジョブ登録、スケジュール設定までを一通り整理しています。

Prefectとは

Prefect は、Pythonで記述できるワークフロー管理ツールです。データ処理・ETL・バッチ実行・クラウドAPI連携など、複数のタスクを柔軟に制御することができます。

主な特徴は以下のとおりです。

インストール

PrefectはPythonパッケージとして提供されています。以下のコマンドで簡単に導入可能です。

pip install prefect

バージョンを確認して、正しくインストールできていることを確認します。

prefect version

基本構成

Prefectの基本構成は以下の要素で成り立ちます。

要素説明
Flow一連のタスク処理を表すPython関数
Task処理単位を表す関数(オプション)
DeploymentFlowをスケジュール・環境設定と結びつける定義
WorkerFlowの実行を担当するプロセス

シンプルなFlowの例

以下は、Hello Worldを出力するシンプルなFlowの例です。

from prefect import flow, task

@task
def say_hello():
    print("Hello, Prefect!")

@flow
def hello_flow():
    say_hello()

if __name__ == "__main__":
    hello_flow()

このスクリプトを実行すると、タスクがFlowとしてPrefectのトラッキング下で実行されます。

デプロイメント作成

Prefectでは、CLIからFlowをデプロイします。

prefect deploy flows/job.py:job -n hourly-job -i 3600

上記コマンドは、prefect.yaml を自動生成し、Flowを1時間ごとに実行するデプロイメントを作成します。

prefect.yaml は以下のような形式です。

deployments:
  - name: hourly-job
    entrypoint: flows/job.py:job
    schedule:
      interval: 1h

ワーカーの起動

ジョブを自動実行するには、Prefect 3では**ワーカー(Worker)**を利用します。

prefect worker start -p default-agent-pool

このコマンドを実行すると、登録されたデプロイメントがスケジュールに従って順次実行されます。

メモ

<-- Back to home
にほんブログ村 IT技術ブログ IT技術メモへ PVアクセスランキング にほんブログ村