Note
Go to the end to download the full example code.
Wrapping PySpark Models#
This example requires full licence, and the program will break if you use the trial licence.
Installation
# To install the required package, use the following command:
# !pip install modeva
Authentication
# To get authentication, use the following command: (To get full access please replace the token to your own token)
# from modeva.utils.authenticate import authenticate
# authenticate(auth_code='eaaa4301-b140-484c-8e93-f9f633c8bacb')
Import required modules
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id
from sklearn.datasets import load_breast_cancer
from modeva import DataSet
from modeva import TestSuite
from modeva.models.wrappers.api import modeva_arbitrary_classifier
Scripts for building a pyspark model#
# Initialize Spark session
spark = SparkSession.builder.appName("PySpark-Wrapper-Example").getOrCreate()
# Load and prepare dataset
data = load_breast_cancer()
df = pd.DataFrame(data.data, columns=data.feature_names)
df['label'] = data.target
# Convert Pandas DataFrame to Spark DataFrame and add an index column
spark_df = spark.createDataFrame(df)
spark_df = spark_df.withColumn("index", monotonically_increasing_id())
# Assemble features into a vector
assembler = VectorAssembler(inputCols=data.feature_names, outputCol="features")
spark_df = assembler.transform(spark_df).select("features", "label", "index")
# Split the data into train and test sets
train, test = spark_df.randomSplit([0.8, 0.2], seed=1234)
# Extract the index for each split
train_indices = np.array([row["index"] for row in train.select("index").collect()])
test_indices = np.array([row["index"] for row in test.select("index").collect()])
Train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train)
Wrap the data#
ds = DataSet()
ds.load_dataframe(data=df)
ds.set_train_idx(train_indices)
ds.set_test_idx(test_indices)
Wrap the PySpark model into Modeva#
def predict_func(X):
X_spark = spark.createDataFrame(X, schema=data.feature_names.tolist())
X_spark = assembler.transform(X_spark)
predictions = lr_model.transform(X_spark).select("prediction")
return np.array([int(row.prediction) for row in predictions.collect()])
def predict_proba_func(X):
X_spark = spark.createDataFrame(X, schema=data.feature_names.tolist())
X_spark = assembler.transform(X_spark)
probabilities = lr_model.transform(X_spark).select("probability")
return np.array([row.probability.toArray() for row in probabilities.collect()])
model = modeva_arbitrary_classifier(
name="PySpark-LogisticRegression",
predict_function=predict_func,
predict_proba_function=predict_proba_func
)
Create test suite for diagnostics#
ts = TestSuite(ds, model)
Accuracy table
results = ts.diagnose_accuracy_table()
results.table