home_site

Lab02 - Apache Spark cz.2 [ ver. ChO.2025.11.10.003 ]

Zawartość strony

Apache Spark

Tematyka zajęć:

  1. Apache Spark - rozgrzewka - obliczenia w Apache Spark
  2. Filtrowanie danych w Apache Spark
  3. Przetwarzanie danych statystycznych
  4. Przetwarzanie danych zawartych w logu

Pliki do realizacji zadań ( dostępne na serwerze w katalogu /home/spark/files ):

A. Apache Spark - rozgrzewka - przykładowe obliczenia w Apache Spark

  1. Przygotowanie środowiska dla języka python w Jupyter
    import findspark
    findspark.init()
    
    from pyspark import SparkConf
    from pyspark import SparkContext
    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[4]"))
    
  2. W ramach tego punktu zostanie zaprezentowane estymowanie wartości liczy Pi z wykorzystaniem generatora liczb pseudolosowych.
  3. Sprawdzenie statystyki generatora liczb losowych.
    import random
    flips = 1000000
    coins = range(flips)
    rdd_coins = sc.parallelize(coins)
    rdd_flips = rdd_coins.map(lambda i: random.random())
    rdd_heads = rdd_flips.filter(lambda r: r < 0.51 )
    rdd_heads.count()
    
  4. Wyznaczenie wartości liczby Pi.
    import random
    num_samples = 1000000
    def inside(p):
        x,y = random.random(), random.random()
        return x*x + y*y < 1
    count = sc.parallelize(range(0, num_samples)).filter(inside).count()
    pi = 4* count/num_samples
    print(pi)
    

