In [1]:
import shellOneLiner

def head(iter, n):
        x = iter.__iter__()
        i = 0
        while i < n :
                print x.next()
                i = i + 1

class list2iter:
        
        def __init__(self, list):
                self.list = list
        
        def __iter__(self):
                return self
        
        def next(self):
                if   len(self.list) > 1:
                        head = self.list[0]
                        self.list = self.list[1:]
                        return head
                elif len(self.list) == 1:
                        head = self.list[0]
                        self.list = []
                        return head
                elif len(self.list) < 1:
                        raise StopIteration('end of list')

shellOneLiner モジュールの紹介

Python を通じて大量のデータを扱う場合には、 Unix コマンドを利用する事で素早く処理を行う事が出来る場合がある。

shellOneLiner モジュールは、 Python コード中からシェルのワンライナーを呼び出し、 Python から Unix コマンドへのデータの受け渡し、ファイルからのデータの読み出し、 Unix コマンドによるデータの処理、 Python へのデータの受け渡し、を行う事ができる。

shellOneLiner モジュールと usp Tukubai コマンド(以下 Tukubai コマンド)を合せて使用する事で、大量のデータを効率的に処理する事が可能である。 usp Tukubai コマンドにより、ファイルシステムを SQL データベースのように使用する事も可能である。

本稿では shellOneLiner モジュールと usp Tukubai コマンドの使用例を簡単に紹介する。

shellOneLiner モジュールの概要

shellOneLiner モジュールは以下のように動作する。 shellOneLiner オブジェクトのインスタンスを作成すると、データ処理の為の Unix コマンド群が起動され、必要であれば Python 処理系からデータの受け渡しをするスレッドが起動される。 shellOneLiner オブジェクトはイテレータ型オブジェクトとして振る舞い、 for 文等を使用して処理の結果を読み出す事が出来る。

+--------------------+               +----------------+
|                    ==[Input Data]==>                |
| Python Interpreter |               | Unix Processes |
|                    <=[Output Data]=>                |
|+------------------+|               +---^------------+
||shellOneLiner     ||                  /
||module            ---(Dispatch)------/
|+------------------+|
+--------------------+

shellOneLiner モジュールの基本的な使用方法

shellOneLiner モジュールのクラス shellOneLiner のインスタンス生成時に任意のシェルスクリプトを文字列として受け取り、実行する[*1]。シェルスクリプトからの出力は、イテレータ型のオブジェクトとして返される。

<出力オブジェクト> = shellOneLiner.ShellOneLiner(<シェルスクリプト>)

[*1] つまり、 shellOneLiner モジュールは直接シェルコマンドを起動する。セキュリティ上の問題が発生する可能性があるので、使用の際には細心の注意が必要である。


In [2]:
ol = shellOneLiner.ShellOneLiner('echo Hello; LANG=C date; cat datafile')
head(ol,5)


['Hello']
['Mon', 'Oct', '19', '09:00:15', 'JST', '2015']
['20150101000005', 'A', '41']
['20150101000006', 'A', '60']
['20150101000006', 'B', '59']

インスタンス生成時に input オプションにイテレータ型オブジェクトを設定する事で、シェルスクリプトの標準入力に対する入力を設定する事ができる。shellOneLiner のインスタンス、および input オプションで指定されるオブジェクトは、デフォルトでは配列のイテレータ型である[*2]。

<出力オブジェクト> = shellOneLiner.ShellOneLiner(<シェルスクリプト>, input=<入力オブジェクト>)

[*2] シェルスクリプトへの入力、またシェルスクリプトからの出力をどのように Python オブジェクトに解釈するかは、インスタンスについては reader オプション、input オプションで指定されるオブジェクトについては writer オプションで解釈を行なう関数を指定する事で変更可能である。


In [3]:
l = map((lambda n: ['%s' % str(n)]),range(80,100))
print l
di = list2iter(l)
ol = shellOneLiner.ShellOneLiner('echo Hello; LANG=C date; head', input=di)
head(ol,5)


