home_site

Lab04 - Apache Spark - przetwarzanie grafów [ ver. ChO.2025.11.27.003 ]

Zawartość strony

Apache Spark - przetwarzanie grafów

Tematyka zajęć:

  1. Środowisko do pracy z danymi w reprezenacji grafowej
  2. Wyznaczanie najkrótszej ścieżki
  3. Wyznaczanie wartości Page Rank
  4. Analiza danych linii lotniczych w Stanach Zjednoczonych

Pliki do realizacji zadań dostępne w odpowiednich podkatalogach w katalogu /home/spark/lab04:

  1. Pliki do ćwiczenia B [3] :
    • 'task2/transport-nodes.csv' - informacje o miastach, ich położenie geograficzne i liczba ludności;
    • 'task2/transport-relationships.csv' - połączenia pomiędzy miastami z odległościami.
  2. Pliki do ćwiczenia C [3] :
    • 'task3/social-nodes.csv' - węzły zawierające osoby;
    • 'task3/social-relationships.csv' - relacje pomiędzy osobami.
  3. Pliki do ćwiczenia D [3]:
    • 'task4/airports.csv' - opisy portów lotniczych - węzły;
    • 'task4/188591317_T_ONTIME.csv - relacje pomiędzy lotniskami;
    • 'task4/airlines.csv - nazwa lini lotniczej i jej kod;
    • 'task4/airlines.json
    • 'task4/latlongs-cluster1.csv

A. Środowisko do pracy z danymi w reprezenacji grafowej

Do realizacji przetwarzania danych w reprezentacji grafowej wymagana jest dodatkowa biblioteka zarówno po stronie Spark'a jak i narzędzi klienta. W ramach Sparka korzystamy z biblioteki "graphframes-0.7.0-spark2.4-s_2.11.jar" [1,2]. Biblioteka umieszczona jest w katalogu "jars" programu Spark. Dodatkowo do realiacji zadań w języku python wymagana jest biblioteka "graphframes", instalowana poprzez "pip install graphframes".

W ramach środowiska udostępnionego na zajęciach na serwerze obydwie biblioteki są zainstalowane i dostępne.

B. Wyznaczanie najkrótszej ścieżki

B1. Algorytm Breadth First Search (BFS)

Breadth First Search (BFS) (przeszukiwanie wszerz) to jeden z podstawowych algorytmów przechodzenia przez graf. Zaczyna od wybranego węzła i eksploruje po wszystkich swoich najbliższych sąsiadów, nastęnie odwiedza wszystkich sąsiadów oddalonych o dwa przeskoki i tak dalej. Algorytm został po raz pierwszy opublikowany w 1959 roku przez Edwarda F. Moore'a, który wykorzystał go do znalezienia najkrótszej drogi w labiryncie.

Algorytm zostanie zastosowany do szukania najkrótszej drogi pomiędzy miastami.

  1. Otwieramy nowy projekt i ustawiamy parametry środowiska.
    import findspark
    findspark.init()
    
  2. Przygotowanie powłoki do obliczeń.
    from pyspark.sql import SparkSession    
    spark = SparkSession \
            .builder \
            .appName("Python Spark Data Exploration") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
    
    from pyspark.sql.types import *
    from graphframes import *
    import pandas as pd
    
  3. Przygotowanie struktury dla danych - wczytanie danych z plików.
    fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]
    
  4. Odczyt danych z plików i przygotowanie odpowiednich struktur danych do reprezantacji DataFrame odpowiednich do struktur GraphFrame..
    v = spark.read.csv("/home/spark/lab04/task2/transport-nodes.csv", header=True, schema=StructType(fields))
    
    src_dst = spark.read.csv("/home/spark/lab04/task2/transport-relationships.csv", header=True)
    
    df_src_dst = src_dst.toPandas()
    df_dst_src = src_dst.toPandas()
    df_dst_src.columns = ["dst", "src", "relationship", "cost"]
    
    e = spark.createDataFrame(pd.concat([df_src_dst, df_dst_src], sort=False))
    
  5. Przygotowanie struktury grafowej w ramach GraphFrame.
    g = GraphFrame(v, e)
    
  6. Szukanie miast z średnią liczbą ludności w przedziale (100 000, 300 000).
    (g.vertices
     .filter("population > 100000 and population < 300000")
     .sort("population")
     .show())
    
  7. Wyszukanie drogi z Den Haag do miasta z liczba osób w przedziale ( 100000, 300000). W otrzymanej tablicy mamy węzły i połączenia. Ostatnie polecenie przypisze do węzłów odpowiednie miasta.
    from_expr = "id='Den Haag'"
    to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
    result = g.bfs(from_expr, to_expr)
    
    print(result.columns)
    
    columns = [column for column in result.columns if not column.startswith("e")]
    result.select(columns).show()
    
  8. Funkcja shortestPaths Sparka służy do znajdowania najkrótszych ścieżek spośród wszystkich węzłów do ​​zbioru węzłów zwanych punktami orientacyjnymi. Poniżej przykład wykorzystania funkcji do realizacji adania znalezienia najkrótszej drogi z we wszystkich lokalizacji do Colchester, Immingham i Hoek van Holland. Liczba obok każdej lokalizacji w kolumnie odległości to liczba relacji (dróg) między miastami, przez które musimy przejść, aby dostać się tam z początkowego węzeła.
    result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
    result.sort(["id"]).select("id", "distances").show(truncate=False)
    
  9. W kolejnym przykładzie znajdziemy najkrótszą ścieżkę ważoną pomiedzy dwoma miastami, czyli najkrótszą całkowitą odległość między miastami. Jeśli chcemy znaleźć najkrótszą ważoną ścieżkę (w tym przypadku odległość), musimy użyć właściwość kosztu. Ta opcja nie jest dostępna w ramach pakietu GraphFrames. Do prezentacji zadania zostanie przedstawione rozwiązanie dostępne w pozycji [3] wykorzystujące funkcję wykorzystującą bibliotekę "agregateMessages". Informacje dotyczące biblioteki "aggregateMessages" można znaleźć w sekcji "Message passing via AggregateMessages" podręcznika "GraphFrames".
  10. Przygotowanie biblioteki i funkcji do znalezienia ścieżki pomiędzy wyznaczonymi węzłami.
    from graphframes.lib import AggregateMessages as AM
    from pyspark.sql import functions as F
    
    add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))
    
  11. Funkcja realizująca postawione zadanie.
    def shortest_path(g, origin, destination, column_name="cost"):
        if g.vertices.filter(g.vertices.id == destination).count() == 0:
            return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema) \
                .withColumn("path", F.array()))
        vertices = (g.vertices.withColumn("visited", F.lit(False))
            .withColumn("distance", F.when(g.vertices["id"] == origin, 0).otherwise(float("inf")))        \
            .withColumn("path", F.array()))      
        cached_vertices = AM.getCachedDataFrame(vertices)
        g2 = GraphFrame(cached_vertices, g.edges)
        while g2.vertices.filter('visited == False').first():
            current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id
            msg_distance = AM.edge[column_name] + AM.src['distance']
            msg_path = add_path_udf(AM.src["path"], AM.src["id"])
            msg_for_dst = F.when(AM.src['id'] == current_node_id,F.struct(msg_distance, msg_path))
            new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),sendToDst=msg_for_dst)
            new_visited_col = F.when(     \
                g2.vertices.visited | (g2.vertices.id == current_node_id),True).otherwise(False)
            new_distance_col = F.when(new_distances["aggMess"].isNotNull() &    \
                (new_distances.aggMess["col1"] < g2.vertices.distance),new_distances.aggMess["col1"])  \
                .otherwise(g2.vertices.distance)
            new_path_col = F.when(new_distances["aggMess"].isNotNull() &  \
                (new_distances.aggMess["col1"] < g2.vertices.distance), new_distances.aggMess["col2"]   \
                .cast("array<string>")).otherwise(g2.vertices.path)
            new_vertices = (g2.vertices.join(new_distances, on="id",how="left_outer")  \
                .drop(new_distances["id"])   \
                .withColumn("visited", new_visited_col)  \
                .withColumn("newDistance", new_distance_col)  \
                .withColumn("newPath", new_path_col)  \
                .drop("aggMess", "distance", "path")  \
                .withColumnRenamed('newDistance', 'distance')  \
                .withColumnRenamed('newPath', 'path'))  
            cached_new_vertices = AM.getCachedDataFrame(new_vertices)
            g2 = GraphFrame(cached_new_vertices, g2.edges)
            if g2.vertices.filter(g2.vertices.id == destination).first().visited:
                return (g2.vertices.filter(g2.vertices.id == destination)  \
                    .withColumn("newPath", add_path_udf("path", "id"))     \
                    .drop("visited", "path")                               \
                    .withColumnRenamed("newPath", "path"))
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)    \
            .withColumn("path", F.array()))
    
  12. Znalezienie najkrótszej drogi pomiędzy miastami: Amsterdam i Colchester
    result = shortest_path(g, "Amsterdam", "Colchester", "cost")
    result.select("id", "distance", "path").show(truncate=False)
    

