home_site

Lab13 - Token JWT, OpenAPI [ ver. TI.2026.01.14.002 ]

Zawartość strony

Token JWT, OpenAPI

W ramach laboratorium zostanie przedstawiony schemat działania aplikacji z wykorzystaniem tokenów JWT dla rozwiązań działających zgodnie z stylem RESTful i nie realizujących sesji po stronie serwera. Link do strony zawierającej opis i dokumentację projektu JWT - Introduction to JSON Web Tokens.

Na rys.1 przedstawiona została struktura tokenu JWT, która składa się z trzech części: nagłowka (ang. header) zawierającego typ tokenu oraz algorytm szyfrowania, zawartości ( ang. payload ) zawierającego dane do przesyłania oraz metadane oraz sygnatury ( ang. signature ), która jest tworzona z wykorzystaniem algorytmu szyfrowania zawartego w nagłówku, zakodowanej zawartości i hasła. Utworzony sygnatura potwierdza autentyczność danych zawartych w tokenie.

Lab13_JWT
Rys.1 Struktura tokenu JWT.

Na rys. 2 przedstawiono schematycznie wykorzystanie tokenu w aplikacji. Po wygenerowaniu tokenu jest on przekazany do klienta, który każdorazowo jest wysyłany przez klienta do częsci serwerowej gdy jest wymagana autoryzacja użytkownika.

Lab13_JWT
Rys.2 Diagram uzyskania dostępu z wykorzystaniem JWT.

Serwis node.js i JWT

