SparkML Integration

This guide provides an example of integrating with the ArthurAI platform to monitor a SparkML model. We’ll use an example dataset to train a SparkML model from scratch, but you could also use an existing Spark Pipeline.

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as f
from pyspark.ml.classification import LogisticRegression

import pandas as pd
import numpy as np

from arthurai import ArthurAI
from arthurai.client.apiv3 import InputType, OutputType, Stage

Train and Save SparkML Model

First, we’ll instantiate a Spark session and load in a sample dataset. In this example, we’ll use a dataset derived from the famous Boson Housing datset to build a simple model.

spark = SparkSession.builder.appName('app').getOrCreate()
data = spark.read.csv('./data/boston_housing.csv', header=True, inferSchema=True)
train, test = data.randomSplit([0.7, 0.3])

We’ll use a LASSO classification model to try to predict the is_expensive column from all the others. This column encodes whether or not a property value was above or below the local average. As preprocessing, we’ll use the VectorAssembler class pull together the input columns into a single numeric feature vector.

feature_columns = data.columns[:-1] # here we omit the final column
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
lasso_classifier = LogisticRegression(featuresCol="features", labelCol="is_expensive", maxIter=10, regParam=0.3, elasticNetParam=1.0)

Using a Pipeline, we’ll combine out preprocessing steps and our ML model, and we’ll fit to the training data and save. If you have an existing Spark Pipeline, you can load from disk.

pipeline = Pipeline(stages=[assembler, lasso_classifier]) 
fitted_pipeline = pipeline.fit(train)
fitted_pipeline.write().overwrite().save('./data/models/boston_housing_spark_model_pipeline')

Onboard to Arthur

connection = ArthurAI(url='app.arthur.ai', access_key='<access_key>', client_version=3)

To onboard our model with Arthur, we’ll register the schema of the data coming into and out of the model. For simplicity, you can use a Pandas Dataframe for this step. We will take a sample of the SparkDF to the driver, and use this to register the model to Arthur.

sample_df = train.take(5000).toPandas()
sample_Y = sample_df.loc[['is_expensive']]
sample_X = sample_df.drop('is_expensive', axis=1)
# instantiate basic model
arthur_model = connection.model({
    "partner_model_id": "Boston Housing", 
    "input_type": InputType.Tabular, 
    "output_type": OutputType.Multiclass, 
    "is_batch": True})

# use pandas DataFrames to register data schema
arthur_model.from_dataframe(sample_X, Stage.ModelPipelineInput)
arthur_model.add_binary_classifier_output_attributes(
            positive_predicted_attr='expensive',
            pred_to_ground_truth_map={
                'prediction_expensive': 'ground_truth_expensive',
                'prediction_cheap': 'ground_truth_cheap'
            },
            threshold=0.75
)

The from_dataframe() method will inspect your dataset and infer the input schema, datatypes, and sample statistics. You can review the model structure and see if any fixes are needed.

arthur_model.review()
# chas and rad were inferred as categorical, lets change those to be continuous
arthur_model.get_attribute('chas', Stage.ModelPipelineInput).set(categorical=False)
arthur_model.get_attribute('rad', Stage.ModelPipelineInput).set(categorical=False)
arthur_model.review()

Monitoring for bias For any attributes that you want to monitor for bias, you set the monitor_for_bias boolean. In fact, these don’t have to be model inputs, they can also be of stage NonInputData.

sensitive_attributes = ["Gender", "Race", "Income_Bracket"]
for attribute_name in sensitive_attributes:
    arthur_model.get_attribute(attribute_name, Stage.ModelPipelineInput).monitor_for_bias = True

Save Now you’re ready to save your model and finish onboarding.

arthur_model.save()

Set reference data

You can set a baseline dataset in order to speed up the calculation of data drift and inference anomaly scoring. This reference set is typically the training set the model was fitted to, or a subsample. You can use either a pandas DataFrame or a directory of parquet files. The reference data can include model input features, ground truths features or model predictions on training sets. However, it is recommended that only model input features are provided.

arthur_model.set_reference_data(directory_path="./data/august_training_data/")

Enabling Explainability

To enable explainability, you’ll supply a python file that implements a predict() function for a single observation (a numpy array). This predict function can contain anything you need, including loading a serialized model, preprocessing/transformations, and making a final prediction. The returned result should be a numpy array. You’ll also supply a requirements file for all the dependencies for running an inference through your model.

