PySpark와 Pandas 연동으로 배우는 대규모 데이터 파이프라인 구축 실전 가이드

PySpark와 Pandas, 왜 함께 써야 할까?

데이터 분석 실무에서 Pandas는 가장 친숙한 도구입니다. 직관적인 API, 풍부한 생태계, 빠른 프로토타이핑이 가능하죠. 하지만 데이터가 수십 GB를 넘어가는 순간 메모리 한계에 부딪힙니다.

PySpark는 이 한계를 돌파합니다. 분산 처리 엔진인 Apache Spark의 Python API로, 수백 GB~TB 규모 데이터를 클러스터에서 병렬 처리할 수 있습니다.

핵심은 로컬에서는 Pandas로 빠르게 개발하고, 대규모 데이터는 PySpark로 처리하되, 둘 사이를 매끄럽게 연동하는 것입니다.

Pandas vs PySpark 비교

항목 Pandas PySpark
처리 규모 수 GB (단일 머신 메모리) 수 TB (분산 클러스터)
실행 방식 즉시 실행 (Eager) 지연 실행 (Lazy)
학습 곡선 낮음 중간
생태계 scikit-learn, matplotlib 등 Spark MLlib, Spark SQL
디버깅 쉬움 상대적으로 어려움
적합 용도 EDA, 프로토타이핑 ETL, 대규모 배치 처리

로컬 개발 환경 설정

먼저 로컬에서 PySpark 개발 환경을 구성합니다.

# 설치
# pip install pyspark pandas pyarrow

from pyspark.sql import SparkSession

# 로컬 모드 SparkSession 생성
spark = SparkSession.builder \
    .appName("DataPipeline") \
    .master("local[*]") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print(f"Spark 버전: {spark.version}")

spark.sql.execution.arrow.pyspark.enabledtrue로 설정하면 Apache Arrow를 통해 PySpark ↔ Pandas 변환 속도가 최대 10배 이상 빨라집니다.

핵심 연동 패턴 3가지

패턴 1: Pandas DataFrame → Spark DataFrame

Pandas로 전처리한 데이터를 Spark로 넘겨 대규모 처리를 수행합니다.

import pandas as pd

# Pandas로 데이터 로드 및 전처리
pdf = pd.read_csv("sample_data.csv")
pdf["date"] = pd.to_datetime(pdf["date"])
pdf = pdf.dropna(subset=["user_id"])

# Spark DataFrame으로 변환
sdf = spark.createDataFrame(pdf)
sdf.printSchema()
sdf.show(5)

패턴 2: Spark DataFrame → Pandas DataFrame

분산 처리 결과를 Pandas로 가져와 시각화하거나 ML 모델에 활용합니다.

# Spark에서 집계 처리
result_sdf = sdf.groupBy("category") \
    .agg(
        {"amount": "sum", "user_id": "countDistinct"}
    )

# Pandas로 변환 (결과가 작을 때만!)
result_pdf = result_sdf.toPandas()

# Pandas에서 시각화
import matplotlib.pyplot as plt
result_pdf.plot(kind="bar", x="category", y="sum(amount)")
plt.title("카테고리별 매출")
plt.tight_layout()
plt.savefig("category_sales.png")

주의: toPandas()는 전체 데이터를 드라이버 메모리로 가져옵니다. 반드시 집계 후 결과가 작아진 상태에서 호출하세요.

패턴 3: Pandas UDF (Vectorized UDF)

가장 강력한 연동 방식입니다. Spark의 분산 처리 안에서 Pandas 로직을 실행합니다.

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

@pandas_udf(DoubleType())
def normalize(series: pd.Series) -> pd.Series:
    """Min-Max 정규화를 분산 환경에서 실행"""
    return (series - series.min()) / (series.max() - series.min())

# Spark DataFrame에 적용
sdf = sdf.withColumn("amount_normalized", normalize(sdf["amount"]))

연동 패턴 선택 가이드

상황 권장 패턴 이유
소규모 데이터 전처리 후 분산 처리 Pandas → Spark Pandas의 편의성 활용
분산 집계 결과 시각화 Spark → Pandas 집계 후 소규모 데이터
복잡한 행 단위 변환 Pandas UDF 분산 + Pandas 로직
그룹별 커스텀 모델 적용 GroupedMap Pandas UDF 그룹별 독립 처리

실전 데이터 파이프라인 구축

