Spark/PySpark Data Preprocessing Audit Workpaper

Audit Focus Area

Evaluating distributed computing risks in Spark-based preprocessing pipelines:

Evidence Collection Methodology

1. Parallel Operation Inconsistencies

Where to Find Evidence:

Risk IndicatorInvestigation MethodTest Code Example
Non-deterministic aggregationsCheck for reduceByKey() without explicit orderingdf.groupBy().agg() vs df.repartition().groupBy().agg()
Order-sensitive operationsSearch for zipWithIndex(), collect() before processingdf.orderBy().collect() vs unordered operations
Partition skewExamine .rdd.getNumPartitions() and partition sizesdf.rdd.glom().map(len).collect()
results = []
for _ in range(5):
    df = spark.createDataFrame(data).repartition(2)
    results.append(df.groupBy("key").agg(F.sum("value")).collect())
assert all(r == results[0] for r in results), "Non-deterministic aggregations detected"

2. Silent Type Coercion

Where to Find Evidence:

Coercion RiskSpark Operations to CheckDiagnostic Method
Numeric precision losswithColumn() casts, UDFsSchema comparison before/after
String truncationsubstring(), cast("string")Length distribution analysis
Null handlingna.fill(), coalesce()Null count comparison
original_dtypes = dict(df.dtypes)
processed_dtypes = dict(processed_df.dtypes)

for col in original_dtypes:
    if original_dtypes[col] != processed_dtypes[col]:
        print(f"Type changed: {col} ({original_dtypes[col]} → {processed_dtypes[col]})")

3. Hidden Notebook Logic

Where to Find Evidence:

Obfuscation PatternDetection MethodRemediation Check
Cell execution order issuesCheck notebook metadata timestampsLineage reconstruction
Undocumented temp viewsspark.catalog.listTables()View dependency graph
Unversioned intermediate writesdbutils.fs.ls() analysisDelta Lake versioning check
defined_functions = [obj for obj in globals() if callable(globals()[obj])]
spark_ops = [f for f in defined_functions if '_df' in f or 'tmp_' in f]

if len(spark_ops) > 3:
    print(f"Warning: {len(spark_ops)} potential hidden transformations found")

Workpaper Template

Parallel Processing Findings

OperationPartition CountResult VarianceSeverity
groupBy().mean()200±0.3% across runsMedium
join()1002% record count fluctuationHigh
window().rank()DefaultInconsistent orderingCritical

Type Coercion Findings

ColumnOriginal TypeFinal TypeData Impact
user_idBIGINTDOUBLE14% precision loss
geo_coordsSTRINGVARCHAR(20)Truncation observed
priceDECIMAL(18,4)FLOAT0.7% rounding error

Notebook Obfuscation Findings

NotebookHidden CellsTemp ViewsLineage Gaps
ETL_Part112 (34%)7 undocumented3 key transforms
Feature_Eng8 (22%)4 cross-notebookWindow spec missing

Key Risks

Recommendations

For Parallelism Issues

spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.sql.execution.sortBeforeRepartition", True)

def deterministic_agg(df, group_col, agg_expr):
    return df.repartition(200, group_col).groupBy(group_col).agg(agg_expr)

For Type Safety

def validate_schema(input_schema):
    def decorator(func):
        def wrapper(*args):
            df = func(*args)
            assert df.schema == input_schema, "Schema validation failed"
            return df
        return wrapper
    return decorator

For Notebook Transparency

import IPython
IPython.get_ipython().events.register('post_run_cell', log_transformation)

from databricks import workflows

Auditor Notes

Required Attachments:

Sign-off:

AuditorData EngineerML OwnerDate
[Your Name][Eng Name][Owner Name][Date]

Standards References: