Bayes Classifier on Dataproc

Overview
In this lab, you explore how to create a two-variable Bayesian model to look at whether to cancel a meeting based on the likely arrival delay of a flight. You quantize using two variables, create a conditional probability lookup table, and examine the on-time arrival percentage in each bin. You carry out the quantization using histogram equalization and on-time arrival percentage computations in Spark.

Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data.

This lab uses a set of code samples and scripts developed for Data Science on the Google Cloud Platform, 2nd Edition from O'Reilly Media, Inc.

What you'll learn
In this lab, you'll use Dataproc to do the following:

Create a Dataproc cluster

Quantization using Spark SQL

Create a Bayes classification model

Evaluate the model

Setup and requirements


student-03-1ad4968bce5b@startup-vm:~$ git clone https://github.com/GoogleCloudPlatform/data-science-on-gcp/

student-03-1ad4968bce5b@startup-vm:~/data-science-on-gcp/06_dataproc$ export PROJECT_ID=$(gcloud info --format='value(config.project)')


student-03-1ad4968bce5b@startup-vm:~/data-science-on-gcp/06_dataproc$ export BUCKET_NAME=$PROJECT_ID-dsongcp


student-03-1ad4968bce5b@startup-vm:~/data-science-on-gcp/06_dataproc$ cat ~/data-science-on-gcp/06_dataproc/create_cluster.sh

#!/bin/bash

if [ "$#" -ne 2 ]; then
    echo "Usage: ./create_cluster.sh  bucket-name  region"
    exit
fi

PROJECT=$(gcloud config get-value project)
BUCKET=$1
REGION=$2
EMAIL=$3
INSTALL=gs://$BUCKET/flights/dataproc/install_on_cluster.sh

# upload install file
sed "s/CHANGE_TO_USER_NAME/dataproc/g" install_on_cluster.sh > /tmp/install_on_cluster.sh
gsutil cp /tmp/install_on_cluster.sh $INSTALL

# create cluster
gcloud dataproc clusters create ch6cluster \
  --enable-component-gateway \
  --region ${REGION} --zone ${REGION}-a \
  --master-machine-type n1-standard-4 \
  --master-boot-disk-size 500 --num-workers 2 \
  --worker-machine-type n1-standard-4 \
  --worker-boot-disk-size 500 \
  --optional-components JUPYTER --project $PROJECT \
  --initialization-actions=$INSTALL \
  --scopes https://www.googleapis.com/auth/cloud-platform

student-03-1ad4968bce5b@startup-vm:~/data-science-on-gcp/06_dataproc$ ./create_cluster.sh $BUCKET_NAME "us-central1"



Task 2. Quantization using Spark SQL

You can use one variable in your dataset departure delay to make predictions of the arrival delay of a flight. However, a second variable would make predictions more accurate. As the longer the flight, the more likely that small delays in departure can be made up in the air, the second variable would be the distance to be traveled.

The statistical model you build in this lab uses two variables — the departure delay and the distance to be traveled.

JupyterLab on Dataproc
The Jupyter notebook provides a Python kernel to run Spark code and a PySpark kernel. The Jupyter component in Dataproc is a Web-based notebook for interactive data analytics and supports the JupyterLab Web UI. Jupyter notebooks are widely used for exploratory data analysis and building machine learning models as they allow you to interactively run your code and immediately see your results.

As developing the Bayesian classification from scratch requires being able to interactively carry out development, you use Jupyter notebooks for this lab.

To launch notebook:

In the Cloud Console, on the Navigation menu, click Dataproc.

In the Cluster list, click on the cluster name to view cluster details.

Click the Web Interfaces tab and then click JupyterLab.

In the Launcher dialog, click the Python 3 tile under Notebook.

Set up environment variables
In this section you set up environment variables (for example, PROJECT, BUCKET and REGION) inside the notebook session. With this notebook you interact with the Dataproc cluster created in Task 1.

