Skip to main content
Big Data Test Infrastructure (BDTI)

Use case code

Please find below the code for the use case for big data analytics.

1. Import relevant libraries

In this step, all the Python libraries that are needed to process the data are imported.

Code step 1:
import sys import os from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.functions import lit from pyspark.sql import functions as F from pyspark.sql.types import ( ArrayType, IntegerType,   MapType,   StringType,   StructField,   StructType, )

2. Define MinIO config parameters

To connect to the MinIO bucket which contains the data, it is necessary to input the followings in the next line of code:

  • url: this is the private address of the MinIO instance. This Jupyter notebook will try to connect to MinIO through the private endpoint, since both the Jupyter and MinIO instance are deployed in the same private network.
  • accessKey: this is the accessKey to connect to MinIO, as defined in the service account.
  • secretKey: this is the secretKey to connect to MinIO, as defined in the service account.

To retrieve these information, please refer to Step 3.3 of the use case guide.

Code step 2:
accessKey = "<input_your_accessKey_here>" secretKey = "<input_your_secretKey_here>" url = "<input_your_url_here" ​​​​​​​connectionTimeOut = "30000"

3. Define Spark Session

To connect to the Spark instance, which will process our dataset, we will need to make some configurations to make sure that the SparkSession is set up correctly. These configurations are meant to: - Connect the Jupyter Notebook to the Spark instance - Connect the Spark instance to MinIO

Here below a brief explanation of the relevant settings:

spark = SparkSession.builder \ .master("<input_your_spark_address_here") \ #1 .config("spark.submit.deployMode", "client") \ .config("spark.driver.host","<input_your_jupyter_hostname>") \ #2 .config("spark.driver.port", "9000")\ .config("spark.executor.memory","4GB") \ .config("spark.hadoop.fs.s3a.endpoint", url) \ #3 .config("spark.hadoop.fs.s3a.access.key", accessKey) \ #4 .config("spark.hadoop.fs.s3a.secret.key", secretKey)\ #5 .config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \ .config("spark.hadoop.fs.s3a.path.style.access", "true")\ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\ .config("spark.sql.debug.maxToStringFields", "100000") \ .getOrCreate()
  1. Here the Spark master node address should be inputed. To retrieve this information, please refer to Step 4.3 of the use case script.
  2. This configuration will allow to make sure that Spark connects with Jupyter. To this extent, the Jupyter hostname should be inputed. To retrieve this information, please refer to Step 5.5 of the use case script.
  3. This configuration provides Spark the MinIO address (defined in the previous step) to read the data.
  4. This configuration provides Spark the MinIO the accessKey (defined in the previous step) to have access to the data.
  5. This configuration provides Spark the MinIO the secretKey (defined in the previous step) to have access to the data.
Code step 3:
spark = SparkSession.builder \ .master("<input_your_spark_address_here") \ .config("spark.submit.deployMode", "client") \ .config("spark.driver.host","<input_your_jupyter_pod_endpoint_here>.<input_your_dsl_here>.pod.cluster.local") \ .config("spark.driver.port", "9000")\ .config("spark.executor.memory","4GB") \ .config("spark.hadoop.fs.s3a.endpoint", url) \ .config("spark.hadoop.fs.s3a.access.key", accessKey) \ .config("spark.hadoop.fs.s3a.secret.key", secretKey)\ .config("spark.hadoop.fs.s3a.connection.timeout", connectionTimeOut) \ .config("spark.hadoop.fs.s3a.path.style.access", "true")\ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\ .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")\ .config("spark.sql.debug.maxToStringFields", "100000") \ .getOrCreate()

4. Pre-define the schema for the source data

In this step, we define a function which generates the data schema forour dataset. This data schema will accomodate all the information of each document/paper present in the dataset. A view of the data schema is here reported:

 
root  |-- paper_id: string (nullable = true)  |-- metadata: struct (nullable = true)  |    |-- title: string (nullable = true)  |    |-- authors: array (nullable = true)  |    |    |-- element: struct (containsNull = true)  |    |    |    |-- first: string (nullable = true)  |    |    |    |-- middle: array (nullable = true)  |    |    |    |    |-- element: string (containsNull = true)  |    |    |    |-- last: string (nullable = true)  |    |    |    |-- suffix: string (nullable = true)  |    |    |    |-- affiliation: struct (nullable = true)  |    |    |    |    |-- laboratory: string (nullable = true)  |    |    |    |    |-- institution: string (nullable = true)  |    |    |    |    |-- location: struct (nullable = true)  |    |    |    |    |    |-- addrLine: string (nullable = true)  |    |    |    |    |    |-- country: string (nullable = true)  |    |    |    |    |    |-- postBox: string (nullable = true)  |    |    |    |    |    |-- postCode: string (nullable = true)  |    |    |    |    |    |-- region: string (nullable = true)  |    |    |    |    |    |-- settlement: string (nullable = true)  |    |    |    |-- email: string (nullable = true)  |-- abstract: array (nullable = true)  |    |-- element: struct (containsNull = true)  |    |    |-- text: string (nullable = true)  |    |    |-- cite_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- ref_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- eq_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- section: string (nullable = true)  |-- body_text: array (nullable = true)  |    |-- element: struct (containsNull = true)  |    |    |-- text: string (nullable = true)  |    |    |-- cite_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- ref_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- eq_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- section: string (nullable = true)  |-- bib_entries: map (nullable = true)  |    |-- key: string  |    |-- value: struct (valueContainsNull = true)  |    |    |-- ref_id: string (nullable = true)  |    |    |-- title: string (nullable = true)  |    |    |-- authors: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- first: string (nullable = true)  |    |    |    |    |-- middle: array (nullable = true)  |    |    |    |    |    |-- element: string (containsNull = true)  |    |    |    |    |-- last: string (nullable = true)  |    |    |    |    |-- suffix: string (nullable = true)  |    |    |-- year: integer (nullable = true)  |    |    |-- venue: string (nullable = true)  |    |    |-- volume: string (nullable = true)  |    |    |-- issn: string (nullable = true)  |    |    |-- pages: string (nullable = true)  |    |    |-- other_ids: struct (nullable = true)  |    |    |    |-- DOI: array (nullable = true)  |    |    |    |    |-- element: string (containsNull = true)  |-- ref_entries: map (nullable = true)  |    |-- key: string  |    |-- value: struct (valueContainsNull = true)  |    |    |-- text: string (nullable = true)  |    |    |-- latex: string (nullable = true)  |    |    |-- type: string (nullable = true)  |-- back_matter: array (nullable = true)  |    |-- element: struct (containsNull = true)  |    |    |-- text: string (nullable = true)  |    |    |-- cite_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- ref_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- eq_spans: array (nullable = true)  |    |    |    |-- element: struct (containsNull = true)  |    |    |    |    |-- start: integer (nullable = true)  |    |    |    |    |-- end: integer (nullable = true)  |    |    |    |    |-- text: string (nullable = true)  |    |    |    |    |-- ref_id: string (nullable = true)  |    |    |-- section: string (nullable = true)  |-- source: string (nullable = false)
Code step 4:
def generate_cord19_schema(): author_fields = [ StructField("first", StringType()), StructField("middle", ArrayType(StringType())), StructField("last", StringType()), StructField("suffix", StringType()), ] authors_schema = ArrayType( StructType( author_fields + [ StructField( "affiliation", StructType( [ StructField("laboratory", StringType()), StructField("institution", StringType()), StructField( "location", StructType( [ StructField("addrLine", StringType()), StructField("country", StringType()), StructField("postBox", StringType()), StructField("postCode", StringType()), StructField("region", StringType()), StructField("settlement", StringType()), ] ), ), ] ), ), StructField("email", StringType()), ] ) ) spans_schema = ArrayType( StructType( [ StructField("start", IntegerType()), StructField("end", IntegerType()), StructField("text", StringType()), StructField("ref_id", StringType()), ] ) ) section_schema = ArrayType( StructType( [ StructField("text", StringType()), StructField("cite_spans", spans_schema), StructField("ref_spans", spans_schema), StructField("eq_spans", spans_schema), StructField("section", StringType()), ] ) ) bib_schema = MapType( StringType(), StructType( [ StructField("ref_id", StringType()), StructField("title", StringType()), StructField("authors", ArrayType(StructType(author_fields))), StructField("year", IntegerType()), StructField("venue", StringType()), StructField("volume", StringType()), StructField("issn", StringType()), StructField("pages", StringType()), StructField( "other_ids", StructType([StructField("DOI", ArrayType(StringType()))]), ), ] ), True, ) ref_schema = MapType( StringType(), StructType( [ StructField("text", StringType()), StructField("latex", StringType()), StructField("type", StringType()), ] ), ) return StructType( [ StructField("paper_id", StringType()), StructField( "metadata", StructType( [ StructField("title", StringType()), StructField("authors", authors_schema), ] ), True, ), StructField("abstract", section_schema), StructField("body_text", section_schema), StructField("bib_entries", bib_schema), StructField("ref_entries", ref_schema), StructField("back_matter", section_schema), ] )

