Working with AWS S3 Storage Using Spark

Start Spark-shell with the following packages


[ec2-user@ip-172-31-28-70 spark]$ bin/pyspark --version

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 3.1.2

/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302

Branch HEAD

Compiled by user centos on 2021-05-24T04:27:48Z

Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8

Url https://github.com/apache/spark

Type --help for more information.


[ec2-user@ip-172-31-28-70 spark]$ bin/pyspark --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:3.2.0

Python 3.7.10 (default, Jun 3 2021, 00:02:01)

[GCC 7.3.1 20180712 (Red Hat 7.3.1-13)] on linux

Type "help", "copyright", "credits" or "license" for more information.

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml

Ivy Default Cache set to: /home/ec2-user/.ivy2/cache

The jars for the packages stored in: /home/ec2-user/.ivy2/jars

com.amazonaws#aws-java-sdk-pom added as a dependency

org.apache.hadoop#hadoop-aws added as a dependency

:: resolving dependencies :: org.apache.spark#spark-submit-parent-68c25d16-55e8-459c-9fa1-912fa874707a;1.0

confs: [default]

found com.amazonaws#aws-java-sdk-pom;1.10.34 in central

found org.apache.hadoop#hadoop-aws;3.2.0 in central

found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central

downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar ...

[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.2.0!hadoop-aws.jar (63ms)

downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar ...

[SUCCESSFUL ] com.amazonaws#aws-java-sdk-bundle;1.11.375!aws-java-sdk-bundle.jar (1127ms)

:: resolution report :: resolve 1336ms :: artifacts dl 1195ms

:: modules in use:

com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]

com.amazonaws#aws-java-sdk-pom;1.10.34 from central in [default]

org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]

---------------------------------------------------------------------

| | modules || artifacts |

| conf | number| search|dwnlded|evicted|| number|dwnlded|

---------------------------------------------------------------------

| default | 3 | 2 | 2 | 0 || 2 | 2 |

---------------------------------------------------------------------

:: retrieving :: org.apache.spark#spark-submit-parent-68c25d16-55e8-459c-9fa1-912fa874707a

confs: [default]

2 artifacts copied, 0 already retrieved (96887kB/71ms)

21/08/19 10:05:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/__ / .__/\_,_/_/ /_/\_\ version 3.1.2

/_/


Using Python version 3.7.10 (default, Jun 3 2021 00:02:01)

Spark context Web UI available at http://ip-172-31-28-70.us-west-2.compute.internal:4040

Spark context available as 'sc' (master = local[*], app id = local-1629367523610).

SparkSession available as 'spark'.

>>>

>>> df = spark.read.format("csv").options(header=True, inferSchema=True).load("s3a://data.einext.com/stocks")

>>> df.show()

+----------+---------+---------+---------+---------+---------+---------+------+

| date| open| high| low| close| volume| adjclose|symbol|

+----------+---------+---------+---------+---------+---------+---------+------+

|2000-07-17| 95.4375| 97.5| 92.75| 96.625|3508100.0|74.269199| XLNX|

|2000-07-17| 22.625| 22.75| 22.4375| 22.5625| 201600.0| 13.48614| ES|

|2000-07-17| 6.750002| 6.937503| 6.375| 6.5|1235700.0| 5.241649| CHK|

|2000-07-17|19.812501| 20.1875|19.500001| 20.1875|1434100.0| 3.806147| NI|

|2000-07-17| 30.5| 30.6875| 30.0| 30.03125| 254600.0| 19.81183| SNA|

|2000-07-17|44.749996|45.062498|44.500004|45.000009| 535200.0|17.400773| FOXA|

|2000-07-17| 19.625| 19.625| 19.25| 19.375| 309500.0|13.768835| R|

|2000-07-17| 16.6562| 16.6875| 16.125| 16.25|5507200.0| 1.755466| ROST|

|2000-07-17| 56.25| 57.25| 56.0625| 56.125|7941200.0| 18.31076| PG|

|2000-07-17|54.000326|54.000326|52.500318|53.375325|3725000.0|71.068871| TYC|

|2000-07-17| 58.75| 58.875| 57.8125| 58.0| 182700.0|37.544123| XL|

|2000-07-17|47.500132|47.500132|45.750135|46.343886|4898700.0|17.662922| F|

|2000-07-17| 84.0| 84.5| 82.625|82.671883|2861800.0| 23.88973| CVX|

|2000-07-17| 22.5| 22.75| 22.375| 22.75| 423600.0| 5.942444| PPL|

|2000-07-17| 37.4375| 37.5625| 36.5625| 37.4375| 738800.0|24.832407| TRV|

|2000-07-17|76.874999|80.937504|76.249993|78.250003|6166200.0| 50.37851| A|

|2000-07-17| 26.5| 26.6875| 26.3125| 26.5| 335200.0| 6.240835| LNT|

|2000-07-17| 23.9375| 24.0625| 23.5| 23.9375| 648400.0| 23.9375| AZO|

|2000-07-17| 60.875| 60.9375| 60.25|60.531239|1464800.0|22.017028| UTX|

|2000-07-17| 3.5625| 3.625005| 3.312495| 3.437505| 340000.0| 2.20411| RRC|

+----------+---------+---------+---------+---------+---------+---------+------+

only showing top 20 rows


>>> df.write.save("s3a://tmp.einext.com/stocks_parquet")

>>> exit()





Scala version of the code


$ spark-shell --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0

Set access key and access Key secret in Hadoop configuration

scala> val sc = spark.sparkContext

scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")

scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<access_key_secret>")

Load file from S3 as Dataset

scala> val df = spark.read.format("csv").load("s3n://einext.com/data/Olympics2016.csv")

scala> df.show