Tuesday, September 19, 2017

Elasticsearch and PubMed: Step 3, Diving in the Data

In the last two blog posts, I explained how to get a local copy of all 30 million PubMed publications and how to parse this data set into an elasticsearch database. In this blog post, I will describe a small project which leverages this data set.

We will go through all publications (title and abstract) and count how many publications contain a certain term. We will do that for different terms and plot them as a function of time. This will tell us which topics are currently trending in medical science. Here is a possible output of the program described in this post:
Figure 1: Number of PubMed publications with certain topics as a function of time.
This plot shows that there was a very steep increase in the publications about Ebola research in 2014. We can also see that the research on blood cancer or leukemia has decreased in the last 25 years, while the research on prostate cancer has increased in those years.

Elasticsearch uses a JSON based syntax, which indeed can be a bit cumbersome, but the documentation is very good.

First, we write the elasticsearch query which counts the papers. There are a couple of different queries in elasticsearch which can do that e.g. the matchterm and match_phrase queries.

The match query analyzes the search term and then compares it to the stored indices.
{
    "query": {
        "match" : {
            "title" : "Cancer"
        }
    }
}
To analyze the term elasticsearch runs the same analyzer we used to index the field. You might remember from the last blog post that we used the standard analyzer, which tokenizes the text (splits it into words) and puts everything in lower case. So the match query above will return all papers which mention the term 'Cancer' or 'cancer'. 

Note that the elasticsearch approach can be limiting. For example, there is no easy way to distinguish lower case and upper case anymore because of the pre-processing of the standard analyzer. You can use the term query to prevent the search term to be analyzed
{
    "query": {
        "term" : { 
            "title" : "Cancer" 
        }
    }
}
but this search will return zero results since 'Cancer' with a capital 'C' does not exist anywhere in the index. For our case, this does not matter, but if you want to keep the ability to distinguish upper and lower case, you need to use a different analyzer when indexing your data.

In some cases, we might be interested to find multiple word terms or phrases, like 'drug resistance'. The match query is going to run the standard analyzer on this phrase and besides lower casing all words, the standard analyzer also tokenizes the phrase. So it turns 'drug resistance' into ['drug', 'resistance'] and looks for papers which contain either one of the words. We can additionally provide the and operator to enforce that documents need to contain both terms,
{
    "query": {
        "match" : {
            "title" : {
                "query": "drug resistance",
                "operator": "and"
            }
        }
    }
}
but this is still not exactly what we want since this will return documents which have the words 'resistance' and 'drug' anywhere. We want to enforce that these words need to be together. This can be achieved with the match_phrase query.
{
    "query": {
        "match_phrase":{
            "title": "drug resistance"
        }
    }
}
If you want to allow some leniency on how close the two words can appear together you can provide the slope parameter. A slope of 2 would still count documents which have not more than 2 words in between 'drug' and 'resistance'. For infinite slope one would recover the match query.

Besides making sure that the papers have the specific term, we also want to limit the publication date to a certain range. We, therefore, create a bool-must query like this
{
    "query": {
        "bool": {
            "must": [{
                "range": {
                    "arxiv_date":{
                        "gte" : low_date.strftime('%Y-%m-%d'), 
                        "lte" : up_date.strftime('%Y-%m-%d'), 
                        "format": "yyyy-MM-dd"
                    }
                }
            }]
        }
    }
}
All conditions in the bool-must query must be true. The must query in this example is a list, as shown by the [] brackets, and we can just append the match_phrase query from above.

However, in our case, we just want to find papers which have the search term in the title OR abstract. So we do not want to put this directly in a bool-must query since this would require the term to be in the title AND abstract. Instead, we create a bool-should query within the bool-must query. The bool-should query requires only one of the conditions within it to be true. Here is the final code.
def get_doc(low_date, up_date, list_of_terms=[]):
    term_doc = []
    for sub_string in list_of_terms:
        term_doc.append({
            "match_phrase":{
                "title": sub_string
            }
        })
        term_doc.append({
            "match_phrase":{
                "abstract": sub_string
            }
        })
    doc = {
        "query": {
            "bool": {
                "must": [{
                    "range": {
                        "created_date":{
                            "gte" : low_date.strftime('%Y-%m-%d'), 
                            "lte" : up_date.strftime('%Y-%m-%d'), 
                            "format": "yyyy-MM-dd"
                        }
                    }
                },
                {
                    'bool': {
                        "should": term_doc
                    }
                }]
            }
        }
    }
    return doc
