TL;DR
사용자가 Spark에서 MongoDB를 쓰거나 읽기를 할 수 있는 Connector 예제입니다.
IT Journeyman
많은 분들이 Hadoop 좀 어떻게 해달라고 말씀하시는 것을 보면, Hadoop은 모든 운영자들의 악몽인 듯 합니다. Hadoop을 대체해서 혹은 병행해서 Spark를 쓰고 있다면, Data Pipeline에서 MongoDB를 다양하게 활용할 수 있습니다.
(Full name : Apache Hadoop, Apache Spark)
Table of Contents
Executive Summary
● Use Cases
● Reference
0. Test Environment
0-1 : Test Server : t3.micro 2 vCPU, 1GB
0-2 : Java Install
0-3 : Spark Install
1. Connect to MongoDB w/ Pyspark
2. Retrieve MongoDB Collection
3. Filters
3. 1 Available Filter Operations
3.2 Filter 예제
4. SQL Queries
5. Insert into MongoDB Collection
6. Upload CSV files into MongoDB Collection
Executive Summary
- Use Cases
- 기존의 Spark 인프라와 MongoDB를 같이 사용할 수 있어 Spark에 대한 투자를 보호
- 현재 사용 중인 Data Pipeline이 Spark로 개발되어 있다면, Spark Connector를 통하여 Data Pipeline에 Input and output으로 MongoDB Atlas를 활용할 수 있습니다.
- 예를 들어 Spark's machine learning (ML)를 이용하여 추천 서비스를 구현하였는데 추천Set DB로 기존의 RDB를 대체하여 성능과 개발 생산성이 좋은 MongoDB를 활용하고자 한다면, Spark Connector를 활용하여 기존 개발된 Spark Infra.를 활용하여 투자를 보호할 수 있습니다.
- MongoDB's aggregation pipeline을 Spark에서 활용 가능
MongoDB에서 Data를 읽어서 Spark로 보낼 때, 데이터를 가공하고 filter하는 다양한 rule을 만들어 활용할 수 있습니다.
- Reference
- Manual : MongoDB Connector for Spark
- 예제 : Docker for MongoDB and Apache Spark.
GitHub - sindbach/mongodb-spark-docker: An example of docker compose to set up a single Spark node connecting to MongoDB via Spark Connector - 예제: Using MongoDB with Jupyter Labs
GitHub - RWaltersMA/mongo-spark-jupyter: Docker environment that spins up MongoDB replica set, Spark, and Jupyter Lab. Example code uses PySpark and the MongoDB Spark Connector. - Tech Blog : Mongo-Spark Connector Deep Dive
Mongo-Spark Connector Deep Dive: Projection Pushdown | by Yerachmiel Feltzman | Zencity Engineering | Medium
0. Test Environment
0-1 : Test Server : t3.micro 2 vCPU, 1GB
ubuntu@ip-172-31-0-102:~$ hostnamectl
Static hostname: ip-172-31-0-102
Icon name: computer-vm
Chassis: vm 🖴
Machine ID: ec29c31f426b243694e0b57a5d5bf064
Boot ID: 0f0b1859d25844ae8148c2dc26ab7fb3
Virtualization: amazon
Operating System: Ubuntu 24.04 LTS
Kernel: Linux 6.8.0-1009-aws
Architecture: x86-64
Hardware Vendor: Amazon EC2
Hardware Model: t3.micro
Firmware Version: 1.0
Firmware Date: Mon 2017-10-16
Firmware Age: 6y 9month 2w 4d
0-2 : Java Install
sudo apt update
sudo apt install openjdk-17-jdk
ubuntu@ip-172-31-0-102:~$ java --version
openjdk 17.0.12 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu224.04)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu224.04, mixed mode, sharing)
0-3 : Spark Install
Spark Download 링크에 최신 버전 다운로드(Downloads | Apache Spark)
ubuntu@ip-172-31-0-196:~$ wget https://www.apache.org/dyn/closer.lua/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
tar -xvf *.*
ubuntu@ip-172-31-0-102:~/spark-3.5.1-bin-hadoop3/bin$ ./pyspark
Python 3.12.3 (main, Apr 10 2024, 05:33:47) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/03 17:01:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.1
/_/
Using Python version 3.12.3 (main, Apr 10 2024 05:33:47)
Spark context Web UI available at http://ip-172-31-0-102.ap-northeast-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1722704469612).
SparkSession available as 'spark'.
>>> exit()
1. Connect to MongoDB w/ Pyspark
최신 드라이버 10.3으로 설정(org/mongodb/spark/mongo-spark-connector_2.12)
최초 접속 시 Driver가 Download됨.
./pyspark --conf "spark.mongodb.read.connection.uri=mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.3.0

