from pydantic import BaseModel, Field from flask import Flask, jsonify, request, Response from flask_pymongo import PyMongo import json from typing import Any from typing import List, Optional from bson.json_util import dumps from bson import ObjectId from flask_cors import CORS from flask_openapi3 import Info, Tag from flask_openapi3 import OpenAPI info = Info(title="stud API", version="1.0.0") class StudPath(BaseModel): id: str = Field(..., description='stud id') class StudBody(BaseModel): fname: str lname: str faculty: str year: int academy: str class StudBodyUpd(BaseModel): fname: Optional[str] = None lname: Optional[str] = None faculty: Optional[str] = None year: Optional[int] = None academy: Optional[str] = None class MongoJSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, ObjectId): return str(o) if isinstance(o, datetime): return str(o) return json.JSONEncoder.default(self, o) #app = Flask(__name__, static_url_path='/client') app = OpenAPI(__name__, info=info, static_url_path='/client') CORS(app) app.config["MONGO_URI"] = "mongodb://user:pass@172.20.44.25:27017/user" mongo = PyMongo(app) #@app.route('/stud', methods=['GET']) @app.get('/stud') def get_stud(): output = [] cursor = mongo.db.student.find() data_json = MongoJSONEncoder().encode(list(cursor)) return data_json #@app.route('/stud/', methods=['GET']) @app.get('/stud/') def get_stud_id(path: StudPath): output = [] objId = ObjectId(path.id) query = { "_id" : objId } cursor = mongo.db.student.find(query) data_json = MongoJSONEncoder().encode(list(cursor)) return data_json #print (path) #return {'result': 'ok'}, 200 #@app.route('/stud', methods=['POST']) @app.post('/stud') def add_stud(body: StudBody): # data = request.json #data = body.json data_s = body.model_dump_json(indent=2,exclude_none=True) data = json.loads(data_s) print(data) mongo.db.student.insert_one(data) return {'result': 'ok'}, 200 #@app.route('/stud/', methods=['PUT']) @app.put('/stud/') def update_stud(path: StudPath, body: StudBodyUpd): objId = ObjectId(path.id) # data = request.json data = json.loads(body.model_dump_json(indent=2,exclude_none=True)) query = { "_id" : objId } value = { "$set" : data } mongo.db.student.update_one(query, value) return {'result':'OK'}, 200 #@app.route('/stud/', methods=['DELETE']) @app.delete('/stud/') def delete_stud(path: StudPath): objId = ObjectId(path.id) query = { "_id" : objId } mongo.db.student.delete_one(query) return {'result':'OK'}, 200 if __name__ == '__main__': app.run(host='149.156.109.180', port=)