1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| """ api.py """
from passlib.apps import custom_app_context from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_httpauth import HTTPBasicAuth from flask_cors import CORS from flask import jsonify, request, abort, g
app = Flask(__name__)
CORS(app, supports_credentials=True)
app.config.from_object('config') db = SQLAlchemy(app) auth = HTTPBasicAuth()
class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(32), index=True) password = db.Column(db.String(128))
def hash_password(self, password): self.password = custom_app_context.encrypt(password)
def verify_password(self, password): return custom_app_context.verify(password, self.password)
def generate_auth_token(self, expiration=600): s = Serializer(app.config['SECRET_KEY'], expires_in=expiration) return s.dumps({'id': self.id})
@staticmethod def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(token) except SignatureExpired: return None except BadSignature: return None user = User.query.get(data['id']) return user
@app.route("/", methods=['POST', 'GET']) @auth.login_required def index(): return jsonify('Hello, %s' % g.user.username)
@app.route('/api/register', methods=['POST']) def new_user(): json_data = request.get_json() username = json_data['username'] password = json_data['password'] if username is None or password is None: abort(400) if User.query.filter_by(username=username).first() is not None: abort(400) user = User(username=username) user.hash_password(password) db.session.add(user) db.session.commit() return jsonify({'username': user.username})
@auth.verify_password def verify_password(username_or_token, password): if request.path == "/api/login": username_and_password_post = request.get_json() if username_and_password_post.get('email') is not None: username_or_token = username_and_password_post['email'] if username_and_password_post.get('password') is not None: password = username_and_password_post['password']
user = User.query.filter_by(username=username_or_token).first() if not user or not user.verify_password(password): return False else: user = User.verify_auth_token(username_or_token) if not user: return False g.user = user return True
@app.route('/api/login', methods=['POST', 'GET']) @auth.login_required def get_auth_token(): token = g.user.generate_auth_token() token = str(token, encoding='utf8') return jsonify(token)
@app.route('/api/test', methods=['POST']) def test(): b = request.get_json() print(b) return "test"
app.run(debug=True)
""" config.py """ import os basedir = os.path.abspath(os.path.dirname(__file__))
SQLALCHEMY_DATABASE_URI = "mysql://root:[email protected]/flaskrest" SQLALCHEMY_MIGRATE_REPO = os.path.join(basedir, 'db_repository') SQLALCHEMY_TRACK_MODIFICATIONS = True BASEDIR = basedir
CSRF_ENABLED = True SECRET_KEY = 'jssssaqer123dsaf/sdf\sdf'
|