W ramach zadania zostanie uruchomiony serwer node.js z funkcjonalnością autoryzacji dostępu z wykorzystaniem tokenu JWT. Wymagany jest pakiet jsonwebtoken.

  1. Instalacja pakietu jsonwebtoken.
    npm i --save jsonwebtoken
    
  2. Plik serwera books_jwt.js - w technologii node.js.

    Plik books_jwt.js - ( [listing dokumentu] [link do dokumentu] )

       const express = require('express');
    const bodyParser = require('body-parser');
    const jwt = require('jsonwebtoken');
    const app = express();
    
    const accessTokenSecret = 'somerandomaccesstoken';
    const refreshTokenSecret = 'somerandomstringforrefreshtoken';
    
    const users = [
        {
            username: 'john',
            password: 'password123admin',
            role: 'admin'
        }, {
            username: 'anna',
            password: 'password123member',
            role: 'member'
        }
    ]
    
    const authenticateJWT = (req, res, next) => {
        const authHeader = req.headers.authorization;
    
        if (authHeader) {
            const token = authHeader.split(' ')[1];
            //const token = authHeader;
            //console.log(`${authHeader} \n`);
            //console.log(`${token} \n`);
            //console.log(`${accessTokenSecret} \n`);
            //console.log(jwt.verify(token, accessTokenSecret));
            jwt.verify(token, accessTokenSecret, (err, user) => {
                if (err) {
                    console.log('Bledny token');
                    return res.sendStatus(403);
                }
                console.log('Poprawny token');
                req.user = user;
                next();
            });
        } else {
            res.sendStatus(401);
        }
    }
    
    const books = [
        {
            "author": "Chinua Achebe",
            "country": "Nigeria",
            "language": "English",
            "pages": 209,
            "title": "Things Fall Apart",
            "year": 1958
        },
        {
            "author": "Hans Christian Andersen",
            "country": "Denmark",
            "language": "Danish",
            "pages": 784,
            "title": "Fairy tales",
            "year": 1836
        },
        {
            "author": "Dante Alighieri",
            "country": "Italy",
            "language": "Italian",
            "pages": 928,
            "title": "The Divine Comedy",
            "year": 1315
        },
    ]
    
    var refreshTokens = [];
    
    app.use(bodyParser.json());
    
    app.post('/login', (req, res) => {
        // read username and password from request body
        const { username, password } = req.body;
    
        // filter user from the users array by username and password
        const user = users.find(u => { return u.username === username && u.password === password });
        console.log(`${user.username} \n`);
        if (user) {
            // generate an access token
            const accessToken = jwt.sign({ username: user.username, role: user.role }, accessTokenSecret, { expiresIn: '3m' });
            const refreshToken = jwt.sign({ username: user.username, role: user.role }, refreshTokenSecret);
    
            refreshTokens.push(refreshToken);
    
            res.json({
                accessToken,
                refreshToken
            });
        } else {
            res.send('Username or password incorrect');
        }
    });
    
    app.post('/token', (req, res) => {
        const { token } = req.body;
    
        if (!token) {
            return res.sendStatus(401);
        }
    
        if (!refreshTokens.includes(token)) {
            return res.sendStatus(403);
        }
    
        jwt.verify(token, refreshTokenSecret, (err, user) => {
            if (err) {
                return res.sendStatus(403);
            }
    
            const accessToken = jwt.sign({ username: user.username, role: user.role }, accessTokenSecret, { expiresIn: '3m' });
    
            res.json({
                accessToken
            });
        });
    });
    
    app.post('/logout', (req, res) => {
        const { token } = req.body;
        refreshTokens = refreshTokens.filter(t => t !== token);
    
        res.send("Logout successful");
    });
    
    app.get('/books', authenticateJWT, (req, res) => {
        res.json(books);
    });
    
    app.post('/books', authenticateJWT, (req, res) => {
        const { role } = req.user;
    
        if (role !== 'admin') {
            return res.sendStatus(403);
        }
    
    
        const book = req.body;
        books.push(book);
    
        res.send('book added successfully');
    });
    
    app.listen(<port>, () => {
        console.log('Books service started on port <port>');
    });
      
  3. Uzyskanie tokenu JWT - logowanie do serwisu.
    curl --header "Content-Type: application/json" \
      --request POST \
      --data '{"password":"password123admin", "username":"john"}' \
      http://localhost:<port>/login
    
  4. Dostęp do serwisu poprzez wysłanie tokenu JWT.
    curl -X GET \
      -H 'Authorization: Bearer <token> ' \
      http://localhost:<port>/books
    
  5. Odświeżenie tokenu JWT - refresh token.
    curl --header "Content-Type: application/json" \
    --request POST --data '{ "token":"<refresh token>"}' \
     http://localhost:<port>/token 
    
  6. Wylogowanie z serwisu.
    curl --header "Content-Type: application/json" \
    --request POST --data '{ "token":"<refresh token>"}' \
     http://localhost:<port>/logout
    
  7. Analiza tokenu JWT w serwisie https://jwt.io/#debugger-io (rys.3).
    Lab13_JWT
    Rys.3 Analiza tokenu JWT - JWT Debugger.

Framework Flask i JWT

