Spark/PySpark Data Preprocessing Audit Workpaper
Audit Focus Area
Evaluating distributed computing risks in Spark-based preprocessing pipelines:
- Parallel Operation Risks: Order/aggregation inconsistencies
- Silent Type Coercion: Implicit data type changes
- Opaque Logic: Hidden transformations in notebook environments
Evidence Collection Methodology
1. Parallel Operation Inconsistencies
Where to Find Evidence:
Risk Indicator | Investigation Method | Test Code Example |
Non-deterministic aggregations | Check for reduceByKey() without explicit ordering | df.groupBy().agg() vs df.repartition().groupBy().agg() |
Order-sensitive operations | Search for zipWithIndex(), collect() before processing | df.orderBy().collect() vs unordered operations |
Partition skew | Examine .rdd.getNumPartitions() and partition sizes | df.rdd.glom().map(len).collect() |
results = []
for _ in range(5):
df = spark.createDataFrame(data).repartition(2)
results.append(df.groupBy("key").agg(F.sum("value")).collect())
assert all(r == results[0] for r in results), "Non-deterministic aggregations detected"
2. Silent Type Coercion
Where to Find Evidence:
Coercion Risk | Spark Operations to Check | Diagnostic Method |
Numeric precision loss | withColumn() casts, UDFs | Schema comparison before/after |
String truncation | substring(), cast("string") | Length distribution analysis |
Null handling | na.fill(), coalesce() | Null count comparison |
original_dtypes = dict(df.dtypes)
processed_dtypes = dict(processed_df.dtypes)
for col in original_dtypes:
if original_dtypes[col] != processed_dtypes[col]:
print(f"Type changed: {col} ({original_dtypes[col]} → {processed_dtypes[col]})")
3. Hidden Notebook Logic
Where to Find Evidence:
Obfuscation Pattern | Detection Method | Remediation Check |
Cell execution order issues | Check notebook metadata timestamps | Lineage reconstruction |
Undocumented temp views | spark.catalog.listTables() | View dependency graph |
Unversioned intermediate writes | dbutils.fs.ls() analysis | Delta Lake versioning check |
defined_functions = [obj for obj in globals() if callable(globals()[obj])]
spark_ops = [f for f in defined_functions if '_df' in f or 'tmp_' in f]
if len(spark_ops) > 3:
print(f"Warning: {len(spark_ops)} potential hidden transformations found")
Workpaper Template
Parallel Processing Findings
Operation | Partition Count | Result Variance | Severity |
groupBy().mean() | 200 | ±0.3% across runs | Medium |
join() | 100 | 2% record count fluctuation | High |
window().rank() | Default | Inconsistent ordering | Critical |
Type Coercion Findings
Column | Original Type | Final Type | Data Impact |
user_id | BIGINT | DOUBLE | 14% precision loss |
geo_coords | STRING | VARCHAR(20) | Truncation observed |
price | DECIMAL(18,4) | FLOAT | 0.7% rounding error |
Notebook Obfuscation Findings
Notebook | Hidden Cells | Temp Views | Lineage Gaps |
ETL_Part1 | 12 (34%) | 7 undocumented | 3 key transforms |
Feature_Eng | 8 (22%) | 4 cross-notebook | Window spec missing |
Key Risks
- Critical: Financial aggregations vary by ±1.2% due to partitioning
- High: 18 columns undergo silent type coercion without validation
- Medium: 40% of preprocessing logic buried in notebook cells
Recommendations
For Parallelism Issues
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.sql.execution.sortBeforeRepartition", True)
def deterministic_agg(df, group_col, agg_expr):
return df.repartition(200, group_col).groupBy(group_col).agg(agg_expr)
For Type Safety
def validate_schema(input_schema):
def decorator(func):
def wrapper(*args):
df = func(*args)
assert df.schema == input_schema, "Schema validation failed"
return df
return wrapper
return decorator
For Notebook Transparency
import IPython
IPython.get_ipython().events.register('post_run_cell', log_transformation)
from databricks import workflows
Auditor Notes
Required Attachments:
- Partitioning strategy documentation
- Schema evolution timeline
- Notebook execution DAG reconstruction
Sign-off:
Auditor | Data Engineer | ML Owner | Date |
[Your Name] | [Eng Name] | [Owner Name] | [Date] |
Standards References:
- Spark Tuning Guide (Deterministic Execution)
- GDPR Article 25 (Data Integrity by Design)
- FINRA 4511 (Financial Aggregation Accuracy)