1. Import relevant libraries
In this step, all the Python libraries that are needed to process the data are imported.
Code step 1:
2. Define MinIO config parameters
To connect to the MinIO bucket which contains the data, it is necessary to input the followings in the next line of code:
- url: this is the private address of the MinIO instance. This Jupyter notebook will try to connect to MinIO through the private endpoint, since both the Jupyter and MinIO instance are deployed in the same private network.
- accessKey: this is the accessKey to connect to MinIO, as defined in the service account.
- secretKey: this is the secretKey to connect to MinIO, as defined in the service account.
To retrieve these information, please refer to Step 3.3 of the use case guide.
Code step 2:
3. Define Spark Session
To connect to the Spark instance, which will process our dataset, we will need to make some configurations to make sure that the SparkSession is set up correctly. These configurations are meant to: - Connect the Jupyter Notebook to the Spark instance - Connect the Spark instance to MinIO
Here below a brief explanation of the relevant settings:
- Here the Spark master node address should be inputed. To retrieve this information, please refer to Step 4.3 of the use case script.
- This configuration will allow to make sure that Spark connects with Jupyter. To this extent, the Jupyter hostname should be inputed. To retrieve this information, please refer to Step 5.5 of the use case script.
- This configuration provides Spark the MinIO address (defined in the previous step) to read the data.
- This configuration provides Spark the MinIO the accessKey (defined in the previous step) to have access to the data.
- This configuration provides Spark the MinIO the secretKey (defined in the previous step) to have access to the data.
Code step 3:
4. Pre-define the schema for the source data
In this step, we define a function which generates the data schema forour dataset. This data schema will accomodate all the information of each document/paper present in the dataset. A view of the data schema is here reported:
5. Extract the data in a single Dataframe by accommodating the information in the pre-defined schema
In this step, the function extract_dataframe(spark) is defined. This function reads each single JSON document of the dataset, and accomodate the information in the schema previously defined. The information are then joined in a single dataframe object.
Code step 5:
The dataframe df is defined by calling the function extract_dataframe. Please note that spark is passed as argument of the function: this means that the processing of the data into a unique data frame is done by the Spark instance.
Code step 6:
A local temporary view of the dataframe is created. This view will be used to query the data through the pyspark SQL library.
Code step 7:
6. Query the data
In this step, 3 queries are performed through the pyspark SQL library:
- How many papers for each source?
- Which author has written the most papers?
- Which are the abstracts for the reported papers?
The results of the query are directly showed on the screen.
7.1 How many papers for each source?Code step 7.1:
Output: +---------------+------------------------+ | source|count(DISTINCT paper_id)| +---------------+------------------------+ |document_parses| 10| +---------------+------------------------+ 6.2 Which author has written the most papers?
Code step 7.2:
query = """ WITH authors AS ( SELECT paper_id, author.* FROM cord19 LATERAL VIEW explode(metadata.authors) AS author ) SELECT first, last, COUNT(DISTINCT paper_id) as n_papers FROM authors GROUP BY first, last ORDER BY n_papers DESC """ spark.sql(query).show(n=5)