B2. Algorytm Single Source Shortest Path (SSSP)

W ramach tego punktu przedstawiony zostanie algorytm Single Source Shortest Path (SSSP), który powstał w podobnym czasie jak algorytm najkrótszej ścieżki Dijkstry. Algorytm SSSP oblicza najkrótszą (ważoną) ścieżkę od węzła początkowego do wszystkich innych węzłów w grafie.

  1. Otwieramy nowy projekt i ustawiamy parametry środowiska.
    import findspark
    findspark.init()
    
  2. Przygotowanie powłoki do obliczeń.
    from pyspark.sql import SparkSession    
    spark = SparkSession \
            .builder \
            .appName("Python Spark Data Exploration") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
    
    from pyspark.sql.types import *
    from graphframes import *
    from pyspark.sql import functions as F
    import pandas as pd
    
    from graphframes.lib import AggregateMessages as AM
    from pyspark.sql import functions as F
    
  3. Utworzenie struktury GraphFrame w ramach funkcji create_transport_graph().
    def create_transport_graph():
        node_fields = [
            StructField("id", StringType(), True),
            StructField("latitude", FloatType(), True),
            StructField("longitude", FloatType(), True),
            StructField("population", IntegerType(), True)
        ]
        nodes = spark.read.csv("/home/spark/lab04/task2/transport-nodes.csv", header=True,
                               schema=StructType(node_fields))
    
        rels = spark.read.csv("/home/spark/lab04/task2/transport-relationships.csv", header=True)
        reversed_rels = (rels.withColumn("newSrc", rels.dst)
                         .withColumn("newDst", rels.src)
                         .drop("dst", "src")
                         .withColumnRenamed("newSrc", "src")
                         .withColumnRenamed("newDst", "dst")
                         .select("src", "dst", "relationship", "cost"))
        relationships = rels.union(reversed_rels)
        return GraphFrame(nodes, relationships)
    
  4. Definicja funkcji SSSP().
    add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))
    
    def sssp(g, origin, column_name="cost"):
        vertices = g.vertices \
            .withColumn("visited", F.lit(False)) \
            .withColumn("distance",
                F.when(g.vertices["id"] == origin, 0).otherwise(float("inf"))) \
            .withColumn("path", F.array())
        cached_vertices = AM.getCachedDataFrame(vertices)
        g2 = GraphFrame(cached_vertices, g.edges)
    
        while g2.vertices.filter('visited == False').first():
            current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id
    
            msg_distance = AM.edge[column_name] + AM.src['distance']
            msg_path = add_path_udf(AM.src["path"], AM.src["id"])
            msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
            new_distances = g2.aggregateMessages(
                F.min(AM.msg).alias("aggMess"), sendToDst=msg_for_dst)
    
            new_visited_col = F.when(
                g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
            new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                      (new_distances.aggMess["col1"] < g2.vertices.distance),
                                      new_distances.aggMess["col1"]) \
                                .otherwise(g2.vertices.distance)
            new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col2"].cast("array<string>")) \
                            .otherwise(g2.vertices.path)
    
            new_vertices = g2.vertices.join(new_distances, on="id", how="left_outer") \
                .drop(new_distances["id"]) \
                .withColumn("visited", new_visited_col) \
                .withColumn("newDistance", new_distance_col) \
                .withColumn("newPath", new_path_col) \
                .drop("aggMess", "distance", "path") \
                .withColumnRenamed('newDistance', 'distance') \
                .withColumnRenamed('newPath', 'path')
            cached_new_vertices = AM.getCachedDataFrame(new_vertices)
            g2 = GraphFrame(cached_new_vertices, g2.edges)
    
        return g2.vertices \
                    .withColumn("newPath", add_path_udf("path", "id")) \
                    .drop("visited", "path") \
                    .withColumnRenamed("newPath", "path")
    
  5. Realizacja obliczeń dla utworzonej funkcji.
    g = create_transport_graph() 
    
    via_udf = F.udf(lambda path: path[1:-1], ArrayType(StringType()))
    
    result = sssp(g, "Amsterdam", "cost")
    (result
     .withColumn("via", via_udf("path"))
     .select("id", "distance", "via")
     .sort("distance")
     .show(truncate=False))
    