5. Extract the data in a single Dataframe by accommodating the information in the pre-defined schema

In this step, the function extract_dataframe(spark) is defined. This function reads each single JSON document of the dataset, and accomodate the information in the schema previously defined. The information are then joined in a single dataframe object.

Code step 5:
def extract_dataframe(spark):     base = "s3a://source/data"     sources = [         "document_parses"     ]     sub_sources = [         "pmc_json",         "pdf_json",     ]     dataframe = None     for source in sources:         for sub_source in sub_sources:             path = f"{base}/{source}/{sub_source}"             df = (                 spark.read.json(path, schema=generate_cord19_schema(), multiLine=True)                 .withColumn("source", lit(source))             )         if not dataframe:             dataframe = df         else:             dataframe = dataframe.union(df)     return dataframe

The dataframe df is defined by calling the function extract_dataframe. Please note that spark is passed as argument of the function: this means that the processing of the data into a unique data frame is done by the Spark instance.

Code step 6:
df = extract_dataframe(spark)

A local temporary view of the dataframe is created. This view will be used to query the data through the pyspark SQL library.

Code step 7:
df.createOrReplaceTempView("cord19")

6. Query the data

In this step, 3 queries are performed through the pyspark SQL library:

  1. How many papers for each source?
  2. Which author has written the most papers?
  3. Which are the abstracts for the reported papers?

The results of the query are directly showed on the screen.

7.1 How many papers for each source?
Code step 7.1:
 
query = """ SELECT     source,     COUNT(DISTINCT paper_id) FROM     cord19 GROUP BY     source """ spark.sql(query).show()

 

Output: +---------------+------------------------+ | source|count(DISTINCT paper_id)| +---------------+------------------------+ |document_parses| 10| +---------------+------------------------+ 6.2 Which author has written the most papers?
Code step 7.2:

 

query = """ WITH authors AS ( SELECT paper_id, author.* FROM cord19 LATERAL VIEW explode(metadata.authors) AS author ) SELECT first, last, COUNT(DISTINCT paper_id) as n_papers FROM authors GROUP BY first, last ORDER BY n_papers DESC """ spark.sql(query).show(n=5)

 

Output:     +------+---------+--------+     | first|     last|n_papers|     +------+---------+--------+     |Xavier| Rossello|       1|     |  Nora|   Watson|       1|     |  José|     Ramó|       1|     |     H| Nauwynck|       1|     |Rafael|Romaguera|       1|     +------+---------+--------+ 7.3 Which are the abstracts for the reported papers?

 

Code Step 7.3:
query = """ WITH abstract AS ( SELECT paper_id, pos, value.text as text FROM cord19 LATERAL VIEW posexplode(abstract) AS pos, value ), collected AS ( SELECT paper_id, collect_list(text) OVER (PARTITION BY paper_id ORDER BY pos) as sentences FROM abstract ), sentences AS ( SELECT paper_id, max(sentences) as sentences FROM collected GROUP BY paper_id ) SELECT paper_id, array_join(sentences, " ") as abstract, -- make sure the regex is being escaped properly size(split(array_join(sentences, " "), "\\\s+")) as words FROM sentences """ spark.sql(query).show(n=5) Output: +--------------------+--------------------+-----+ | paper_id| abstract|words| +--------------------+--------------------+-----+ |0000b6da665726420...|Objective: An at ...| 263| |0000fcce604204b1b...|Contribución de l...| 406| |000122a9a774ec76f...|Introduction and ...| 286| |00013062c83cef3b8...|Systems serology ...| 173| |00013694fb8095bb8...|Prolonged Covid-1...| 200| +--------------------+--------------------+-----+