For more details around enabling explainability, see Explainability Guide. Below we provide a Spark specific example.

The first step is to save your SparkML model and pipeline so it can be imported for use in the predict() function

fitted_pipeline.write().overwrite().save('./data/models/boston_housing_spark_model_pipeline')

Next is to create your predict() function.

# entrypoint.py

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel


# To start the spark session on the model server specify the master url as local.
# By default this will run spark using 1 thread, to increase threads you can specify
# local[x] where x is the number of threads. When allocating more compute and memory to the spark
# session be sure to increase the amount allocated to the model server when calling ArthurModel.enable_explainability()
# in the sdk (by default 1 cpu and 1gb of memory is allocated to the model server).

spark = SparkSession.builder.master('local').appName('app').getOrCreate()
loaded_pipeline = PipelineModel.load("./data/models/boston_housing_spark_model_pipeline")


def predict(input_data):
	col_names = ['crim','zn','indus','chas','nox','rm','age','dis','rad','tax','ptratio','b','lstat']
	input_df = pd.DataFrame(input_data, columns=col_names)
	spark_df = spark.createDataFrame(input_df)
	predictions = loaded_pipeline.transform(spark_df)
	return np.array([float(x.prediction) for x in predictions.select('prediction').collect()])

You are then ready to enable explainability

arthur_model.enable_explainability(
    df=sample_df, 
    project_directory='.',
    user_predict_function_import_path='entrypoint',
    requirements_file='requirements.txt')

Send a batch of inferences

Once your model has been onboarded, it is ready to receive inferences and model telemetry. There are some standard inputs needed to identify inferences and batches.

  • First, each inference needs a unique identifier so that it can later be joined with ground truth. Include a column named partner_inference_id and ensure these IDs are unique across batches. For example, if you run predictions across your customer base on a daily-batch cadence, then a unique identfier could be composed of your customer_id plus the date.

  • Second, each inference needs to be associated with a batch_id, but this id will be shared among one or more inferences.

  • Finally, each inference needs an inference_timestamp and these don’t have to be unique.

Additionally, the predictions/scores from your model should match the column names in the registered schema. If we take a look above at arthur_model.review() we’ll recall that columns we created correspond to the clasiffier’s output probabilities over the classes (“prediction_cheap” and “prediction_expensive”) and the corresponding ground truth over the possible classes in one-hot form (“ground_truth_cheap” and “ground_truth_expensive”).

We will process a batch of datapoints through the Pipeline and save the inputs (and predictions) to parquet. We will do the same for the ground truths.

loaded_pipeline = PipelineModel.load("./data/models/boston_housing_spark_model_pipeline")
inferencesDF = loaded_pipeline.transform(test).withColumnRenamed("probability", "prediction_expensive")

uuidUdf= udf(lambda : str(uuid.uuid4()), StringType())
inferencesDF = inferencesDF.withColumn('partner_inference_id', uuidUdf())

# add required columns
inferencesDF["inference_timestamp"] = datetime.utcnow()
inferencesDF["batch_id"] = "inferences_batch_001"
inference_df["partner_inference_id"] = ...

# write inferences
inferencesDF.write.parquet("./data/inference_files/inferences.parquet")

# write ground truths
ground_truth_DF = test.select(["ground_truth_cheap", "ground_truth_expensive"])
ground_truth_DF["partner_inference_id"] = ...
ground_truth_DF["ground_truth_timestamp"] = datetime.utcnow()
ground_truth_DF["batch_id"] = "gt_batch_001"
ground_truth_batch.write.parquet("./data/ground_truth_files/ground_truth.parquet")

With our model’s inputs and outputs save as parquet, we upload a batch by pointing to the directory containing one or more parquet files. The directory will be traversed and all parquet files will be joined into the corresponding batch. Note, the model inputs and predictions will be uploaded separately from the ground truth.

arthur_model.send_bulk_inferences(directory_path='./data/inference_files/')

You can separately upload ground truths for each inference. Every row in the ground truth file(s) should have an external_id column that matches any IDs you create for the inferences.

arthur_model.send_bulk_ground_truths(directory_path='./data/ground_truth_files/')