W kolejnym punkcie przedstawiony zostanie serwer opracowany w języku python z wykorzystaniem frameworku Flask i tokenów JWT. Realizacja projektu jest zrealizowana w środowisku Python Virtual Environment. W tym celu należy utworzyć odpowiedni katalog, przygotować środowisko i pobrać odpowiednie pakiety do realizacji zadania. Poniżej przykładowa realizacja PVE wraz z poleceniami przygotowującymi środowisko uruchomieniowe dla serwera.

  1. Przygotowanie środowiska uruchomieniowego.
    mkdir python-lab13
    cd python-lab13
    python3 -m venv lab13
    source lab13/bin/activate
    pip install flask
    pip install pyjwt
    pip install python-dotenv
    
  2. Konfiguracja plików w projekcie.
    lab13/
      ├──▻ app/  { auth.py, routes.py }
      ├──▻ services/ { auth_guard.py, auth_provider.py, jwt_handler.py, .env }
    { app.py }
    
  3. Plik uwierzytelnienia app/auth.py

    Plik auth.py ( [listing dokumentu] link do dokumentu )

    # app/auth.py
    
    from flask import request, jsonify
    from services.auth_provider import authenticate
    from services.jwt_handler import generate_jwt
    
    def init(app):
    
        @app.route('/api/auth', methods=['POST'])
        def auth():
            email = request.json.get('email')
            password = request.json.get('password')
            if not email or not password:
                return jsonify({"message": "Email or password missing", "status": 400}), 400
    
            user_data = authenticate(email, password)
            if not user_data:
                return jsonify({"message": "Invalid credentials", "status": 400}), 400
    
            token = generate_jwt(payload=user_data, lifetime=60) # <--- generates a JWT with valid within 1 hour by now
            return jsonify({"data": token, "status": 200}), 200  
  4. Plik punktów końcowych aplikacji app/routes.py

    Plik routes.py ( [listing dokumentu] link do dokumentu )

    # app/routes.py
    
    from flask import jsonify
    from services.auth_guard import auth_guard
    
    def init(app):
    
        @app.route('/api/protected_route', methods=['GET'])
        @auth_guard() # <--- Requires the authentication, but do not restricts authorization by roles
        def protected_route():
            return jsonify({"message": 'You have accessed a protected route.', "status": 200}), 200
    
        @app.route('/api/protected_route_user', methods=['GET'])
        @auth_guard('user') # <--- Requires the authentication AND authorization by 'user' role
        def protected_route_user():
            return jsonify({"message": 'You have accessed a user protected route.', "status": 200}), 200
    
        @app.route('/api/protected_route_admin', methods=['GET'])
        @auth_guard('admin') # <--- Requires the authentication AND authorization by 'admin' role
        def protected_route_admin():
            return jsonify({"message": 'You have accessed a admin protected route.', "status": 200}), 200
    
        @app.route('/api/protected_route_super_admin', methods=['GET'])
        @auth_guard('super_admin') # <--- Requires the authentication AND authorization by 'super_admin' role
        def protected_route_super_admin():
            return jsonify({"message": 'You are a SUPER ADMIN!', "status": 200}), 200  
  5. Plik użytkowników serwisu services/auth_provider.py

    Plik auth_provider.py ( [listing dokumentu] link do dokumentu )

    # services/auth_provider.py
    
    def authenticate(email, password):
        if email == 'admin@office.com' and password == 'password':
            return {
                'username': 'admin',
                'email': 'admin@office.com',
                'roles': ['admin', 'user']
            }
        elif email == 'user@office.com' and password == 'password':
            return {
                'username': 'user',
                'email': 'user@office.com',
                'roles': ['user']
            }  
        elif email == 'super_admin@office.com' and password == 'password':
            return {
                'username': 'super',
                'email': 'super_admin@office.com',
                'roles': ['super_admin','admin','user']
            } 
        else:        
            return False  
  6. Plik weryfikacji poprawności tokenu JWT services/auth_guard.py

    Plik auth_guard.py ( [listing dokumentu] link do dokumentu )

    # services/auth_guard.py
    
    from flask import request, jsonify
    from services.jwt_handler import decode_jwt
    
    def check_jwt():
        # Gets token from request header and tries to get it's payload
        # Will raise errors if token is missing, invalid or expired 
        token = request.headers.get('Authorization')
        if not token:
            raise Exception('Missing access token')
        jwt = token.split('Bearer ')[1]
        try:
            return decode_jwt(jwt)
        except Exception as e:
            raise Exception(f'Invalid access token: {e}')
    
    def auth_guard(role=None):
        def wrapper(route_function):
            def decorated_function(*args, **kwargs):
                # Authentication gate
                try:
                    user_data = check_jwt()
                except Exception as e:
                    return jsonify({"message": f'{e}', "status": 401}), 401
                # Authorization gate
                if role and role not in user_data['roles']:
                    return jsonify({"message": 'Authorization required.', "status": 403}), 403
                # Proceed to original route function
                return route_function(*args, **kwargs)
            decorated_function.__name__ = route_function.__name__
            return decorated_function
        return wrapper  
  7. Plik generujący token JWT services/jwt_handler.py

    Plik jwt_handler.py ( [listing dokumentu] link do dokumentu )

    # services/jwt_handler.py
    
    from dotenv import load_dotenv
    import os
    import jwt
    from datetime import datetime, timedelta
    
    load_dotenv()
    
    def generate_jwt(payload, lifetime=None):
        # Generates a new JWT token, wrapping information provided by payload (dict)
        # Lifetime describes (in minutes) how much time the token will be valid
        if lifetime:
            payload['exp'] = (datetime.now() + timedelta(minutes=lifetime)).timestamp()
        return jwt.encode(payload, os.environ.get('SECRET_KEY'), algorithm="HS256")
    
    def decode_jwt(token):
        # Tries to retrieve payload information inside of a existent JWT token (string)
        # Will throw an error if the token is invalid (expired or inconsistent)
        return jwt.decode(token, os.environ.get('SECRET_KEY'), algorithms=["HS256"])  
  8. Plik zawierający zmienne systemowe services/.env
    SECRET_KEY= ... klucz sekretny ...
    
  9. Główny plik aplikacji server.py

    Plik server.py ( [listing dokumentu] link do dokumentu )

    # api.py
    
    from flask import Flask
    from app.auth import init as init_auth_routes
    from app.routes import init as init_routes
    
    # def create_app():
    app = Flask(__name__)
    init_auth_routes(app)
    init_routes(app)
    #    return app
    
    if __name__ == '__main__':
    #    app = create_app()
        app.run(host='149.156.109.180', port=4000, debug=True)  
  10. Sprawdzamy poprawność nasz aplikacji z wykorzystaniem curl'a dla wszystkich użytkowników.
    curl --header "Content-Type: application/json" \
      --request POST \
      --data '{"password":"password", "email":"user@office.com"}' \
      http://149.156.109.180:<port>/login
    
    curl -X GET \
      -H 'Authorization: Bearer <token> ' \
      http://149.156.109.180:<port>/protected_route
    
    curl -X GET \
      -H 'Authorization: Bearer <token> ' \
      http://149.156.109.180:<port>/protected_route_user
    
    curl -X GET \
      -H 'Authorization: Bearer <token> ' \
      http://149.156.109.180:<port>/protected_route_admin
    
    curl -X GET \
      -H 'Authorization: Bearer <token> ' \
      http://149.156.109.180:<port>/protected_route_super_admin
    