[['80'], ['81'], ['82'], ['83'], ['84'], ['85'], ['86'], ['87'], ['88'], ['89'], ['90'], ['91'], ['92'], ['93'], ['94'], ['95'], ['96'], ['97'], ['98'], ['99']]
['Hello']
['Mon', 'Oct', '19', '09:00:16', 'JST', '2015']
['80']
['81']
['82']

データ

以下の例ではファイルに記録された、以下のような時系列データを処理対象とする。

20150101000004 A 1
20150101000007 A 10
20150101000008 A 70
20150101000009 A 85
20150101000010 A 69
20150101000012 A 2
...

データは1行1レコードで、1レコードはスペースで区切られた複数のフィールドから成る。それぞれのフィールドの内容は以下のとおりである。

  • 第一フィールド: データの到着時刻。 YYYYMMDDHHMMSS 形式。
  • 第二フィールド: データ観測地点。文字列。
  • 第ニフィールド: データ。 0 ~ 100 の整数値。

このようなデータが、データ観測地点 A 、 B 、 C のそれぞれについて、 set_A 、 set_B 、 set_C の三つのファイルに記録されている。 また、三つの地点の観測データがマージされたデータが、ファイル set に記録されているとする。 レコードは第一フィールドの、データ到着時刻、及び第二フィールドのデータ観測地点にてソートされている。

データの統合と抽出

様々なデータを扱う場合、まず一つのデータにまとめる事でその後の処理の見通しが良くなる事がある。また、必要なデータのみを抽出することで、後段の計算量を削減する事ができる。 以下ではいくつかのデータをまとめたり、特定の条件で抽出する際の処理の例を示す。

データのマージ

まず、複数の時系列データ set_A 、 set_B 、 set_C を、時系列順にマージする例を示す。


In [4]:
ol = shellOneLiner.ShellOneLiner('dmerge key=1 set_A set_B set_C')
head(ol, 3)


['20150101000005', 'A', '41']
['20150101000006', 'A', '60']
['20150101000006', 'B', '59']

ここでは、 Tukubai コマンド、 dmerge を使用している。

dmerge コマンドは、入力データの指定されたフィールドをソート済みキーとして、入力データのマージを行うコマンドである[*1]。第一引数 'key=1' により、第一フィールドをキーとしてマージを行うよう指定している。続く引数、 set_A 、 set_B 、 set_C はそれぞれ入力データの格納されたファイルである。読み出されたデータは shellOneLiner モジュールにより行指向スペース区切りデータとして解釈され、1行毎にpythonのリストオブジェクトに変換される。

[*1] 入力データはキーによりソートされている必要がある。

データの抽出(1)

仮にマージされた状態でデータを受け取った場合に特定の地点のデータのみを取り出す事も用意である。上記のマージされたファイルから、地点 A のレコードのみを取り出す例を示す。


In [5]:
ol = shellOneLiner.ShellOneLiner('selr 2 A set')
head(ol, 3)


['20150101000005', 'A', '41']
['20150101000006', 'A', '60']
['20150101000010', 'A', '54']

ここでは、 Tukubai コマンド、 selr を使用している。

