UP | HOME

NLP analysis over large amounts of text data with AWS comprehend, python and pandas

Table of Contents

Today I want to tell you about how to use AWS Comprehend to perform NLP tasks over your data, in this case Entity, sentiment, syntax and keyphrases analysis. Last time I wrote about using awswrangler and I will be using it for this post also.

To put you in context, imagine you have a lot of text, for example tweets, or products reviews and would like to get insights about what people are saying and their feelings about it. That's where NLP comes in, and AWS Comprehend makes it really easy to apply machine learning models to your data.

For this post, I will be reading data from an already existing database from AWS Athena, pre process a little the data and then feed it to the AWS comprehend service, but taking into consideration that there is a lot of data to be processed and preventing AWS API from throttling us. Let's start.

Things to consired

In order to not hit the API limit, I will be reading the data with a chunk size of 25, and will use the following constants:

MIN_SENTENCE_LENGTH_IN_CHARS = 10
MAX_SENTENCE_LENGTH_IN_CHARS = 5000
COMPREHEND_BATCH_SIZE = 25

Reading the data

For this, I will be using awswrangler

sql = """
SELECT <your columns here>
FROM database
where length(textfield) > :min; and length(textfield) < :max;
"""

data = wr.athena.read_sql_query(
    sql=sql,
    chunksize=COMPREHEND_BATCH_SIZE,
    database=database,
    params={"max": str(MAX_SENTENCE_LENGTH_IN_CHARS), "min": str(MIN_SENTENCE_LENGTH_IN_CHARS)},
)

Now in data I have a generator waiting to be consumed, but first lets create a function that will call the API to get back the analyzed data:

client = boto3.client("comprehend", region_name=region)
def get_sentiment(text_list):
    @rate_limited(25)
    def _get_sentiment(text_list):
        return client.batch_detect_sentiment(TextList=text_list, LanguageCode="en")

    sentiment = _get_sentiment(text_list)
    arr = list()
    for r in sentiment["ResultList"]:
        idx = r["Index"]
        sentiment_score = r["SentimentScore"]
        arr.append(
            (
                rows[idx][0],
                r["Sentiment"],
                sentiment_score["Positive"],
                sentiment_score["Negative"],
                sentiment_score["Neutral"],
                sentiment_score["Mixed"],
            )
        )
    return pd.DataFrame(
        arr, columns=["ID", "sentiment", "positive", "negative", "neutral", "mixed"]
    )

In order to be able to connect the result to the original data, I need to extract the index list from the result (idx = r["Index"]) and append it to a list accumulating the results.

Finally, the last piece of code that puts it all together:

sentiment = list()
entities = list()
key_phrases = list()
syntax = list()
for text_data in data:
    rows = (
        text_data[columns]
        .apply(lambda r: r.id, axis=1)
        .to_list()
    )
    messages = text_data.message.to_list()

    entities.append(get_entities(messages))
    sentiment.append(get_sentiment(messages))
    key_phrases.append(get_key_phrases(messages))
    syntax.append(get_syntax(messages))

entities = pd.concat(entities)
sentiment = pd.concat(sentiment)
key_phrases = pd.concat(key_phrases)
syntax = pd.concat(syntax)

I have ommited how to call the other three api endpoints because they are pretty much the same. After this, the last hing that remains is writing the results back to S3:

wr.s3.to_parquet(
      df=sentiment,
      path="s3://path/sentiments",
      dataset=True,
      mode="overwrite",
      database="database",
      table="sentimets",
      partition_cols=["sentiment"],
)

And that's, This code is just one quick way of analyzing the data, if you were to create a pipeline for this, there is still room for improvements here.

For this task, the following AWS's post really helped me: How to scale sentiment analysis using Amazon Comprehend, AWS Glue and Amazon Athena | Amazon Web Services

Author: Alejandro Alcalde

Date: 2021-03-12 Fri 00:00

Emacs 26.3 (Org mode 9.3.6)

Validate

Index