home_site

Lab03 - Apache Spark cz.3 [ ver. ChO.2025.11.17.003 ]

Zawartość strony

Apache Spark

Tematyka zajęć:

  1. Analiza logu serwera WWW (cd. lab.02)
  2. Analiza ruchu lotniczego
  3. Spark z linii polecenia

A. Dalsza analiza logu serwera WWW

  1. W ramach tego punktu przeprowadzimy analizę logu serwera WWW - kontynuacja zagadnienia z laboratorium 2.
  2. Na początek przygotowanie środowiska - można wykorzystać środowisko z lab.02.
    import findspark
    findspark.init()
    
    # configure spark variables
    from pyspark.context import SparkContext
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
       
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)
    
    # load up other dependencies
    import re
    import pandas as pd
    
  3. Sprawdzenie poprawności dołączenia biblioteki obsługującej wyrażenia regularne.
    m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)
    for match in m:
        print(match, match.start(), match.end())
    
  4. Pobranie zawartości plików logów do pliku DataFrames.
    raw_data_files = ['/home/spark/files/access_log1', '/home/spark/files/access_log2']
    base_df = spark.read.text(raw_data_files)
    base_df.printSchema()
    
  5. Sprawdzenie typy danych i pobranie kilku redordów danych (5 rekordów).
    type(base_df)
    
    base_df.show(5, truncate=False)
    
  6. Utworzenie zbioru RDD.
    base_df_rdd = base_df.rdd
    
    type(base_df_rdd)
    
    base_df_rdd.take(5)
    
  7. Parsowanie pliku logu odpowiednimi wyrażeniami regularnymi.
    from pyspark.sql.functions import regexp_extract
    
    log_df = base_df.select(regexp_extract('value', r'(^[\S+]+) -', 1).alias('host'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\]', 2).alias('timestamp'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\] "(\w+)', 3).alias('method'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*)\] "(\w+) (.*?) (.*?)', 4).alias('endpoint'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" ', 6).alias('protocol'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" (\d+) ', 7).cast('integer').alias('status'),
                              regexp_extract('value', r'(^[\S+]+) - - \[(.*) \+(.*)\] "(\w+) (.*?) (.*?)" (\d+) (\d+)', 8).cast('integer').alias('content_size'))
    log_df.show(10, truncate=True)
    print((log_df.count(), len(log_df.columns)))
    
  8. Sprawdzanie przetworzonych danych, czy wytępują wartości NULL (np. timestamp).
    (log_df
        .filter(log_df['content_size']
                    .isNull())
        .count())
    
  9. Szukanie wartości NULL w poszczególnych polach przetworzonych rekordów.
    bad_rows_df = log_df.filter(log_df['host'].isNull()|
                                log_df['timestamp'].isNull() |
                                log_df['method'].isNull() |
                                log_df['endpoint'].isNull() |
                                log_df['status'].isNull() |
                                log_df['content_size'].isNull()|
                                log_df['protocol'].isNull())
    bad_rows_df.count()
    
    bad_rows_df.show(5)
    
  10. Zamiana wartości NULL na wartość 0 dla pola 'content-size'.
    log_df = log_df.na.fill({'content_size': 0})
    log_df.count()
    
  11. Odrzucenie rekordów nie zawierających wartości w polu 'timestamp'.
    log_df1 = log_df[~log_df['timestamp'].isin([''])]
    log_df1.count()
    
  12. Zmiana zawartości pola 'timestamp' z typu 'string' na typ 'timestamp' (utworzenie nowego pola 'time').
    from pyspark.sql.functions import udf
    
    month_map = {
      'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
      'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
    }
    
    def parse_clf_time(text):
        """ Convert Common Log time format into a Python datetime object
        Args:
            text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
        Returns:
            a string suitable for passing to CAST('timestamp')
        """
        # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
        return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
          int(text[7:11]),
          month_map[text[3:6]],
          int(text[0:2]),
          int(text[12:14]),
          int(text[15:17]),
          int(text[18:20])
        )
    
    udf_parse_time = udf(parse_clf_time)
    
    log_dfn = (log_df1.select('*', udf_parse_time(log_df1['timestamp'])
                                       .cast('timestamp')
                                       .alias('time'))
                                       .drop('timestamp'))
    
    log_dfn.printSchema()
    
  13. Utworzenie widoku do zapytań SQL.
    log_dfn.createOrReplaceTempView("logs")
    
  14. Przygotowanie środowiska graficznego do prezentacji wykresów.
    import matplotlib.pyplot as plt
    import seaborn as sns
    import numpy as np
    %matplotlib inline
    
    def bar_plot_list_of_tuples_horizontal(input_list,x_label,y_label,plot_title):
        y_labels = [val[0] for val in input_list]
        x_labels = [val[1] for val in input_list]
        plt.figure(figsize=(12, 6))
        plt.xlabel(x_label)
        plt.ylabel(y_label)
        plt.title(plot_title)
        ax = pd.Series(x_labels).plot(kind='barh')
        ax.set_yticklabels(y_labels)
        for i, v in enumerate(x_labels):
            ax.text(int(v) + 0.5, i - 0.25, str(v),ha='center', va='bottom')
    
  15. Prezentacja punktów, które pobrały największą informację.
    topEndpointsMaxSize = (sqlContext
                    .sql("SELECT endpoint,content_size/1024 FROM logs ORDER BY content_size DESC LIMIT 10")
                    .rdd.map(lambda row: (row[0], row[1]))
                    .collect())
    
    bar_plot_list_of_tuples_horizontal(topEndpointsMaxSize,'Data Flow - MB','Enpoints','Endpoint Analysis based on Max Content Size')
    
    Lab3_spark_log01
    Rys.1 Wynik analizy
  16. Prezentacja punktów, które najczęściej odpytują serwis.
    frequentIpAddressesHits = (sqlContext
                   .sql("SELECT host, COUNT(*) AS total FROM logs GROUP BY host HAVING total > 1000")
                   .rdd.map(lambda row: (row[0], row[1]))
                   .collect()) 
    
    bar_plot_list_of_tuples_horizontal(frequentIpAddressesHits,'Number of Hits','IP Address','Most Frequent Visitors (Frequent IP Address Hits)')
    
    Lab3_spark_log02
    Rys.2 Wynik analizy
  17. Prezentacja liczby żadań do serwisu w kolejnych dniach.
    import pyspark.sql.functions as F
    host_day_df = log_dfn.select(log_dfn.host,
                                 F.dayofmonth('time').alias('day'))
    
    status_freq_df = (log_dfn
                         .groupBy('status')
                         .count()
                         .sort('status')
                         .cache())
    print('Total distinct HTTP Status Codes:', status_freq_df.count()) 
    
    log_freq_df = status_freq_df.withColumn('log(count)',
                                            F.log(status_freq_df['count']))
    log_freq_df.show()
    
    host_day_df.show(5, truncate=False)
    
    def_mr = pd.get_option('max_rows')
    pd.set_option('max_rows', 10)
    
    daily_hosts_df = (host_day_df
                         .groupBy('day')
                         .count()
                         .sort("day"))
    
    daily_hosts_df.show(10, truncate=True)
    
    daily_hosts_pd_df = (daily_hosts_df
                             .toPandas()
                             .sort_values(by=['count'],
                                          ascending=False))
    
    c = sns.catplot(x='day', y='count',
                    data=daily_hosts_pd_df,
                    kind='point', height=5,
                    aspect=1.5)
    
    Lab3_spark_log03
    Rys.3 Wynik analizy