This function expects a list of terms so that we can search for synonyms like ['blood cancer', 'leukemia']. For each term, we create a match_phrase query for the title and abstract fields and include this list to the bool-should query.

Now, all we have to do is to call this query for different time steps and different terms and plot it. Here is the code which loops over the last 25 years of PubMed publications and counts the number of papers. The number of papers is always normalized to the total number of papers in the same time frame.
def get_paper_count(list_of_terms, timestep):
    # We start 25 years in the past
    start_date = datetime.datetime.utcnow() - datetime.timedelta(days=365*25)

    list_of_counts = []
    list_of_dates = []
    low_date = start_date
    # loop through the data year by year
    while low_date < datetime.datetime.utcnow() - datetime.timedelta(days=10):
        up_date = low_date + datetime.timedelta(timestep)

        doc = get_doc(low_date, up_date)
        # we are only interested in the count -> size=0
        res = es.search(index=index_name, size=0, body=doc)
        norm = res['hits']['total']

        doc = get_doc(low_date, up_date, list_of_terms)
        # we are only interested in the count -> size=0
        res = es.search(index=index_name, size=0, body=doc)

        # norm should always be >0 but just in case   
        if norm > 0:
            list_of_counts.append(100.*float(res['hits']['total'])/float(norm))
            list_of_dates.append(low_date + datetime.timedelta(days=timestep/2))
        else:
            list_of_counts.append(0.)
            list_of_dates.append(low_date + datetime.timedelta(days=timestep/2))

        low_date = low_date + datetime.timedelta(timestep)
    return list_of_counts, list_of_dates 
Using this function we can create the plot I showed at the beginning of the page like this
def create_trending_plot():
    timestep = 365 # average over 365 days
    
    # Get a generic list of colors
    colors = dict(mcolors.BASE_COLORS, **mcolors.CSS4_COLORS)
    colors = [color[0] for color in colors.items()]
    # get all possible line styles
    linestyles = ['-', '--', '-.', ':']

    list_of_queries = [['prostate cancer'], 
                       ['blood cancer', 'leukemia'], 
                       ['Ebola'],
                       ['alzheimer', 'dementia']]
    timestamp = datetime.datetime.utcnow()

    plt.clf()
    # The maximum number of terms is given by the available colors
    for i, list_of_terms in enumerate(list_of_queries[:len(colors)]):
        print "i = ", i, "term = ", list_of_terms
        list_of_counts, list_of_dates = get_paper_count(list_of_terms, timestep)
        plt.plot(list_of_dates, 
                 list_of_counts, 
                 color=colors[i], 
                 label=', '.join(list_of_terms),
                 linestyle=linestyles[i%len(linestyles)])
    plt.xlabel('Date [in steps of %d days]' % timestep)
    plt.title('Relative number of papers for topic vs. time')
    plt.ylabel('Relative number of papers [%]')
    plt.legend(loc='upper left', prop={'size': 7})
    plt.savefig(OUTPUT_FOLDER + "trending_pubmed_%s.png" % 
                timestamp.strftime('%m-%d-%Y-%H-%M-%f'))
    return 
I also created a small web application using this code www.benty-fields.com/trending. The entire exercise can also be downloaded on GitHub. Let me know if you have any comments/questions below.
cheers
Florian

Saturday, September 2, 2017

Elasticsearch and pubmed: Step 2 parsing the data into Elasticsearch

This is the second blog post discussing my project with the PubMed dataset. In the last post, I explained how to get a local copy of all the PubMed data. Here I will describe how I parse the data into my elasticsearch database.

The installation guidelines for an elasticsearch server are given here:
1. First download the latest version, in my case 5.5.2

    curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

