본문 바로가기
카테고리 없음

[Data Lake]MongoDB Connector for Spark

by IT Journeyman 2024. 8. 7.

TL;DR

사용자가 Spark에서 MongoDB를 쓰거나 읽기를 할 수 있는 Connector 예제입니다. 

IT Journeyman

많은 분들이 Hadoop 좀 어떻게 해달라고 말씀하시는 것을 보면, Hadoop은 모든 운영자들의 악몽인 듯 합니다. Hadoop을 대체해서 혹은 병행해서 Spark를 쓰고 있다면, Data Pipeline에서 MongoDB를 다양하게 활용할 수 있습니다. 

(Full name : Apache Hadoop, Apache Spark)

 

Table of Contents

 

Executive Summary

● Use Cases

● Reference

0. Test Environment

0-1 : Test Server :  t3.micro 2 vCPU, 1GB

0-2 : Java  Install 

0-3 : Spark  Install

1. Connect to MongoDB w/ Pyspark

2. Retrieve MongoDB Collection

3. Filters

3. 1 Available Filter Operations

3.2 Filter 예제

4. SQL Queries

5. Insert into MongoDB Collection

6. Upload CSV files into MongoDB Collection

 

Executive Summary

  • Use Cases 
  • 기존의 Spark 인프라와 MongoDB를 같이 사용할 수 있어 Spark에 대한 투자를 보호
    • 현재 사용 중인 Data Pipeline이 Spark로 개발되어 있다면, Spark Connector를 통하여 Data Pipeline에 Input and output으로 MongoDB Atlas를 활용할 수 있습니다.
    • 예를 들어 Spark's machine learning (ML)를 이용하여 추천 서비스를 구현하였는데 추천Set DB로 기존의 RDB를 대체하여 성능과 개발 생산성이 좋은 MongoDB를 활용하고자 한다면, Spark Connector를 활용하여 기존 개발된 Spark Infra.를 활용하여 투자를 보호할 수 있습니다.
  • MongoDB's aggregation pipeline을 Spark에서 활용 가능
    MongoDB에서 Data를 읽어서 Spark로 보낼 때, 데이터를 가공하고 filter하는 다양한 rule을 만들어 활용할 수 있습니다.

 

 

 

0. Test Environment

0-1 : Test Server :  t3.micro 2 vCPU, 1GB

ubuntu@ip-172-31-0-102:~$ hostnamectl

 Static hostname: ip-172-31-0-102

       Icon name: computer-vm

         Chassis: vm 🖴

      Machine ID: ec29c31f426b243694e0b57a5d5bf064

         Boot ID: 0f0b1859d25844ae8148c2dc26ab7fb3

  Virtualization: amazon

Operating System: Ubuntu 24.04 LTS                

          Kernel: Linux 6.8.0-1009-aws

    Architecture: x86-64

 Hardware Vendor: Amazon EC2

  Hardware Model: t3.micro

Firmware Version: 1.0

   Firmware Date: Mon 2017-10-16

    Firmware Age: 6y 9month 2w 4d    

0-2 : Java  Install

sudo apt update

sudo apt install openjdk-17-jdk

 

ubuntu@ip-172-31-0-102:~$ java --version

openjdk 17.0.12 2024-07-16

OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu224.04)

OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu224.04, mixed mode, sharing)




0-3 : Spark  Install

Spark Download 링크에 최신 버전 다운로드(Downloads | Apache Spark)

ubuntu@ip-172-31-0-196:~$ wget https://www.apache.org/dyn/closer.lua/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

 

tar -xvf *.*

 

ubuntu@ip-172-31-0-102:~/spark-3.5.1-bin-hadoop3/bin$ ./pyspark

Python 3.12.3 (main, Apr 10 2024, 05:33:47) [GCC 13.2.0] on linux

Type "help", "copyright", "credits" or "license" for more information.

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

24/08/03 17:01:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 3.5.1

      /_/

 

Using Python version 3.12.3 (main, Apr 10 2024 05:33:47)

Spark context Web UI available at http://ip-172-31-0-102.ap-northeast-2.compute.internal:4040

Spark context available as 'sc' (master = local[*], app id = local-1722704469612).

SparkSession available as 'spark'.

>>> exit()

 

 

1. Connect to MongoDB w/ Pyspark

최신 드라이버 10.3으로 설정(org/mongodb/spark/mongo-spark-connector_2.12)

최초 접속 시 Driver가 Download됨.

./pyspark --conf "spark.mongodb.read.connection.uri=mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users?readPreference=primaryPreferred" \

--conf "spark.mongodb.write.connection.uri=mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users" \

--packages org.mongodb.spark:mongo-spark-connector_2.12:10.3.0


2. Retrieve MongoDB Collection 

>>> df = spark.read.format("mongodb") \

... .option("spark.mongodb.read.database", "sample_mflix") \

... .option("spark.mongodb.read.collection", "users") \

... .option("uri", "mongodb+srv://it_admin:***@m0cluster.nnuhput.mongodb.net/sample_mflix.users") \

... .load()

 

>>> df.show(5)

+--------------------+--------------------+----------------+--------------------+-----------+

|                 _id|               email|            name|            password|preferences|

+--------------------+--------------------+----------------+--------------------+-----------+

|59b99db4cfa9a34dc...|sean_bean@gameoft...|       Ned Stark|$2b$12$UREFwsRUoy...|       NULL|

|59b99db4cfa9a34dc...|mark_addy@gameoft...|Robert Baratheon|$2b$12$yGqxLG9LZp...|       NULL|

|59b99db5cfa9a34dc...|nikolaj_coster-wa...| Jaime Lannister|$2b$12$6vz7wiwO.E...|       NULL|

|59b99db5cfa9a34dc...|michelle_fairley@...|   Catelyn Stark|$2b$12$fiaTH5Sh1z...|       NULL|

|59b99db6cfa9a34dc...|lena_headey@gameo...|Cersei Lannister|$2b$12$FExjgr7CLh...|       NULL|

+--------------------+--------------------+----------------+--------------------+-----------+

only showing top 5 rows

 

 

3. Filters 

3. 1 Available Filter Operations

아래와 같이 Filter를 사용할 수 있습니다.
(Read from MongoDB in Batch Mode - Spark Connector v10.3)

 

3.2 Filter 예제

아래 처럼 데이터가 있을 때, filter  조건을 추가해서 데이터를 가져올 수 있습니다.

>>> dataFrame = spark.read \

...                  .format("mongodb") \

...                  .option("database", "fruit") \

...                  .option("collection", "fruit") \

...                  .load()

>>> dataFrame.show()

+---+---+------+

|_id|qty|  type|

+---+---+------+

|  1|  5| apple|

|  2| 10|orange|

|  3| 15|banana|

+---+---+------+



>>> dataFrame.filter(dataFrame['qty'] >= 10).show()

+---+---+------+

|_id|qty|  type|

+---+---+------+

|  2| 10|orange|

|  3| 15|banana|

+---+---+------+



 

 

 

 

 

 

 

4. SQL Queries

SQL query를 하기 위해서는 먼저 temporary table을 아래처럼 register하고 수행합니다.

 

>>> dataFrame.createOrReplaceTempView("temp")

>>> some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")

>>> some_fruit.show()

+------+---+

|  type|qty|

+------+---+

| apple|  5|

|orange| 10|

+------+---+




 

 

5. Insert into MongoDB Collection  

dataFrame = spark.createDataFrame([("Bilbo Baggins"50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])

 

dataFrame.write.format("mongodb") \

.mode("append") \

.option("database", "people")\

.option("collection", "contacts") \

.save()

 

MongoDB Compass로 데이터 생성 확인



6. Upload CSV files into MongoDB Collection  

아래처럼 pyspark에서 csv파일을 읽어서 MongoDB에 저장할 수 있습니다.

 

>>> csv_file_path = "./sample_mflix.users.csv"

>>> df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

>>>

>>> df.show(5)

+--------------------+----------------+--------------------+--------------------+

|                 _id|            name|               email|            password|

+--------------------+----------------+--------------------+--------------------+

|59b99db4cfa9a34dc...|       Ned Stark|sean_bean@gameoft...|$2b$12$UREFwsRUoy...|

|59b99db4cfa9a34dc...|Robert Baratheon|mark_addy@gameoft...|$2b$12$yGqxLG9LZp...|

|59b99db5cfa9a34dc...| Jaime Lannister|nikolaj_coster-wa...|$2b$12$6vz7wiwO.E...|

|59b99db5cfa9a34dc...|   Catelyn Stark|michelle_fairley@...|$2b$12$fiaTH5Sh1z...|

|59b99db6cfa9a34dc...|Cersei Lannister|lena_headey@gameo...|$2b$12$FExjgr7CLh...|

+--------------------+----------------+--------------------+--------------------+

only showing top 5 rows

 

>>> df.write.format("mongodb").mode("append").save()

 

아래처럼 fruit.users가 생성된 것을 확인할 수 있습니다.