CTF Platform API basic auth
Addded ctf platform Flask app with basic auth functions register, login, logout Added ctf platform database design (MySQL Workbench) Added forward engineered SQL file Added wsgi startup script Installed various python dependencies README.md Updated .gitignore Updated
This commit is contained in:
2
ctf_platform/api/__init__.py.py
Executable file
2
ctf_platform/api/__init__.py.py
Executable file
@@ -0,0 +1,2 @@
|
||||
#!/usr/bin/env python
|
||||
import main
|
||||
96
ctf_platform/api/auth.py
Normal file
96
ctf_platform/api/auth.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from flask import Blueprint, g, redirect, request, session, url_for, jsonify
|
||||
from flask import current_app as app
|
||||
from werkzeug.security import check_password_hash, generate_password_hash
|
||||
from .database import get_db
|
||||
from .helper_functions import build_response_dict
|
||||
import functools
|
||||
bp = Blueprint('auth', __name__, url_prefix='/auth/')
|
||||
|
||||
|
||||
|
||||
@bp.before_app_request
|
||||
def load_logged_in_user():
|
||||
user_id = session.get('user_id')
|
||||
|
||||
if user_id is None:
|
||||
g.user = None
|
||||
else:
|
||||
db = get_db()
|
||||
db.execute(
|
||||
'SELECT user_id, username, active, registered, info FROM user WHERE user_id = %s', (user_id)
|
||||
)
|
||||
g.user = db.fetchone()
|
||||
app.logger.info('USER ACCESS %s' % (g.user))
|
||||
|
||||
#@auth.login_required decorator
|
||||
def login_required(view):
|
||||
@functools.wraps(view)
|
||||
def wrapped_view(**kwagrs):
|
||||
if g.user is None:
|
||||
api_response = build_response_dict(False, 'access', 'Access denied', reason='Login required')
|
||||
return jsonify(api_response)
|
||||
return view(**kwagrs)
|
||||
|
||||
return wrapped_view
|
||||
|
||||
@bp.route("/register", methods=['POST'])
|
||||
def register():
|
||||
api_response = {}
|
||||
error = None
|
||||
username = request.form.get('username')
|
||||
password = request.form.get('password')
|
||||
db = get_db()
|
||||
if not username:
|
||||
error = 'Username required!'
|
||||
elif not password:
|
||||
error = 'Password required!'
|
||||
else:
|
||||
db.execute('SELECT user_id FROM user WHERE username = %s', (username))
|
||||
if db.fetchone():
|
||||
error = "Username already exists!"
|
||||
if error is None:
|
||||
password_hash = generate_password_hash(password)
|
||||
db.execute('INSERT INTO user (username, password) VALUES (%s, %s)', (username, password_hash))
|
||||
g.db.commit()
|
||||
api_response = build_response_dict(True, 'register', 'Registration successful')
|
||||
else:
|
||||
api_response = build_response_dict(False, 'register', 'Registration failed', reason=error)
|
||||
return jsonify(api_response)
|
||||
|
||||
@bp.route("/login", methods=['POST'])
|
||||
def login():
|
||||
api_response = {}
|
||||
error = None
|
||||
username = request.form.get('username')
|
||||
password = request.form.get('password')
|
||||
if not username:
|
||||
error = 'Username required!'
|
||||
elif not password:
|
||||
error = 'Password required!'
|
||||
else:
|
||||
db = get_db()
|
||||
db.execute("SELECT * FROM user WHERE username = %s", (username))
|
||||
db_user = db.fetchone()
|
||||
if db_user is None or not check_password_hash(db_user['password'], password):
|
||||
error = 'Username or password incorrect!'
|
||||
if error is None:
|
||||
session.clear()
|
||||
session['user_id'] = db_user['user_id']
|
||||
session['user_name'] = db_user['username']
|
||||
api_response = build_response_dict(True, 'login', 'Login successful')
|
||||
app.logger.info("{} logged in successfully!".format(username))
|
||||
else:
|
||||
api_response = build_response_dict(False, 'login', 'Login failed', reason=error)
|
||||
app.logger.info("{} failed to login ({})!".format(username, error))
|
||||
|
||||
return jsonify(api_response)
|
||||
|
||||
@bp.route('/logout', methods=['GET'])
|
||||
def logout():
|
||||
if 'user_name' in session:
|
||||
app.logger.info("{} logged out!".format(session['user_name']))
|
||||
session.clear()
|
||||
api_response = build_response_dict(True, 'logout', 'Logout successful')
|
||||
else:
|
||||
api_response = build_response_dict(False, 'logout', 'Logout failed', reason='No active session')
|
||||
return jsonify(api_response)
|
||||
50
ctf_platform/api/database.py
Normal file
50
ctf_platform/api/database.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import pymysql, click, re
|
||||
from flask import current_app as app
|
||||
from flask import g
|
||||
from flask.cli import with_appcontext
|
||||
|
||||
def get_db():
|
||||
if 'db' not in g:
|
||||
mysql = pymysql.connect(host=app.config['MYSQL_DATABASE_HOST'],
|
||||
user=app.config['MYSQL_DATABASE_USER'],
|
||||
password=app.config['MYSQL_DATABASE_PASS'],
|
||||
db=app.config['MYSQL_DATABASE_DB'],
|
||||
cursorclass=pymysql.cursors.DictCursor)
|
||||
g.db = mysql
|
||||
g.db_cursor = mysql.cursor()
|
||||
return g.db_cursor
|
||||
|
||||
def close_db(e=None):
|
||||
db = g.pop('db', None)
|
||||
|
||||
if db is not None:
|
||||
db.close()
|
||||
|
||||
def init_db(schema):
|
||||
db = mysql = pymysql.connect(host=app.config['MYSQL_DATABASE_HOST'],
|
||||
user=app.config['MYSQL_DATABASE_USER'],
|
||||
password=app.config['MYSQL_DATABASE_PASS'])
|
||||
statement = ""
|
||||
for line in open(schema):
|
||||
if line.strip().startswith('--'): # ignore sql comment lines
|
||||
continue
|
||||
if not line.strip().endswith(';'): # keep appending lines that don't end in ';'
|
||||
statement = statement + line
|
||||
else: # when you get a line ending in ';' then exec statement and reset for next statement
|
||||
statement = statement + line
|
||||
try:
|
||||
db.cursor().execute(statement)
|
||||
except Exception as e:
|
||||
print("[WARN] MySQLError during execute statement \n\tArgs: '{}'".format(str(e.args)))
|
||||
statement = ""
|
||||
|
||||
@click.command('init-db')
|
||||
@click.argument('schema')
|
||||
@with_appcontext
|
||||
def init_db_command(schema):
|
||||
init_db(schema)
|
||||
click.echo('Initialized the database.')
|
||||
|
||||
def init_app(app):
|
||||
app.teardown_appcontext(close_db)
|
||||
app.cli.add_command(init_db_command)
|
||||
1
ctf_platform/api/flag.py
Normal file
1
ctf_platform/api/flag.py
Normal file
@@ -0,0 +1 @@
|
||||
#Flag blueprint
|
||||
11
ctf_platform/api/helper_functions.py
Normal file
11
ctf_platform/api/helper_functions.py
Normal file
@@ -0,0 +1,11 @@
|
||||
def build_response_dict(success, action, message, reason=None):
|
||||
response_dict = {}
|
||||
if success:
|
||||
response_dict['data'] = { action: 'success', 'message': message}
|
||||
if reason is not None:
|
||||
response_dict['data']['reason'] = reason
|
||||
elif not success:
|
||||
response_dict['error'] = { action: 'failed', 'message': message}
|
||||
if reason is not None:
|
||||
response_dict['error']['reason'] = reason
|
||||
return response_dict
|
||||
17
ctf_platform/api/main.py
Normal file
17
ctf_platform/api/main.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from flask import Flask, jsonify
|
||||
import os, logging, sys
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.config.from_pyfile('../config.cfg')
|
||||
app.secret_key = os.urandom(24)
|
||||
from . import auth, database
|
||||
database.init_app(app)
|
||||
app.register_blueprint(auth.bp)
|
||||
|
||||
@app.route("/")
|
||||
def home():
|
||||
info = { 'data' : { 'version': '1.0' }}
|
||||
return jsonify(info)
|
||||
|
||||
return app
|
||||
Reference in New Issue
Block a user