2. Untar the file you just downloaded

    tar -xvf elasticsearch-5.5.2.tar.gz

3. Enter into the folder which was created by the previous command

    cd elasticsearch-5.5.2/bin

4. Now you can start the elasticsearch server with ./elasticsearch. In general, it is a good idea to set the swap and heap size using the corresponding environment variable. The recommendation is to use half of your available memory, which in my case is 4Gb, but I run other stuff on my machine and therefore use only 1Gb

    ES_JAVA_OPTS="-Xms1g -Xmx1g" ./elasticsearch

5. Now we need a python interface for elasticsearch. Of course, it is possible to directly interact with the elasticsearch server using curl (e.g. pycurl), but there are some interfaces which allow you to get away from the rather messy elasticsearch syntax. To be honest none of the available python packages totally convinced me, but the current standard seems to be the one which is also called elasticsearch (see e.g. here for more details)

    pip install elasticsearch

With this we can contact the elasticsearch server within our python program by including the following lines
from elasticsearch import Elasticsearch
es = Elasticsearch(hosts=['localhost:9200'])
Now we have to create an elasticsearch index (in other database systems this is called a table). This index describes the structure in which our data will be stored. I decided to store the data in 5 shards, which means that the data will be divided into 5 index files, each of which can be stored on a different machine. For now, I am not using any replicas (copies of shards). Replicas increase redundancy which allows retrieving data even if one or multiple servers are down. The decision how many shards to use is quite important since it cannot easily be changed in the future, while replicas can be added at any time.

Below I posted the function which creates the index. The settings option is used to set the number of shards and replicas together with some customised filters which I will describe in a second. The mapping describes the data vector and how it will be stored. Here I will keep things simple and only store the title, abstract and creation_date of the papers, even though the .gz files we downloaded contain much more information.
def create_pubmed_paper_index():    
    settings = {
        # changing the number of shards after the fact is not 
        # possible max Gb per shard should be 30Gb, replicas can 
        # be produced anytime
        # https://qbox.io/blog/optimizing-elasticsearch-how-many-shards-per-index
        "number_of_shards" : 5,
        "number_of_replicas": 0
    }
    mappings = {
        "pubmed-paper": {
            "properties" : {
                "title": { "type": "string", "analyzer": "standard"},
                "abstract": { "type": "string", "analyzer": "standard"},
                "created_date": {
                    "type":   "date",
                    "format": "yyyy-MM-dd"
                }
            }
        }
    }
    es.indices.delete(index=index_name, ignore=[400, 404])
    es.indices.create(index=index_name, 
                      body={ 'settings': settings,
                             'mappings': mappings }, 
                      request_timeout=30)
    return 
Elasticsearch is a non SQL database, meaning it does not follow the SQL syntax or functionality. If you are used to SQL databases, this will require some re-thinking. For example there is no way to join indices easily, which is a fundamental principle of SQL. This limits what you can do with elasticsearch significantly.

However, joins are very slow, and elasticsearch is all about speed. If you think about it, there is almost always a way around joins, all what you have to do is to store the data in the way you want to retrieve it. This can sometimes be ugly and requires a lot of memory to store redundant data, but without having to perform joins at runtime, it can be very fast.

