Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲みたいときは DataDrame で write するときに quoteAll=True を指定してやればよい。
outputDf = newDf.repartition(1) s3OutputPath ="s3://dl-sfdc-dm/test/newline_test" outputDf.write.mode('append').csv(s3OutputPath, quoteAll=True, escape='"')
補足
Glue の DynamicFrame はデフォルトで quoteChar はダブルクオート(")だが、全てのカラムにダブルクオートがつかなかったので、DataFrame を使った。
datasink2 = glueContext.write_dynamic_frame.from_options( frame = result, connection_type = "s3", connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, format = "csv", transformation_ctx = "datasink2")
Format Options for ETL Inputs and Outputs in AWS Glue - AWS Glueformat="csv"
This value designates comma-separated-values as the data format (for example, see RFC 4180 and RFC 7111).
You can use the following format_options values with format="csv":
- separator — Specifies the delimiter character. The default is a comma: ",", but any other character can be specified.
- escaper — Specifies a character to use for escaping. This option is used only when reading CSV files. The default value is none. If enabled, the character which immediately follows is used as-is, except for a small set of well-known escapes (\n, \r, \t, and \0).
- quoteChar — Specifies the character to use for quoting. The default is a double quote: '"'. Set this to -1 to turn off quoting entirely.
- multiLine — A Boolean value that specifies whether a single record can span multiple lines. This can occur when a field contains a quoted new-line character. You must set this option to True if any record spans multiple lines. The default value is False, which allows for more aggressive file-splitting during parsing.
- withHeader — A Boolean value that specifies whether to treat the first line as a header. The default value is False. This option can be used in the DynamicFrameReader class.
- writeHeader — A Boolean value that specifies whether to write the header to output. The default value is True. This option can be used in the DynamicFrameWriter class.
- skipFirst — A Boolean value that specifies whether to skip the first data line. The default value is False.
The following example shows how to specify the format options within an AWS Glue ETL job script.
glueContext.write_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = { "path": "s3://s3path" }, format = "csv", format_options={ "quoteChar": -1, "separator": "|" }, transformation_ctx = "datasink2")
参考
pyspark.sql.DataFrameWriter.csv
DataFrameWriter.csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None)[source]
Saves the content of the DataFrame in CSV format at the specified path.
(中略)pyspark.sql.DataFrameWriter.csv — PySpark 3.1.2 documentation
- quoteAllstr or bool, optional
- a flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value false, only escaping values containing a quote character.
コード全量
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import col from pyspark.sql.functions import * ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "newline_test", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "newline_test", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string")], transformation_ctx = "applymapping1") df = DynamicFrame.toDF(applymapping1) newDf = df.withColumn("col2", regexp_replace(col("col2"), "\\n|\\r", " ")) result = DynamicFrame.fromDF(newDf, glueContext, "result") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] #datasink2 = glueContext.write_dynamic_frame.from_options(frame = result, connection_type = "s3", connection_options = {"path": "s3://dl-sfdc-dm/test/newline_test"}, format = "csv", transformation_ctx = "datasink2") outputDf = newDf.repartition(1) s3OutputPath ="s3://dl-sfdc-dm/test/newline_test" outputDf.write.mode('append').csv(s3OutputPath, quoteAll=True, escape='"') job.commit()