Working with AWS S3 Storage Using Spark
Start Spark-shell with the following packages
[ec2-user@ip-172-31-28-70 spark]$ bin/pyspark --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.
[ec2-user@ip-172-31-28-70 spark]$ bin/pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:3.2.0
Python 3.7.10 (default, Jun 3 2021, 00:02:01)
[GCC 7.3.1 20180712 (Red Hat 7.3.1-13)] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
com.amazonaws#aws-java-sdk-pom added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-68c25d16-55e8-459c-9fa1-912fa874707a;1.0
confs: [default]
found com.amazonaws#aws-java-sdk-pom;1.10.34 in central
found org.apache.hadoop#hadoop-aws;3.2.0 in central
found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar ...
[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.2.0!hadoop-aws.jar (63ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar ...
[SUCCESSFUL ] com.amazonaws#aws-java-sdk-bundle;1.11.375!aws-java-sdk-bundle.jar (1127ms)
:: resolution report :: resolve 1336ms :: artifacts dl 1195ms
:: modules in use:
com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
com.amazonaws#aws-java-sdk-pom;1.10.34 from central in [default]
org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 2 | 2 | 0 || 2 | 2 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-68c25d16-55e8-459c-9fa1-912fa874707a
confs: [default]
2 artifacts copied, 0 already retrieved (96887kB/71ms)
21/08/19 10:05:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Python version 3.7.10 (default, Jun 3 2021 00:02:01)
Spark context Web UI available at http://ip-172-31-28-70.us-west-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1629367523610).
SparkSession available as 'spark'.
>>>
>>> df = spark.read.format("csv").options(header=True, inferSchema=True).load("s3a://data.einext.com/stocks")
>>> df.show()
+----------+---------+---------+---------+---------+---------+---------+------+
| date| open| high| low| close| volume| adjclose|symbol|
+----------+---------+---------+---------+---------+---------+---------+------+
|2000-07-17| 95.4375| 97.5| 92.75| 96.625|3508100.0|74.269199| XLNX|
|2000-07-17| 22.625| 22.75| 22.4375| 22.5625| 201600.0| 13.48614| ES|
|2000-07-17| 6.750002| 6.937503| 6.375| 6.5|1235700.0| 5.241649| CHK|
|2000-07-17|19.812501| 20.1875|19.500001| 20.1875|1434100.0| 3.806147| NI|
|2000-07-17| 30.5| 30.6875| 30.0| 30.03125| 254600.0| 19.81183| SNA|
|2000-07-17|44.749996|45.062498|44.500004|45.000009| 535200.0|17.400773| FOXA|
|2000-07-17| 19.625| 19.625| 19.25| 19.375| 309500.0|13.768835| R|
|2000-07-17| 16.6562| 16.6875| 16.125| 16.25|5507200.0| 1.755466| ROST|
|2000-07-17| 56.25| 57.25| 56.0625| 56.125|7941200.0| 18.31076| PG|
|2000-07-17|54.000326|54.000326|52.500318|53.375325|3725000.0|71.068871| TYC|
|2000-07-17| 58.75| 58.875| 57.8125| 58.0| 182700.0|37.544123| XL|
|2000-07-17|47.500132|47.500132|45.750135|46.343886|4898700.0|17.662922| F|
|2000-07-17| 84.0| 84.5| 82.625|82.671883|2861800.0| 23.88973| CVX|
|2000-07-17| 22.5| 22.75| 22.375| 22.75| 423600.0| 5.942444| PPL|
|2000-07-17| 37.4375| 37.5625| 36.5625| 37.4375| 738800.0|24.832407| TRV|
|2000-07-17|76.874999|80.937504|76.249993|78.250003|6166200.0| 50.37851| A|
|2000-07-17| 26.5| 26.6875| 26.3125| 26.5| 335200.0| 6.240835| LNT|
|2000-07-17| 23.9375| 24.0625| 23.5| 23.9375| 648400.0| 23.9375| AZO|
|2000-07-17| 60.875| 60.9375| 60.25|60.531239|1464800.0|22.017028| UTX|
|2000-07-17| 3.5625| 3.625005| 3.312495| 3.437505| 340000.0| 2.20411| RRC|
+----------+---------+---------+---------+---------+---------+---------+------+
only showing top 20 rows
>>> df.write.save("s3a://tmp.einext.com/stocks_parquet")
>>> exit()
Scala version of the code
$ spark-shell --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
Set access key and access Key secret in Hadoop configuration
scala> val sc = spark.sparkContext
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<access_key_secret>")
Load file from S3 as Dataset
scala> val df = spark.read.format("csv").load("s3n://einext.com/data/Olympics2016.csv")
scala> df.show