Preprocessing Notebook

Author: James Foster, jmfoster@gmail.com

Install gomill: http://mjw.woodcraft.me.uk/gomill/doc/0.7.4/install.html


In [1]:
import numpy as np
import pandas as pd
from gomill import sgf
from gomill import ascii_boards
from gomill import sgf_moves
from IPython.core.debugger import Tracer

In [2]:
def sgf_filename_to_game(game_filename):
    """
    Read in sgf game file and convert to gomill Game object
    """
    with open(game_filename, 'r') as myfile:
        game_string=myfile.read() #.replace('\n', '')
    g = sgf.Sgf_game.from_string(game_string)
    return g

In [3]:
def game_to_string(game):
    """
    Print info about Game object
    """
    print g.get_winner()
    print g.get_size()
    print g.get_root().get_raw('BR')
    print
    for node in g.get_main_sequence():
        print node

In [4]:
def show_sgf_file(sgf_game, move_number=None):
    """
    Show the position from an SGF file. If a move number is specified, the position
    before that move is shown (this is to match the behaviour of GTP loadsgf).
    """
    try:
        board, plays = sgf_moves.get_setup_and_moves(sgf_game)
    except ValueError, e:
        raise StandardError(str(e))
    if move_number is not None:
        move_number = max(0, move_number-1)
        plays = plays[:move_number]

    for colour, move in plays:
        if move is None:
            continue
        row, col = move
        try:
            board.play(row, col, colour)
        except ValueError:
            raise StandardError("illegal move in sgf file")

    print ascii_boards.render_board(board)
    print

In [5]:
def game_to_board(game, move_number=None):
    """
    Convert gomill Game object to Board object. If move number is 
    specified, the position before that move is shown (this is to 
    match the behaviour of GTP loadsgf).
    """
    if move_number<1:
        raise ValueError('Game undefined for move_number < 1')
    
    try:
        board, plays = sgf_moves.get_setup_and_moves(game)
    except ValueError, e:
        raise StandardError(str(e))
        
    if move_number is not None:
        move_number = max(0, move_number-1)
        if move_number==0:  # Special case for first move of the game
            turn, _ = plays[0]
        plays = plays[:move_number]
    
    swap_dict = {'w':'b', 'b':'w'}
    for colour, move in plays:
        if move is None:
            continue
        row, col = move
        try:
            board.play(row, col, colour)
        except ValueError:
            raise StandardError("illegal move in sgf file")    
        turn = swap_dict[colour]

    if move_number is None or move_number > len(plays):  # Game is over, it's neither player's turn
        turn = None
    return (board, turn)

In [6]:
def game_move_to_board(game, move_number):
    """
    Convert gomill Game object to Board object that includes only the specified move. 
    The position before the specified move is shown (this is to match the behaviour of GTP loadsgf).
    """

    try:
        board, plays = sgf_moves.get_setup_and_moves(game)
    except ValueError, e:
        raise StandardError(str(e))
        
    if move_number is not None:
        move_number = max(0, move_number-2)
        play = plays[move_number]
        
    colour, move = play # Unpack tuple
    row, col = move # Unpack tuple
    try:
        board.play(row, col, colour)
    except ValueError:
        raise StandardError("illegal move in sgf file")    

    turn = colour
    return (board, turn)

In [7]:
def board_to_array(board, dimension=1, turn=None, white=-1, black=1):
    """
    Convert gomill Board object to numpy 1D array (default) or 2D matrix. 
    If turn is None, use default values for white and black stones (default is white=-1, black=1).
    Else, convert stones to perspective of player whose turn it is: 1 is my stone, -1 is your stone.
    """
    size = board.side
    if dimension==1:
        array = np.zeros(size*size, dtype=np.int8)  # Initialize numpy 1D array of zeros
    elif dimension==2:
        array = np.zeros((size,size), dtype=np.int8)  # Initialize numpy 2D array of zeros
    else: 
        raise ValueError('Invalid number of dimensions specified: ', dimension)
        
    points = board.board_points
    for row, col in points:
        colour = board.board[row][col]
        if turn:  # Alternate perspectivers according to whose turn it is
            if colour:
                value = (colour==turn)*2-1  # value is 1 for player whose turn it is, -1 for other player
            else: # Point was played but was captured, is now empty?
                value = 0
        else: # turn is none, don't alternate perspectives according to turn
            if colour=='w':
                value = white
            elif colour=='b':
                value = black
            else: # Point was played but was captured, is now empty?
                value = 0
        row = size-row-1 # Convert Board row index (which starts at bottom of board) into matrix row index (which starts at top)
        
        if dimension==1:
            array[row*size+col] = value
        elif dimension==2:
            array[row,col] = value
        else:
             raise ValueError('Invalid number of dimensions specified: ', dimension)
        
    return array

In [8]:
# Test Representation conversions
def test_representation():
    game_filename = './Game_Files/9x9/Go_Seigen/1968-08-00.sgf'
    g = sgf_filename_to_game(game_filename)
    
    move = 4
    print show_sgf_file(g,move)
    b, turn = game_to_board(g, move)
    print ascii_boards.render_board(b)
    matrix = board_to_array(b, dimension=2, turn=turn)
    print
    print matrix
    matrix = board_to_array(b, dimension=2, turn=None)
    print
    print matrix
    print
    print board_to_array(b, dimension=1, turn=turn)
    print board_to_array(b, dimension=1, turn=None)
    print matrix.flatten(order='C')
    print board_to_array(b, dimension=1)
    assert (matrix.flatten(order='C') == board_to_array(b,1)).all()
    