B. Filtrowanie danych w Apache Spark - Scala

  1. Przykład I
    1. Przygotowanie środowiska do realizacji zadania.
      import org.apache.spark.sql._
      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.types.{IntegerType, StringType, ArrayType}
      
    2. Przykładowe dane do realizacji zaptyń.
      val arrayStructureData = Seq(
          Row(Row("James","","Smith"),List("Java","Scala","C++"),"OH","M"),
          Row(Row("Anna","Rose",""),List("Spark","Java","C++"),"NY","F"),
          Row(Row("Julia","","Williams"),List("CSharp","VB"),"OH","F"),
          Row(Row("Maria","Anne","Jones"),List("CSharp","VB"),"NY","M"),
          Row(Row("Jen","Mary","Brown"),List("CSharp","VB"),"NY","M"),
          Row(Row("Mike","Mary","Williams"),List("Python","VB"),"OH","M")
      )
      
    3. Tworzymy strukturę opisującą dane.
      val arrayStructureSchema = new StructType(). 
          add("name",new StructType().
          add("firstname",StringType).
          add("middlename",StringType).
          add("lastname",StringType)).
          add("languages", ArrayType(StringType)).
          add("state", StringType).
          add("gender", StringType)
      
    4. Tworzymy strukturę DataFrame zawierającą przykładowe dane.
      val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
      df.printSchema()
      df.show()
      
    5. Filtrujemy dane z wykorzystaniem "filter" lub "where", warunek na kolumnie. Do porównania wykorzystujemy "===".
      df.filter(df("state") === "OH").show(false)
      df.filter('state === "OH").show(false)
      df.filter($"state" === "OH").show(false)
      df.filter(df.col("state") === "OH").show(false)
      df.where(df("state") === "OH").show(false)
      df.where('state === "OH").show(false)
      df.where($"state" === "OH").show(false)
      df.where(df.col("state") === "OH").show(false)
      
    6. Filtrujemy dane z wykorzystaniem klauzul typu SQL.
      df.filter("gender == 'M'").show(false)
      df.where("gender == 'M'").show(false)
      
    7. Filtrujemy danych z wykorzystaniem wielu warunków, AND (&&), OR(||) i NOT(!).
      df.filter(df("state") === "OH" && df("gender") === "M").show(false)
      df.where(df("state") === "OH" && df("gender") === "M").show(false)
      
    8. Filtrujemy danych wewnątrz tablicy.
      import org.apache.spark.sql.functions.array_contains
      df.filter(array_contains(df("languages"),"Java")).show(false)
      df.filter(df("name.lastname") === "Williams").show(false)
      
    9. Plan zapytań dla kilku przykładowych zapytań.
      df.filter(df("name.lastname") === "Williams").explain()
      
      df.filter(array_contains(df("languages"),"Java")).explain()
      
      df.filter(df("state") === "OH" && df("gender") === "M").explain()
      
  2. Przykład II
    1. Do realizacji zapytań w tym przykładzie wykorzystamy plik dane1.csv z poprzedniego laboratorium. Na początek ustawiamy środowisko do realizacji zadania.
      import org.apache.spark.sql._
      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.types.{IntegerType, StringType, ArrayType}
      import org.apache.spark.sql.SparkSession
      
    2. Ustawiam zmienną "session" i ustawiamy jeden węzeł obliczeniowy.
      val session = SparkSession.builder.config("spark.master","local[1]").getOrCreate
      
    3. Budujemy strukturę "schema" dla wczytanych danych z pliku.
      val schema = new StructType().
              add("fname",StringType).
              add("lname",StringType).
              add("courses",StringType).
              add("grade",IntegerType).
              add("year",IntegerType)  
      
    4. Wczytanie danych z pliku dane1.csv do struktury DataFrame.
      val data = session.read.option("header","true").option("delimiter","|").schema(schema).csv("/home/spark/files/dane1.csv")  
      data.printSchema()
      data.show()
      
    5. Przetwarzanie danych zawartych w DataFrame.
      data.where("fname = 'Weronika'").show(false)
      data.where('fname === "Weronika").show(false)
      data.where(data.col("fname") === "Weronika").show(false)
      data.select($"lname").show(false)
      data.select($"lname",$"fname").show(false)
      data.groupBy($"grade",$"year").count().show(false)
      data.groupBy($"grade",$"year").count().orderBy($"grade",$"year").show(false)
      
    6. Przetwarzanie danych z wykorzysatniem języka SQL ( na początek tworzymy wirtualną tabelę ) .
      data.createGlobalTempView("lista")
      spark.sql("SELECT * from global_temp.lista").show()
      spark.sql("SELECT grade, year, count(*) from global_temp.lista group by grade, year order by grade, year").show()
      
    7. Zapytania do realizacji samodzielnej:
      • liczba studentów na poszczególnych latach dla stopnia pierwszego posortowana malejąco,
      • liczba wszystkich studentów,
      • liczba różnych imion w pliku.

C. Przetwarzanie danych statystycznych

  1. W ramach tego punktu przeanalizujemy dane statystyczne zawierające historię kredytów w Niemczech. Do realizacji zadania posłuży nam plik z danymi "gcredit.csv". Przeprowadzona analiza jest zrealizowana w oparciu o podręcznik [1].
  2. W ramach zajęć będziemy realizowali zadania wykorzystując pyspark w środowisko Jupyter.
    1. Przygotowanie środowiska Jupyter do pracy z projektem.
      from pyspark.sql.dataframe import DataFrame
      from pyspark.sql import SparkSession
      spark = SparkSession(sc)
      
    2. Pobieramy dane z pliku "gcredit.csv" i umieszczamy w pliku DataFrame. Plik zawiera opisy kolumn.
      df = spark.read.format('com.databricks.spark.csv').\
                                     options(header='true', \
                                     inferschema='true').load("/home/spark/files/gcredit.csv",header=True);
      
    3. Wylistowanie kolumn zawartych w dokumencie.
      df.columns
      
    4. Analiza danych zawartych w pliku danych z wykorzystaniem funkcji describe(). Funkcja describe() zwraca większość wyników statystycznych, takich jak wartość min, mediana, maksimum, kwartyle i odchylenie standardowe. Za pomocą funkcji zdefiniowanej przez użytkownika można uzyskać jeszcze więcej wyników statystycznych.
      df[['Account Balance','No of dependents']].toPandas().describe()
      
    5. Do realizacji kolejnego zadania przygotowana została funkcja która zwraca kwartyle i percentyle
      def describe_pd(df_in, columns, style):
          '''
          Function to union the basic stats results and deciles
          :param df_in: the input dataframe 
          :param columns: the cloumn name list of the numerical variable     
          :param style: the display style  
      
          :return : the numerical describe info. of the input dataframe   
      
          :author: Wenqiang Feng
          :email:  von198@gmail.com  
          '''       
      
          if style == 1:
              percentiles = [25, 50, 75]
          else:
              percentiles = np.array(range(0, 110, 10))
          
          percs = np.transpose([np.percentile(df_in.select(x).collect(), percentiles) for x in columns])
          percs = pd.DataFrame(percs, columns=columns)
          percs['summary'] = [str(p) + '%' for p in percentiles]
          
          spark_describe = df_in.describe().toPandas()
          new_df = pd.concat([spark_describe, percs],ignore_index=True,sort=True)
          new_df = new_df.round(2)
          return new_df[['summary'] + columns]
      
    6. Do prezentacji funkcji zostaną wybrane następujące parametry: "Account Balance" i "No of dependents". Na początek prezentacja wartości parametrów dla funkcjonalności standardowej a następnie z utworzoną funkcją.
      num_cols = ['Account Balance','No of dependents']
      
    7. Przetwarzanie wybranych danych z wykorzystaniem describe().
      df.select(num_cols).describe().show()
      
    8. Przetworzenie danych z wykorzystaniem opracowanej funkcji wymaga dołączenia pakietów: numpy i pandas.
      import numpy as np
      import pandas as pd
      
    9. Przetworzenie danych z wykorzystaniem describe() z opracowaną funkcją.
      output = describe_pd(df,num_cols,1)
      
      output['summary']= output['summary'].astype(str)
      # convert just columns
      output[num_cols] = output[num_cols].apply(pd.to_numeric)
      
      output.dtypes
      
      spark.createDataFrame(output).show()
      
    10. Przetwarzanie danych z wykorzystaniem opracowanej funkcji w wersji z percyntylami.
      output = describe_pd(df,num_cols,2)
      
      output['summary']= output['summary'].astype(str)
      # convert just columns
      output[num_cols] = output[num_cols].apply(pd.to_numeric)
      spark.createDataFrame(output).show()
      
    11. Współczynnik skośności i kurtoza [6,7,8].
      Kurtoza
      Kurtoza jest miarą koncentracji wyników. Kurtoza informuje nas o tym, na ile nasze obserwacje, wyniki są skoncentrowane wokół średniej. Miara ta informuje nas jak dużo naszych wyników / obserwacji jest zbliżona do wartości średniej, czy większość z zaobserwowanych wyników ma wartość podobną do średniej?
      Rozkłady prawdopodobieństwa można podzielić ze względu na wartość kurtozy na rozkłady:
      • mezokurtyczne (rozkład normalny K = 0 ) - wartość kurtozy wynosi 0, spłaszczenie rozkładu jest podobne do spłaszczenia rozkładu normalnego (dla którego kurtoza wynosi dokładnie 0)
      • leptokurtyczne (rozkład wysmukły K > 0 ) - kurtoza jest dodatnia, wartości cechy bardziej skoncentrowane niż przy rozkładzie normalnym
      • platokurtyczne (rozkład spłaszczony K < 0 ) - kurtoza jest ujemna, wartości cechy mniej skoncentrowane niż przy rozkładzie normalnym
      Współczynnik skośności
      Skośność jest miarą asymetrii obserwowanych wyników. Informuje nas o tym jak wyniki dla danej zmiennej kształtują się wokół średniej. Czy większość zaobserwowanych wyników jest z lewej strony średniej, blisko wartości średniej czy z prawej strony średniej? Innymi słowy, czy w naszym zbiorze obserwacji więcej jest wyników, które są niższe niż średnia dla całej grupy, wyższe czy równe średniej?
      Współczynnik skośności przyjmuje wartość zero dla rozkładu symetrycznego, wartości ujemne dla rozkładów o lewostronnej asymetrii (wydłużone lewe ramię rozkładu) i wartości dodatnie dla rozkładów o prawostronnej asymetrii (wydłużone prawe ramię rozkładu).
      W ramach testu sprawdzimy rozkład dla parametru "Age (years)".
      var = 'Age (years)'
      #  pyspark.sql.function
      #df.select(skewness(var),kurtosis(var)).show()
      #  pandas skew(), kurtosis()
      df[['Age (years)']].toPandas().skew(),df[['Age (years)']].toPandas().kurtosis()
      
    12. Histogramy. Różnica pomiędzy histogramem i wykresem słupkowym [9].
      Do kolejnych zadań wykorzystamy dane 'Credit Amount' i 'Age (years)'. Na początek wyświetlmy wartości parametru 'Credit Amount'.
      df.select('Credit Amount').show(5)
      
    13. W kolejnym zadaniu analizujemy wiek osób które pobierały kredyt. Wynikiem realizacji tego punktu będzie histogram liczby kredytów w zależności od wieku.
      data1 = df.select('Age (years)').toPandas()
      
      import pandas as pd
      import numpy as np
      import matplotlib.pyplot as plt
      import seaborn as sns
      from scipy import stats
      %matplotlib inline
      
      plt.rcParams['figure.figsize'] =(16,9)
      plt.style.use('ggplot')
      sns.set()
      
      var = 'Age (years)'
      x = data1[var]
      bins = np.arange(0, 100, 5.0)
      
      plt.figure(figsize=(10,8))
      # the histogram of the data
      plt.hist(x, bins, alpha=0.8, histtype='bar', color='gold',
               ec='black',weights=np.zeros_like(x) + 100. / x.size)
      
      plt.xlabel(var)
      plt.ylabel('percentage')
      plt.xticks(bins)
      plt.show()
      
      #fig.savefig(var+".pdf", bbox_inches='tight')
      
    14. Kolejny skrypt umożliwi wyświetlenie liczby kredytów i ich procentowego udziału w zależności od wieku kredytobiorców.
      var = 'Age (years)'
      x = data1[var]
      bins = np.arange(0, 100, 5.0)
      
      
      ########################################################################
      hist, bin_edges = np.histogram(x,bins,
                                     weights=np.zeros_like(x) + 100. / x.size) 
      # make the histogram
       
      fig = plt.figure(figsize=(20, 8))
      ax = fig.add_subplot(1, 2, 1)
      
      # Plot the histogram heights against integers on the x axis
      ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black', color='gold')
      # # Set the ticks to the middle of the bars
      ax.set_xticks([0.5+i for i,j in enumerate(hist)]) 
      # Set the xticklabels to a string that tells us what the bin edges were
      labels =['{}'.format(int(bins[i+1])) for i,j in enumerate(hist)]
      #labels.insert(0,'0')
      ax.set_xticklabels(labels)
      plt.xlabel(var)
      plt.ylabel('percentage')
      
      ########################################################################
      
      hist, bin_edges = np.histogram(x,bins) # make the histogram
      
      ax = fig.add_subplot(1, 2, 2)
      # Plot the histogram heights against integers on the x axis
      ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black', color='gold')
       
      # # Set the ticks to the middle of the bars
      ax.set_xticks([0.5+i for i,j in enumerate(hist)])
       
      # Set the xticklabels to a string that tells us what the bin edges were
      labels =['{}'.format(int(bins[i+1])) for i,j in enumerate(hist)]
      #labels.insert(0,'0')
      ax.set_xticklabels(labels)
      plt.xlabel(var)
      plt.ylabel('count')
      plt.suptitle('Histogram of {}: Left with percentage output;Right with count output'
                   .format(var), size=16)
      plt.show()
      
      #fig.savefig(var+".pdf", bbox_inches='tight')
      
    15. W ramach kolejnych punktów będziemy tworzyli wykresy prezentujące udział poszczególnych grup wiekowych w całkowitym pobranym kredycie. Na początek funkcja przyporządkowująca osobę do odpowiedniej grupy a następnie utworzenie odpowiedniego zestawu danych.
      def age_condition(x):
          if pd.isnull(x):
              return "missing"
          elif x < 25:
              return "<25"
          elif 25 <= x <= 34:
              return "25-34"
          elif 35 <= x <= 44:
              return "35-44" 
          elif 45 <= x <= 54:
              return "45-54" 
          elif 55 <= x <= 64:
              return "55-64"     
          else:
              return "65+"
      
      from pyspark.sql.functions import udf
      from pyspark.sql.types import StringType, DoubleType
      
      age_udf = udf(lambda x: age_condition(x), StringType())
      
      df = df.withColumn("age_class", age_udf("Age (years)"))
      
      df.select(['age_class','Age (years)']).show(3)
      
    16. Sprawdzamy poprawność przypisania do grup wiekowych i wykorzystując paramater 'Occupation' tworzymy kwerendę krzyżową.
      df.select(['age_class','Credit Amount']).\
         groupBy('age_class').count().show()
      
      df.stat.crosstab("age_class", "Occupation").show()
      
    17. Dla utworzonych grup wiekowych tworzymy wartości zaagregowane dla parametru 'Credit Amount' tj. wartość średnia, wartość minimalna i maksymalna oraz liczba wystąpień.
      from pyspark.sql import functions as F
      from pyspark.sql.functions import rank,sum,col
      from pyspark.sql import Window
      
      window = Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
      # withColumn('Percent %',F.format_string("%5.0f%%\n",col('Credit_num')*100/col('total'))).\
      tab = df.select(['age_class','Credit Amount']).\
         groupBy('age_class').\
         agg(F.count('Credit Amount').alias('Credit_num'),
             F.mean('Credit Amount').alias('Credit_avg'),
             F.min('Credit Amount').alias('Credit_min'),
             F.max('Credit Amount').alias('Credit_max')).\
         withColumn('total',sum(col('Credit_num')).over(window)).\
         withColumn('Percent',col('Credit_num')*100/col('total')).\
         drop(col('total'))
      
      tab.show()
      
    18. W kolejnym zadaniu przeniesiemy dane do biblioteki 'pandas' i zmienimy prezentację wartości w zależności od grup wiekowych.
      plot_data = tab.toPandas() 
      plot_data.sort_values('age_class')
      
      custom_dict = {'<25': 0, '25-34': 1, '35-44': 2, '45-54': 3, '55-64': 4, '65+': 5}
      
      plot_data['index']= plot_data['age_class'].replace(custom_dict)
      
      plot_data.sort_values('index')
      
    19. Prezentacja wyników analizy na wykresie kołowym.
      plot_data = plot_data.sort_values('index')
      
      # Data to plot
      labels = plot_data.age_class
      sizes =  plot_data.Percent
      colors = ['gold', 'yellowgreen', 'lightcoral','blue', 'lightskyblue','green','red']
      explode = (0, 0.1, 0, 0,0,0)  # explode 1st slice
       
      # Plot
      plt.figure(figsize=(10,8))
      plt.pie(sizes, explode=explode, labels=labels, colors=colors,
              autopct='%1.1f%%', shadow=True, startangle=140)
       
      plt.axis('equal')
      plt.show()
      
    20. Prezentacja wyników analizy na wykresie słukowym.
      labels = plot_data.age_class
      missing = plot_data.Percent
      ind = [x for x, _ in enumerate(labels)]
      
      plt.figure(figsize=(10,8))
      plt.bar(ind, missing, width=0.8, label='missing', color='gold')
      
      plt.xticks(ind, labels)
      plt.ylabel("percentage")
      
      plt.show()
      
    21. Analiza wyników ze względu na płeć osoby biorącej kredyt.
      labels = ['missing', '<25', '25-34', '35-44', '45-54','55-64','65+']
      missing = np.array([0.000095, 0.024830, 0.028665, 0.029477, 0.031918,0.037073,0.026699])
      man = np.array([0.000147, 0.036311, 0.038684, 0.044761, 0.051269, 0.059542, 0.054259])
      women = np.array([0.004035, 0.032935, 0.035351, 0.041778, 0.048437, 0.056236,0.048091])
      ind = [x for x, _ in enumerate(labels)]
      
      plt.figure(figsize=(10,8))
      plt.bar(ind, women, width=0.8, label='women', color='gold', bottom=man+missing)
      plt.bar(ind, man, width=0.8, label='man', color='silver', bottom=missing)
      plt.bar(ind, missing, width=0.8, label='missing', color='#CD853F')
      
      plt.xticks(ind, labels)
      plt.ylabel("percentage")
      plt.legend(loc="upper left")
      plt.title("demo")
      
      plt.show()
      
    22. Ostatni wykres przedstawia zależność liczby kredytów od jego wielkości.
      # prepare for the plot data
      
      var = 'Credit Amount'
      plot_data = df.select(var).toPandas()
      x= plot_data[var]
      
      bins =[0,200,400,600,700,800,900,1000,2000,3000,4000,5000,6000,10000,25000] 
      
      hist, bin_edges = np.histogram(x,bins,weights=np.zeros_like(x) + 100. / x.size) # make the histogram
       
      fig = plt.figure(figsize=(10, 8))
      ax = fig.add_subplot(1, 1, 1)
      # Plot the histogram heights against integers on the x axis
      ax.bar(range(len(hist)),hist,width=1,alpha=0.8,ec ='black',color = 'gold')
       
      # # Set the ticks to the middle of the bars
      ax.set_xticks([0.5+i for i,j in enumerate(hist)])
       
      # Set the xticklabels to a string that tells us what the bin edges were
      #labels =['{}k'.format(int(bins[i+1]/1000)) for i,j in enumerate(hist)]
      labels =['{}'.format(bins[i+1]) for i,j in enumerate(hist)]
      #labels.insert(0,'0')
      ax.set_xticklabels(labels)
      #plt.text(-0.6, -1.4,'0')
      plt.xlabel(var)
      plt.ylabel('percentage')
      plt.show()
      

D. Analiza logu serwera WWW

  1. W ramach tego punktu przeprowadzimy analizę logu serwera WWW. Analiza logu zostanie zrealizowana w oparciu o dokumenty [2,3,4,5].
  2. Przygotowanie środowiska do realizacji projektu. Do realizacji projektu wymagane będzie dołaczenie biblioteki obsługującej DataFrame i wyrażenia regularne. Otwieramy nowy dokument w Jupyterze.
    import findspark
    findspark.init()
    
    # configure spark variables
    from pyspark.context import SparkContext
    from pyspark import SparkConf
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
       
    sc = SparkContext().getOrCreate(SparkConf().setMaster("local[4]"))
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)
    
    # load up other dependencies
    import re
    import pandas as pd
    
  3. Sprawdzenie poprawności dołączenia biblioteki obsługującej wyrażenia regularne.
    m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)
    for match in m:
        print(match, match.start(), match.end())
    
  4. Pobranie zawartości plików logów do pliku DataFrames.
    raw_data_files = ['/home/spark/files/access_log1', '/home/spark/files/access_log2']
    base_df = spark.read.text(raw_data_files)
    base_df.printSchema()
    
  5. Konwersja pliku danych Spark dataFrames do pliku RDD ( Resilient Distributed Dataset) i prezentacja danych (10 rekordów).
    type(base_df)
    
    base_df_rdd = base_df.rdd
    type(base_df_rdd)
    
    base_df.show(10, truncate=False)
    
    base_df_rdd.take(10)
    
  6. Parsowanie danych zawartych w pliku logu. Znaczenie poszczególnych pól logu.
    remotehost
    Remote hostname (or IP number if DNS hostname is not available or if DNSLookup is off).
    rfc931
    The remote logname of the user if at all it is present.
    authuser
    The username of the remote user after authentication by the HTTP server.
    [date]
    Date and time of the request.
    “request”
    The request, exactly as it came from the browser or client.
    status
    The HTTP status code the server sent back to the client.
    bytes
    The number of bytes (Content-Length) transferred to the client.
  7. Na początek sprawdzimy liczbę rekordów oraz liczbę pół.
    print((base_df.count(), len(base_df.columns)))
    
  8. Na potrzeby opracowania wyrażeń regularnych pobierających określoną informację z zestawu danych utworzymy zbiór testowych danych.
    sample_logs = [item['value'] for item in base_df.take(15)]
    sample_logs
    
  9. Pobranie danych dotyczących serwera.
    host_pattern = r'(^\S+)\s'
    hosts = [re.search(host_pattern, item).group(1)
               if re.search(host_pattern, item)
               else 'no match'
               for item in sample_logs]
    hosts
    
  10. Pobranie danych dotyczących związnych z parametrem timestamp.
    ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} \S{1}\d{4})]'
    timestamps = [re.search(ts_pattern, item).group(1) 
                     if re.search(ts_pattern, item)
                     else 'no match'
                     for item in sample_logs]
    timestamps
    
  11. Pobranie informacji o metodzie HTTP, adresie URL i wersji protokołu.
    method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
    method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
                   if re.search(method_uri_protocol_pattern, item)
                   else 'no match'
                  for item in sample_logs]
    method_uri_protocol
    
  12. Parsowanie pliku logu zgodnie z opracowanymi wyrażeniami regularnymi.
    from pyspark.sql.functions import regexp_extract
    
    logs_df = base_df.select(regexp_extract('value', r'(^[\S+]+) -', 1).alias('host'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\]', 2).alias('timestamp'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\] "(\w+)', 3).alias('method'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\] "(\w+) (.*?) (.*?)', 4).alias('path'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" ', 6).alias('protocol'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" (\d+) ', 7).cast('integer').alias('code'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" (\d+) (\d+)', 8).cast('integer').alias('size'))
    logs_df.show(10, truncate=True)
    print((logs_df.count(), len(logs_df.columns)))
    
  13. Sprawdzanie przetworzonych danych, czy wytępują wartości NULL.
    (base_df
        .filter(base_df['value']
                    .isNull())
        .count())
    
  14. Szukanie wartości NULL w poszczególnych polach przetworzonych rekordów.
    bad_rows_df = logs_df.filter(logs_df['host'].isNull()|
    #                             logs_df['timestamp'].isNull() |
    #                             logs_df['method'].isNull() |
    #                             logs_df['path'].isNull() |
                                 logs_df['code'].isNull() |
                                 logs_df['size'].isNull()|
                                 logs_df['protocol'].isNull())
    bad_rows_df.count()
    
  15. Zamiana wartości NULL na wartość 0 dla pola 'content-size'.
    logs_df = logs_df.na.fill({'size': 0})
    bad_rows_df.count()
    
  16. Zmiana zawartości pola 'timestamp' z typu 'string' na typ 'timestamp' (utworzenie nowego pola 'time').
    from pyspark.sql.functions import udf
    
    month_map = {
      'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
      'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
    }
    
    def parse_clf_time(text):
        """ Convert Common Log time format into a Python datetime object
        Args:
            text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
        Returns:
            a string suitable for passing to CAST('timestamp')
        """
        # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
        return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
          int(text[7:11]),
          month_map[text[3:6]],
          int(text[0:2]),
          int(text[12:14]),
          int(text[15:17]),
          int(text[18:20])
        )
    
    udf_parse_time = udf(parse_clf_time)
    
    logs_df = (logs_df.select('*', udf_parse_time(logs_df['timestamp'])
                                       .cast('timestamp')
                                       .alias('time'))
                                       .drop('timestamp'))
    
    logs_df.printSchema()
    
  17. Statystyka pola 'content-size' przy pomocy funkcji describe().
    import numpy as np
    import pandas as pd
    
    content_size_summary_df = logs_df.describe(['size'])
    
    content_size_summary_df.toPandas()
    
  18. Statystyka pola 'content-size' przy pomocy funkcji agregujących.
    from pyspark.sql import functions as F
    
    (logs_df.agg(F.min(logs_df['size']).alias('min_content_size'),
                 F.max(logs_df['size']).alias('max_content_size'),
                 F.mean(logs_df['size']).alias('mean_content_size'),
                 F.stddev(logs_df['size']).alias('std_content_size'),
                 F.count(logs_df['size']).alias('count_content_size'))
            .toPandas())
    
  19. Analiza pola 'status'.
    status_freq_df = (logs_df
                         .groupBy('code')
                         .count()
                         .sort('code')
                         .cache())
    print('Total distinct HTTP Status Codes:', status_freq_df.count()) 
    
    status_freq_pd_df = (status_freq_df
                             .toPandas()
                             .sort_values(by=['count'],
                                          ascending=False))
    status_freq_pd_df
    
    import matplotlib.pyplot as plt
    import seaborn as sns
    import numpy as np
    %matplotlib inline
    
    sns.catplot(x='code', y='count', data=status_freq_pd_df,
                kind='bar', order=status_freq_pd_df['code'])
    
    log_freq_df = status_freq_df.withColumn('log(count)',
                                            F.log(status_freq_df['count']))
    log_freq_df.show()
    
    log_freq_pd_df = (log_freq_df
                        .toPandas()
                        .sort_values(by=['log(count)'],
                                     ascending=False))
    sns.catplot(x='code', y='log(count)', data=log_freq_pd_df,
                kind='bar', order=status_freq_pd_df['code'])
    

E. Zadanie do realizacji

W ramach zadania należy opracować punkt drugi (filtrowanie danych) z wykorzystaniem środowiska PySpark (obydwa przykłady).