Set up PROJECT, BUCKET and REGION using the following command in the notebook cell.
PROJECT=!gcloud config get-value project
PROJECT=PROJECT[0]
import os
BUCKET = '{}-dsongcp'.format(PROJECT)
REGION = "us-central1"
os.environ['BUCKET'] = BUCKET
Copied!
Run the cell by either pressing Shift + Enter, or clicking the triangle on the Notebook top menu to Run selected cells and advance.
Note: After pasting commands into the Jupyter notebook cell, you run the cell to execute the command and then advance to the next cell.
Exploration using BigQuery
First import the basic data science and Google BigQuery API client library followed by creating a BigQuery client object bq:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import google.cloud.bigquery as bigquery
bq = bigquery.Client()
Copied!
Enter the code below in new cell and run the cell:
sql = """
SELECT DISTANCE, DEP_DELAY
FROM dsongcp.flights_tzcorr
WHERE RAND() < 0.001 AND dep_delay > -20 AND dep_delay < 30 AND distance < 2000
"""
df = bq.query(sql).to_dataframe()
Copied!
The query samples the full dataset, pulling in 1/1,000 of the flights_tzcorr table distance and departure delay fields (that lie within reasonable ranges) into a Pandas dataframe.

Use the seaborn library to draw a "hex" kind of plot using jointplot:
sns.set_style("whitegrid")
g = sns.jointplot(x=df['DISTANCE'], y=df['DEP_DELAY'], kind="hex", height=10, joint_kws={'gridsize':20})
Copied!
The distribution plots at the top and right of the center panel of the graph show how the distance and departure delay values are distributed.

Start a Spark session
Type and run the following code in a new cell to Create a Spark session:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Bayes classification using Spark") \
    .getOrCreate()
Copied!
Read the time-corrected JSON files from the Google cloud storage bucket:
inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET)
flights = spark.read.json(inputs)
Copied!
Employ SQL on the dataframe by creating a temporary view (it is available only within this Spark session):
flights.createOrReplaceTempView('flights')
Copied!
Employ SQL to query the flights view, for example by using this command:
results = spark.sql('SELECT COUNT(*) FROM flights WHERE dep_delay > -20 AND CAST(distance AS FLOAT) < 2000')
results.show()
Copied!
Restrict to train days
Create a CSV file of the training days Google BigQuery table and save data to cloud storage bucket:
sql = """
SELECT *
FROM dsongcp.trainday
"""
df = bq.query(sql).to_dataframe()
df.to_csv('trainday.csv', index=False)
Copied!
%%bash
gsutil cp trainday.csv gs://${BUCKET}/flights/trainday.csv
Copied!
Create the traindays dataframe from the CSV file trainday.csv using the following code:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
schema = StructType([
    StructField('FL_DATE', StringType(), True),
    StructField('is_train_day', BooleanType(), True)
])
traindays = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('gs://{}/flights/trainday.csv'.format(BUCKET))
traindays.createOrReplaceTempView('traindays')
Copied!
Now restrict the flights dataframe to contain only training days using an SQL join operation:
statement = """
SELECT
  f.FL_DATE AS date,
  CAST(distance AS FLOAT) AS distance,
  dep_delay,
  IF(arr_delay < 15, 1, 0) AS ontime
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day AND
  f.dep_delay IS NOT NULL
ORDER BY
  f.dep_delay DESC
"""
flights = spark.sql(statement)
Copied!
Create a hexbin plot using Spark (repeat of what you did in BigQuery, except that you now restrict to train days only):
df = flights[(flights['distance'] < 2000) & (flights['dep_delay'] > -20) & (flights['dep_delay'] < 30)]
pdf = df.sample(False, 0.02, 20).toPandas()  # to 100,000 rows approx on complete dataset
g = sns.jointplot(x=pdf['distance'], y=pdf['dep_delay'], kind="hex", height=10, joint_kws={'gridsize':20})
Copied!
Finding thresholds that make the two quantized variables uniformly distributed is straightforward using the approximate quantiles method:
distthresh = flights.approxQuantile('distance', list(np.arange(0, 1.0, 0.2)), 0.02)
distthresh[-1] = float('inf')
print(distthresh)
Copied!
You can similarly quantize the departure delay thresholds into equal boundaries:
delaythresh = flights.approxQuantile('dep_delay', list(np.arange(0, 1.0, 0.2)), 0.05)
delaythresh[-1] = float('inf')
print(delaythresh)
Copied!
Task 3. Bayes classification
You have the quantization thresholds, you now need to determine the recommendation (whether to cancel the meeting) for each bin based on whether 70% of flights in that bin are on time or not.

