123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670 |
- import datetime as datetime
- from urllib.parse import urlparse
- import webbrowser
- from emailtool.emailer import Emailer
- from flask import Flask, render_template, request, redirect, url_for, session, send_file
- from gevent.pywsgi import WSGIServer
- import socket
- import logging
- from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn
- import hashlib
- import os
- import csv
- db_name = 'app.db'
- db = SqliteDatabase(db_name)
- class User(Model):
- username = CharField()
- password = CharField()
- date_created = DateTimeField(default=datetime.datetime.now)
- logged_in = BooleanField(default=False)
- class Meta:
- database = db
- class IlsUser(Model):
- username = CharField()
- email = CharField()
- reset_datetime = DateTimeField(default=datetime.datetime.now)
- class Meta:
- database = db
- class Settings(Model):
- name = CharField()
- value = CharField()
- class Meta:
- database = db
- class Schedule(Model):
- interval = CharField()
- class Meta:
- database = db
- class Log(Model):
- date = DateTimeField(default=datetime.datetime.now)
- username = CharField()
- action = CharField()
- class Meta:
- database = db
- class PasswordResetLog(Model):
- date = DateTimeField(default=datetime.datetime.now)
- username = CharField()
- class Meta:
- database = db
- # Encrypt the password with SHA256
- def encrypt_password(password):
- hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
- return hash
- # Check for DB tables and create if they don't exist
- if db.table_exists('user') is False:
- db.create_tables([User, ])
- User.create(username='admin', password=encrypt_password('admin'),
- date_created=datetime.datetime.now(), logged_in=False).save()
- if db.table_exists('ilsuser') is False:
- db.create_tables([IlsUser, ])
- if db.table_exists('settings') is False:
- db.create_tables([Settings, ])
- Settings.create(name='Debug Mode', value=False).save()
- Settings.create(name='HTTP Port', value=5055).save()
- Settings.create(name='SMTP Host', value="").save()
- Settings.create(name='SMTP Port', value="587").save()
- Settings.create(name='SMTP Username', value="").save()
- Settings.create(name='SMTP Password', value="").save()
- Settings.create(name='Domain Name', value="lynx").save()
- Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
- if db.table_exists('log') is False:
- db.create_tables([Log, ])
- if db.table_exists('password_reset_log') is False:
- db.create_tables([PasswordResetLog, ])
- if db.table_exists('schedule') is False:
- db.create_tables([Schedule, ])
- db.close()
- settings = Settings.select().execute()
- debug_setting = Settings.get(Settings.name == 'Debug Mode')
- debug = debug_setting.value
- http_port_setting = Settings.get(Settings.name == 'HTTP Port')
- http_port = http_port_setting.value
- smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
- smtp_host = smtp_host_setting.value
- smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
- smtp_port = smtp_port_setting.value
- smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
- smtp_username = smtp_username_setting.value
- smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
- smtp_password = smtp_password_setting.value
- domain_name_setting = Settings.get(Settings.name == 'Domain Name')
- domain_name = domain_name_setting.value
- password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
- password_reset_url = password_reset_url_setting.value
- log = logging.getLogger('werkzeug')
- log.setLevel(logging.INFO)
- if debug.lower() == 'true':
- debug = True
- else:
- debug = False
- def send_email(to, subject, body):
- # Get settings from smtp_settings
- host = smtp_host
- port = smtp_port
- username = smtp_username
- password = smtp_password
- # Create an instance of the Emailer class
- emailer = Emailer(host, port, username, password)
- # Call the send_email method
- emailer.send_email(to, subject, body)
- def shutdown_session(exception=None):
- print('Stopping HTTP Service...')
- http_server.stop()
- # Get the systems hostname
- def get_hostname():
- return socket.gethostname()
- # Get systems IP address
- def get_ip_address():
- return socket.gethostbyname(socket.gethostname())
- # Method to check if a URL is valid using regex
- def is_valid_url(url):
- try:
- result = urlparse(url)
- if all([result.scheme, result.netloc]):
- url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
- else:
- url = False
- return url
- except:
- return False
- def requires_auth():
- if 'username' in session:
- username = session['username']
- user = User.get(User.username == username)
- if user.logged_in is True:
- return True
- else:
- return False
- else:
- return False
- db.close()
- app = Flask(__name__)
- app.secret_key = os.urandom(24)
- def format_time_ago(timestamp):
- """Calculate the time passed since a datetime stamp and format it as a human-readable string."""
- now = datetime.datetime.utcnow()
- diff = now - timestamp
- if diff.days > 365:
- years = diff.days // 365
- return f"{years} year{'s' if years > 1 else ''} ago"
- if diff.days > 30:
- months = diff.days // 30
- return f"{months} month{'s' if months > 1 else ''} ago"
- if diff.days > 0:
- return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
- if diff.seconds > 3600:
- hours = diff.seconds // 3600
- return f"{hours} hour{'s' if hours > 1 else ''} ago"
- if diff.seconds > 60:
- minutes = diff.seconds // 60
- return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
- return "just now"
- app.jinja_env.filters['time_since'] = format_time_ago
- @app.before_request
- def before_request():
- db.connect()
- @app.after_request
- def after_request(response):
- db.close()
- return response
- # Create a route for the home page
- @app.route('/', methods=['GET', 'POST'])
- def index():
- # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
- error = None
- reset = False
- reset_url = is_valid_url(password_reset_url)
- reset_url_error = False
- if reset_url is False:
- reset_url_error = True
- if request.method == 'POST':
- username = request.form.get('username')
- # Check for the username in the DB
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- print(e)
- user = None
- if user:
- # Reset login datetime
- user.reset_datetime = datetime.datetime.now()
- PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
- user.save()
- # Open the reset URL in a new tab if the URL is valid
- if reset_url is not False:
- webbrowser.open_new_tab(str(reset_url))
- # Set reset to True to pass back to the view to display the correct content back to the user.
- reset = True
- else:
- error = 'Invalid username'
- context = {
- 'domain': domain_name,
- 'error': error,
- 'reset': reset,
- 'reset_url': reset_url,
- 'reset_url_error': reset_url_error,
- }
- return render_template('index.html', context=context)
- # Create a route for admin page
- @app.route('/admin/')
- def admin():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- return render_template('admin.html')
- @app.route('/admin/users/', methods=['GET', 'POST'])
- def admin_users():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- password = request.form.get('password')
- confirm_password = request.form.get('confirm_password')
- # Check to see if username already exists
- try:
- user = User.filter(User.username == username).first()
- except Exception as e:
- print(e)
- user = None
- if user:
- message = 'Username already exists'
- else:
- if password == confirm_password:
- User.create(username=username, password=encrypt_password(password),
- date_created=datetime.datetime.now(), logged_in=False).save()
- message = 'User created successfully'
- Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
- else:
- message = 'Passwords do not match'
- # Get all admin users from the DB
- users = User.select().execute()
- context = {
- 'users': users,
- 'message': message,
- }
- return render_template('admin_users.html', context=context)
- @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
- def admin_users_delete(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = User.get(User.id == id)
- if user.username != 'admin':
- # Unset session variable if the user is deleting their own account
- if user.username == session['username']:
- session.pop('username', None)
- username = user.username
- user.delete_instance()
- Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
- return redirect(url_for('admin_users'))
- @app.route('/admin/users/ils', methods=['GET', 'POST'])
- def admin_ils_users():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- email = request.form.get('email')
- # Check to see if username already exists
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- print(e)
- user = None
- if user:
- message = 'Username already exists'
- else:
- IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
- message = 'ILS User: %s created successfully' % username
- Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save()
- # Get all admin users from the DB
- users = IlsUser.select().execute()
- context = {
- 'users': users,
- 'message': message,
- }
- return render_template('admin_ils_users.html', context=context)
- @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
- def admin_ils_users_delete(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = IlsUser.get(IlsUser.id == id)
- username = user.username
- user.delete_instance()
- Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
- return redirect(url_for('admin_ils_users'))
- # create a route for generating a CSV file for download
- @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
- def admin_ils_users_csv_download():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Create a CSV file with the users and don't add a blank line between rows
- with open('users.csv', 'w', newline='') as f:
- writer = csv.writer(f)
- writer.writerow(['username', 'email'])
- users = IlsUser.select().execute()
- for user in users:
- writer.writerow([user.username, user.email])
- Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
- # return the CSV file to the user
- return send_file('users.csv', as_attachment=True)
- @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
- def admin_ils_users_csv_import():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- csv_file = request.files['csv']
- if csv_file.filename != '':
- csv_file.save(os.path.join('uploads', csv_file.filename))
- with open(os.path.join('uploads', csv_file.filename), 'r') as f:
- reader = csv.reader(f)
- for row in reader:
- username = row[0]
- email = row[1]
- # ignore the header row
- if username == 'username':
- continue
- # ignore blank rows
- if username == '':
- continue
- # Check if user already exists and if it does update the entry
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- print(e)
- user = None
- if user:
- user.email = email
- user.reset_datetime = datetime.datetime.now()
- user.save()
- else:
- IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
- # Delete the uploaded file
- os.remove(os.path.join('uploads', csv_file.filename))
- return redirect(url_for('admin_ils_users'))
- context = {
- 'message': message,
- }
- return render_template('csv.html', context=context)
- @app.route('/admin/settings', methods=['GET', 'POST'])
- def settings():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- # Process form submission
- if request.method == 'POST':
- # Assign form values to variables
- id = request.form.get('id')
- value = request.form.get('value')
- # Check if the setting exists
- try:
- setting = Settings.get(Settings.id == id)
- except Exception as e:
- print(e)
- setting = None
- if setting:
- # Update the setting
- old_value = setting.value
- setting.value = value
- setting.save()
- Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
- old_value,
- value)).save()
- message = '%s updated successfully' % setting.name
- # Get settings from DB
- all_settings = Settings.select().execute()
- context = {
- 'settings': all_settings,
- 'message': message,
- }
- return render_template('settings.html', context=context)
- @app.route('/admin/schedule', methods=['GET', 'POST'])
- def schedule():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- # add schedule
- if request.method == 'POST':
- # Assign form values to variables
- interval = request.form.get('interval')
- # Check if the schedule already exists
- try:
- schedule = Schedule.get(Schedule.interval == interval)
- except Exception as e:
- print(e)
- schedule = None
- if schedule:
- message = 'Schedule already exists'
- else:
- # Create the schedule
- Schedule.create(interval=interval).save()
- Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save()
- message = 'Schedule: %s created successfully' % interval
- # Get all schedules from the DB
- schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
- context = {
- 'schedules': schedules,
- 'message': message,
- }
- return render_template('schedule.html', context=context)
- # remove schedule
- @app.route('/admin/schedule/remove/<int:id>', methods=['GET', 'POST'])
- def schedule_remove(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the schedule from the DB
- schedule = Schedule.get(Schedule.id == id)
- schedule.delete_instance()
- Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save()
- return redirect(url_for('schedule'))
- @app.route('/admin/system/log')
- def system_log():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all logs from the DB
- logs = Log.select().order_by(Log.id.desc()).execute()
- context = {
- 'logs': logs,
- }
- return render_template('system_log.html', context=context)
- @app.route('/admin/system/log/password/resets')
- def password_reset_log():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all logs from the DB
- logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
- context = {
- 'logs': logs,
- }
- return render_template('password_reset_log.html', context=context)
- @app.route('/logout')
- def logout():
- if 'username' in session:
- username = session['username']
- user = User.get(User.username == username)
- user.logged_in = False
- Log.create(username=session['username'], action='Logged out').save()
- user.save()
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form.get('username')
- password = encrypt_password(request.form.get('password'))
- try:
- user = User.filter(User.username == username and User.password == password).first()
- except Exception as e:
- print(e)
- user = None
- session.pop('username', None)
- if user:
- # Login user
- session['username'] = request.form.get('username')
- user.logged_in = True
- Log.create(username=session['username'], action='Logged in').save()
- user.save()
- return redirect(url_for('admin'))
- else:
- error = 'Invalid Credentials. Please try again.'
- context = {
- 'error': error
- }
- return render_template('login.html', context=context)
- context = {
- }
- return render_template('login.html', context=context)
- # on exit of the program make sure the http server is stopped
- # @app.teardown_appcontext
- if __name__ == "__main__":
- print("------------------------- Start up -----------------------------")
- print("Starting HTTP Service on port %s..." % http_port)
- if debug is True:
- print("Debug mode is enabled.")
- http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
- else:
- http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
- print("HTTP Service Started.")
- print("--------------------- Application Details ---------------------")
- print("Application started at %s" % datetime.datetime.now())
- print("System IP Address: %s" % get_ip_address())
- print("System Hostname: %s" % get_hostname())
- print("Access the Dashboard using a web browser using any of the following:")
- print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
- print("---------------------------------------------------------------")
- print("To stop the application close this window.")
- print("---------------------------------------------------------------")
- http_server.serve_forever()
|