C. Wyznaczenie wartości Page Rank

Algorytmy centralności służą do zrozumienia roli poszczególnych węzłów na grafie oraz ich wpływu na sieć. Są przydatne, ponieważ identyfikują ważne węzły i pomagają zrozumieć dynamikę grupy, taką jak wiarygodność, dostępność, szybkość rozprzestrzeniają się informacji ( lub innych rzeczy ) az znaleźć połączenia pomiędzy grupami. W ramach tego zadania zapoznamy się z algorytmem Page Rank.

PageRank to najbardziej znany z algorytmów centralności. Mierzy przechodni (lub kierunkowy) wpływ węzłów. Większość algorytmów centralności mierzy bezpośredni wpływ węzła, podczas gdy PageRank uwzględnia wpływ sąsiadów węzła i ich sąsiadów. Na przykład posiadanie kilku wpływowych sąsiadów może sprawić, że będziesz bardziej wpływowy niż przy posiadanie wielu słabszych przyjaciół. PageRank jest obliczany przez iteracyjne rozdzielanie rangi jednego węzła na jego sąsiadów lub losowo przechodząc przez graf i liczy częstotliwość z jaką każdy węzeł jest trafiony podczas tego przeszukiwania. Nazwa PageRank opracował Larry'ego Page'a, współzałożyciel Google, który opracował go w celu pozycjonowania stron internetowych w wynikach wyszukiwania Google. PageRank mierzy liczbę i jakość relacji przychodzących do węzła w celu ustalenia oszacowania jak ważny jest ten węzeł.

  1. Definicja formuły Page Rank.
    Lab4_spark_02
    Rys.2 Definicja furmuły Page Rank
  2. Otwieramy nowy projekt i ustawiamy parametry środowiska.
    import findspark
    findspark.init()
    
    from pyspark.sql import SparkSession    
    spark = SparkSession \
            .builder \
            .appName("Python Spark Data Exploration") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()
    
    from graphframes import *
    from pyspark import SparkContext
    
  3. Wczytanie danych i utworzenie struktury GraphFrame.
    v = spark.read.csv("/home/spark/lab04/task3/social-nodes.csv", header=True)
    e = spark.read.csv("/home/spark/lab04/task3/social-relationships.csv", header=True)
    g = GraphFrame(v, e)
    
  4. Wyznaczenie wartości Page Rank dla parametrów:
    results = g.pageRank(resetProbability=0.15, maxIter=20)
    results.vertices.sort("pagerank", ascending=False).show()
    
    Lab4_spark_3
    Rys.3 Tabela wyznaczonych wartości Page Rank.
  5. Wyznaczenie wartości Page Rank dla parametrów:
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.sort("pagerank", ascending=False).show()
    
    Lab3_spark_05b
    Rys.4 Tabela wyznaczonych wartości Page Rank.
  6. Na koniec wykorzystując algorytm "Degree Centrality" przeanalizujemy dane pod kątem liczby relacji przychodzącyhc i wychodzących a także zanlezienie najbardziej popularnego węzła.
    total_degree = g.degrees
    in_degree = g.inDegrees
    out_degree = g.outDegrees
    (total_degree.join(in_degree, "id", how="left")
      .join(out_degree, "id", how="left")
      .fillna(0)
      .sort("inDegree", ascending=False)
      .show())
    

D. Analiza lotów w Stanach Zjednoczonych z wykorzystaniem grafów

