目次
前提記事
環境用意
PythonからMySQLへ書き込むためのライブラリを用意します。
(ApacheBeamとの関係も考えたほうがいいのだと思いますが、、、できるのかドキドキです。)
Cloud9へ以下のプロジェクトディレクトリを用意します。
ec2-user:~/environment/Python_Project $ mkdir BeamMySQL
ec2-user:~/environment/Python_Project $ cd BeamMySQL/
ec2-user:~/environment/Python_Project/BeamMySQL $ python -m venv beammysqlenv
ec2-user:~/environment/Python_Project/BeamMySQL $ source beammysqlenv/bin/activate
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $
必要なライブラリをインストールします。
$ pip install mysql-connector-python
$ pip install apache-beam
DBスキーマを用意
どうやって作るかいろいろ考えたのですが、、WebベースならGoogleSpreadSheetかなと、
あまりいい感じの記事がなかったので、CreateTableを作成する部分だけ作ってみました。。
(これもめっちゃ微妙ですね。ほんとはa5m2とか使ったほうがいいですね。。。)
https://docs.google.com/spreadsheets/d/1c7Kwlwu0G_7VFLW1wJr9qF0VsHBSSg4gNNzuG3F8aOI/edit?usp=sharing
CREATE DATABASE beamTest;
USE beamTest;
CREATE TABLE beamTest(
id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT ,
name VARCHAR(10)
);
desc beamTest;
この定義を作るSpreadSheet作るのに結構時間かかって結構悲しいですね。(WebUIって遅いしExcelで使ってるコマンドとか使えない。。)
気を取り直して、MySQLへテーブル作成します。上部2行は今回作成しますが好きにカスタムしてください。phpMyAdminのSQLタブから上記のDDLもどきを実行してください。
無事、テーブルができましたね。
mysql-connectorの動作確認
さて、今回は以下のPGMを実行します。
要所にコメントを入れているので読んでみてください。
エラーハンドリングなども盛り込んでいるのと、CRUDそれぞれの操作も入れてます。
import mysql.connector as mydb
import logging
import traceback
logger = logging.getLogger("sample")
logging.basicConfig(level=logging.DEBUG)
# コネクションの作成
conn = mydb.connect(host='localhost',
port='3306',
user='root',
password='',
database='beamTest')
# トランザクションの開始(検証した感じだとデフォルトFALSE)
conn.autocommit = False
try:
#SETUP
logging.debug("===SETUP===")
conn.ping(reconnect=True)
logging.debug(conn.is_connected())
# DB操作用にカーソルを作成
cur = conn.cursor()
# テーブル作成(上のDDL時間かかったけどいらなかった(^_-)-☆)
table = "test"
cur.execute("DROP TABLE IF EXISTS %s;" % table)
cur.execute("""
CREATE TABLE IF NOT EXISTS %s (
id int auto_increment primary key,
name varchar(50) not null
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
""" % table)
# INSERT
logging.debug("===INSERT===")
query = "INSERT INTO test VALUES (%s, %s)"
# タプル
cur.execute(query, (1, 'hello'))
logging.debug("count:" + str(cur.rowcount))
# ディクショナリ
cur.execute("INSERT INTO test VALUES (%(id)s, %(name)s)", {
'id': 2,
'name': 'world'
}, {
'id': 3,
'name': 'foo'
})
logging.debug("count:" + str(cur.rowcount))
# タプルリスト
cur.executemany(query, [
(3, 'foo'),
(4, 'bar'),
])
logging.debug("count:" + str(cur.rowcount))
# 2次元リスト
cur.executemany(query, [
[5, 'hoge'],
[6, 'huga'],
])
logging.debug("count:" + str(cur.rowcount))
# SELECT
logging.debug("===SELECT===")
cur.execute("SELECT * FROM test ORDER BY id ASC")
# 全てのデータを取得
rows = cur.fetchall()
for row in rows:
logging.debug(row)
# fetch後じゃないとrowcountは効かない
logging.debug("count:" + str(cur.rowcount))
# 1件取得
cur.execute("SELECT * FROM test WHERE name=%s", ('foo', ))
logging.debug(cur.fetchone())
logging.debug("count:" + str(cur.rowcount))
# UPDATE
logging.debug("===UPDATE===")
cur.execute('UPDATE test SET name=%s WHERE id<%s', ('bar', 3))
logging.debug("count:" + str(cur.rowcount))
# DELETE
logging.debug("===DELETE===")
cur.execute('DELETE FROM test WHERE id < 2')
logging.debug("count:" + str(cur.rowcount))
# # 例外を投げる
# raise ValueError("ERROR")
# 後処理(正常時)
conn.commit()
cur.close()
conn.close()
except Exception as e:
# 後処理(例外時)
traceback.print_exc()
cur.close()
conn.rollback()
conn.close()
動作
今回は折角例外時の処理も書いたので、その動作も確認しましょう。
正常
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ python beamMySQL.py --debug
DEBUG:root:===SETUP===
DEBUG:root:True
DEBUG:root:===INSERT===
DEBUG:root:count:1
DEBUG:root:count:1
DEBUG:root:count:2
DEBUG:root:count:2
DEBUG:root:===SELECT===
DEBUG:root:(1, 'hello')
DEBUG:root:(2, 'world')
DEBUG:root:(3, 'foo')
DEBUG:root:(4, 'bar')
DEBUG:root:(5, 'hoge')
DEBUG:root:(6, 'huga')
DEBUG:root:count:6
DEBUG:root:(3, 'foo')
DEBUG:root:count:1
DEBUG:root:===UPDATE===
DEBUG:root:count:2
DEBUG:root:===DELETE===
DEBUG:root:count:1
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $
ちゃんとデータが入りましたね、UPDATEやDELETEも想定通り動作しています。
異常時
90行目のコメントアウトを外します。
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ python beamMySQL.py --debug
DEBUG:root:===SETUP===
DEBUG:root:True
DEBUG:root:===INSERT===
DEBUG:root:count:1
DEBUG:root:count:1
DEBUG:root:count:2
DEBUG:root:count:2
DEBUG:root:===SELECT===
DEBUG:root:(1, 'hello')
DEBUG:root:(2, 'world')
DEBUG:root:(3, 'foo')
DEBUG:root:(4, 'bar')
DEBUG:root:(5, 'hoge')
DEBUG:root:(6, 'huga')
DEBUG:root:count:6
DEBUG:root:(3, 'foo')
DEBUG:root:count:1
DEBUG:root:===UPDATE===
DEBUG:root:count:2
DEBUG:root:===DELETE===
DEBUG:root:count:1
Traceback (most recent call last):
File "beamMySQL.py", line 90, in <module>
raise ValueError("ERROR")
ValueError: ERROR
ちゃんとエラーがでましたね!テーブルは
できちゃってる&データがクリーンされてますね。
恐らくDROPとCREATETABLEはトランザクションでロールバックはできないのでしょう。