Pythonでmysql-connector-pythonでDB処理+トランザクションの動作検証


前提記事

環境用意

PythonからMySQLへ書き込むためのライブラリを用意します。
(ApacheBeamとの関係も考えたほうがいいのだと思いますが、、、できるのかドキドキです。)

Cloud9へ以下のプロジェクトディレクトリを用意します。

ec2-user:~/environment/Python_Project $ mkdir BeamMySQL                                                                                                                                                                                              
ec2-user:~/environment/Python_Project $ cd BeamMySQL/
ec2-user:~/environment/Python_Project/BeamMySQL $ python -m venv beammysqlenv
ec2-user:~/environment/Python_Project/BeamMySQL $ source beammysqlenv/bin/activate
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ 

必要なライブラリをインストールします。

$ pip install mysql-connector-python
$ pip install apache-beam

DBスキーマを用意

どうやって作るかいろいろ考えたのですが、、WebベースならGoogleSpreadSheetかなと、
あまりいい感じの記事がなかったので、CreateTableを作成する部分だけ作ってみました。。
(これもめっちゃ微妙ですね。ほんとはa5m2とか使ったほうがいいですね。。。)

https://docs.google.com/spreadsheets/d/1c7Kwlwu0G_7VFLW1wJr9qF0VsHBSSg4gNNzuG3F8aOI/edit?usp=sharing

CREATE DATABASE beamTest;	
USE	beamTest;	
CREATE TABLE 	beamTest(	
	id INT(11)  PRIMARY KEY NOT NULL AUTO_INCREMENT	,
	name VARCHAR(10)
);
desc beamTest;

この定義を作るSpreadSheet作るのに結構時間かかって結構悲しいですね。(WebUIって遅いしExcelで使ってるコマンドとか使えない。。)

気を取り直して、MySQLへテーブル作成します。上部2行は今回作成しますが好きにカスタムしてください。phpMyAdminのSQLタブから上記のDDLもどきを実行してください。

無事、テーブルができましたね。

mysql-connectorの動作確認

さて、今回は以下のPGMを実行します。
要所にコメントを入れているので読んでみてください。

エラーハンドリングなども盛り込んでいるのと、CRUDそれぞれの操作も入れてます。

import mysql.connector as mydb
import logging
import traceback

logger = logging.getLogger("sample")
logging.basicConfig(level=logging.DEBUG)

# コネクションの作成
conn = mydb.connect(host='localhost',
                    port='3306',
                    user='root',
                    password='',
                    database='beamTest')
# トランザクションの開始(検証した感じだとデフォルトFALSE)
conn.autocommit = False

try:
    #SETUP
    logging.debug("===SETUP===")
    conn.ping(reconnect=True)
    logging.debug(conn.is_connected())

    # DB操作用にカーソルを作成
    cur = conn.cursor()

    # テーブル作成(上のDDL時間かかったけどいらなかった(^_-)-☆)
    table = "test"
    cur.execute("DROP TABLE IF EXISTS %s;" % table)
    cur.execute("""
        CREATE TABLE IF NOT EXISTS %s (
        id int auto_increment primary key,
        name varchar(50) not null
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
        """ % table)

    # INSERT
    logging.debug("===INSERT===")
    query = "INSERT INTO test VALUES (%s, %s)"
    # タプル
    cur.execute(query, (1, 'hello'))
    logging.debug("count:" + str(cur.rowcount))
    # ディクショナリ
    cur.execute("INSERT INTO test VALUES (%(id)s, %(name)s)", {
        'id': 2,
        'name': 'world'
    }, {
        'id': 3,
        'name': 'foo'
    })
    logging.debug("count:" + str(cur.rowcount))
    # タプルリスト
    cur.executemany(query, [
        (3, 'foo'),
        (4, 'bar'),
    ])
    logging.debug("count:" + str(cur.rowcount))
    # 2次元リスト
    cur.executemany(query, [
        [5, 'hoge'],
        [6, 'huga'],
    ])
    logging.debug("count:" + str(cur.rowcount))

    # SELECT
    logging.debug("===SELECT===")
    cur.execute("SELECT * FROM test ORDER BY id ASC")
    # 全てのデータを取得
    rows = cur.fetchall()
    for row in rows:
        logging.debug(row)
    # fetch後じゃないとrowcountは効かない
    logging.debug("count:" + str(cur.rowcount))

    # 1件取得
    cur.execute("SELECT * FROM test WHERE name=%s", ('foo', ))
    logging.debug(cur.fetchone())
    logging.debug("count:" + str(cur.rowcount))

    # UPDATE
    logging.debug("===UPDATE===")
    cur.execute('UPDATE test SET name=%s WHERE id<%s', ('bar', 3))
    logging.debug("count:" + str(cur.rowcount))

    # DELETE
    logging.debug("===DELETE===")
    cur.execute('DELETE FROM test WHERE id < 2')
    logging.debug("count:" + str(cur.rowcount))

    # # 例外を投げる
    # raise ValueError("ERROR")

    # 後処理(正常時)
    conn.commit()
    cur.close()
    conn.close()
except Exception as e:
    # 後処理(例外時)
    traceback.print_exc()
    cur.close()
    conn.rollback()
    conn.close()

動作

今回は折角例外時の処理も書いたので、その動作も確認しましょう。

正常

(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ python beamMySQL.py --debug
DEBUG:root:===SETUP===
DEBUG:root:True
DEBUG:root:===INSERT===
DEBUG:root:count:1
DEBUG:root:count:1
DEBUG:root:count:2
DEBUG:root:count:2
DEBUG:root:===SELECT===
DEBUG:root:(1, 'hello')
DEBUG:root:(2, 'world')
DEBUG:root:(3, 'foo')
DEBUG:root:(4, 'bar')
DEBUG:root:(5, 'hoge')
DEBUG:root:(6, 'huga')
DEBUG:root:count:6
DEBUG:root:(3, 'foo')
DEBUG:root:count:1
DEBUG:root:===UPDATE===
DEBUG:root:count:2
DEBUG:root:===DELETE===
DEBUG:root:count:1
(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ 

ちゃんとデータが入りましたね、UPDATEやDELETEも想定通り動作しています。

異常時

90行目のコメントアウトを外します。

(beammysqlenv) ec2-user:~/environment/Python_Project/BeamMySQL $ python beamMySQL.py --debug
DEBUG:root:===SETUP===
DEBUG:root:True
DEBUG:root:===INSERT===
DEBUG:root:count:1
DEBUG:root:count:1
DEBUG:root:count:2
DEBUG:root:count:2
DEBUG:root:===SELECT===
DEBUG:root:(1, 'hello')
DEBUG:root:(2, 'world')
DEBUG:root:(3, 'foo')
DEBUG:root:(4, 'bar')
DEBUG:root:(5, 'hoge')
DEBUG:root:(6, 'huga')
DEBUG:root:count:6
DEBUG:root:(3, 'foo')
DEBUG:root:count:1
DEBUG:root:===UPDATE===
DEBUG:root:count:2
DEBUG:root:===DELETE===
DEBUG:root:count:1
Traceback (most recent call last):
  File "beamMySQL.py", line 90, in <module>
    raise ValueError("ERROR")
ValueError: ERROR

ちゃんとエラーがでましたね!テーブルは

できちゃってる&データがクリーンされてますね。
恐らくDROPとCREATETABLEはトランザクションでロールバックはできないのでしょう。

Please follow and like us:
タグ: ,

コメントを残す

メールアドレスが公開されることはありません。