Standard OpenAPI, standard WSGI i serwer Gunicorn

Ostatni punkt zajęć dotyczy dwóch zagadnień. Pierwsze zagadnienie to narzędzia do udostępnia punktów końcowych aplikacji opracowanych zgodnie z stylem REST. W ramach zadania zostanie przedstawione narzędzie OpenAPI3 z jego implementacją w Flask'u zaimplementowane w przykładowym skrypcie realizującym operacje CRUD. Kolejne zagadnienie to prezentacja serwera Gunicorn, który został opracowany do wsparcia rozwiązań serwisów WWW realizowanych z wykorzystaniem języka python.

  1. Na początek uruchomimy przykład z poprzedniego laboratorium po modyfikcji skryptu do wymagań narzędzia OpenAPI. Realizacja tego zadania wymaga doinstalowania w naszym środowisku PVE dodatkowych bibliotek, które instalujemy wykorzystując poniższe polecenia.

    pip install -U flask-openapi3
    pip install -U flask-openapi3-swagger
    pip install -U flask-openapi3-redoc
    pip install -U flask-openapi3-rapidoc
    pip install -U flask-openapi3-rapipdf
    pip install -U flask-openapi3-scalar
    pip install -U flask-openapi3-elements
    pip install pydantic
    
  2. Plik serwera server_openapi.py ( [listing dokumentu] link do dokumentu )

    from pydantic import BaseModel, Field
    from flask import Flask, jsonify, request, Response
    from flask_pymongo import PyMongo
    import json
    from typing import Any
    from typing import List, Optional
    from bson.json_util import dumps
    from bson import ObjectId
    from flask_cors import CORS
    
    from flask_openapi3 import Info, Tag
    from flask_openapi3 import OpenAPI
    
    info = Info(title="stud API", version="1.0.0")
    
    class StudPath(BaseModel):
        id: str = Field(..., description='stud id')
        
    class StudBody(BaseModel):
        fname: str
        lname: str
        faculty: str
        year: int
        academy: str
           
    class StudBodyUpd(BaseModel):
        fname: Optional[str] = None
        lname: Optional[str] = None
        faculty: Optional[str] = None
        year: Optional[int] = None
        academy: Optional[str] = None        
    
    class MongoJSONEncoder(json.JSONEncoder):
        def default(self, o: Any) -> Any:
            if isinstance(o, ObjectId):
                return str(o)
            if isinstance(o, datetime):
                return str(o)
            return json.JSONEncoder.default(self, o)
    
    #app = Flask(__name__, static_url_path='/client')
    app = OpenAPI(__name__, info=info, static_url_path='/client')
    CORS(app)
    app.config["MONGO_URI"] = "mongodb://user:pass@172.20.44.25:27017/user"
    mongo = PyMongo(app)
    
    
    #@app.route('/stud', methods=['GET'])
    @app.get('/stud')
    def get_stud():
        output = []
        cursor = mongo.db.student.find()
        data_json = MongoJSONEncoder().encode(list(cursor))
        return data_json
        
    #@app.route('/stud/<id>', methods=['GET'])
    @app.get('/stud/<string:id>')
    def get_stud_id(path: StudPath):
        output = []
        objId = ObjectId(path.id)
        query = { "_id" : objId }
        cursor = mongo.db.student.find(query)
        data_json = MongoJSONEncoder().encode(list(cursor))
        return data_json 
        #print (path)
        #return {'result': 'ok'}, 200   
    	
    #@app.route('/stud', methods=['POST'])
    @app.post('/stud')
    def add_stud(body: StudBody):
        # data = request.json
        #data = body.json    
        data_s = body.model_dump_json(indent=2,exclude_none=True)
        data = json.loads(data_s) 
        print(data)
        mongo.db.student.insert_one(data)
        return {'result': 'ok'}, 200
    		
    		
    #@app.route('/stud/<id>', methods=['PUT'])
    @app.put('/stud/<string:id>')
    def update_stud(path: StudPath, body: StudBodyUpd):
        objId = ObjectId(path.id)
        # data = request.json
        data = json.loads(body.model_dump_json(indent=2,exclude_none=True))     
        query = { "_id" : objId }
        value = { "$set" : data }
        mongo.db.student.update_one(query, value)
        return {'result':'OK'}, 200		
    	
    #@app.route('/stud/<id>', methods=['DELETE'])
    @app.delete('/stud/<string:id>')
    def delete_stud(path: StudPath):
        objId = ObjectId(path.id)
        query = { "_id" : objId }
        mongo.db.student.delete_one(query)
        return {'result':'OK'}, 200	
    	
    if __name__ == '__main__':
        app.run(host='149.156.109.180', port=<port>)
    
      
  3. Dostęp do poleceń narzędzia OpenAPI uzyskujemy po wprowadzeniu następującego URL'a:
    http://149.156.109.180:<port>/openapi
  4. Na koniec uruchomimy nasze aplikacje korzystając z serwera Gunicorn, który jest zalecany do realizacji projektów uruchamianych w kontekście produkcyjnym. Wymaga to dodanie w naszym środowisku pakietu wspierającego ten serwer.
    pip install gunicorn  
    
  5. Tworzymy serwer zgodny z WSGI, który będzie serwował naszą aplikację wsgi.py - plik w katalogu głównym.
    from server import app
    if __name__ == "__main__":
       app.run()
    
  6. Uruchamiamy aplikację z wykorzystaniem serwera Gunicorn.
    gunicorn -w 4 --bind 0.0.0.0:<port> wsgi:app