It's often the case where data scientists want to explore a data set that doesn't fit into memory. As data continues to grow in size, so too, do our tools to work with these data need to expand. Motivation for this post originates from my interest in working with openSNP data. openSNP is an open-source platform where users upload their genotype data from direct-to-consumer genetics companies like 23andMe. Raw result files are often hundreds of thousandas of records long, each record representing a location in the human genome. There are over 4,000 raw result files in the openSNP data however, my laptop runs a VirtualBox with only 5GB RAM, which meant I needed to find some clever solutions beyond the standard pandas library in order to work with this much data. This post describes one of several applications that can benefit from out-of-core computing.

Download the openSNP data

openSNP has over 40 GB of genotype & phenotype data stored as text files!
For the purposes of this demo, the analysis will be limited to only the 23andMe files. Below are a few shell commands to get the raw data, create a directory and extract 23andMe files.

In [1]:
%%sh
wget https://opensnp.org/data/zip/opensnp_datadump.current.zip
unzip opensnp_datadump.current.zip

# Isolate the 23&me; files
mkdir gt23
mv opensnp_datadump.current/*.23andme.txt* gt23/

# how many 23&me; genotype files?
find gt23/ -type f | wc -l

Here's a look at an example 23andMe result file, the header is commented out with # then there are 600,000+ rows of genotype data.

raw data

Extract a reasonable test set

For this demo exercise, let's use a manageable subset of the full openSNP data.

In [2]:
%%sh
mkdir gt23_testset
cp gt23/user21* gt23_testset/
In [3]:
import glob
allFiles = glob.glob("gt23_testset/*.txt")
In [4]:
%%sh
du -h gt23_testset/
766M	gt23_testset/

Pure pandas

First, start with a pure pandas read_csv solution, something that should be familiar to Python data scientists. Let's try to create a large DataFrame in memory. join can accomplish this task, even though it's an expensive operation, the test data is small enough that we can successfully execute it.

In [5]:
import re
import pandas as pd
In [6]:
%%time
gtdf = pd.DataFrame(index=['rsid']) #empty DataFrame

for file_ in allFiles:
    #extract the userid
    match = re.search('user(\d+)', file_)
    userid = match.group(1)
    
    df = pd.read_csv(file_, 
                     comment='#', 
                     sep='\t', 
                     header=None,
                     usecols=[0,3],
                     error_bad_lines=False,
                     names=['rsid', userid],
                     dtype={'rsid':object,
                            'genotype':object},
                     index_col=['rsid']
                    )
    gtdf = gtdf.join(df, how='outer')
CPU times: user 7min 48s, sys: 9.83 s, total: 7min 58s
Wall time: 8min 15s
In [7]:
gtdf.dropna().head()
Out[7]:
2100 2106 210 2111 2116 2117 2118 2124 2126 2127 ... 2177 217 2182 2183 2184 2186 2189 2191 2192 2195
i1000009 G G G G G G G G G G ... G G G G G G G G -- G
i2000003 GG GG GG GG GG GG GG GG GG GG ... GG GG GG GG GG GG GG GG GG GG
i3000001 II II II II II II II II II II ... II II II II II II II II II II
i3000002 G G G G -- G G G -- G ... G G G G -- -- G G G G
i3000003 G G G G -- G G G -- G ... G G G G -- -- G G G G

5 rows × 43 columns

On the test set of 43 files, the entire DataFrame fits into memory. However, using this pure pandas method on the entire openSNP data set of 1,915 files would eventually crash the computer because it would run out of pyhsical memory.

Dask -- parallel out-of-core DataFrame

Enter dask, a parallel Python library that implements out-of-core DataFrames. It's API is similar to pandas, with a few additional methods and arguments. Dask creates a computation graph that will read the same files in parallel and create a "lazy" DataFrame that isn't executed until comptue() is explicitly called.

Dask Graph

In [8]:
import dask.dataframe as dd
In [9]:
%%time 
# won't actually read the csv's yet...
# similar read_csv() as pandas
ddf = dd.read_csv('gt23_testset/*.txt',
                  comment='#', 
                  sep='\t', 
                  header=None,
                  usecols=[0,3],
                  error_bad_lines=False,
                  names=['rsid', 'genotype'],
                  dtype={'rsid':object,
                         'genotype':object})
CPU times: user 112 ms, sys: 52 ms, total: 164 ms
Wall time: 298 ms
In [10]:
# Dask defaults to 1 partition per file
ddf
Out[10]:
dd.DataFrame
In [11]:
%%time
#  this operation is expensive
ddf = ddf.set_index('rsid')
CPU times: user 4min 44s, sys: 23 s, total: 5min 7s
Wall time: 4min 45s

The dask compute() method provides familiar results. Since we didn't join the dask DataFrame, let's investigate one record (or one SNP). Remember, in contrast to gtdf, ddf isn't in memory so it will take longer to get this result.

In [12]:
%%time
# dask DataFrame
ddf.loc['rs1333525']['genotype'].value_counts().compute()
CPU times: user 3min 31s, sys: 36.1 s, total: 4min 7s
Wall time: 3min 46s
Out[12]:
CC    37
CT     5
TT     1
Name: genotype, dtype: int64
In [13]:
%%time
# pure pandas DataFrame
gtdf.loc['rs1333525'].value_counts()
CPU times: user 4.3 s, sys: 8.1 s, total: 12.4 s
Wall time: 1min 1s
Out[13]:
CC    37
CT     5
TT     1
Name: rs1333525, dtype: int64

At first blush, the computation times are deceiving. As expected, the dask method took longer because of the lazy computation, it still had to read all the files and then perform the operation. However, when you consider the 8 minutes 15 seconds required to join gtdf DataFrame from cell 6 above in addtion to the 1 minute 1 second for gtdf.loc[rs13333525'].value_counts(), that's 9 minutes 16 seconds. In contrast, the dask method required 3 steps. First setting the computation graph (cell 9) was < 1 second. So the real comparison comes from the 4 minutes 45 seconds to set the index (cell 10) and 3 minutes 46 seconds to perform ddf.loc['rs1333525']['genotype'].value_counts().compute() (cell 12) for a grand total of 8 minutes 31 seconds. Seems like a marginal speedup on our test data of only 43 files. The real power of dask comes into play when you can't fit the DataFrame into memory.

Increase query peformance -- Convert .txt/.csv to Parquet!

It would be nice to speed up the dask queries so we can work with the DataFrame for downstream analysis in a reasonable amount of time. The solution is to store the raw text data on disk in an efficient binary format. A popular choice for this is traditionally HDF5, but I chose to use parquet becuase HDF5 can be tricky to work with. Dask uses the fastparquet implementation.

In [14]:
%%time
dd.to_parquet('gt23test_pq/', ddf)
CPU times: user 12min 35s, sys: 1min 41s, total: 14min 17s
Wall time: 19min 30s

Essentially, what we've done to this point is convert .txt files to parquet files. How much performance is really gained from this? Revisiting the dask DataFrame, ddf, remember that took 3 minutes and 46 seconds to compute value_counts():

In [15]:
# using dask with fastparquet, lazy, so nothing computed here...
pqdf = dd.read_parquet('gt23test_pq/', index='rsid')
In [16]:
%%time
pqdf.loc['rs1333525']['genotype'].value_counts().compute()
CPU times: user 8.18 s, sys: 608 ms, total: 8.79 s
Wall time: 11.7 s
Out[16]:
CC    37
CT     5
TT     1
Name: genotype, dtype: int64

Parquet offers a noticeable performance increase for querying data, even with this small validation set of 43 files. Scaling this up to 1,915 files, the parallel .txt version, ddf, took 5+ hours to execute value_counts() for one SNP. to_parquet on the 1,915 file DataFrame took a couple hours. Once you see the query performance improvements, it's well worth the up-front cost of converting from .txt or .csv to parquet.

query performance

Performance on the entire openSNP data set

I pre-computed the parquet files for the entire gt23 directory using the same commands above. Some malformed files needed to be cleaned and/or removed.

In [17]:
%%sh
find gt23_pq/ -type f | wc -l
1898
In [18]:
allpqdf = dd.read_parquet('gt23_pq/', index='rsid')
In [19]:
%%time
allpqdf.loc['rs1333525']['genotype'].value_counts().compute()
CPU times: user 6.67 s, sys: 112 ms, total: 6.78 s
Wall time: 7.2 s
Out[19]:
CC    1625
CT     277
TT      13
Name: genotype, dtype: int64

These results highlight the far superior performance of parquet over .csv or .txt. Additionally, dask proved it's value as an easy-to-use tool when physical memory is a constraint. By now you've probably noticed I wasn't able to assign the user identifier to each column. dd.read_csv() assumes the column names in each file are identical.

Implemenation on the entire openSNP dataset

The dask and parquet portion of the tutorial have concluded. I wanted to show how easy dask and parquet make it to quickly compare variant frequencies in the openSNP data to exAC data. Here we'll define a function to extract variant frequencies from exAC, then compare to openSNP frequencies.

In [20]:
from urllib.parse import urlparse
from urllib.parse import urlencode
from urllib.request import urlopen
import json
In [21]:
def ExacFrequency(rsid):
    try:
        exac = {}

        request = urlopen('http://exac.hms.harvard.edu/rest/dbsnp/{}'.format(rsid))
        results = request.read().decode('utf-8')
        exac_variants = json.loads(results)['variants_in_region']

        for alt_json in exac_variants:
            exac_allele = alt_json['ref']+alt_json['alt']
            exac_freq = alt_json['allele_freq']
            exac[exac_allele] = exac_freq
    except:
        return {'n/a':0}
    return exac
In [22]:
brca2snps = ['rs766173', 'rs144848', 'rs11571746', 'rs11571747',
             'rs4987047', 'rs11571833', 'rs1801426']
In [23]:
df = pd.DataFrame(columns=['rsid', 'gt', 'source', 'varFreq'])
In [24]:
for snp in brca2snps:
    Osnp_alleles = allpqdf.loc[snp]['genotype'].value_counts().compute() #dask
    
    Osnp_alleles = Osnp_alleles.to_dict()
    opensnp_N = sum(Osnp_alleles.values())
    
    Osnp_freqs = {alt: AC/float(opensnp_N) for alt,AC in Osnp_alleles.items()}
    exac_freqs= ExacFrequency(snp)
    
    for gt,freq in exac_freqs.items():
        df = df.append({'rsid':snp, 'gt': gt, 'source':'exAC', 'varFreq':freq }, ignore_index=True )
    
    for gt,freq in snp_freqs.items():
        if gt in exac_freqs:
            gt = gt
        elif gt[::-1] in exac_freqs:
            gt = gt[::-1]
        else:
            continue
        df = df.append({'rsid':snp, 'gt': gt, 'source':'Osnp', 'varFreq':freq }, ignore_index=True )
In [25]:
%pylab inline
import seaborn as sns
sns.barplot(x="rsid", y="varFreq", hue="source", data=df);
Populating the interactive namespace from numpy and matplotlib

I hope you find this post useful and think about using dask and parquet when working with structured text files. I really encourage you to check out the dask and parquet documentation and follow along on some of their tutorials. Thanks for reading, comments and code improvements welcome!

- Kevin Arvai


Comments

comments powered by Disqus