#test_representation()

In [9]:
def test_game_move_to_board():
    game_file = './Game_Files/9x9/Go_Seigen/1968-08-00.sgf'
    g = sgf_filename_to_game(game_file)

    move = 5
    b, turn = game_to_board(g, move)
    print ascii_boards.render_board(b)
    matrix = board_to_array(b, dimension=2)
    print
    print matrix


    b, turn = game_to_board(g, move+1)
    print ascii_boards.render_board(b)
    matrix = board_to_array(b, dimension=2)
    print
    print matrix
    print


    b, turn = game_move_to_board(g, move+1)
    print ascii_boards.render_board(b)
    print
    matrix = board_to_array(b, dimension=2, turn=turn)
    print matrix
    print
    vector = board_to_array(b, dimension=1, turn=turn)
    print vector
    print turn
    
#test_game_move_to_board()

In [10]:
import os
from fnmatch import fnmatch

def directory_to_data_files(root_dir, output_filename, size, print_progress=False):
    """
    Load and convert all .sgf files from a root directory into text file of data vectors
    """
    pattern = "*.sgf"
    sgf_files = []
    for path, subdirs, files in os.walk(root_dir):
        for name in files:
            if fnmatch(name, pattern):
                sgf_files.append(os.path.join(path, name))
    n_files = len(sgf_files)
    
    # Open data file for writing
    vectors = open(output_filename, 'wb')

    # Convert sgf files to numerical array data files
    for i, sgf_file in enumerate(sgf_files):
        try:
            game = sgf_filename_to_game(sgf_file)
            if print_progress:
                print str(i+1)+"/"+str(n_files), 'Processing file:', sgf_file
        except ValueError as ve: 
            print 'Exception:',str(ve)+'.','File "'+sgf_file+'"', 'is likely malformed.'
        for move in range(1,len(game.get_main_sequence())):
            try:
                # Create current move vector
                board, turn = game_to_board(game, move)
                vector = board_to_array(board, dimension=1, turn=turn)
                # Create next move vector
                next_move_board, turn = game_move_to_board(game, move+1)  # Get board containing only the move after the current move
                next_move_vector = board_to_array(next_move_board, dimension=1, turn=turn)
                # Create winner, 1 means current play won, -1 means other player one
                winner = np.int8((game.get_winner()==turn)*2-1)
                
                if len(vector)!=size or len(next_move_vector)!=size:
                    msg = 'Board size is '+str(len(vector))+'. Expected size is '+str(size)
                    raise SizeException(msg)
                
                # Write data arrays to files 
                np.savetxt(vectors, winner[None], fmt='%i', newline=';')
                np.savetxt(vectors, vector[None], fmt='%i', newline=';') 
                np.savetxt(vectors, next_move_vector[None], fmt='%i') 
            except TypeError as te: 
                print 'Exception:',str(te)+'.','File "'+sgf_file+'"', 'is likely malformed.'
            except ValueError as ve: 
                print 'Exception:',str(ve)+'.','File "'+sgf_file+'"', 'is likely malformed.'
            except IndexError as ie: 
                print 'Exception:',str(ie)+'.','File "'+sgf_file+'"', 'is likely malformed.'
            except Exception as e:
                print 'Exception:',str(e)+'.','File "'+sgf_file+'"', 'is likely malformed.'


    vectors.close() 

    
class SizeException(Exception):
    pass

In [11]:
def parse_line(line):
    """
    Parse line string into winner, vector, and next_move_vector
    """
    #line = line.rstrip()  # Remove '\n' at end of line
    line = line.split(';')  # Split line into winner, vector, and next_move_vector
    winner = np.int8(line[0])
    vector = np.fromstring(line[1], dtype='int8', sep=' ')
    next_move_vector = np.fromstring(line[2], dtype='int8', sep=' ')
    return winner, vector, next_move_vector

In [12]:
from random_sampler3 import random_sampler

def sample_data(data_filename, k):
    """
    Randomly sample k lines from file, parse them.
    Return lists of winners, vectors, and next_move_vectors
    """
    lines = random_sampler(filename=data_filename, k=k)
    state_tuples = map(parse_line, lines)  # Apply parse_lines() to each sampled line
    state_lists = map(list, zip(*state_tuples))  # Unzip list of state_tuples into aligned-index list of winners, vectors, next_move_vectors
    winners = state_lists[0]
    vectors = state_lists[1]
    next_move_vectors = state_lists[2]
    return winners, vectors, next_move_vectors

In [40]:
def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

In [41]:
# Main method for running from command line

if __name__ == "__main__":
    print 'main method executed'
    
    # Convert and save data to file
    root_dir = './Game_Files/9x9'
    output_filename = './Data/data_9x9.txt'
    #directory_to_data_files(root_dir, output_filename, size=81, print_progress=True)
    
    # Load data from file
    data_filename = './Data/data_9x9.txt'
    max_examples = file_len(data_filename)
    k = max_examples  # Number of training examples to randomly sample from data file (note: repeated sampling could give repeat examples)
    k = min(k, max_examples)  # Don't try to sample more examples than rows in the data file

    winners, vectors, next_move_vectors = sample_data(data_filename, k=k)
    X = np.array(vectors)  # Convert list of vectors into 2D array X
    Y = np.array(next_move_vectors)  # Convert list of next_move_vectors into 2D array Y
    winners = np.array(winners)  # Convert list of winners into 1D array winners


main method executed

In [ ]: