Skip to main content
Big Data Test Infrastructure (BDTI)

Use case code

Please find below the code for the use case for live dashboards.  

1) Import relevant libraries

In this step, all the relevant Python libraries are imported.

import pandas as pd
import numpy as np
import urllib.request
import urllib.parse
import psycopg2
import json
from datetime import date
from pandas.io.json import json_normalize
from sqlalchemy import create_engine
from sqlalchemy.engine import URL

2) Fetch response data from the Open-meteo API and normalize

In this step, the request to the Open-meteo API is performed and the response is stored in a dataframe.

url = "https://api.open-meteo.com/v1/forecast?latitude=50.8371&longitude=4.3676&hourly=temperature_2m,relativehumidity_2m,dewpoint_2m,apparent_temperature,cloudcover,rain,pressure_msl"
response = urllib.request.urlopen(url)
record_list = json.loads(response.read())
df = json_normalize(record_list)

3) Define the DB engine parameters

In this step, the configuration parameters of PostgreSQL DB are defined. These parameter will be used to correctly establish the connection to the PostgreSQL DB.

drivername = 'postgresql' 
# EDIT THIS LINE host = '<postgresSQLHost>'
port = '5432'
username = 'postgres'
# EDIT THIS LINE password = '<postgreSQLPassword>'
database = 'postgres'

DATABASE = {
    'drivername': drivername,
    'host': host,
    'port': port,
    'username': username,
    'password': password,
    'database': database
}

4) Data pre-processing

In this step we substitute the character "." with "_" for JSON nested levels (to not incur in SQL statement's syntax errors).

df_columns = df.columns.tolist()
df_columns = [c.replace('.','_') for c in df_columns]
df.columns = df_columns
df_columns

5) Store the metrics

In this step we save each weather metric in a dedicated object.

time = df["hourly_time"]
temperature = df["hourly_temperature_2m"]
relativehumidity = df["hourly_relativehumidity_2m"]
dewpoint = df["hourly_dewpoint_2m"]
apparent_temperature = df["hourly_apparent_temperature"]
cloudcover = df["hourly_cloudcover"]
rain = df["hourly_rain"]
pressure_msl = df["hourly_pressure_msl"]

6) Make the data ready to be stored on the DB

In this step we concatenate each object's serie. In this way we make the data suitable to be hosted on our SQL DB.  

time_conc = np.stack(time)
temperature_conc = np.stack(temperature)
relativehumidity_conc = np.stack(relativehumidity)
dewpoint_conc = np.stack(dewpoint)
apparent_temperature_conc = np.stack(apparent_temperature)
cloudcover_conc = np.stack(cloudcover)
rain_conc = np.stack(rain)
pressure_msl_conc = np.stack(pressure_msl)


time_transposed = pd.DataFrame(time_conc).transpose()
temperature_transposed = pd.DataFrame(temperature_conc).transpose()
relativehumidity_transposed = pd.DataFrame(relativehumidity_conc).transpose()
dewpoint_transposed = pd.DataFrame(dewpoint_conc).transpose()
apparent_temperature_transposed = pd.DataFrame(apparent_temperature_conc).transpose()
cloudcover_transposed = pd.DataFrame(cloudcover_conc).transpose()
rain_transposed = pd.DataFrame(rain_conc).transpose()
pressure_msl_transposed = pd.DataFrame(pressure_msl_conc).transpose()

7) Renaming columns

In order to have a more meaningful naming, rename the columns.

time_transposed.columns = ['hourly_time']
temperature_transposed.columns = ['hourly_temperature_2m']
relativehumidity_transposed.columns = ['hourly_relativehumidity']
dewpoint_transposed.columns = ['hourly_dewpoint']
apparent_temperature_transposed.columns = ['hourly_apparent_temperature']
cloudcover_transposed.columns = ['hourly_cloudcover']
rain_transposed.columns = ['hourly_rain']
pressure_msl_transposed.columns = ['hourly_pressure_msl']

8) Merging

Merge the pre-processed objects into the final dataframe: in this step we create a unique dataframe which contains all the weather metrics.

hourly_data = pd.concat([time_transposed.reset_index(drop = True),temperature_transposed.reset_index(drop = True),relativehumidity_transposed.reset_index(drop = True),dewpoint_transposed.reset_index(drop = True),apparent_temperature_transposed.reset_index(drop = True),cloudcover_transposed.reset_index(drop = True),rain_transposed.reset_index(drop = True),pressure_msl_transposed.reset_index(drop = True)],axis = 1)
hourly_data

9) Create function

Function to create the "hourly data" table that will be pushed to our PostgreSQL DB.

def create_table_hourly_data():
    # a) Connect to the PostgreSQL instance through psycopg2 library: the database parameters defined at the beginning of the code are here used.
   conn = psycopg2.connect(database=database, user=username,password=password,host=host,port=port) 
    # b) Create a cursor object
   cur=conn.cursor()
    # c) Execute the SQL statement to create the table with the needed columns and data formats.
   cur.execute("CREATE TABLE IF NOT EXISTS hourly_data (hourly_time DATE,hourly_temperature_2m FLOAT,hourly_relativehumidity FLOAT,hourly_dewpoint FLOAT,hourly_apparent_temperature FLOAT,hourly_cloudcover DECIMAL,hourly_rain FLOAT,hourly_pressure_msl FLOAT)") 
    # d) Commit to actually perform the executed SQL statement.
   conn.commit() 
    # e) Close the connection with the PostgreSQL instance.
   conn.close() 
create_table_hourly_data()

10) Create SQL engine

In order to feed the data into the newly created PostgreSQL table, create a SQL engine.

engine = create_engine(URL(**DATABASE)) 

11) Feed the data

Feed the data into the "hourly_data" table through the "to_sql" method.

hourly_data.to_sql("hourly_data", engine, index=False, if_exists='replace')
print("Done")

12) Create the dataframe

Create the dataframe which will contain the metadata related to the response - unit of measured.

metadata = df.iloc[:,0:15]

13) Append the date of the current script's execution

This information will be shown in the dashboard to make sure that the displayed insights are always up to date.

metadata['execution_date'] = date.today()
metadata

14) Create the "metadata" table

Function to create the metadata table.

def create_table_metadata():
    # a) Connect to the PostgreSQL instance through psycopg2 library
   conn = psycopg2.connect(database=database,user=username,password=password,host=host,port=port) 
    # b) Create a cursor object
   cur=conn.cursor()
    # c) Execute the SQL statement to create the table
   cur.execute("CREATE TABLE IF NOT EXISTS metadata (latitude FLOAT,longitude FLOAT,generationtime_ms FLOAT,utc_offset_seconds INT,timezone VARCHAR,timezone_abbreviation VARCHAR,elevation FLOAT,hourly_units_time VARCHAR,hourly_units_temperature_2m VARCHAR)")
    # d) Commit to actually perform the executed SQL statement
   conn.commit()
    # e) Close the connection with the PostgreSQL instance.
   conn.close() 

create_table_metadata()

15) Feed the data into the newly created PostgreSQL table

Create an SQL engine to feed the data into the newly created PostgreSQL table.

engine = create_engine(URL(**DATABASE)) 

16) Feed the data

Feed the data into the "hourly_data" table through the "to_sql" method.

metadata.to_sql("metadata", engine, index=False, if_exists='replace')
print("Done")