サラリーマンエンジニアブログ

大企業で働くなんちゃってITエンジニアのブログです。

Amazon Athenaをboto3から動かす

Athena
やぁどうもフナミズです

Amazon Athenaをboto3から動かす

本日はAmazon AthenaをBoto3から動かしてみようと思います。


Athenaとは

Amazon Athena とは、標準 SQL を使用して Amazon S3 内のデータを直接分析することができるインタラクティブなクエリサービス。

Amazon S3 上にあるデータを指定して、スキーマを定義して、 SQL を使ってデータのクエリを開始するだけです。

マネージメントコンソール上でクエリを叩けて、アドホックに分析できるので非常に便利ですね。
f:id:yarite_parker:20180829121704p:plain
 マネージメントコンソールからも扱えるのですが、今回はPythonのBoto3でAthenaにクエリを投げていきたいと思います。

Boto3とは

AWS SDK for Pythonなライブラリですね。
PythonでAWSの様々なサービスを扱うことを可能にするサービスですね。

 

前提条件

  • python3がインストールされていること 
  • boto3がインストールされていること

python3のインストール方法 (EC2 Amazon Linux)

sudo yum install python3

boto3のインストール方法 (EC2 Amazon Linux)

sudo yum install pip3
pip3 install boto3

初期値の設定

関数化しているので、関数の引数となるパラメータを代入します。

import os
import boto3
from time import sleep

data_path = "s3//test/data"   # データの入っているs3 ディレクトリパス
db_name = "test"  # データベース名
table_name = "test" # テーブル名
log_path = "s3://test/logs"  # 実行ログのパス
client = boto3.client('athena', region_name='us-west-2')  # bot3のクライアント設定

クエリの結果取得

関数

QUERY = "select *from test limit 100"

def query(db_name, table_name, client, query):
    # Contents of query
    post_query = client.start_query_execution(
    QueryString = query,
    QueryExecutionContext = {
        'Database': db_name
        },
    ResultConfiguration = {
        'OutputLocation': log_path
        })
    
    # Get query id
    query_id = post_query['QueryExecutionId']

    # Progress
    while True:
        query_response = client.get_query_execution(QueryExecutionId = query_id)
        state = query_response['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print("STATUS : " + state)
            break
        else:
            print("STATUS : " + state)
            sleep(1)
    
    # Get Result
    result = client.get_query_results(QueryExecutionId = query_id)
    return result['ResultSet']['Rows']

実行

>>> result = query(db_name, table_name, client, QUERY)

結果

STATUS : RUNNING
STATUS : RUNNING
STATUS : SUCCEEDED
>>> result
..............
# 結果が表示される

テーブル作成

関数

# テーブル作成関数
def create_table_command(db_name, table_name, file_path, client):
    # Contents of query
    CREATE_TABLE = "CREATE EXTERNAL TABLE `%s`( \
      `aaa` string,  \
      `bbb` string, \
      `ccc` bigint,  \
      `ddd` timestamp, \
      `eee` timestamp, \
      `fff` timestamp, \
      `ggg` bigint) \
    PARTITIONED BY ( \
      `fff` string) \
    ROW FORMAT SERDE \
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' \
    STORED AS INPUTFORMAT \
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' \
    LOCATION \
      '%s'"%(table_name, file_path)

    # Excute query
    post_query = client.start_query_execution(
    QueryString = CREATE_TABLE,
    QueryExecutionContext = {
        'Database': db_name
        },
    ResultConfiguration = {
        'OutputLocation': log_path
        })

    # Get query id
    query_id = post_query['QueryExecutionId']

    # Progress
    while True:
        create_table_response = client.get_query_execution(QueryExecutionId = query_id)
        state = create_table_response['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print("CREATE TABLE STATUS : " + state)
            break
        else:
            print("CREATE TABLE STATUS : " + state)
            sleep(1)

実行

>>> create_table_command(db_name, table_name, file_path, client)

結果

>>> CREATE TABLE STATUS : RUNNING
>>> CREATE TABLE STATUS : SUCCEEDED

デーブル削除

関数

# テーブル削除関数
def drop_table_command(db_name, table_name, client):
    # Contents of query
    DROP_TABLE = "DROP TABLE IF EXISTS %s"%(table_name)

    # Excute query
    post_query = client.start_query_execution(
    QueryString = DROP_TABLE,
    QueryExecutionContext = {
        'Database': db_name
        },
        ResultConfiguration={
    	'OutputLocation': log_path
    	})

    # Get query id
    query_id = post_query['QueryExecutionId']

    # Progress
    while True:
        drop_table_response = client.get_query_execution(QueryExecutionId = query_id)
        state = drop_table_response['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print("DROP TABLE STATUS : " + state)
            break
        else:
            print("DROP TABLE STATUS : " + state)
            sleep(1)

実行

>>> drop_table_command(db_name, table_name, client)

結果

>>> DROP TABLE STATUS : RUNNING
>>> DROP TABLE STATUS : SUCCEEDED

パーティション追加

関数

# パーティション追加関数
def add_partition_command(db_name, table_name, client):
    # Contents of query
    ADD_PARTITION = "MSCK REPAIR TABLE `%s`"%(table_name)

    # Excute query
    post_query = client.start_query_execution(
    QueryString = ADD_PARTITION,
    QueryExecutionContext = {
        'Database': db_name
        },
    ResultConfiguration = {
        'OutputLocation': log_path
        })

    # Get query id
    query_id = post_query['QueryExecutionId']

    # Progress
    while True:
        add_partition_response = client.get_query_execution(QueryExecutionId = query_id)
        state = add_partition_response['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print("ADD PARTITION STATUS : " + state)
            break
        else:
            print("ADD PARTITION STATUS : " + state)
            sleep(1)

実行

>>> add_partition_command(db_name, table_name, client)

結果

>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : RUNNING
>>> ADD PARTITION STATUS : SUCCEEDED

まとめ

Athena & Boto3はすごく便利。
ぜひ使ってみてはいかがでしょうか?