In [1]:
import struct
import array
In [2]:
def calculate_indexed_array_sizes(dmp_file):
array_sizes = array.array("I",(0,)*(0xFFFF))
n_gi = 0 #Number of processed gis
current_gi_idx = None
last_taxid = None
is_series_of_taxid = False
with open(dmp_file) as gi2taxid_fh:
for line in gi2taxid_fh:
n_gi += 1
#Read row
row = line.rstrip("\n").split("\t")
#Extract gi
gi = struct.unpack("HH",struct.pack("I", long(row[0])))
#Extract taxid
tax_id = row[1]
#if gi belongs to next level in index (high bytes of gi changed)
if gi[1] != current_gi_idx:
current_gi_idx = gi[1] #Change current index
last_taxid = None
if tax_id != last_taxid:
array_sizes[ gi[1] ] += 1
last_taxid = tax_id
return array_sizes, n_gi
In [3]:
def init_indexed_array(array_sizes):
taxid_array = [None] * (0xFFFF)
for idx,size in enumerate(array_sizes):
taxid_array[idx] = array.array( "B", (0,) * (5*(size) ) )
return taxid_array
In [17]:
def load_dmp_into_indexed_array(idxed_array, dmp_file):
current_gi_idx = None
last_taxid = None
is_series_of_taxid = False
with open(dmp_file) as gi2taxid_fh:
for line in gi2taxid_fh:
#Read row
row = line.rstrip("\n").split("\t")
#Extract gi
low_bytes_gi, hi_bytes_gi = struct.unpack("HH",struct.pack("I", long(row[0])))
#Extract taxid
tax_id = long(row[1])
#if gi belongs to next level in index (high bytes of gi changed)
if hi_bytes_gi != current_gi_idx:
current_gi_idx = hi_bytes_gi #Change current index
pos_to_fill = 0
last_taxid = None
if tax_id != last_taxid:
#Split taxid in three bytes (discard the highest byte who is always 0)
taxid_bytes= struct.unpack("=BBBB",struct.pack("=I",tax_id))[:3]
#Split low bytes of gi into one byte components
low_gi = struct.unpack("=BB",struct.pack("=H",low_bytes_gi))
#5 byte to store is (gi_1,gi_2, taxid_1,taxid_2,taxid_3)
bytes_to_store = array.array("B",low_gi+taxid_bytes)
#Store 5byte code into array
idxed_array[hi_bytes_gi][pos_to_fill:pos_to_fill+5] = bytes_to_store
#Update variables for next iteration
pos_to_fill += 5
last_taxid = tax_id
In [5]:
#def build_indexed_array(dmp_file):
array_sizes,n_gi = calculate_indexed_array_sizes("gi_taxid_nucl.dmp")
taxid_array = init_indexed_array(array_sizes)
load_dmp_into_indexed_array(taxid_array, "gi_taxid_nucl.dmp")
print n_gi
In [2]:
def get_taxid_for_gi(gi, taxid_array,array_sizes):
low_bytes_gi, hi_bytes_gi = struct.unpack("HH",struct.pack("I", long(gi)))
last_gi = (array_sizes[hi_bytes_gi]-1) * 5
if len(taxid_array[hi_bytes_gi]) == 0 or low_bytes_gi < decode_gi( taxid_array[hi_bytes_gi][0:2]):
print "GI does not have an assigned taxid"
elif low_bytes_gi > decode_gi(taxid_array[hi_bytes_gi][last_gi:last_gi+2]):
return decode_taxid(taxid_array[hi_bytes_gi][last_gi+2:last_gi+5])
else:
return bytearray_binsearch(low_bytes_gi,taxid_array[hi_bytes_gi])
In [13]:
def bytearray_binsearch(gi,byte_array):
first = 0
last = (len(byte_array) / 5 ) - 1
found = False
assert len(byte_array) % 5 == 0
while not found and first <= last:
middle_pos = (last + first) / 2 #Integer division, always floor(x/y)
middle_gi = decode_gi(byte_array[(middle_pos*5):(middle_pos*5)+2])
if middle_gi == gi:
found = True
taxid = decode_taxid(byte_array[(middle_pos*5)+2:(middle_pos*5)+5])
elif middle_gi < gi:
first = middle_pos + 1
else: #middle_gi > gi
last = middle_pos - 1
if not found:
#If not found, last item will always be the closest smaller number in ther
# array, and following our assumptions, if the number is not present,
# it is because it is a series of repeated tax id for the consequent numbers
# until the next change. Therefore, its tax id must be the last position checked
taxid = decode_taxid(byte_array[(last*5)+2:(last*5)+5])
return taxid
In [4]:
def decode_gi_taxid( byte_array ):
gi =struct.unpack("=H",struct.pack("=BB",*byte_array[0:2]))[0]
taxid = struct.unpack("=I",struct.pack("=BBBB",byte_array[2],byte_array[3],byte_array[4],0))[0]
return gi,taxid
In [5]:
def decode_gi( byte_array ):
gi =struct.unpack("=H",struct.pack("=BB",*byte_array))[0]
return gi
def decode_taxid( byte_array ):
taxid = struct.unpack("=I",struct.pack("=BBBB",byte_array[0],byte_array[1],byte_array[2],0))[0]
return taxid
In [ ]:
gis_fh = open("gis")
out_fh = open("out.dmp","w")
n = 0
for gi in gis_fh:
out_fh.write("{}\t{}\n".format(gi.rstrip("\n"), get_taxid_for_gi(gi.rstrip("\n") ,taxid_array,array_sizes )))
n+=1
if n % 10000000 == 0:
print n
In [142]:
out_fh.close()
In [158]:
index_fh = open("nucl.ids","wb")
for x in array_sizes:
index_fh.write(struct.pack("=H",x))
index_fh.close()
In [162]:
index_fh = open("nucl.idx","wb")
for x in taxid_array:
index_fh.write(x)
index_fh.close()
In [6]:
def load_array_sizes(sizes_file):
size_idx = open(sizes_file,"rb")
byte_sizes = bytearray(size_idx.read())
loaded_sizes = array.array("I",(0,)*(0xFFFF))
for x in range(0xFFFF):
loaded_sizes[x] = struct.unpack("=H",byte_sizes[(2*x):(2*x)+2])[0]
return loaded_sizes
In [10]:
def load_taxids_from_index(array_index_file,array_sizes):
taxid_array = [None] * (0xFFFF)
taxid_array_fh = open(array_index_file,"rb")
for idx,size in enumerate(array_sizes):
taxid_array[idx] = array.array( "B" , taxid_array_fh.read(size*5))
return taxid_array
In [11]:
loaded_sizes = load_array_sizes("nucl.ids")
loaded_array = load_taxids_from_index("nucl.idx",loaded_sizes)
In [14]:
get_taxid_for_gi(2,loaded_array,loaded_sizes)
Out[14]:
In [20]:
def test_dump(gi_list_file, out_file,indexed_array,array_sizes):
gis_fh = open(gi_list_file)
out_fh = open(out_file,"w")
n = 0
for gi in gis_fh:
out_fh.write("{}\t{}\n".format(gi.rstrip("\n"), get_taxid_for_gi(gi.rstrip("\n") ,indexed_array,array_sizes )))
n+=1
if n % 10000000 == 0:
print n
out_fh.close()
In [22]:
import time
start_time = time.time()
test_dump("gis","loaded_idx_nucl.dmp",loaded_array,loaded_sizes)
elapsed_time = time.time() - start_time
In [23]:
elapsed_time
Out[23]:
In [ ]: