Azure Synapse Analytics SQL Pool에 Bulk Insert

Cloud/Azure / /
728x90

1.   Azures Synapse Analytics SQL Pool Bulk Insert

1)   SQL Pool 이란?

(1)    이전에 Azure SQL Data Warehouse로 서비스되었던 PaaS 형의 DBMS 서버스

(2)    기본적인 구조는 다음과 같다.


Azure Storage( 50개의 Disk)에 데이터를 분산 저장하고 쿼리 실행 시 정해진 Compute Node에 따라 데이터를 분산 처리하는 방식의 구조

(3)    데이터를 정해진 방식에 따라 나누어 저장한다.

I.       ROUND_ROBIN : 데이터를 순차적으로 분산하여 저장

II.      HASH 함수 : 해당 데이터를 해쉬 값으로 치환하여 해당 값을 기준으로 분산하여 저장


(4)    기본적으로 테이블은 CCI(Clustered Columnstore Index) 로 구성되어야 한다

(5)    Example

CREATE TABLE [dbo].[test]

  [a] [int]  NULL,

  [b] [int]  NULL

)

WITH

(

  DISTRIBUTION = ROUND_ROBIN,-- HASH ( [a] ),

  CLUSTERED COLUMNSTORE INDEX

)

GO

.

2)   Azure Storage 연결

(1)    같은 구독 내의 Azure Storage 에는 별도의 인증이 없이 SQL Pool에서 접근이 가능하다.

(2)    Azure Synapse Analytics에서는 기본적으로 3가지 방법으로 Azure Storage에 접근하는 방법을 제공한다.

I.       Azure Data Factory와 마찬가지로 Dataset을 구성하여 연결

II.      SQL Pool(Serverless or dedicated SQL pool)에서 Polybase를 이용한 연결

III.    Spark Pool에서 연결 URL PATH를 통한 연결

(3)    AD-HOC 으로 데이터를 Bulk로 입력하기 위해서는 SQL PoolPolybaseSpark Pool에서의 연결 2개를 이용한다.

3)   Spark Pool에서 Bulk Insert

(1)    Pyspark 기반으로 설명한다.

(2)    Pyspark에서 SQL DW 연결을 위해서는 JDBC를 이용한다. (Spark JVM위에서 작동되는 JAVA 기반 어플리케이션)

(3)    Example

from pyspark.sql import SparkSession

from pyspark.sql.types import *

account_name = "Storage Account"

container_name = "Container"

relative_path = "test/file_folder/path"

file_name = "filename.csv"

adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_nameaccount_namerelative_path)

 

df = spark.read.option('header''true') \

                .option('delimiter'',') \

                .csv(adls_path + '/' + file_name)

 

servername = "jdbc:sqlserver://<synapse name>.sql.azuresynapse.net"

dbname = "<sql pool name>"

url = servername + ";" + "databaseName=" + dbname + ";"

dbtable = "<table name>"

user = "<user>" 

password = '<pwd>'

 

df.write \

    .format("jdbc") \

    .mode("append") \

    .option("driver""com.microsoft.sqlserver.jdbc.SQLServerDriver") \

    .option("url"url) \

    .option("dbtable"dbtable) \

    .option("user"user) \

    .option("password"password) \

    .save()

 

 

A.     Spark SessionAzure Storagecsv 파일을 읽는다.

B.      첫 행에 컬럼명을 가지고 있을 경우 header 옵션을 true로 준다.

C.      csv파일은 기본적으로 ‘,’ delimiter(컬럼 구분자)로 가진다.

D.     jdbc 연결 정보를 입력해준다.

E.      mode의 경우 데이터를 추가할 경우 append를 실행하고 데이터를 다리 쓸 경우 overwrite를 진행한다.

가)    SQL Pool 에서 overwrite를 할 경우 테이블을 재 생성할 때 CCI를 생성하지 않기 때문엔 오류가 발생된다.

나)    overwrite 옵션을 사용하더라고 테이블을 기 생성해 놓고 .option(“truncate” , “true”) 옵션을 주어 데이터를 초기화 한 후 입력하는 방식으로 처리하여 오류를 방지한다.

df.write \

    .format("jdbc") \

    .mode("overwrite") \

    .option("driver""com.microsoft.sqlserver.jdbc.SQLServerDriver") \

    .option("url"url) \

    .option("dbtable"dbtable) \

    .option("user"user) \

    .option("password"password) \

    .option("truncate","true") \

    .save()

 

(4)    단점

I.       Spark Pool이 고 비용이기 때문에 Cluster를 정지해 놓는데 재실행하는데 시간이 오래 걸린다.

II.      데이터 입력 시 JDBC를 이용하여 Insert를 진행하기 때문에 BULK 적인 입력이라기 보다 Cursor Point를 이동하며 입력하는 방식으로 시간이 오래 걸린다.

 

4)   Polybase를 이용한 Bulk insert

(1)    polybase Microsoft 에서 제공하는 engine으로 다양한 방식의 Connector를 제공한다.

(2)    SQL Server bulk insert 구문과 비슷한 구문인 COPY구문을 사용한다.

(3)    Example

COPY INTO dbo.<table>

(a 1, b 2)

FROM 'https://<Storage Account>.dfs.core.windows.net/<Container>/test/file_folder/path/filename.csv'

WITH

(

  FILE_TYPE = 'CSV'

  ,MAXERRORS = 0

  ,DATEFORMAT = 'ymd'

  ,FIRSTROW = 2

  ,ERRORFILE = 'https://<Storage Account>.dfs.core.windows.net/<Container>/<error log create path>'

  ,IDENTITY_INSERT = 'OFF'

)

GO

 

I.       FILE_TYPE CSVParquet를 지원

II.      MAXERRORS 은 허용 에러 행 수

III.    DATEFORMAT은 날짜 형 컬럼의 데이터 상태로 { ‘mdy’ | ‘dmy’ | ‘ymd’ | ‘ydm’ | ‘myd’ | ‘dym’ } 의 형태를 지원한다.

IV.    첫 행에 컬럼명 정보를 가지고 있다면 데이터의 FIRSTROW2로 설정한다.

V.      ERROFILE은 에러 발생했을 시 해당 데이터와 ROW 정보를 해당 PATH에 파일로 생성한다.

VI.    IDENTITIY_INSERT 옵션은 SQL Server와 마찬가지로 자동증가 컬럼이 있을 경우 자동증가를 무시하고 값을 넣는 방식이다.

 

2.   출처

1)      https://docs.microsoft.com/ko-kr/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest

2)      https://docs.microsoft.com/ko-kr/azure/synapse-analytics/spark/synapse-spark-sql-pool-import-export

3)      https://docs.microsoft.com/ko-kr/azure/synapse-analytics/sql-data-warehouse/massively-parallel-processing-mpp-architecture

 

 

728x90
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기