Dwa duże zdarzenia w roku 2010, które zdarzyły się w amerykańskim systemie podrózy lotniczych były inspiracją do powstania prac wykorzystujących grafy do analizy tych zdarzeń. W wyniku tych prac zaanlizowano wystąpienia systematycznych opóźnień kaskadowych i wykorzystano te informacje do poszukiwań rozwiązań, które uchronią linie lotnicze przed tymi zdarzeniami. ( P. Fleurquin, J. J. Ramasco, and V. M. Eguíluz, "Systemic Delay Propagation in the US Airport Network".)

  1. Tworzymy nowy projekt i ustawiamy zmienne środowiska.
    import findspark
    findspark.init()
    
    from graphframes import *
    from pyspark.sql.types import *
    from pyspark.sql import functions as F
    
    import matplotlib
    import matplotlib.pyplot as plt
    plt.style.use('fivethirtyeight')
    
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('baza').getOrCreate()
    
  2. Wczytanie wartości opisy portów lotniczych - węzły.
    nodes = spark.read.csv("/home/spark/lab04/task4/airports.csv", header=False)
    
    nodes.columns
    
    cleaned_nodes = (nodes.select("_c1", "_c3", "_c4", "_c6", "_c7")
                     .filter("_c3 = 'United States'")
                     .withColumnRenamed("_c1", "name")
                     .withColumnRenamed("_c4", "id")
                     .withColumnRenamed("_c6", "latitude")
                     .withColumnRenamed("_c7", "longitude")
                     .drop("_c3"))
    
    cleaned_nodes = cleaned_nodes[cleaned_nodes["id"] != "\\N"]
    
  3. Wczytanie wartości relacji - krawędzie.
    relationships = spark.read.csv("/home/spark/lab04/task4/188591317_T_ONTIME.csv", header=True)
    
    cleaned_relationships = (relationships
                             .select("ORIGIN", "DEST", "FL_DATE", "DEP_DELAY", "ARR_DELAY",
                                     "DISTANCE", "TAIL_NUM", "FL_NUM", "CRS_DEP_TIME",
                                     "CRS_ARR_TIME", "UNIQUE_CARRIER")
                             .withColumnRenamed("ORIGIN", "src")
                             .withColumnRenamed("DEST", "dst")
                             .withColumnRenamed("DEP_DELAY", "deptDelay")
                             .withColumnRenamed("ARR_DELAY", "arrDelay")
                             .withColumnRenamed("TAIL_NUM", "tailNumber")
                             .withColumnRenamed("FL_NUM", "flightNumber")
                             .withColumnRenamed("FL_DATE", "date")
                             .withColumnRenamed("CRS_DEP_TIME", "time")
                             .withColumnRenamed("CRS_ARR_TIME", "arrivalTime")
                             .withColumnRenamed("DISTANCE", "distance")
                             .withColumnRenamed("UNIQUE_CARRIER", "airline")
                             .withColumn("deptDelay", F.col("deptDelay").cast(FloatType()))
                             .withColumn("arrDelay", F.col("arrDelay").cast(FloatType()))
                             .withColumn("time", F.col("time").cast(IntegerType()))
                             .withColumn("arrivalTime", F.col("arrivalTime").cast(IntegerType()))
                             )
    
  4. Utworzenie struktury GraphFrame.
    g = GraphFrame(cleaned_nodes, cleaned_relationships)
    
  5. Odczyt z pliku referencji kodów do nazw linii lotniczych.
    airlines_reference = (spark.read.csv("/home/spark/lab04/task4/airlines.csv")
          .select("_c1", "_c3")
          .withColumnRenamed("_c1", "name")
          .withColumnRenamed("_c3", "code"))
    
    airlines_reference = airlines_reference[airlines_reference["code"] != "null"]
    
    df = spark.read.option("multiline", "true").json("/home/spark/lab04/task4/airlines.json")
    dummyDf = spark.createDataFrame([("test", "test")], ["code", "name"])
    
    for code in df.schema.fieldNames():
        tempDf = (df.withColumn("code", F.lit(code))
                  .withColumn("name", df[code]))
        tdf = tempDf.select("code", "name")
        dummyDf = dummyDf.union(tdf)
    
  6. Troszkę statystyki, liczba węzłów, krawędzi i maksymalny czas opóźnienia.
    g.vertices.count()
    
    g.edges.count()
    
    g.edges.groupBy().max("deptDelay").show()
    
  7. Szukamy lotnisk z których odlatuje najwięcej samolotów. Wykorzystamy algorytm oparty o "Degree Centrality".
    airports_degree = g.outDegrees.withColumnRenamed("id", "oId")
    
    full_airports_degree = (airports_degree
                            .join(g.vertices, airports_degree.oId == g.vertices.id)
                            .sort("outDegree", ascending=False)
                            .select("id", "name", "outDegree"))
    
    full_airports_degree.show(n=10, truncate=False)
    
  8. Prezentacja graficzna najbardziej popularnych lotnisk ( w oparciu o stopień węzłów ).
    plt.style.use('fivethirtyeight')
    
    ax = (full_airports_degree
          .toPandas()
          .head(10)
          .plot(kind='bar', x='id', y='outDegree', legend=None))
    
    ax.xaxis.set_label_text("")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
  9. Kolejna analiza przedstawia zestawienie opóźnień lotów z lotniska ORD (Chicago O’Hare International Airport), ważny węzeł komunikacyjny pomiedzy wschodnim i zachodnim wybrzeżem. Prezentacja 10 najbardziej opóźnionych lotów.
    delayed_flights = (g.edges
                       .filter("src = 'ORD' and deptDelay > 0")
                       .groupBy("dst")
                       .agg(F.avg("deptDelay"), F.count("deptDelay"))
                       .withColumn("averageDelay", F.round(F.col("avg(deptDelay)"), 2))
                       .withColumn("numberOfDelays", F.col("count(deptDelay)")))
    
    (delayed_flights
     .join(g.vertices, delayed_flights.dst == g.vertices.id)
     .sort(F.desc("averageDelay"))
     .select("dst", "name", "averageDelay", "numberOfDelays")
     .show(n=10, truncate=False))
    
  10. Na pierwszym miejscu jest 12 lotów z lotniska ORD do lotniska CKB. Przeprowadzamy dalszą analizę tych lotów.
    from_expr = 'id = "ORD"'
    to_expr = 'id = "CKB"'
    ord_to_ckb = g.bfs(from_expr, to_expr)
    
    ord_to_ckb = ord_to_ckb.select(
      F.col("e0.date"),
      F.col("e0.time"),
      F.col("e0.flightNumber"),
      F.col("e0.deptDelay"))
    
    ax = (ord_to_ckb
          .sort("date")
          .toPandas()
          .plot(kind='bar', x='date', y='deptDelay', legend=None))
    
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()
    
  11. Analiza wykresu. Ponad połowa lotów była opóźniona. Ale największe opóżnienie było 2 maja 2018 i wynosiło 14 godzin co wpłynęlo na niekorzystny wynik.
  12. Analiza lotów na lotnisku San Francisco International Airport (SFO) w dniu 11-05-2018 - zły dzień. ( Analizujemy dzień w którym najwięcej było opóźnień na lotnisku ). W ramach tego punktu znajdziemy lotu które lądują na lotnisku a następnie odlatują - (a)-[ab]->(b); (b)-[bc]->(c). Znajdujemy loty, które mają opóźnienie przylotu lub odlotu powyżej 30 minut i należą do tej samej linii.
    motifs = (g.find("(a)-[ab]->(b); (b)-[bc]->(c)")
              .filter("""(b.id = 'SFO') and
                      (ab.date = '2018-05-11' and bc.date = '2018-05-11') and
                      (ab.arrDelay > 30 or bc.deptDelay > 30) and
                      (ab.flightNumber = bc.flightNumber) and
                      (ab.airline = bc.airline) and
                      (ab.time < bc.time)"""))
    
    def sum_dist(dist1, dist2):
        return sum([value for value in [dist1, dist2] if value is not None])
    
    sum_dist_udf = F.udf(sum_dist, FloatType())
    
    result = (motifs.withColumn("delta", motifs.bc.deptDelay - motifs.ab.arrDelay)
              .select("ab", "bc", "delta")
              .sort("delta", ascending=False))
    
    result.select(
        F.col("ab.src").alias("a1"),
        F.col("ab.time").alias("a1DeptTime"),
        F.col("ab.arrDelay"),
        F.col("ab.dst").alias("a2"),
        F.col("bc.time").alias("a2DeptTime"),
        F.col("bc.deptDelay"),
        F.col("bc.dst").alias("a3"),
        F.col("ab.airline"),
        F.col("ab.flightNumber"),
        F.col("delta")
    ).show()
    
  13. Wyznaczenie wartości Page Rank (obliczenia troszkę trwają, można pominać, wrócić później).
    result = g.pageRank(resetProbability=0.15, maxIter=20)
    (result.vertices
     .sort("pagerank", ascending=False)
     .withColumn("pagerank", F.round(F.col("pagerank"), 2))
     .show(truncate=False, n=100))
    
  14. Wyznaczenie wartości Page Rank (obliczenia troszkę trwają, można pominać, wrócić później).
    triangles = g.triangleCount().cache()
    pagerank = g.pageRank(resetProbability=0.15, maxIter=20).cache()
    
    (triangles.select(F.col("id").alias("tId"), "count")
     .join(pagerank.vertices, F.col("tId") == F.col("id"))
     .select("id", "name", "pagerank", "count")
     .sort("count", ascending=False)
     .withColumn("pagerank", F.round(F.col("pagerank"), 2))
     .show(truncate=False))
    
  15. W kolejnym zadaniu poszukimy linii lotniczej, która umożliwi nam zwiedzenie największej liczby lotnisk.
    airlines = (g.edges
      .groupBy("airline")
      .agg(F.count("airline").alias("flights"))
      .sort("flights", ascending=False))
    full_name_airlines = (airlines_reference
      .join(airlines, airlines.airline == airlines_reference.code)
      .select("code", "name", "flights"))
    
    ax = (full_name_airlines.toPandas()
      .plot(kind='bar', x='name', y='flights', legend=None))
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()
    
  16. Kolejny algorytm pozwoli nam znaleźć grupy lotnisk dla każdej linii lotniczej, których loty z i do lotnisk należą do tej samej grupy. Wykorzystujemy algorytm "Strongly Connected Components". ( Liczy długo )
    Najwieksza liczba obsługiwanych lotnisk - linia loticza.
    def find_scc_components(g, airline):
       # Create a subgraph containing only flights on the provided airline
       airline_relationships = g.edges[g.edges.airline == airline]
       airline_graph = GraphFrame(g.vertices, airline_relationships)
       # Calculate the Strongly Connected Components
       scc = airline_graph.stronglyConnectedComponents(maxIter=10)
       # Find the size of the biggest component and return that
       return (scc
         .groupBy("component")
         .agg(F.count("id").alias("size"))
         .sort("size", ascending=False)
         .take(1)[0]["size"])
    
    # Calculate the largest strongly connected component for each airline
    airline_scc = [(airline, find_scc_components(g, airline))
    for airline in airlines.toPandas()["airline"].tolist()]
    airline_scc_df = spark.createDataFrame(airline_scc, ['id', 'sccCount'])
    # Join the SCC DataFrame with the airlines DataFrame so that we can show
    # the number of flights an airline has alongside the number of
    # airports reachable in its biggest component
    airline_reach = (airline_scc_df
    .join(full_name_airlines, full_name_airlines.code == airline_scc_df.id)
    .select("code", "name", "flights", "sccCount")
    .sort("sccCount", ascending=False))
    
  17. Prezentacja wyników na wykresie.
      
    ax = (airline_reach.toPandas()
    .plot(kind='bar', x='name', y='sccCount', legend=None))
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()
    
  18. Podczas kolejnej analizy poznamy lotniska, które są obslugiwane przez przewoźnika DL ( Delta Airlines).
    airline_relationships = g.edges.filter("airline = 'DL'")
    airline_graph = GraphFrame(g.vertices, airline_relationships)
    
    clusters = airline_graph.labelPropagation(maxIter=10)
    (clusters
     .sort("label")
     .groupby("label")
     .agg(F.collect_list("id").alias("airports"),
          F.count("id").alias("count"))
     .sort("count", ascending=False)
     .show(truncate=70, n=10))
    
     
    all_flights = g.degrees.withColumnRenamed("id", "aId")
    
    (clusters
    .filter("label=1606317768706")
    .join(all_flights, all_flights.aId == clusters.id)
     .sort("degree", ascending=False)
    .select("id", "name", "degree")
    .show(truncate=False))