B. Analiza ruch lotniczego

  1. W ramach tego punktu przeanalizujemy plik zawierający statystyki lotów w Stanach Zjednoczonych.
  2. Przygotowanie środowiska Jupyter do pracy z projektem.
    import findspark
    findspark.init()
    
  3. Przygotowanie powłoki do obliczeń.
    from pyspark.sql import SparkSession    
    spark = SparkSession \
            .builder \
            .appName("Python Spark Data Exploration") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
    
  4. Przygotowanie bibiliotek do przetwarzania danych i tworzenia grafiki.
    #import libraries
    import pandas as pd
    import matplotlib.pyplot as plt
    import random
    
    %matplotlib inline
    
    #set ggplot style
    plt.style.use('ggplot')
    
  5. Pobieramy dane z pliku "2008.csv" i umieszczamy w pliku DataFrame. Plik zawiera opisy kolumn. Informacja o danych i ich formacie znajduje się na stronach: .
    df = spark.read.format('com.databricks.spark.csv').\
                                   options(header='true', \
                                   inferschema='true').load("/home/spark/files/2008.csv",header=True);
    
  6. Wylistowanie kolumn zawartych w dokumencie.
    df.columns
    

    Opis kolumn w zbiorze danych:

    1. Year 1987-2008
    2. Month 1-12
    3. DayofMonth 1-31
    4. DayOfWeek 1 (Monday) - 7 (Sunday)
    5. DepTime actual departure time (local, hhm m)
    6. CRSDepTime scheduled departure time (local, hhmm)
    7. ArrTime actual arrival time (local, hhmm)
    8. CRSArrTime scheduled arrival time (local, hhmm)
    9. UniqueCarrier unique carrier code
    10. FlightNum flight number
    11. TailNum plane tail number
    12. ActualElapsedTime in minutes
    13. CRSElapsedTime in minutes
    14. AirTime in minutes
    15. ArrDelay arrival delay, in minutes
    16. DepDelay departure delay, in minutes
    17. Origin origin IATA airport code
    18. Dest des tination IATA airport code
    19. Distance in miles 20 TaxiIn taxi in time, in minutes
    20. TaxiOut taxi out time in minutes
    21. Cancelled was the flight cancelled?
    22. CancellationCode reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
    23. Diverted 1 = yes, 0 = no
    24. CarrierDelay in minutes
    25. WeatherDelay in minutes
    26. NASDelay in minutes
    27. SecurityDelay in minutes
    28. LateAircraftDelay in minutes
  7. Wylistowanie lotów dla poszczegółnych przewoźników.
    df.groupBy("UniqueCarrier").count().show()
    
  8. [ Zad 1 ] Przedstawić tabelę zawierającą opóżnienia powyżej 40 minut na lotnisku docelowym. Wykorzystać transformacje: filter(), groupBy(), count(), orderBy().
    Lab3_spark_01
    Rys.4 Realizacja zadania 1
  9. Zapis zbioru do bufora.
    df.cache
    
  10. Utworzenie widoku do zapytań w języku SQL i zapis do bufora.
    df.createOrReplaceTempView("flights")
    
    spark.catalog.cacheTable("flights")
    
  11. [ Zad 2 ] Przedstawić tabelę zawierającą informację o przewoźniku, lotnisku startowym, docelowym, czasie opóźnienia opóżnienia i czasie lądowania dla lotów z opóźnieniem powyżej 40 minut na lotnisku docelowym. Do realizacji zapytania wykorzystać pytanie SQL do utworzonego widoku lub polecenie select na DF.
    Lab3_spark_02
    Rys.5 Realizacja zadania 2
  12. [ Zad 3 ] Wyznaczyć średnie opóźnienia dla poszczególnych przewoźników. Realizacja zadania na DF - transformacje groupBy() i agg().
    Lab3_spark_03
    Rys.6 Realizacja zadania 3
  13. Przygotowanie środowiska graficznego do prezentacji wykresów.
    import matplotlib.pyplot as plt
    import seaborn as sns
    import numpy as np
    %matplotlib inline
    
  14. Przedstawić tabelę oraz wykres prezentujący liczbę opóźnień ( powyżej 20 minut ) w funkcji godzin.
    import pyspark.sql.functions as F
    crshour = df.select('DepDelay', F.round(F.col('CRSDepTime')/100).cast('integer').alias('CRSDepHour'))
    
    crshour.show()
    
    crshour_count = ( crshour.filter(df.DepDelay > 20).groupBy('crsdephour').count().sort('crsdephour').cache())
    
    crshour_count_pd = ( crshour_count.toPandas().sort_values(by=['crsdephour']))
    
    crshour_count_pd
    
    Lab3_spark_06a
    Rys.7a Realizacja zadania ( tabelka - pandas )
    sns.catplot(x='crsdephour', y='count', data=crshour_count_pd,kind='bar',order=crshour_count_pd['crsdephour'] )
    
    Lab3_spark_06b
    Rys.7b Realizacja zadania ( wykres )
  15. [ Zad 4 ] Przedstawić tabelę oraz wykres prezentujący liczbę opóźnień dla przewoźników (powyżej 40 minut ).
    Lab3_spark_04a
    Rys.8a Realizacja zadania 4 (tabela - pandas)
    Lab3_spark_04b
    Rys.8b Realizacja zadania (wykres)
  16. [ Zad 5 ] Przedstawić tabelę oraz wykres prezentujący liczbę opóźnień ( powyżej 20 minut ) w zależności od dnia tygodnia.
    Lab3_spark_05a
    Rys.9a Realizacja zadania 5 (tabela - pandas )
    Lab3_spark_05b
    Rys.9b Realizacja zadania 5 ( wykres )

