PySpark와 Pandas, 왜 함께 써야 할까?
데이터 분석 실무에서 Pandas는 가장 친숙한 도구입니다. 직관적인 API, 풍부한 생태계, 빠른 프로토타이핑이 가능하죠. 하지만 데이터가 수십 GB를 넘어가는 순간 메모리 한계에 부딪힙니다.
PySpark는 이 한계를 돌파합니다. 분산 처리 엔진인 Apache Spark의 Python API로, 수백 GB~TB 규모 데이터를 클러스터에서 병렬 처리할 수 있습니다.
핵심은 로컬에서는 Pandas로 빠르게 개발하고, 대규모 데이터는 PySpark로 처리하되, 둘 사이를 매끄럽게 연동하는 것입니다.
Pandas vs PySpark 비교
| 항목 | Pandas | PySpark |
|---|---|---|
| 처리 규모 | 수 GB (단일 머신 메모리) | 수 TB (분산 클러스터) |
| 실행 방식 | 즉시 실행 (Eager) | 지연 실행 (Lazy) |
| 학습 곡선 | 낮음 | 중간 |
| 생태계 | scikit-learn, matplotlib 등 | Spark MLlib, Spark SQL |
| 디버깅 | 쉬움 | 상대적으로 어려움 |
| 적합 용도 | EDA, 프로토타이핑 | ETL, 대규모 배치 처리 |
로컬 개발 환경 설정
먼저 로컬에서 PySpark 개발 환경을 구성합니다.
# 설치
# pip install pyspark pandas pyarrow
from pyspark.sql import SparkSession
# 로컬 모드 SparkSession 생성
spark = SparkSession.builder \
.appName("DataPipeline") \
.master("local[*]") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
print(f"Spark 버전: {spark.version}")
spark.sql.execution.arrow.pyspark.enabled를 true로 설정하면 Apache Arrow를 통해 PySpark ↔ Pandas 변환 속도가 최대 10배 이상 빨라집니다.
핵심 연동 패턴 3가지
패턴 1: Pandas DataFrame → Spark DataFrame
Pandas로 전처리한 데이터를 Spark로 넘겨 대규모 처리를 수행합니다.
import pandas as pd
# Pandas로 데이터 로드 및 전처리
pdf = pd.read_csv("sample_data.csv")
pdf["date"] = pd.to_datetime(pdf["date"])
pdf = pdf.dropna(subset=["user_id"])
# Spark DataFrame으로 변환
sdf = spark.createDataFrame(pdf)
sdf.printSchema()
sdf.show(5)
패턴 2: Spark DataFrame → Pandas DataFrame
분산 처리 결과를 Pandas로 가져와 시각화하거나 ML 모델에 활용합니다.
# Spark에서 집계 처리
result_sdf = sdf.groupBy("category") \
.agg(
{"amount": "sum", "user_id": "countDistinct"}
)
# Pandas로 변환 (결과가 작을 때만!)
result_pdf = result_sdf.toPandas()
# Pandas에서 시각화
import matplotlib.pyplot as plt
result_pdf.plot(kind="bar", x="category", y="sum(amount)")
plt.title("카테고리별 매출")
plt.tight_layout()
plt.savefig("category_sales.png")
주의:
toPandas()는 전체 데이터를 드라이버 메모리로 가져옵니다. 반드시 집계 후 결과가 작아진 상태에서 호출하세요.
패턴 3: Pandas UDF (Vectorized UDF)
가장 강력한 연동 방식입니다. Spark의 분산 처리 안에서 Pandas 로직을 실행합니다.
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
@pandas_udf(DoubleType())
def normalize(series: pd.Series) -> pd.Series:
"""Min-Max 정규화를 분산 환경에서 실행"""
return (series - series.min()) / (series.max() - series.min())
# Spark DataFrame에 적용
sdf = sdf.withColumn("amount_normalized", normalize(sdf["amount"]))
연동 패턴 선택 가이드
| 상황 | 권장 패턴 | 이유 |
|---|---|---|
| 소규모 데이터 전처리 후 분산 처리 | Pandas → Spark | Pandas의 편의성 활용 |
| 분산 집계 결과 시각화 | Spark → Pandas | 집계 후 소규모 데이터 |
| 복잡한 행 단위 변환 | Pandas UDF | 분산 + Pandas 로직 |
| 그룹별 커스텀 모델 적용 | GroupedMap Pandas UDF | 그룹별 독립 처리 |
실전 데이터 파이프라인 구축
실무에서 자주 사용하는 ETL 파이프라인 예제입니다.
def run_pipeline(spark, input_path, output_path):
# 1. Extract: 대규모 데이터 로드
raw_sdf = spark.read.parquet(input_path)
print(f"원본 레코드 수: {raw_sdf.count():,}")
# 2. Transform: Spark SQL로 대규모 변환
raw_sdf.createOrReplaceTempView("raw_data")
transformed_sdf = spark.sql("""
SELECT
user_id,
category,
DATE_TRUNC('month', event_date) AS month,
SUM(amount) AS total_amount,
COUNT(*) AS event_count
FROM raw_data
WHERE event_date >= '2024-01-01'
GROUP BY user_id, category, DATE_TRUNC('month', event_date)
""")
# 3. Pandas UDF로 고급 변환 적용
transformed_sdf = transformed_sdf.withColumn(
"amount_normalized", normalize(transformed_sdf["total_amount"])
)
# 4. Load: 결과 저장
transformed_sdf.write \
.partitionBy("month") \
.mode("overwrite") \
.parquet(output_path)
# 5. 요약 리포트는 Pandas로
summary = transformed_sdf.groupBy("category").count().toPandas()
print(summary.to_markdown(index=False))
run_pipeline(spark, "s3a://bucket/raw/", "s3a://bucket/processed/")
로컬에서 클러스터로 배포하기
로컬 개발이 끝나면 클러스터에 배포합니다. 변경이 필요한 부분은 SparkSession 설정뿐입니다.
import os
def create_spark_session():
env = os.getenv("DEPLOY_ENV", "local")
builder = SparkSession.builder.appName("DataPipeline")
if env == "local":
builder = builder.master("local[*]")
elif env == "cluster":
builder = builder \
.master("yarn") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "2") \
.config("spark.num.executors", "10")
builder = builder.config(
"spark.sql.execution.arrow.pyspark.enabled", "true"
)
return builder.getOrCreate()
클러스터 제출 명령어:
# YARN 클러스터 모드
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 4g \
--num-executors 10 \
pipeline.py
배포 시 체크리스트
- 의존성 관리:
--py-files로 추가 모듈 포함 - 리소스 설정: executor 메모리와 코어 수 조정
- 데이터 경로: 로컬 경로를 S3/HDFS 경로로 변경
- 로그 레벨:
spark.sparkContext.setLogLevel("WARN")으로 설정 - 모니터링: Spark UI(포트 4040)로 작업 상태 확인
성능 최적화 팁
- Arrow 활성화는 필수입니다. Pandas 변환 성능이 극적으로 개선됩니다.
toPandas()호출 전 반드시 필터링과 집계를 먼저 수행하세요.- Pandas UDF는 일반 UDF보다 최대 100배 빠릅니다. Python UDF 대신 항상 Pandas UDF를 사용하세요.
- 파티셔닝 전략을 잘 세우면 읽기 성능이 크게 향상됩니다.
cache()와persist()를 활용해 반복 사용 데이터를 메모리에 유지하세요.
마무리
PySpark와 Pandas 연동의 핵심을 정리합니다.
- Pandas는 프로토타이핑과 소규모 데이터, PySpark는 대규모 분산 처리에 최적화되어 있습니다.
- Apache Arrow를 활성화하면 두 프레임워크 간 변환 비용을 최소화할 수 있습니다.
- Pandas UDF는 분산 환경에서 Pandas의 편의성을 그대로 사용할 수 있는 가장 강력한 도구입니다.
- 로컬 개발 → 클러스터 배포 전환은 SparkSession 설정만 변경하면 됩니다.
toPandas()는 집계 후 소규모 결과에만 사용하는 것이 안전합니다.
실무에서는 이 세 가지 연동 패턴을 상황에 맞게 조합하여 사용합니다. 로컬에서 충분히 검증한 뒤 클러스터에 배포하는 워크플로우를 익히면, 데이터 규모에 관계없이 효율적인 파이프라인을 운영할 수 있습니다.
이 글이 도움이 되셨나요?
Buy me a coffee
답글 남기기