Spark
S3 にある Parquet ファイルを Glue の Crawler でクロールしてテーブルを作成し、文字列型のカラムを数値型に変更するとエラーになるという当たりまり前(Parquet は項目定義に型を持っているバイナリファイルのため)のことを検証した。型を変えたい場合は…
事象 Spark on EMR で Glue カタログのデータベース名を表示しようとすると、"because no identity-based policy allows the glue:GetDatabase action" で AccessDeniedException が発生する。 $ pyspark >>> from pyspark.sql import SparkSession >>> spar…
PySpark でタイムスタンプを UTC から JST に変換する例。 # 文字列をタイムスタンプ型に変換 df = df.withColumn("timestamp", col("timestamp").cast("Timestamp")) # UTC から JST に変換 df = df.withColumn("timestamp", from_utc_timestamp(col("times…
Glue PySpark で CSV 出力時に全カラムをダブルクオートで囲みたいときは DataDrame で write するときに quoteAll=True を指定してやればよい。 outputDf = newDf.repartition(1) s3OutputPath ="s3://dl-sfdc-dm/test/newline_test" outputDf.write.mode('…
Glue PySpark で CSV のカラム内の改行コードを置換する例。Spark では正規表現は Java の記法になる。 newDf = df.withColumn("col2", regexp_replace(col("col2"), "\\n|\\r", " ")) サンプルコード全量 import sys from awsglue.transforms import * from…
PySpark では Java の正規表現を使う Regex in pyspark internally uses java regex.One of the common issue with regex is escaping backslash as it uses java regex and we will pass raw python string to spark.sql we can see it with a sample examp…
Spark 2.4.3 pyspark.sql module
pyspark.sql module の select、concat、col で DataFrame に複数カラムを連結したカラムを追加する。.alias("...") で連結したカラムに別名をつけている。 from pyspark.sql.functions import concat, col, lit df = df.select(col("col1"), col("col2"), c…
pyspark.sql module の select で DataFrame の全カラムを取得する。 df = df.select([column for column in df.columns]) 参考 drop_list = ['a column', 'another column', ...] df.select([column for column in df.columns if column not in drop_list])…
DataFrame を SparkSQL で操作する サンプル df.registerTempTable('table1') df_res = spark.sql('select * from table1') df_res.show() 参考: PySpark の DataFrame を SparkSQL で操作する - CUBE SUGAR CONTAINER Timestamp 型に変更する やりたいこと …
カウントする df.count() スキーマを表示する Spark DataframeのSample Code集 - Qiita print df.printSchema() DynamicFrame Dataframe 変換 from awsglue.dynamicframe import DynamicFrame # DynamicFrame -> Spark DataFrame df = DynamicFrame.toDF(<DynamicFrame>) #</dynamicframe>…
https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html の lineorder テーブルのデータを増幅する PySpark スクリプト for Glue ジョブ。実行状況は Spark History UI から確認する(AWS マネジメントコンソ…
事象 PySpark で DataFrame にリテラルで列を追加しようとすると "col should be Column" と怒られる。 コード import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from …
Spark Meetup Tokyo #2 (Spark+AI Summit EU 2019) - connpass に参加してきた。今度は Spark について、どうでもいことに Dive deep して話してみるのも面白そう。 Spark+AI Summit Europe 2019 セッションハイライト by 萩原 悠二/Yuji Hagiwara and 酒井 …
Spark DataFrame の repartition(パーティション数, カラム名) とすると指定したカラムで指定したパーティション数にパーティショニングする。パーティション数を省略するとデフォルト値(Spark 2.3.2 では 200)になる。 repartition(numPartitions, *cols)…