C. Spark z linii polecenia

  1. Przygotowanie do pracy z Spark'iem z linii poleceń.
    export SPARK_HOME=/usr/local/spark/  
    export PATH=$SPARK_HOME/bin:$PATH
    export PYSPARK_PYTHON=python3
    
  2. Praca w środowisku Spark - język Scala. Proszę wykonać kilka poleceń z laboratorium 1.
    spark-shell
    
  3. Praca w środowisku Spark - język python. Proszę wykonać kilka poleceń z laboratorium 1.
    pyspark
    
  4. Praca w środowisku Spark - uruchamianie skryptów. Odczyt pliku tekstowego w języku python. Na początek zawartość pliku read.py, a następnie polecenie uruchamiające skrypt w środowisku Spark.
    from pyspark import SparkConf, SparkFiles
    from pyspark.sql import SparkSession
    import time
    import os
    import sys
    conf = SparkConf().setAppName("test")
    spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
    print("============")
    with open("/home/spark/example/data.csv", 'r') as fh:
        print(fh.read())
    #time.sleep(300)
    spark.stop()
    
    spark-submit --master local[3]  read.py
    
  5. Praca w środowisku Spark - uruchamianie skryptów. Odczyt pliku tekstowego w języku python. Zmiana logowania informacji tworzonych przez Spark'a. Modyfikacja linii 'log4j.rootCategory=INFO, console' na linię 'log4j.rootCategory=ERROR, console' w pliku log4j.properties. Plik umieszczamy w swoim katalogu np.conf.
    mkdir conf
    cp $SPARK_HOME/conf/log4j.properties.template conf/log4j.properties
    
  6. Przygotowanie skryptu wykorzystująccego nowy plik 'log4j.properties'. W ramach polecenia ścieżkę do pliku podajemy jako bezwzględną.
    spark-submit --master local[3] \
        --driver-java-options "-Dlog4j.configuration=file:/home/anton/conf/log4j.properties" \
        --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \
        --files "/home/anton/conf/log4j.properties" \
        main.py