import datetime as datetime from urllib.parse import urlparse import webbrowser from emailtool.emailer import Emailer from flask import Flask, render_template, request, redirect, url_for, session, send_file from gevent.pywsgi import WSGIServer import socket import logging from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn import hashlib import os import csv db_name = 'app.db' db = SqliteDatabase(db_name) class User(Model): username = CharField() password = CharField() date_created = DateTimeField(default=datetime.datetime.now) logged_in = BooleanField(default=False) class Meta: database = db class IlsUser(Model): username = CharField() email = CharField() reset_datetime = DateTimeField(default=datetime.datetime.now) class Meta: database = db class Settings(Model): name = CharField() value = CharField() class Meta: database = db class Schedule(Model): interval = CharField() class Meta: database = db class Log(Model): date = DateTimeField(default=datetime.datetime.now) username = CharField() action = CharField() class Meta: database = db class PasswordResetLog(Model): date = DateTimeField(default=datetime.datetime.now) username = CharField() class Meta: database = db settings = Settings.select().execute() debug_setting = Settings.get(Settings.name == 'Debug Mode') debug = debug_setting.value http_port_setting = Settings.get(Settings.name == 'HTTP Port') http_port = http_port_setting.value smtp_host_setting = Settings.get(Settings.name == 'SMTP Host') smtp_host = smtp_host_setting.value smtp_port_setting = Settings.get(Settings.name == 'SMTP Port') smtp_port = smtp_port_setting.value smtp_username_setting = Settings.get(Settings.name == 'SMTP Username') smtp_username = smtp_username_setting.value smtp_password_setting = Settings.get(Settings.name == 'SMTP Password') smtp_password = smtp_password_setting.value domain_name_setting = Settings.get(Settings.name == 'Domain Name') domain_name = domain_name_setting.value password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL') password_reset_url = password_reset_url_setting.value log = logging.getLogger('werkzeug') log.setLevel(logging.INFO) if debug.lower() == 'true': debug = True else: debug = False def send_email(to, subject, body): # Get settings from smtp_settings host = smtp_host port = smtp_port username = smtp_username password = smtp_password # Create an instance of the Emailer class emailer = Emailer(host, port, username, password) # Call the send_email method emailer.send_email(to, subject, body) # Encrypt the password with SHA256 def encrypt_password(password): hash = hashlib.sha256(password.encode('utf-8')).hexdigest() return hash def shutdown_session(exception=None): print('Stopping HTTP Service...') http_server.stop() # Get the systems hostname def get_hostname(): return socket.gethostname() # Get systems IP address def get_ip_address(): return socket.gethostbyname(socket.gethostname()) # Method to check if a URL is valid using regex def is_valid_url(url): try: result = urlparse(url) if all([result.scheme, result.netloc]): url = '{uri.scheme}://{uri.netloc}/'.format(uri=result) else: url = False return url except: return False def requires_auth(): if 'username' in session: username = session['username'] user = User.get(User.username == username) if user.logged_in is True: return True else: return False else: return False # Check for DB tables and create if they don't exist if db.table_exists('user') is False: db.create_tables([User, IlsUser]) User.create(username='admin', password=encrypt_password('admin'), date_created=datetime.datetime.now(), logged_in=False).save() if db.table_exists('ilsuser') is False: db.create_tables([IlsUser]) if db.table_exists('settings') is False: db.create_tables([Settings]) Settings.create(name='Debug Mode', value=False).save() Settings.create(name='HTTP Port', value=5055).save() Settings.create(name='SMTP Host', value="").save() Settings.create(name='SMTP Port', value="587").save() Settings.create(name='SMTP Username', value="").save() Settings.create(name='SMTP Password', value="").save() Settings.create(name='Domain Name', value="lynx").save() Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save() if db.table_exists('log') is False: db.create_tables([Log]) if db.table_exists('password_reset_log') is False: db.create_tables([PasswordResetLog]) if db.table_exists('schedule') is False: db.create_tables([Schedule]) db.close() app = Flask(__name__) app.secret_key = os.urandom(24) def format_time_ago(timestamp): """Calculate the time passed since a datetime stamp and format it as a human-readable string.""" now = datetime.datetime.utcnow() diff = now - timestamp if diff.days > 365: years = diff.days // 365 return f"{years} year{'s' if years > 1 else ''} ago" if diff.days > 30: months = diff.days // 30 return f"{months} month{'s' if months > 1 else ''} ago" if diff.days > 0: return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" if diff.seconds > 3600: hours = diff.seconds // 3600 return f"{hours} hour{'s' if hours > 1 else ''} ago" if diff.seconds > 60: minutes = diff.seconds // 60 return f"{minutes} minute{'s' if minutes > 1 else ''} ago" return "just now" app.jinja_env.filters['time_since'] = format_time_ago @app.before_request def before_request(): db.connect() @app.after_request def after_request(response): db.close() return response # Create a route for the home page @app.route('/', methods=['GET', 'POST']) def index(): # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email') error = None reset = False reset_url = is_valid_url(password_reset_url) reset_url_error = False if reset_url is False: reset_url_error = True if request.method == 'POST': username = request.form.get('username') # Check for the username in the DB try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: # Reset login datetime user.reset_datetime = datetime.datetime.now() PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save() user.save() # Open the reset URL in a new tab if the URL is valid if reset_url is not False: webbrowser.open_new_tab(str(reset_url)) # Set reset to True to pass back to the view to display the correct content back to the user. reset = True else: error = 'Invalid username' context = { 'domain': domain_name, 'error': error, 'reset': reset, 'reset_url': reset_url, 'reset_url_error': reset_url_error, } return render_template('index.html', context=context) # Create a route for admin page @app.route('/admin/') def admin(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) return render_template('admin.html') @app.route('/admin/users/', methods=['GET', 'POST']) def admin_users(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') confirm_password = request.form.get('confirm_password') # Check to see if username already exists try: user = User.filter(User.username == username).first() except Exception as e: print(e) user = None if user: message = 'Username already exists' else: if password == confirm_password: User.create(username=username, password=encrypt_password(password), date_created=datetime.datetime.now(), logged_in=False).save() message = 'User created successfully' Log.create(username=session['username'], action='Created admin user: %s' % username, ).save() else: message = 'Passwords do not match' # Get all admin users from the DB users = User.select().execute() context = { 'users': users, 'message': message, } return render_template('admin_users.html', context=context) @app.route('/admin/users/delete/', methods=['GET', 'POST']) def admin_users_delete(id): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get the user from the DB user = User.get(User.id == id) if user.username != 'admin': # Unset session variable if the user is deleting their own account if user.username == session['username']: session.pop('username', None) username = user.username user.delete_instance() Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save() return redirect(url_for('admin_users')) @app.route('/admin/users/ils', methods=['GET', 'POST']) def admin_ils_users(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': username = request.form.get('username') email = request.form.get('email') # Check to see if username already exists try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: message = 'Username already exists' else: IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save() message = 'ILS User: %s created successfully' % username Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save() # Get all admin users from the DB users = IlsUser.select().execute() context = { 'users': users, 'message': message, } return render_template('admin_ils_users.html', context=context) @app.route('/admin/users/ils/delete/', methods=['GET', 'POST']) def admin_ils_users_delete(id): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get the user from the DB user = IlsUser.get(IlsUser.id == id) username = user.username user.delete_instance() Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save() return redirect(url_for('admin_ils_users')) # create a route for generating a CSV file for download @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST']) def admin_ils_users_csv_download(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Create a CSV file with the users and don't add a blank line between rows with open('users.csv', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['username', 'email']) users = IlsUser.select().execute() for user in users: writer.writerow([user.username, user.email]) Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save() # return the CSV file to the user return send_file('users.csv', as_attachment=True) @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST']) def admin_ils_users_csv_import(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None if request.method == 'POST': csv_file = request.files['csv'] if csv_file.filename != '': csv_file.save(os.path.join('uploads', csv_file.filename)) with open(os.path.join('uploads', csv_file.filename), 'r') as f: reader = csv.reader(f) for row in reader: username = row[0] email = row[1] # ignore the header row if username == 'username': continue # ignore blank rows if username == '': continue # Check if user already exists and if it does update the entry try: user = IlsUser.filter(IlsUser.username == username).first() except Exception as e: print(e) user = None if user: user.email = email user.reset_datetime = datetime.datetime.now() user.save() else: IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save() # Delete the uploaded file os.remove(os.path.join('uploads', csv_file.filename)) return redirect(url_for('admin_ils_users')) context = { 'message': message, } return render_template('csv.html', context=context) @app.route('/admin/settings', methods=['GET', 'POST']) def settings(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None # Process form submission if request.method == 'POST': # Assign form values to variables id = request.form.get('id') value = request.form.get('value') # Check if the setting exists try: setting = Settings.get(Settings.id == id) except Exception as e: print(e) setting = None if setting: # Update the setting old_value = setting.value setting.value = value setting.save() Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name, old_value, value)).save() message = '%s updated successfully' % setting.name # Get settings from DB all_settings = Settings.select().execute() context = { 'settings': all_settings, 'message': message, } return render_template('settings.html', context=context) @app.route('/admin/schedule', methods=['GET', 'POST']) def schedule(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) message = None # add schedule if request.method == 'POST': # Assign form values to variables interval = request.form.get('interval') # Check if the schedule already exists try: schedule = Schedule.get(Schedule.interval == interval) except Exception as e: print(e) schedule = None if schedule: message = 'Schedule already exists' else: # Create the schedule Schedule.create(interval=interval).save() Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save() message = 'Schedule: %s created successfully' % interval # Get all schedules from the DB schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute() context = { 'schedules': schedules, 'message': message, } return render_template('schedule.html', context=context) # remove schedule @app.route('/admin/schedule/remove/', methods=['GET', 'POST']) def schedule_remove(id): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get the schedule from the DB schedule = Schedule.get(Schedule.id == id) schedule.delete_instance() Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save() return redirect(url_for('schedule')) @app.route('/admin/system/log') def system_log(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get all logs from the DB logs = Log.select().order_by(Log.id.desc()).execute() context = { 'logs': logs, } return render_template('system_log.html', context=context) @app.route('/admin/system/log/password/resets') def password_reset_log(): # Check to see if user is logged in if not requires_auth(): return redirect(url_for('login')) # Get all logs from the DB logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute() context = { 'logs': logs, } return render_template('password_reset_log.html', context=context) @app.route('/logout') def logout(): if 'username' in session: username = session['username'] user = User.get(User.username == username) user.logged_in = False Log.create(username=session['username'], action='Logged out').save() user.save() session.pop('username', None) return redirect(url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = encrypt_password(request.form.get('password')) try: user = User.filter(User.username == username and User.password == password).first() except Exception as e: print(e) user = None session.pop('username', None) if user: # Login user session['username'] = request.form.get('username') user.logged_in = True Log.create(username=session['username'], action='Logged in').save() user.save() return redirect(url_for('admin')) else: error = 'Invalid Credentials. Please try again.' context = { 'error': error } return render_template('login.html', context=context) context = { } return render_template('login.html', context=context) # on exit of the program make sure the http server is stopped # @app.teardown_appcontext if __name__ == "__main__": print("------------------------- Start up -----------------------------") print("Starting HTTP Service on port %s..." % http_port) if debug is True: print("Debug mode is enabled.") http_server = WSGIServer(('0.0.0.0', int(http_port)), app) else: http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log) print("HTTP Service Started.") print("--------------------- Application Details ---------------------") print("Application started at %s" % datetime.datetime.now()) print("System IP Address: %s" % get_ip_address()) print("System Hostname: %s" % get_hostname()) print("Access the Dashboard using a web browser using any of the following:") print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port)) print("---------------------------------------------------------------") print("To stop the application close this window.") print("---------------------------------------------------------------") http_server.serve_forever()