selr コマンドは、入力データの指定されたフィールドが指定された文字列であるデータを抽出して出力するコマンドである。第一引数、 1' 及び第二第二引数A' により、第一フィールドが `A' である行を抜き出すよう指定している。入力データは、標準入力か、第三引数で指定されるファイルから渡される。

データの抽出(2)

より複雑なデータの抽出も可能である。以下では、 2015 年 1 月 2 日に到着したレコードのみを取り出す例を示す。


In [6]:
ol = shellOneLiner.ShellOneLiner('grep \'^20150102\' set')
head(ol, 3)


['20150102000000', 'B', '55']
['20150102000001', 'A', '2']
['20150102000001', 'C', '31']

ここで用いているのは、 Unix 標準コマンド grep である。

「20150102」で始まる行を抜き出す事で、特定の日に到着したレコードを抜き出す事ができる。

次に、到着したレコードのうち、深夜0時に到着したデータのみを取り出す例を示す。

まず、元のデータに時刻のフィールドを追加する例を示す。


In [7]:
ol = shellOneLiner.ShellOneLiner('self 0 1.9.2 set')
head(ol, 3)


['20150101000005', 'A', '41', '00']
['20150101000006', 'A', '60', '00']
['20150101000006', 'B', '59', '00']

ここでは、 Tukubai コマンド self を使用している。

self コマンドの名前は select field の略であり、入力データからフィールドの選択、及び部分文字列の取り出しを行うコマンドである。 self コマンドの第三引数は入力ファイルである。 第一引数「0」は、入力データの一レコード全てを出力するという意味である。 第二引数、 「1.9.2」は、入力データの第一フィールドの部分文字列、九文字目から二文字を出力するという意味である。 self コマンドの適用により、元のデータは以下の第四フィールドを付加したフォーマットに加工される。

  • 第四フィールド: データの到着時刻。 HH 形式。

これに、先ほど説明した selr コマンドを適用して深夜0時に到着したデータを抽出する。


In [8]:
ol = shellOneLiner.ShellOneLiner('self 0 1.9.2 set | selr 4 00 | delf 4')
head(ol, 3)


['20150101000005', 'A', '41']
['20150101000006', 'A', '60']
['20150101000006', 'B', '59']

ここでは、 Tukubai コマンド delf を使用している。

delf コマンドにより、第四フィールドを削除している。 delf コマンドの名前は、 delete field の略であり、入力データから指定されたフィールドの削除を行なうコマンドである。 ここでは第一引数で「4」を指定しており、標準入力から読み込んだデータの第4フィールドを削除している[*1]。

[*1] 同様の記述方法により、先ほどの grep を用いた、特定の日のレコードを抽出する処理も記述する事ができる。逆に grep の正規表現を駆使して、特定の時刻に到着したレコードを抽出する処理を記述する事も可能である。しかし複雑な正規表現は解読が難しくなりがちであり、処理性能の低下にも繋がる。固定文字列の検索には、 self と selr の組み合わせが望ましい。

データの抽出(3)

さらに複雑な例として、以下では到着データの値が80以上であるレコードを取り出す例を示す。


In [9]:
ol = shellOneLiner.ShellOneLiner('awk \'$3>80\' set')
head(ol, 3)


['20150101000010', 'C', '92']
['20150101000012', 'C', '99']
['20150101000016', 'C', '99']

ここでは、 Unix 標準コマンド awk を用いている。 第三フィールドを比較するパターンを記述する事で、到着データの値が80以上であるレコードを取り出している。

データの抽出(4)

こういった抽出方法を組み合わせる事で、複雑な抽出条件を簡潔に記述する事が出来る。

以下では、 A 地点に深夜0時に到着したレコードのうち、到着データが 40 未満であるレコードを抽出する例を示す。


In [10]:
ol = shellOneLiner.ShellOneLiner('selr 2 A set | self 0 1.9.2 | selr 4 00 | delf 4 | awk \'$3<40\' ')
head(ol, 3)


['20150101000011', 'A', '5']
['20150101000017', 'A', '30']
['20150101000029', 'A', '28']

まず最初の selr コマンドにより、 A 地点に到着したレコードのみを抽出している。続く2段目〜4段目のコマンドにて、深夜0時に到着したレコードを抜き出している。最後に5段目のコマンドにて到着データが40以下であるデータを抽出している。

このように複雑な抽出条件であっても、抽出方法を組み合わせる事で簡潔に記述する事ができる。

データの付加

時系列データになにがしかの情報を付加したい場合もあるだろう。以下では入力データに時間帯に応じた情報を付加して読み出す例を示す。

処理に先立ち、以下のようなファイル weight を用意する。

00 MidNight     1
01 MidNight     1
02 MidNight     1
03 MidNight     1
04 EarlyMorning 2
05 EarlyMorning 2
06 EarlyMorning 2
07 Morning      3
08 Morning      3
09 Morning      3
10 MidDay       5
11 MidDay       5
12 MidDay       5
13 MidDay       5
14 MidDay       5
15 MidDay       5
16 Evening      3
17 Evening      3
18 Evening      3
19 Night        2
20 Night        2
21 Night        2
22 MidNight     1
23 MidNight     1

それぞれのフィールドの内容は以下のとおりである。

  • 第一フィールド: 時間帯。 HH 形式。
  • 第二フィールド: 時間帯名。文字列。
  • 第ニフィールド: 重み。 0 ~ 9 の整数値。

このファイルの内容を、ファイル set_A のデータに付加する。

まず、先ほどと同じように、 self の部分文字列参照機能を用いて元のデータに以下の第四フィールドを付加したフォーマットに加工する。

  • 第四フィールド: データの到着時間。 HH 形式。

この第四フィールドと、先ほどのファイル Weight の第一フィールドを付き合わせて、情報を付加する。


In [11]:
ol = shellOneLiner.ShellOneLiner('self 0 1.9.2 set_A | cjoin2 key=4 weight')
head(ol, 3)


['20150101000005', 'A', '41', '00', 'MidNight', '1']
['20150101000006', 'A', '60', '00', 'MidNight', '1']
['20150101000010', 'A', '54', '00', 'MidNight', '1']

ここでは、新たな Tukubai コマンド cjoin2 を使用している。

cjoin2 コマンドは、指定されたマスタと入力データを、指定されたフィールドをキーとして突き合わせるコマンドである[*1]。第一引数``key=4''により、入力データの第四フィールドをキーとして、第二引数で指定されるファイル weight と入力データを突き合わせる。

[*1] 入力データはキーによりソートされている必要がある。

データの集約

大量のデータをそのまま Python で扱うのは困難が伴う事が多い。データを読み出す前に、コマンドを用いて基本的なデータの集約を行う事で、軽快に分析を行う事が出来る。

データ到着数の計数

以下では、それぞれの日の到着データ件数を数える例を示す。


In [12]:
ol = shellOneLiner.ShellOneLiner('self 1.1.8 2 set_A | count key=1@2')
head(ol, 3)


['20150101', 'A', '21827']
['20150102', 'A', '21558']
['20150103', 'A', '21567']

ここでは、新たな Tukubai コマンド count を使用している。

self コマンドにより、元のデータは以下のようなフォーマットに加工される。

  • 第一フィールド: データの到着日付。 YYYYMMDD 形式。
  • 第二フィールド: データ観測地点。文字列。

count コマンドは、入力データの指定されたフィールドをキーとして、その件数を数えるコマンドである[*1]。この例では、 self コマンドからの出力を入力データとしている。第一引数により、第一フィールド、及び第二フィールドをキーとして件数を数える。

[*1] 入力データはキーによりソートされている必要がある。

最大値の発見

以下では、それぞれの日の到着データのうち、最大のパラメータのデータを選択する例を示す。


In [13]:
ol = shellOneLiner.ShellOneLiner('self 1.1.8 2 3 set_A | psort ref=1@2 key=3n | getlast key=1@2')
head(ol, 3)


['20150101', 'A', '100']
['20150102', 'A', '100']
['20150103', 'A', '100']

ここでは、新たな Tukubai コマンド psort および getlast を使用している。

self コマンドにより、元のデータは以下のようなフォーマットに加工される。

  • 第一フィールド: データの到着日付。 YYYYMMDD 形式。
  • 第二フィールド: データ観測地点。文字列。
  • 第ニフィールド: データ。 0 ~ 100 の整数値。

psort コマンドは、入力データの指定されたフィールドをソート済みキーとし、別の指定されたフィールドをソート対象フィールドとして、ソートを行うコマンドである。ここでは、第一フィールド、第二フィールドをソート済みキーとして、第三フィールドを数値として昇順ソートを行っている。

getlast コマンドは、入力データの指定されたフィールドをキーとし、最後のデータを取り出すコマンドである。ここでは、第一フィールド、第二フィールドをキーとして、最後のレコードを取り出している。

データの合計

以下では、それぞれの日の到着データの合計を算出する例を示す。


In [14]:
ol = shellOneLiner.ShellOneLiner('self 1.1.8 2 3 set_A | sm2 key=1/2 val=3')
head(ol, 3)


['20150101', 'A', '1088947']
['20150102', 'A', '1073157']
['20150103', 'A', '1079436']

ここでは、新たな Tukubai コマンド sm2 を使用している。

sm2 コマンドは、入力データの指定されたフィールドをソート済みキーとして、指定されたフィールドの合計を算出するコマンドである。ここでは第一フィールドから第二フィールドをキーとして、第三フィールドの値を合計している。

情報の付加と集約

shellOneLiner モジュールと Tukubai コマンドを組み合わせて、より複雑な処理を行う事も出来る。以下では時間帯毎の重みをつけた、日毎のパラメータの合計を算出する例を示す。

時間帯毎の重みには、``データの付加''の節で用いたファイル weight を用いる。


In [15]:
ol = shellOneLiner.ShellOneLiner(
    'self 1.1.8 2 3 1.9.2 set_A | cjoin2 key=4 weight | lcalc \'$1, $2, $3 * $6\' | sm2 key=1/2 val=3')
head(ol, 3)


['20150101', 'A', '2994687']
['20150102', 'A', '2967972']
['20150103', 'A', '2989401']

ここでは、新たな Tukubai コマンド lcalc を使用している。

lcalc コマンドは入力データについて指定された演算を行うコマンドである。ここでは,第一フィールドと第二フィールドを出力データの第一フィールドと第二フィールドとし、第三フィールドと第六フィールドの積を出力データの第三フィールドとしている。

lcalc コマンドの出力を先ほど使用した sm2 コマンドを使用して合計する事で、日毎の重み付き平均を算出している。

Python 上のデータを使用した処理

ここまででは、 Tukubai コマンドを使用してファイルを読み込み、処理をして、 Python 上のデータとして利用する例を見て来た。 shellOnerLiner モジュールは、 Python 上のイテレータ型データを入力として Unix コマンドにて処理をする用途でも使用する事ができる。

以下では、特定の範囲のパラメータをとるレコードを抜き出す例を示す。パラメータの範囲の指定は、 Python 上のデータにより指定される。まず、パラメータ範囲を指定する為のデータを作成する。


In [16]:
l = map((lambda n: ['%s' % str(n)]),range(80,100))
print l


[['80'], ['81'], ['82'], ['83'], ['84'], ['85'], ['86'], ['87'], ['88'], ['89'], ['90'], ['91'], ['92'], ['93'], ['94'], ['95'], ['96'], ['97'], ['98'], ['99']]

shellOneLiner モジュールはイテレータ型の Python データを入力として受け取る。


In [17]:
di = list2iter(l)
ol = shellOneLiner.ShellOneLiner(
    'cjoin0 key=3 - set_A',
    input=di )
head(ol,3)


['20150101000019', 'A', '89']
['20150101000050', 'A', '98']
['20150101000102', 'A', '96']

shellOneLiner モジュールは、入力として指定された Python データを解釈し変換して、コマンドの標準入力に入力する[*1]。

ここでは、新たな Tukubai コマンド cjoin0 を使用している。

cjoin0 コマンドは、指定されたマスタと入力データを、指定されたフィールドをキーとして突き合わせるコマンドである[*2]。第一引数key=3''により、入力データの1第三フィールドをキーとして、第二引数で指定されるファイルと入力データを突き合わせる。ここではハイフン-'' が指定されているので、標準入力からデータを読み込む。

[*1] 解釈の仕方は、 ShellOneLiner コンストラクタの writer 引数に適切な python 関数を渡してやる事により、変更可能である。

[*2] 入力データはキーによりソートされている必要がある。

まとめ

ビッグデータを処理する場合には Python のみでは荷が重い場合がある。 shellOneLiner モジュールを用いて、 Tukubai コマンドと組み合わせる事で、処理時間を短縮する事ができる。