Find the flights that belong to the mth distance bin and nth delay bin by slicing the full set of flights:
import pyspark.sql.functions as F
import pandas as pd
df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime'])
for m in range(0, 2):
    for n in range(0, len(delaythresh)-1):
        bdf = flights[(flights['distance'] >= distthresh[m])
             & (flights['distance'] < distthresh[m+1])
             & (flights['dep_delay'] >= delaythresh[n])
             & (flights['dep_delay'] < delaythresh[n+1])]
        ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0]
        print (m, n, ontime_frac)
        df = df.append({
            'dist_thresh': distthresh[m],
            'delay_thresh': delaythresh[n],
            'frac_ontime': ontime_frac
        }, ignore_index=True)
Copied!
The ontime fraction is nearly 100% for all the delay bins except the largest value for n. This makes perfect sense because only the last departure delay bin has any delayed flights.

Here, you get close to the 70% threshold only on the last bin. You have to fix this – one way to do so is to hand-select the departure delay bins. Because you previously looked at thresholding the departure delay, you know that the interesting range is between 10 and 20 minutes and that departure delays are reported in integer minutes. So, you try delay variables of 10, 11, 12, …, 20 minutes.

Fine-tune the delay threshold around the decision boundary:
delaythresh = range(10, 20)
df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime'])
for m in range(0, len(distthresh)-1):
    for n in range(0, len(delaythresh)-1):
        bdf = flights[(flights['distance'] >= distthresh[m])
             & (flights['distance'] < distthresh[m+1])
             & (flights['dep_delay'] >= delaythresh[n])
             & (flights['dep_delay'] < delaythresh[n+1])]
        ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0]
        print (m, n, ontime_frac)
        df = df.append({
            'dist_thresh': distthresh[m],
            'delay_thresh': delaythresh[n],
            'frac_ontime': ontime_frac
        }, ignore_index=True)
Copied!
Note: It may take 10-12 minutes to complete.
To find the delay threshold for each distance threshold where the value is closest to the 0.70 decision boundary, run the following code:
df['score'] = abs(df['frac_ontime'] - 0.7)
bayes = df.sort_values(['score']).groupby('dist_thresh').head(1).sort_values('dist_thresh')
print(bayes)
Copied!
If the departure delay is greater than the threshold corresponding to how far the flight is, you cancel the meeting because you expect the flight to be late.

Write out the table bayes as a CSV file to Google cloud storage bucket:
bayes.to_csv('gs://{}/flights/bayes.csv'.format(BUCKET), index=False)
!gsutil cat gs://{BUCKET}/flights/bayes.csv
Copied!
Assessment Completed!
Create Bayes classification model
Assessment Completed!
Task 4. Evaluate the model
To evaluate the model you created, look at the flights data that was not used in creating the model.

Enter the following code into a new cell and run the cell.
distthresh[-1] = 100000
for m in range(0, len(distthresh)-1):
    statement = """
SELECT
  '{0:.0f}-{1:.0f} miles' AS bin,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive,
  ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative,
  ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel,
  COUNT(*) AS total_flights
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'False' AND
  f.distance >= {0:f} AND f.distance < {1:f}
""".format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] )
    eval_flights = spark.sql(statement)
    eval_flights.show()
Copied!
Task 5. Delete the dataproc cluster
You created and evaluated your model. Now clean up by deleting the Dataproc cluster.

Return to the startup-vm terminal and delete the dataproc cluster using the following command: and enter y to continue.
cd ~/data-science-on-gcp/06_dataproc/
./delete_cluster.sh "us-central1"
Copied!
Enter Y when prompted to confirm



























No comments:

Post a Comment

AppEngine - Python

tudent_04_347b5286260a@cloudshell:~/python-docs-samples/appengine/standard_python3/hello_world (qwiklabs-gcp-00-88834e0beca1)$ sudo apt upda...