Another way how elasticsearch saves time, is by processing the document directly when indexed. I used the standard analyzer, which lower cases and tokenizes all words. So for example the sentence
"The 2 QUICK Brown-Foxes jumped over the lazy dog's bone."
would be stored as
[ the, 2, quick, brown, foxes, jumped, over, the, lazy, dog's, bone ]
Again, this will speed up the required processing steps at runtime.


The function above creates the elasticsearch index. Now we have to read the data into this index. For that we write a small function, which unzips the files and builds an Element tree using xml.etree.cElementTree. Note that building such a tree can quickly lead to memory issues, since our .gz files are several Gb in size. So you should stay away from the often used parse() function which would eventually load the entire file into memory. Below I use the iterparse() function, which allows us to discard the elements after we have written them to the database.
import xml.etree.cElementTree as ET # C implementation of ElementTree
def fill_pubmed_papers_table(list_of_files):
    # Loop over all files, extract the information and index in bulk
    for i, f in enumerate(list_of_files):
        print("Read file %d filename = %s" % (i, f))
        time0 = time.time()
        time1 = time.time()
        inF = gzip.open(f, 'rb')
        # we have to iterate through the subtrees, ET.parse() would result
        # in memory issues
        context = ET.iterparse(inF, events=("start", "end"))
        # turn it into an iterator
        context = iter(context)

        # get the root element
        event, root = context.next()
        print("Preparing the file: %0.4fsec" % ((time.time() - time1)))
        time1 = time.time()

        documents = []
        time1 = time.time()
        for event, elem in context:
            if event == "end" and elem.tag == "PubmedArticle":
                doc, source = extract_data(elem)
                documents.append(doc)
                documents.append(source)
                elem.clear()
        root.clear()
        print("Extracting the file information: %0.4fsec" % 
              ((time.time() - time1)))
        time1 = time.time()

        res = es.bulk(index=index_name, body=documents, request_timeout=300)
        print("Indexing data: %0.4fsec" % ((time.time() - time1)))
        print("Total time spend on this file: %0.4fsec\n" % 
             ((time.time() - time0)))
        os.remove(f) # we directly remove all processed files
    return 
This function is looking for elements with the tag PubmedArticle, which we pass on to the extract_data() function. In that function, we extract the information we need. To write such a function we need to know the internal structure of the pubmed xml files. To get an idea how that structure might look like, you could print out one element using
def prettify(elem):
    from bs4 import BeautifulSoup # just for prettify
    '''Return a pretty-printed XML string for the Element.'''
    return BeautifulSoup(ET.tostring(elem, 'utf-8'), "xml").prettify()
I am using BeautifulSoup to produce a readable output since the equivalent functionality in cElementTree doesn't look as nice.

Without going into any more detail, here is the function which can extract the relevant information and store it in a class element
def extract_data(citation):
    new_pubmed_paper = Pubmed_paper()

    citation = citation.find('MedlineCitation')

    new_pubmed_paper.pm_id = citation.find('PMID').text
    new_pubmed_paper.title = citation.find('Article/ArticleTitle').text

    Abstract = citation.find('Article/Abstract')
    if Abstract is not None:
        # Here we discart information about objectives, design, 
        # results and conclusion etc.
        for text in Abstract.findall('AbstractText'):
            if text.text:
                if text.get('Label'):
                    new_pubmed_paper.abstract += '<b>' + text.get('Label') + '</b>: '
                new_pubmed_paper.abstract += text.text + '<br>'

    DateCreated = citation.find('DateCreated')
    new_pubmed_paper.created_datetime = datetime.datetime(
        int(DateCreated.find('Year').text),
        int(DateCreated.find('Month').text),
        int(DateCreated.find('Day').text)
    )
    doc, source = get_es_docs(new_pubmed_paper)
    del new_pubmed_paper
    return doc, source
where the class Pubmed_paper() is defined as
class Pubmed_paper():
    ''' Used to temporarily store a pubmed paper outside es '''
    def __init__(self):
        self.pm_id = 0
        # every paper has a created_date
        self.created_datetime = datetime.datetime.today()
        self.title = ""
        self.abstract = ""

    def __repr__(self):
        return '<Pubmed_paper %r>' % (self.pm_id)
and the function which writes the doc and source dictionaries is
def get_es_docs(paper):
    source = {
        'title': paper.title,
        'created_date': paper.created_datetime.date(),
        'abstract': paper.abstract
    }
    doc = {
        "index": {
            "_index": index_name,
            "_type": type_name,
            "_id": paper.pm_id
        }
    }
    return doc, source
To read all the files into the database will take a few hours.

This post was a bit code heavy, but now that we have written the entire dataset into elasticsearch, we can easily access it. In the next post we will start a small project using this large dataset and making use of the fast elasticsearch database. You can access the code used in this post at GitHub. Let me know if you have any comments/questions below.
cheers
Florian