1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264 |
- import datetime as datetime
- from urllib.parse import urlparse
- import webbrowser
- from emailtool.emailer import Emailer
- from flask import Flask, render_template, request, redirect, url_for, session, send_file
- from gevent.pywsgi import WSGIServer
- import socket
- import logging
- from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn
- import hashlib
- import os
- import csv
- import time
- import threading
- import atexit
- db_name = 'app.db'
- db = SqliteDatabase(db_name, check_same_thread=False, timeout=5)
- schedule_db_name = 'schedule.db'
- schedule_db = SqliteDatabase(schedule_db_name, check_same_thread=False, timeout=5)
- class User(Model):
- username = CharField()
- password = CharField()
- date_created = DateTimeField(default=datetime.datetime.now)
- logged_in = BooleanField(default=False)
- class Meta:
- database = db
- class IlsUser(Model):
- username = CharField()
- email = CharField()
- reset_datetime = CharField()
- class Meta:
- database = db
- class Settings(Model):
- name = CharField()
- value = CharField()
- class Meta:
- database = db
- class Schedule(Model):
- interval = CharField()
- class Meta:
- database = db
- class Log(Model):
- date = DateTimeField(default=datetime.datetime.now)
- username = CharField()
- action = CharField()
- class Meta:
- database = db
- class PasswordResetLog(Model):
- date = CharField(default=datetime.datetime.now().strftime('%Y-%m-%d'))
- username = CharField()
- class Meta:
- database = db
- class EmailReminderLog(Model):
- date = CharField(default=datetime.datetime.now().strftime('%Y-%m-%d'))
- username = CharField()
- email = CharField()
- interval = CharField()
- sent = BooleanField(default=False)
- ignore = BooleanField(default=False)
- class Meta:
- database = schedule_db
- def reminder_cleanup():
- reminders = EmailReminderLog.select().where(EmailReminderLog.date < datetime.datetime.now().strftime('%Y-%m-%d')).execute()
- for reminder in reminders:
- reminder.delete_instance()
- def scheduler(stop_event):
- while not stop_event.is_set():
- try:
- # Get settings
- domain_name_setting = Settings.get(Settings.name == 'Domain Name')
- domain_name = domain_name_setting.value
- password_reset_interval_setting = Settings.get(Settings.name == 'Password Reset Interval')
- password_reset_interval = password_reset_interval_setting.value
- smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
- smtp_host = smtp_host_setting.value
- smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
- smtp_port = smtp_port_setting.value
- smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
- smtp_username = smtp_username_setting.value
- smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
- smtp_password = smtp_password_setting.value
- suspend_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend = suspend_setting.value
- reset_link = "http://%s:%s" % (get_hostname(), http_port)
- #reset_link_setting = Settings.get(Settings.name == 'Password Reset URL')
- #reset_link = reset_link_setting.value
- # Loop through each user and calculate the number of days until the users password
- # expires based on the password_reset_interval value.
- # Loop through all users
- if suspend.lower() == "false":
- # Get all scheduled intervals from the DB
- intervals = Schedule.select().execute()
- users = IlsUser.select().execute()
- # Remove reminders of users that doesn't exists
- current_users = list()
- for user in users:
- current_users.append(user.username)
- current_reminders = EmailReminderLog.select().execute()
- for reminder in current_reminders:
- if reminder.username not in current_users:
- reminder.delete_instance()
- # Schedule emails to be sent for each user and interval
- for user in users:
- # get the scheduled intervals
- for interval in intervals:
- # Calculate the date (yyyy-mm-dd) that the password will expire and subtract the interval
- # from that date to get the date that the email should be sent.
- # Example: If the password reset interval is 90 days and the current date is 2019-01-01
- # then the password will expire on 2019-04-01. If the interval is 30 days then the
- # email should be sent on 2019-03-02.
- # Calculate the date that the password will expire
- password_expiration_date = datetime.datetime.strptime(user.reset_datetime, "%Y-%m-%d") + datetime.timedelta(days=int(password_reset_interval))
- # Subtract the interval from the password expiration date to get the date that the email should be sent
- email_date = password_expiration_date - datetime.timedelta(days=int(interval.interval))
- # check if reminder has already been saved to the DB
- reminder = EmailReminderLog.select().where(EmailReminderLog.date == email_date.strftime('%Y-%m-%d'), EmailReminderLog.username == user.username, EmailReminderLog.interval == interval.interval).execute()
- if reminder:
- continue
- else:
- # Store the date in the DB
- EmailReminderLog.create(date=email_date.strftime('%Y-%m-%d'), email=user.email, username=user.username, interval=interval.interval).save()
- else:
- pass
- # Get all email reminders from the DB
- # if today's date matches the EmailReminderLog date then send the email and mark the reminder as sent
- reminders = EmailReminderLog.select().where(EmailReminderLog.date == datetime.datetime.now().strftime('%Y-%m-%d'), EmailReminderLog.ignore == 0, EmailReminderLog.sent == 0).execute()
- for reminder in reminders:
- # Send the email
- emailer = Emailer(smtp_host, smtp_port, smtp_username, smtp_password)
- if reminder.interval == 1:
- # 1 day reminder email
- emailer.send_email(reminder.email, 'ILS Password Reset Reminder',
- 'The password for ILS account with the username "'
- + reminder.username + '" will expire in ' + reminder.interval +
- ' day. To reset your password, please click the following link: ' + reset_link + '. If the password is not reset today the account will be locked.')
- else:
- # more than a day reminder email
- emailer.send_email(reminder.email, 'ILS Password Reset Reminder',
- 'The password for ILS account with the username "'
- + reminder.username + '" will expire in ' + reminder.interval +
- ' days. To reset your password, please click the following link: ' + reset_link)
- # Mark the reminder as sent
- reminder.sent = True
- reminder.save()
- # Remove all reminders from the DB that are older than today's date
- reminder_cleanup()
- except Exception as e:
- pass
- time.sleep(3) # Sleep for 5 seconds
- # Encrypt the password with SHA256
- def encrypt_password(password):
- hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
- return hash
- #db.connect()
- # Check for DB tables and create if they don't exist
- if db.table_exists('user') is False:
- db.create_tables([User, ])
- User.create(username='admin', password=encrypt_password('admin'),
- date_created=datetime.datetime.now(), logged_in=False).save()
- if db.table_exists('ilsuser') is False:
- db.create_tables([IlsUser, ])
- if db.table_exists('settings') is False:
- db.create_tables([Settings, ])
- Settings.create(name='Debug Mode', value=False).save()
- Settings.create(name='HTTP Port', value=5055).save()
- Settings.create(name='SMTP Host', value="").save()
- Settings.create(name='SMTP Port', value="587").save()
- Settings.create(name='SMTP Username', value="").save()
- Settings.create(name='SMTP Password', value="").save()
- Settings.create(name='Domain Name', value="lynx").save()
- Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
- Settings.create(name='Password Reset Interval', value="90").save()
- Settings.create(name='Suspend Scheduler', value="True").save()
- if db.table_exists('log') is False:
- db.create_tables([Log, ])
- if schedule_db.table_exists('password_reset_log') is False:
- db.create_tables([PasswordResetLog, ])
- if db.table_exists('schedule') is False:
- db.create_tables([Schedule, ])
- if db.table_exists('email_reminder_log') is False:
- schedule_db.create_tables([EmailReminderLog, ])
- settings = Settings.select().execute()
- debug_setting = Settings.get(Settings.name == 'Debug Mode')
- debug = debug_setting.value
- http_port_setting = Settings.get(Settings.name == 'HTTP Port')
- http_port = http_port_setting.value
- domain_name_setting = Settings.get(Settings.name == 'Domain Name')
- domain_name = domain_name_setting.value
- password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
- password_reset_url = password_reset_url_setting.value
- password_reset_interval_setting = Settings.get(Settings.name == 'Password Reset Interval')
- password_reset_interval = password_reset_interval_setting.value
- log = logging.getLogger('werkzeug')
- log.setLevel(logging.INFO)
- if debug.lower() == 'true':
- debug = True
- else:
- debug = False
- #db.close()
- def shutdown_session(exception=None):
- print('Stopping HTTP Service...')
- http_server.stop()
- # Get the systems hostname
- def get_hostname():
- return socket.gethostname()
- # Get systems IP address
- def get_ip_address():
- return socket.gethostbyname(socket.gethostname())
- # Method to check if a URL is valid using regex
- def is_valid_url(url):
- try:
- result = urlparse(url)
- if all([result.scheme, result.netloc]):
- url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
- else:
- url = False
- return url
- except:
- return False
- def requires_auth():
- if 'username' in session:
- username = session['username']
- user = User.get(User.username == username)
- if user.logged_in is True:
- return True
- else:
- return False
- else:
- return False
- def admin_password_check():
- admin_user = User.get(User.username == 'admin')
- if admin_user.password == encrypt_password('admin'):
- return True
- # Start the scheduler loop in another thread
- shutdown_scheduler = threading.Event()
- scheduler_thread = threading.Thread(target=scheduler, args=(shutdown_scheduler,))
- scheduler_thread.start()
- # Start the HTTP Server
- app = Flask(__name__)
- app.secret_key = os.urandom(24)
- def format_time_ago(timestamp):
- """Calculate the time passed since a datetime stamp and format it as a human-readable string."""
- now = datetime.datetime.now()
- diff = now - datetime.datetime.strptime(timestamp, "%Y-%m-%d")
- if diff.days > 365:
- years = diff.days // 365
- return f"{years} year{'s' if years > 1 else ''} ago"
- if diff.days > 30:
- months = diff.days // 30
- return f"{months} month{'s' if months > 1 else ''} ago"
- if diff.days > 0:
- return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
- return ""
- # Create method for time until
- def format_time_until(timestamp):
- """Calculate the time until a datetime stamp and format it as a human-readable string."""
- # Convert timestamp into datetime object
- timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%d')
- now = datetime.datetime.now()
- diff = timestamp - now
- if diff.days > 365:
- years = diff.days // 365
- return f"in {years} year{'s' if years > 1 else ''}"
- # return months and days
- if diff.days > 30:
- months = diff.days // 30
- days = diff.days % 30
- return f"in {months} month{'s' if months > 1 else ''} and {days} day{'s' if days > 1 else ''}"
- # return days
- if diff.days > 0:
- return f"in {diff.days} day{'s' if diff.days > 1 else ''}"
- # return hours and minutes
- if diff.seconds > 3600:
- hours = diff.seconds // 3600
- minutes = (diff.seconds % 3600) // 60
- return f"in {hours} hour{'s' if hours > 1 else ''} and {minutes} minute{'s' if minutes > 1 else ''}"
- # return minutes
- if diff.seconds > 60:
- minutes = diff.seconds // 60
- return f"in {minutes} minute{'s' if minutes > 1 else ''}"
- if diff.days > 30:
- months = diff.days // 30
- return f"in {months} month{'s' if months > 1 else ''}"
- if diff.days > 0:
- return f"in {diff.days} day{'s' if diff.days > 1 else ''}"
- return "Today"
- app.jinja_env.filters['time_since'] = format_time_ago
- app.jinja_env.filters['time_until'] = format_time_until
- '''
- @app.before_request
- def before_request():
- db.connect()
- @app.after_request
- def after_request(response):
- db.close()
- return response
- '''
- @app.route('/admin/password/reset', methods=['GET', 'POST'])
- def admin_password_reset():
- message = None
- if request.method == 'POST':
- password = request.form.get('password')
- password_confirm = request.form.get('password_confirm')
- if password != password_confirm:
- message = 'Passwords do not match'
- else:
- try:
- user = User.get(User.username == 'admin')
- user.password = encrypt_password(password)
- user.logged_in = True
- user.save()
- session['username'] = 'admin'
- return redirect(url_for('admin'))
- except Exception as e:
- message = 'Username not found'
- context = {
- 'message': message,
- }
- return render_template('admin_password_reset.html', context=context)
- # Create a route for the home page
- @app.route('/', methods=['GET', 'POST'])
- def index():
- error = None
- reset = False
- reset_url = is_valid_url(password_reset_url)
- reset_url_error = False
- if reset_url is False:
- reset_url_error = True
- if request.method == 'POST':
- username = request.form.get('username')
- # Check for the username in the DB
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- user = None
- if user:
- # Remove emailreinderlogs from this user
- try:
- reminders = EmailReminderLog.select().where(EmailReminderLog.username == user.username).execute()
- for reminder in reminders:
- reminder.delete_instance()
- except Exception as e:
- pass
- # Reset login datetime
- user.reset_datetime = datetime.datetime.now().strftime("%Y-%m-%d")
- PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
- user.save()
- # Open the reset URL in a new tab if the URL is valid
- if reset_url is not False:
- webbrowser.open_new_tab(str(reset_url))
- # Set reset to True to pass back to the view to display the correct content back to the user.
- reset = True
- else:
- error = 'Invalid username'
- context = {
- 'domain': domain_name,
- 'error': error,
- 'reset': reset,
- 'reset_url': reset_url,
- 'reset_url_error': reset_url_error,
- }
- return render_template('index.html', context=context)
- # Create a route for admin page
- @app.route('/admin/')
- def admin():
- if admin_password_check():
- return redirect(url_for('admin_password_reset'))
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get settings
- try:
- password_reset_interval = Settings.get(Settings.name == 'Password Reset Interval')
- except Exception as e:
- password_reset_interval = None
- # Get a count of all IlsUsers
- try:
- ils_users = IlsUser.select().count()
- except Exception as e:
- ils_users = None
- # Get a count of all EmailReminders that are not ignored or sent
- try:
- total_email_reminders = EmailReminderLog.select().where(EmailReminderLog.sent == False, EmailReminderLog.ignore == False).count()
- except Exception as e:
- total_email_reminders = None
- # Get a list ils users that have passwords expiring in the next 7 days
- ils_expiring_users = list()
- try:
- # Get a list of ILS users that have passwords expiring in the next 7 days
- ils_users_expiring = IlsUser.select().where(IlsUser.reset_datetime + datetime.timedelta(days=int(password_reset_interval.value)) <= datetime.datetime.now() + datetime.timedelta(days=7)).execute()
- for user in ils_users_expiring:
- user = IlsUser.get(IlsUser.username == user.username)
- ils_expiring_users.append(user)
- except Exception as e:
- ils_users_expiring = None
- # get a list of email notifications sent in the last 7 days
- try:
- # Get a list of all future email reminders
- email_reminders = EmailReminderLog.select().order_by(-EmailReminderLog.date).limit(15).execute()
- except Exception as e:
- email_reminders = None
- # Suspend setting
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting = str(suspend_setting_setting.value).lower()
- context = {
- 'ils_user_count': ils_users,
- 'total_email_reminders': total_email_reminders,
- 'suspend_setting': suspend_setting,
- }
- return render_template('admin.html', context=context)
- @app.route('/admin/users/', methods=['GET', 'POST'])
- def admin_users():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- password = request.form.get('password')
- confirm_password = request.form.get('confirm_password')
- # Check to see if username already exists
- try:
- user = User.filter(User.username == username).first()
- except Exception as e:
- user = None
- if user:
- message = 'Username already exists'
- else:
- if password == confirm_password:
- User.create(username=username, password=encrypt_password(password),
- date_created=datetime.datetime.now(), logged_in=False).save()
- message = 'User created successfully'
- Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
- else:
- message = 'Passwords do not match'
- # Get all admin users from the DB
- users = User.select().execute()
- context = {
- 'users': users,
- 'message': message,
- }
- return render_template('admin_users.html', context=context)
- @app.route('/admin/users/edit/<int:id>', methods=['GET', 'POST'])
- def admin_users_edit(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = User.get(User.id == id)
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- password = request.form.get('password')
- confirm_password = request.form.get('confirm_password')
- # Check to see if username already exists
- all_users = list()
- users = User.select().execute()
- for u in users:
- if u.username != user.username:
- all_users.append(u.username)
- if username in all_users:
- message = 'Username already exists'
- else:
- user.username = username
- if password is not None or password != '':
- if password == confirm_password:
- user.password = encrypt_password(password)
- else:
- message = 'Passwords do not match'
- user.save()
- message = 'User updated successfully'
- Log.create(username=session['username'], action='Updated admin user: %s' % username, ).save()
- context = {
- 'user': user,
- 'message': message,
- }
- return render_template('admin_user_edit.html', context=context)
- @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
- def admin_users_delete(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = User.get(User.id == id)
- if user.username != 'admin':
- # Unset session variable if the user is deleting their own account
- if user.username == session['username']:
- session.pop('username', None)
- username = user.username
- user.delete_instance()
- Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
- return redirect(url_for('admin_users'))
- @app.route('/admin/users/ils', methods=['GET', 'POST'])
- def admin_ils_users():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- email = request.form.get('email')
- date = request.form.get('date')
- # Check to see if username already exists
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- user = None
- # Check if date field was submitted if so convert to datetime object
- if date is None or date == '':
- date = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
- if user:
- message = 'Username already exists'
- else:
- IlsUser.create(username=username, email=email, reset_datetime=date).save()
- message = 'ILS User: %s created successfully' % username
- Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save()
- # Get all admin users from the DB
- users = IlsUser.select().execute()
- # Suspend setting
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting = str(suspend_setting_setting.value).lower()
- context = {
- 'users': users,
- 'message': message,
- 'suspend_setting': suspend_setting,
- }
- return render_template('admin_ils_users.html', context=context)
- @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
- def admin_ils_users_delete(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = IlsUser.get(IlsUser.id == id)
- username = user.username
- # Remove scheduled email reminders
- try:
- email_reminders = EmailReminderLog.select().where(EmailReminderLog.username == user.username).execute()
- for reminder in email_reminders:
- reminder.delete_instance()
- except Exception as e:
- pass
- user.delete_instance()
- Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
- return redirect(url_for('admin_ils_users'))
- @app.route('/admin/users/ils/delete/all', methods=['GET', 'POST'])
- def admin_ils_users_delete_all():
- if not requires_auth():
- return redirect(url_for('login'))
- '''
- # Set the suspend scheduler to True
- settings = Settings.select().execute()
- for setting in settings:
- if setting.name == 'Suspend Scheduler':
- setting.value = 'True'
- setting.save()
- '''
- '''
- # Remove scheduled email reminders
- try:
- email_reminders = EmailReminderLog.select().execute()
- for reminder in email_reminders:
- reminder.delete_instance()
- except Exception as e:
- pass
- '''
- try:
- # Remove all ILS users
- users = IlsUser.select().execute()
- for user in users:
- user.delete_instance()
- except Exception as e:
- print('Database is locked')
- finally:
- db.close()
- '''
- # Set the suspend scheduler to False
- settings = Settings.select().execute()
- for setting in settings:
- if setting.name == 'Suspend Scheduler':
- setting.value = 'True'
- setting.save()
- '''
- return redirect(url_for('admin_ils_users'))
- @app.route('/admin/users/ils/edit/<int:id>', methods=['GET', 'POST'])
- def admin_ils_users_edit(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get the user from the DB
- user = IlsUser.get(IlsUser.id == id)
- message = None
- if request.method == 'POST':
- username = request.form.get('username')
- email = request.form.get('email')
- date = request.form.get('date')
- # Check to see if username already exists
- all_users = list()
- users = IlsUser.select().execute()
- for u in users:
- if u.username != user.username:
- all_users.append(u.username)
- if username in all_users:
- message = 'Username already exists'
- else:
- # Remove all scheduled email reminders. They will automatically be rescheduled with the new information.
- try:
- email_reminders = EmailReminderLog.select().where(EmailReminderLog.username == user.username).execute()
- for reminder in email_reminders:
- reminder.delete_instance()
- except Exception as e:
- pass
- user.username = username
- user.email = email
- if date is not None and date != '':
- user.reset_datetime = date
- else:
- user.reset_datetime = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
- user.save()
- message = 'User updated successfully'
- Log.create(username=session['username'], action='Updated ILS user: %s' % username, ).save()
- # Suspend setting
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting = str(suspend_setting_setting.value).lower()
- context = {
- 'user': user,
- 'message': message,
- 'suspend_setting': suspend_setting,
- }
- return render_template('admin_ils_user_edit.html', context=context)
- # create a route for generating a CSV file for download
- @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
- def admin_ils_users_csv_download():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Create a CSV file with the users and don't add a blank line between rows
- with open('ils-users_%s.csv' % datetime.datetime.now().strftime('%Y-%m-%d'), 'w', newline='') as f:
- writer = csv.writer(f)
- writer.writerow(['ILS Username', 'Email', 'Last Password Reset (YYYY-MM-DD)'])
- users = IlsUser.select().execute()
- for user in users:
- date = datetime.datetime.strptime(user.reset_datetime, '%Y-%m-%d').strftime('%Y-%m-%d')
- writer.writerow([user.username, user.email, date])
- Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
- # return the CSV file to the user
- return send_file('ils-users_%s.csv' % datetime.datetime.now().strftime('%Y-%m-%d'), as_attachment=True)
- @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
- def admin_ils_users_csv_import():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- if request.method == 'POST':
- csv_file = request.files['csv']
- if csv_file.filename != '':
- csv_file.save(os.path.join('uploads', csv_file.filename))
- with open(os.path.join('uploads', csv_file.filename), 'r') as f:
- reader = csv.reader(f)
- for row in reader:
- username = row[0]
- email = row[1]
- reset_date = row[2]
- try:
- reset_date = datetime.datetime.strptime(reset_date, '%m/%d/%Y').strftime('%Y-%m-%d')
- except Exception as e:
- pass
- try:
- reset_date = datetime.datetime.strptime(reset_date, '%Y-%m-%d').strftime('%Y-%m-%d')
- except Exception as e:
- pass
- # ignore the header row
- if username == 'ILS Username':
- continue
- # ignore blank rows
- if username == '':
- continue
- # Check if user already exists and if it does update the entry
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- user = None
- if user:
- user.email = email
- # Check if the reset date is blank and if it is set it to the current date
- if reset_date == '' or reset_date is None:
- user.reset_datetime = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
- else:
- user.reset_datetime = reset_date
- user.save()
- else:
- if reset_date == '' or reset_date is None:
- user.reset_datetime = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
- else:
- reset_datetime = reset_date
- IlsUser.create(username=username, email=email, reset_datetime=reset_datetime).save()
- # Delete the uploaded file
- os.remove(os.path.join('uploads', csv_file.filename))
- return redirect(url_for('admin_ils_users'))
- context = {
- 'message': message,
- }
- return render_template('csv.html', context=context)
- @app.route('/admin/settings', methods=['GET', 'POST'])
- def settings():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- # Process form submission
- if request.method == 'POST':
- # Assign form values to variables
- id = request.form.get('id')
- value = request.form.get('value')
- # Check if the setting exists
- try:
- setting = Settings.get(Settings.id == id)
- except Exception as e:
- setting = None
- if setting:
- # Scrub the values based on the setting name
- if setting.name == 'Debug Mode':
- if value.lower() == 'true':
- value = True
- else:
- value = False
- value = bool(value)
- elif setting.name == 'HTTP Port':
- value = str(value)
- elif setting.name == 'SMTP Host':
- value = str(value)
- elif setting.name == 'SMTP Port':
- value = str(value)
- elif setting.name == 'SMTP Username':
- value = str(value)
- elif setting.name == 'SMTP Password':
- value = str(value)
- elif setting.name == 'Domain Name':
- value = str(value)
- elif setting.name == 'Password Reset URL':
- value = str(value)
- elif setting.name == 'Password Reset Interval':
- if value.isdigit():
- value = int(value)
- else:
- value = setting.value
- # Update the setting
- old_value = setting.value
- setting.value = value
- setting.save()
- if setting.name == 'SMTP Password':
- Log.create(username=session['username'], action='SMTP Password was changed.').save()
- else:
- Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
- old_value,
- setting.value)).save()
- message = '%s updated successfully' % setting.name
- # Get settings from DB
- all_settings = Settings.select().execute()
- # Suspend setting
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting = str(suspend_setting_setting.value).lower()
- context = {
- 'settings': all_settings,
- 'message': message,
- 'suspend_setting': suspend_setting,
- }
- return render_template('settings.html', context=context)
- @app.route('/admin/schedule', methods=['GET', 'POST'])
- def schedule():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- message = None
- # Get all schedules from the DB
- schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
- # add schedule
- if request.method == 'POST':
- # Assign form values to variables
- interval = request.form.get('interval')
- # Check if interval is a number
- try:
- int(interval)
- except Exception as e:
- message = 'Error creating schedule. Value submitted must be a whole number.'
- return render_template('schedule.html', context={'message': message, 'schedules': schedules})
- # Check if the schedule already exists
- try:
- schedule = Schedule.get(Schedule.interval == interval)
- except Exception as e:
- schedule = None
- if schedule:
- message = 'Schedule already exists'
- else:
- # Create the schedule
- Schedule.create(interval=interval).save()
- Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save()
- message = 'Schedule: %s created successfully' % interval
- schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
- # Suspend setting
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting = str(suspend_setting_setting.value).lower()
- context = {
- 'schedules': schedules,
- 'message': message,
- 'suspend_setting': suspend_setting,
- }
- return render_template('schedule.html', context=context)
- @app.route('/admin/schedule/emails')
- def scheduled_emails():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all future emailreminderlogs
- reminders = EmailReminderLog.select().where(EmailReminderLog.date > datetime.datetime.now(), EmailReminderLog.ignore == False).order_by(
- -EmailReminderLog.date).execute()
- context = {
- 'reminders': reminders,
- }
- return render_template('scheduled_emails.html', context=context)
- @app.route('/admin/schedule/emails/clear')
- def reset_scheduled_emails():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all future emailreminderlogs
- reminders = EmailReminderLog.select().execute()
- for reminder in reminders:
- reminder.delete_instance()
- time.sleep(3)
- return redirect(url_for('scheduled_emails'))
- # remove schedule
- @app.route('/admin/schedule/remove/reminder/<int:id>')
- def reminder_remove(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get EmailReminderLogs with the interval of the schedule being removed
- email_reminder_log = EmailReminderLog.get(EmailReminderLog.id == id)
- Log.create(username=session['username'],
- action='Removed %s day reminder for %s (%s)' % (email_reminder_log.interval, email_reminder_log.username, email_reminder_log.email)).save()
- email_reminder_log.ignore = True
- email_reminder_log.save()
- return redirect(url_for('scheduled_emails'))
- # remove schedule
- @app.route('/admin/schedule/remove/<int:id>', methods=['GET', 'POST'])
- def schedule_remove(id):
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get Schedule by id
- try:
- schedule = Schedule.get(Schedule.id == id)
- except Exception as e:
- schedule = None
- if schedule:
- # Get EmailReminderLogs with the interval of the schedule being removed
- email_reminder_logs = EmailReminderLog.select().where(EmailReminderLog.interval == schedule.interval).execute()
- for email_reminder_log in email_reminder_logs:
- email_reminder_log.delete_instance()
- schedule.delete_instance()
- Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save()
- return redirect(url_for('schedule'))
- @app.route('/admin/system/log')
- def system_log():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all logs from the DB
- logs = Log.select().order_by(Log.id.desc()).execute()
- context = {
- 'logs': logs,
- }
- return render_template('system_log.html', context=context)
- @app.route('/admin/settings/test/email', methods=['POST'])
- def test_email():
- if not requires_auth():
- return redirect(url_for('login'))
- # Get form data
- email = request.form.get('email')
- # Get email settings from DB
- smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
- smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
- smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
- smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
- smtp_host = smtp_host_setting.value
- smtp_port = smtp_port_setting.value
- smtp_username = smtp_username_setting.value
- smtp_password = smtp_password_setting.value
- emailer = Emailer(smtp_host, smtp_port, smtp_username, smtp_password)
- emailer.send_email(email, 'ILS Password Reset Test',
- 'This is a test email to verify that the email settings are correct.')
- return redirect(request.referrer)
- @app.route('/admin/system/scheduler/enable')
- def enable_scheduler():
- if not requires_auth():
- return redirect(url_for('login'))
- # Get settings from DB
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting_setting.value = False
- suspend_setting_setting.save()
- return redirect(request.referrer)
- @app.route('/admin/system/scheduler/disable')
- def disable_scheduler():
- if not requires_auth():
- return redirect(url_for('login'))
- # Get settings from DB
- suspend_setting_setting = Settings.get(Settings.name == 'Suspend Scheduler')
- suspend_setting_setting.value = True
- suspend_setting_setting.save()
- return redirect(request.referrer)
- @app.route('/admin/system/log/password/resets')
- def password_reset_log():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- # Get all logs from the DB
- logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
- context = {
- 'logs': logs,
- }
- return render_template('password_reset_log.html', context=context)
- @app.route('/logout')
- def logout():
- try:
- if 'username' in session:
- username = session['username']
- user = User.get(User.username == username)
- user.logged_in = False
- Log.create(username=session['username'], action='Logged out').save()
- user.save()
- except:
- pass
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form.get('username')
- password = encrypt_password(request.form.get('password'))
- try:
- user = User.filter(User.username == username and User.password == password).first()
- except Exception as e:
- user = None
- session.pop('username', None)
- if user:
- # Login user
- session['username'] = request.form.get('username')
- user.logged_in = True
- Log.create(username=session['username'], action='Logged in').save()
- user.save()
- return redirect(url_for('admin'))
- else:
- error = 'Invalid Credentials. Please try again.'
- context = {
- 'error': error
- }
- return render_template('login.html', context=context)
- context = {
- }
- return render_template('login.html', context=context)
- def clean_up():
- shutdown_scheduler.set()
- http_server.stop()
- if __name__ == "__main__":
- print("------------------------- Start up -----------------------------")
- print("Starting HTTP Service on port %s..." % http_port)
- if debug is True:
- print("Debug mode is enabled.")
- http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
- else:
- http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
- print("HTTP Service Started.")
- print("--------------------- Application Details ---------------------")
- print("Application started at %s" % datetime.datetime.now())
- print("System IP Address: %s" % get_ip_address())
- print("System Hostname: %s" % get_hostname())
- print("Access the Dashboard using a web browser using any of the following:")
- print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
- print("---------------------------------------------------------------")
- print("To stop the application close this window.")
- print("---------------------------------------------------------------")
- http_server.serve_forever()
- atexit.register(clean_up)
|