- Sprawdzenie poprawności działania środowiska pyspark (brak błędów - poprawne działanie).
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
- Pierwszy skrypt w powłoce SparkContext.
rdd = sc.parallelize(range(30))
rdd.collect()
- Pierwsze elementy z zbioru danych.
rdd.first()
- Kolejne elementy z zbioru ( listy, tablicy ).
rdd.take(3)
- Wybrane losowo elementy z zbioru (listy, tablicy), sprawdzić dla parametru TRUE i FALSE.
rdd.takeSample(True,10)
- Przetwarzanie danych z wykorzystaniem transformacji map().
rdd2 = rdd.map(lambda x: x*x)
rdd2.collect()
- Przetwarzanie danych z wykorzystaniem transformacji map().
rdd2 = rdd.map(lambda x: [x,x])
rdd2.collect()
- Przetwarzanie danych z wykorzystaniem transformacji map().
rdd2 = rdd.flatMap(lambda x: [x,x])
rdd2.collect()
- Przetwarzanie danych z wykorzystaniem akcji reduce().
val = rdd.reduce(lambda t1, t2: t1+t2)
print (val)
rdda = sc.parallelize(['aa','bb','cc','dd','ee','ff','gg'])
val = rdda.reduce(lambda t1, t2: t1+t2)
print (val)
- Przetwarzanie danych z wykorzystaniem akcji count().
rdd.count()
- Przetwarzanie danych z wykorzystaniem transformacji union().
array1 = [("physics",85),("maths",75),("chemistry",95)]
array2 = [("physics",65),("maths",45),("chemistry",85)]
rdd_arr1 = sc.parallelize(array1)
rdd_arr2 = sc.parallelize(array2)
rdd_arr1.union(rdd_arr2).collect()
rdd1 = sc.parallelize(range(1,20))
rdd2 = sc.parallelize(range(10,25))
rdd3 = rdd1.union(rdd2)
rdd3.collect()
rdd3.count()
- Przetwarzanie danych z wykorzystaniem transformacji distinct().
rdd4 = rdd3.distinct()
rdd4.collect()
rdd4.count()
- Przetwarzanie danych z wykorzystaniem transformacji intersection().
rdd4 = rdd1.intersection(rdd2)
rdd4.collect()
- Przetwarzanie danych z wykorzystaniem akcji countByKey().
rdd11a = sc.parallelize(('aa','bb','cc','dd','aa','cc','ee','ff','dd','dd','aa'))
rdd11b = rdd11a.map(lambda k: (k,1))
rdd11b.countByKey().items()
- Przetwarzanie danych z wykorzystaniem akcji join(), leftOuterJoin(), rightOuterJoin().
rdda1 = sc.parallelize(('aa','bb','cc','dd','ee','ff','gg','aa')).map(lambda k: (k,1))
rdda2 = sc.parallelize(('aa','cc','mm','rr','tt')).map(lambda k: (k,1))
rdda1.join(rdda2).collect()
rdda1.leftOuterJoin(rdda2).collect()
rdda1.rightOuterJoin(rdda2).collect()