실무에서 자주 사용하는 ETL 파이프라인 예제입니다.

def run_pipeline(spark, input_path, output_path):
    # 1. Extract: 대규모 데이터 로드
    raw_sdf = spark.read.parquet(input_path)
    print(f"원본 레코드 수: {raw_sdf.count():,}")

    # 2. Transform: Spark SQL로 대규모 변환
    raw_sdf.createOrReplaceTempView("raw_data")
    transformed_sdf = spark.sql("""
        SELECT
            user_id,
            category,
            DATE_TRUNC('month', event_date) AS month,
            SUM(amount) AS total_amount,
            COUNT(*) AS event_count
        FROM raw_data
        WHERE event_date >= '2024-01-01'
        GROUP BY user_id, category, DATE_TRUNC('month', event_date)
    """)

    # 3. Pandas UDF로 고급 변환 적용
    transformed_sdf = transformed_sdf.withColumn(
        "amount_normalized", normalize(transformed_sdf["total_amount"])
    )

    # 4. Load: 결과 저장
    transformed_sdf.write \
        .partitionBy("month") \
        .mode("overwrite") \
        .parquet(output_path)

    # 5. 요약 리포트는 Pandas로
    summary = transformed_sdf.groupBy("category").count().toPandas()
    print(summary.to_markdown(index=False))

run_pipeline(spark, "s3a://bucket/raw/", "s3a://bucket/processed/")

로컬에서 클러스터로 배포하기

로컬 개발이 끝나면 클러스터에 배포합니다. 변경이 필요한 부분은 SparkSession 설정뿐입니다.

import os

def create_spark_session():
    env = os.getenv("DEPLOY_ENV", "local")

    builder = SparkSession.builder.appName("DataPipeline")

    if env == "local":
        builder = builder.master("local[*]")
    elif env == "cluster":
        builder = builder \
            .master("yarn") \
            .config("spark.executor.memory", "4g") \
            .config("spark.executor.cores", "2") \
            .config("spark.num.executors", "10")

    builder = builder.config(
        "spark.sql.execution.arrow.pyspark.enabled", "true"
    )

    return builder.getOrCreate()

클러스터 제출 명령어:

# YARN 클러스터 모드
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --executor-memory 4g \
    --num-executors 10 \
    pipeline.py

배포 시 체크리스트

  1. 의존성 관리: --py-files로 추가 모듈 포함
  2. 리소스 설정: executor 메모리와 코어 수 조정
  3. 데이터 경로: 로컬 경로를 S3/HDFS 경로로 변경
  4. 로그 레벨: spark.sparkContext.setLogLevel("WARN")으로 설정
  5. 모니터링: Spark UI(포트 4040)로 작업 상태 확인

성능 최적화 팁

  • Arrow 활성화는 필수입니다. Pandas 변환 성능이 극적으로 개선됩니다.
  • toPandas() 호출 전 반드시 필터링과 집계를 먼저 수행하세요.
  • Pandas UDF는 일반 UDF보다 최대 100배 빠릅니다. Python UDF 대신 항상 Pandas UDF를 사용하세요.
  • 파티셔닝 전략을 잘 세우면 읽기 성능이 크게 향상됩니다.
  • cache()persist()를 활용해 반복 사용 데이터를 메모리에 유지하세요.

마무리

PySpark와 Pandas 연동의 핵심을 정리합니다.

  • Pandas는 프로토타이핑과 소규모 데이터, PySpark는 대규모 분산 처리에 최적화되어 있습니다.
  • Apache Arrow를 활성화하면 두 프레임워크 간 변환 비용을 최소화할 수 있습니다.
  • Pandas UDF는 분산 환경에서 Pandas의 편의성을 그대로 사용할 수 있는 가장 강력한 도구입니다.
  • 로컬 개발 → 클러스터 배포 전환은 SparkSession 설정만 변경하면 됩니다.
  • toPandas()집계 후 소규모 결과에만 사용하는 것이 안전합니다.

실무에서는 이 세 가지 연동 패턴을 상황에 맞게 조합하여 사용합니다. 로컬에서 충분히 검증한 뒤 클러스터에 배포하는 워크플로우를 익히면, 데이터 규모에 관계없이 효율적인 파이프라인을 운영할 수 있습니다.

이 글이 도움이 되셨나요?

Buy me a coffee

코멘트

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다

TODAY 141 | TOTAL 141