import datetime as datetime from urllib.parse import urlparse import webbrowser from emailtool.emailer import Emailer from flask import Flask, render_template, request, redirect, url_for, session, send_file import configparser from gevent.pywsgi import WSGIServer import socket import logging from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField import hashlib import os import csv db_name = 'app.db' db = SqliteDatabase(db_name) class User(Model): username = CharField() password = CharField() date_created = DateTimeField(default=datetime.datetime.now) logged_in = BooleanField(default=False) class Meta: database = db class IlsUser(Model): username = CharField() email = CharField() reset_datetime = DateTimeField(default=datetime.datetime.now) class Meta: database = db config = configparser.ConfigParser() config.read('settings.ini') application_settings = config['application'] smtp_settings = config['smtp'] http_settings = config['http'] domain_settings = config['domain'] log = logging.getLogger('werkzeug') log.setLevel(logging.INFO) if application_settings['debug'].lower() == 'true': debug = True else: debug = False def send_email(to, subject, body): # Get settings from smtp_settings host = smtp_settings['host'] port = smtp_settings['port'] username = smtp_settings['username'] password = smtp_settings['password'] # Create an instance of the Emailer class emailer = Emailer(host, port, username, password) # Call the send_email method emailer.send_email(to, subject, body) # Encrypt the password with SHA256 def encrypt_password(password): hash = hashlib.sha256(password.encode('utf-8')).hexdigest() return hash def shutdown_session(exception=None): print('Stopping HTTP Service...') http_server.stop() # Get the systems hostname def get_hostname(): return socket.gethostname() # Get systems IP address def get_ip_address(): return socket.gethostbyname(socket.gethostname()) # Method to check if a URL is valid using regex def is_valid_url(url): try: result = urlparse(url) if all([result.scheme, result.netloc]): url = '{uri.scheme}://{uri.netloc}/'.format(uri=result) else: url = False return url except: return False def requires_auth(): if 'username' in session: username = session['username'] user = User.get(User.username == username) if user.logged_in is True: return True else: return False else: return False # Check for DB tables and create if they don't exist if db.table_exists('user') is False: db.create_tables([User, IlsUser]) User.create(username='admin', password=encrypt_password('admin'), date_created=datetime.datetime.now(), logged_in=False).save() if db.table_exists('ilsuser') is False: db.create_tables([IlsUser]) db.close() app = Flask(__name__) app.secret_key = 'super secret key' @app.before_request def before_request(): db.connect() @app.after_request def after_request(response): db.close() return response # Create a route for the home page @app.route('/', methods=['GET', 'POST']) def index(): # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email') error = None reset = False reset_url = is_valid_url(domain_settings['reset_url']) reset_url_error = False if reset_url is False: reset_url_error = True if request.method == 'POST': username = request.form.get('username') # Check for the username in the DB try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: # Reset login datetime user.reset_datetime = datetime.datetime.now() user.save() # Open the reset URL in a new tab if the URL is valid if reset_url is not False: webbrowser.open_new_tab(str(reset_url)) # Set reset to True to pass back to the view to display the correct content back to the user. reset = True else: error = 'Invalid username' context = { 'domain': domain_settings['name'], 'error': error, 'reset': reset, 'reset_url': reset_url, 'reset_url_error': reset_url_error, } return render_template('index.html', context=context) # Create a route for admin page @app.route('/admin/') def admin(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) return render_template('admin.html') @app.route('/admin/users/', methods=['GET', 'POST']) def admin_users(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') confirm_password = request.form.get('confirm_password') # Check to see if username already exists try: user = User.filter(User.username == username).first() except Exception as e: print(e) user = None if user: message = 'Username already exists' else: if password == confirm_password: User.create(username=username, password=encrypt_password(password), date_created=datetime.datetime.now(), logged_in=False).save() message = 'User created successfully' else: message = 'Passwords do not match' # Get all admin users from the DB users = User.select().execute() context = { 'users': users, 'message': message, } return render_template('admin_users.html', context=context) @app.route('/admin/users/delete/', methods=['GET', 'POST']) def admin_users_delete(id): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get the user from the DB user = User.get(User.id == id) if user.username != 'admin': # Unset session variable if the user is deleting their own account if user.username == session['username']: session.pop('username', None) user.delete_instance() return redirect(url_for('admin_users')) @app.route('/admin/users/ils', methods=['GET', 'POST']) def admin_ils_users(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') # Check to see if username already exists try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: message = 'Username already exists' else: IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save() message = 'User created successfully' # Get all admin users from the DB users = IlsUser.select().execute() context = { 'users': users, 'message': message, } return render_template('admin_ils_users.html', context=context) @app.route('/admin/users/ils/delete/', methods=['GET', 'POST']) def admin_ils_users_delete(id): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get the user from the DB user = IlsUser.get(IlsUser.id == id) user.delete_instance() return redirect(url_for('admin_ils_users')) # create a route for generating a CSV file for download @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST']) def admin_ils_users_csv_download(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Create a CSV file with the users and don't add a blank line between rows with open('users.csv', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['username', 'email']) users = IlsUser.select().execute() for user in users: writer.writerow([user.username, user.email]) # return the CSV file to the user return send_file('users.csv', as_attachment=True) @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST']) def admin_ils_users_csv_import(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': csv_file = request.files['csv'] if csv_file.filename != '': csv_file.save(os.path.join('uploads', csv_file.filename)) with open(os.path.join('uploads', csv_file.filename), 'r') as f: reader = csv.reader(f) for row in reader: username = row[0] email = row[1] # ignore the header row if username == 'username': continue # ignore blank rows if username == '': continue # Check if user already exists and if it does update the entry try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: user.email = email user.reset_datetime = datetime.datetime.now() user.save() else: IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save() # Delete the uploaded file os.remove(os.path.join('uploads', csv_file.filename)) return redirect(url_for('admin_ils_users')) context = { 'message': message, } return render_template('csv.html', context=context) @app.route('/logout') def logout(): if 'username' in session: username = session['username'] user = User.get(User.username == username) user.logged_in = False user.save() session.pop('username', None) return redirect(url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = encrypt_password(request.form.get('password')) try: user = User.filter(User.username == username and User.password == password).first() except Exception as e: print(e) user = None session.pop('username', None) if user: # Login user session['username'] = request.form.get('username') user.logged_in = True user.save() return redirect(url_for('admin')) else: error = 'Invalid Credentials. Please try again.' context = { 'error': error } return render_template('login.html', context=context) context = { } return render_template('login.html', context=context) # on exit of the program make sure the http server is stopped #@app.teardown_appcontext if __name__ == "__main__": print("------------------------- Start up -----------------------------") print("Starting HTTP Service on port %s..." % http_settings['port']) if debug is True: print("Debug mode is enabled.") http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app) else: http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app, log=log, error_log=log) print("HTTP Service Started.") print("--------------------- Application Details ---------------------") print("Application started at %s" % datetime.datetime.now()) print("System IP Address: %s" % get_ip_address()) print("System Hostname: %s" % get_hostname()) print("Access the Dashboard using a web browser using any of the following:") print("http://%s:%s or http://%s:%s" % (get_hostname(), http_settings['port'], get_ip_address(), http_settings['port'])) print("---------------------------------------------------------------") print("To stop the application close this window.") print("---------------------------------------------------------------") http_server.serve_forever()