2. Retrieve MongoDB Collection
>>> df = spark.read.format("mongodb") \
... .option("spark.mongodb.read.database", "sample_mflix") \
... .option("spark.mongodb.read.collection", "users") \
... .option("uri", "mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users") \
... .load()
>>> df.show(5)
+--------------------+--------------------+----------------+--------------------+-----------+
| _id| email| name| password|preferences|
+--------------------+--------------------+----------------+--------------------+-----------+
|59b99db4cfa9a34dc...|sean_bean@gameoft...| Ned Stark|$2b$12$UREFwsRUoy...| NULL|
|59b99db4cfa9a34dc...|mark_addy@gameoft...|Robert Baratheon|$2b$12$yGqxLG9LZp...| NULL|
|59b99db5cfa9a34dc...|nikolaj_coster-wa...| Jaime Lannister|$2b$12$6vz7wiwO.E...| NULL|
|59b99db5cfa9a34dc...|michelle_fairley@...| Catelyn Stark|$2b$12$fiaTH5Sh1z...| NULL|
|59b99db6cfa9a34dc...|lena_headey@gameo...|Cersei Lannister|$2b$12$FExjgr7CLh...| NULL|
+--------------------+--------------------+----------------+--------------------+-----------+
only showing top 5 rows
3. Filters
3. 1 Available Filter Operations
아래와 같이 Filter를 사용할 수 있습니다.
(Read from MongoDB in Batch Mode - Spark Connector v10.3)

3.2 Filter 예제
아래 처럼 데이터가 있을 때, filter 조건을 추가해서 데이터를 가져올 수 있습니다.
>>> dataFrame = spark.read \
... .format("mongodb") \
... .option("database", "fruit") \
... .option("collection", "fruit") \
... .load()
>>> dataFrame.show()
+---+---+------+
|_id|qty| type|
+---+---+------+
| 1| 5| apple|
| 2| 10|orange|
| 3| 15|banana|
+---+---+------+
>>> dataFrame.filter(dataFrame['qty'] >= 10).show()
+---+---+------+
|_id|qty| type|
+---+---+------+
| 2| 10|orange|
| 3| 15|banana|
+---+---+------+
4. SQL Queries
SQL query를 하기 위해서는 먼저 temporary table을 아래처럼 register하고 수행합니다.
>>> dataFrame.createOrReplaceTempView("temp")
>>> some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
>>> some_fruit.show()
+------+---+
| type|qty|
+------+---+
| apple| 5|
|orange| 10|
+------+---+
5. Insert into MongoDB Collection
dataFrame = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
dataFrame.write.format("mongodb") \
.mode("append") \
.option("database", "people")\
.option("collection", "contacts") \
.save()
MongoDB Compass로 데이터 생성 확인

6. Upload CSV files into MongoDB Collection
아래처럼 pyspark에서 csv파일을 읽어서 MongoDB에 저장할 수 있습니다.
>>> csv_file_path = "./sample_mflix.users.csv"
>>> df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
>>>
>>> df.show(5)
+--------------------+----------------+--------------------+--------------------+
| _id| name| email| password|
+--------------------+----------------+--------------------+--------------------+
|59b99db4cfa9a34dc...| Ned Stark|sean_bean@gameoft...|$2b$12$UREFwsRUoy...|
|59b99db4cfa9a34dc...|Robert Baratheon|mark_addy@gameoft...|$2b$12$yGqxLG9LZp...|
|59b99db5cfa9a34dc...| Jaime Lannister|nikolaj_coster-wa...|$2b$12$6vz7wiwO.E...|
|59b99db5cfa9a34dc...| Catelyn Stark|michelle_fairley@...|$2b$12$fiaTH5Sh1z...|
|59b99db6cfa9a34dc...|Cersei Lannister|lena_headey@gameo...|$2b$12$FExjgr7CLh...|
+--------------------+----------------+--------------------+--------------------+
only showing top 5 rows
>>> df.write.format("mongodb").mode("append").save()
아래처럼 fruit.users가 생성된 것을 확인할 수 있습니다.
