123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- import datetime as datetime
- import configparser as configparser
- import os
- from urllib.parse import urlparse
- import webbrowser
- from emailtool.emailer import Emailer
- from flask import Flask, render_template, request, redirect, url_for, jsonify, session
- from flask_httpauth import HTTPBasicAuth
- import configparser
- from gevent.pywsgi import WSGIServer
- import socket
- import logging
- from peewee import Model, CharField, TextField, DateTimeField, SqliteDatabase, BooleanField
- import hashlib
- db_name = 'users.db'
- db = SqliteDatabase(db_name)
- class User(Model):
- username = CharField()
- password = CharField()
- logged_in = BooleanField(default=False)
- class Meta:
- database = db
- class IlsUser(Model):
- username = CharField()
- email = CharField()
- reset_datetime = DateTimeField(default=datetime.datetime.now)
- class Meta:
- database = db
- config = configparser.ConfigParser()
- config.read('settings.ini')
- application_settings = config['application']
- smtp_settings = config['smtp']
- http_settings = config['http']
- domain_settings = config['domain']
- log = logging.getLogger('werkzeug')
- log.setLevel(logging.INFO)
- if application_settings['debug'].lower() == 'true':
- debug = True
- else:
- debug = False
- def send_email(to, subject, body):
- # Get settings from smtp_settings
- host = smtp_settings['host']
- port = smtp_settings['port']
- username = smtp_settings['username']
- password = smtp_settings['password']
- # Create an instance of the Emailer class
- emailer = Emailer(host, port, username, password)
- # Call the send_email method
- emailer.send_email(to, subject, body)
- # Encrypt the password with SHA256
- def encrypt_password(password):
- hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
- return hash
- def shutdown_session(exception=None):
- print('Stopping HTTP Service...')
- http_server.stop()
- # Get the systems hostname
- def get_hostname():
- return socket.gethostname()
- # Get systems IP address
- def get_ip_address():
- return socket.gethostbyname(socket.gethostname())
- # Method to check if a URL is valid
- def is_valid_url(url):
- try:
- result = urlparse(url)
- return all([result.scheme, result.netloc])
- except:
- return False
- def requires_auth():
- print('Checking for session...')
- if 'username' in session:
- print('Username in session')
- username = session['username']
- user = User.get(User.username == username)
- if user.logged_in is True:
- return True
- else:
- print('1')
- return False
- else:
- print('2')
- return False
- # Check for DB tables and create if they don't exist
- if db.table_exists('user') is False:
- db.create_tables([User, IlsUser])
- User.create(username='admin', password=encrypt_password('admin'))
- if db.table_exists('ilsuser') is False:
- db.create_tables([IlsUser])
- db.close()
- app = Flask(__name__)
- app.secret_key = 'super secret key'
- @app.before_request
- def before_request():
- db.connect()
- @app.after_request
- def after_request(response):
- db.close()
- return response
- # Create a route for the home page
- @app.route('/', methods=['GET', 'POST'])
- def index():
- # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
- error = None
- reset = False
- reset_url = is_valid_url(domain_settings['reset_url'])
- if reset_url is False:
- error = 'Invalid Reset URL. Please contact your system administrator.'
- if request.method == 'POST':
- print('POST')
- username = request.form.get('username')
- # Check for the username in the DB
- try:
- user = IlsUser.filter(IlsUser.username == username).first()
- except Exception as e:
- print(e)
- user = None
- if user:
- # Reset login datetime
- user.reset_datetime = datetime.datetime.now()
- user.save()
- print(reset_url)
- # Open the reset URL in a new tab if the URL is valid
- if reset_url is not False:
- webbrowser.open_new_tab(reset_url)
- # Set reset to True to pass back to the view to display the correct content back to the user.
- reset = True
- print(reset_url)
- context = {
- 'domain': domain_settings['name'],
- 'error': error,
- 'reset': reset,
- 'reset_url:': reset_url,
- }
- return render_template('index.html', context=context)
- # Create a route for admin page
- @app.route('/admin')
- def admin():
- # Check to see if user is logged in
- if not requires_auth():
- return redirect(url_for('login'))
- return render_template('admin.html')
- @app.route('/logout')
- def logout():
- if 'username' in session:
- username = session['username']
- user = User.get(User.username == username)
- user.logged_in = False
- user.save()
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form.get('username')
- password = encrypt_password(request.form.get('password'))
- try:
- user = User.filter(User.username == username and User.password == password).first()
- except Exception as e:
- print(e)
- user = None
- session.pop('username', None)
- if user:
- # Login user
- session['username'] = request.form.get('username')
- user.logged_in = True
- user.save()
- return redirect(url_for('admin'))
- else:
- error = 'Invalid Credentials. Please try again.'
- context = {
- 'error': error
- }
- return render_template('login.html', context=context)
- context = {
- }
- return render_template('login.html', context=context)
- # on exit of the program make sure the http server is stopped
- #@app.teardown_appcontext
- if __name__ == "__main__":
- print("------------------------- Start up -----------------------------")
- print("Starting HTTP Service on port %s..." % http_settings['port'])
- if debug is True:
- print("Debug mode is enabled.")
- http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app)
- else:
- http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app, log=log, error_log=log)
- print("HTTP Service Started.")
- print("--------------------- Application Details ---------------------")
- print("Application started at %s" % datetime.datetime.now())
- print("System IP Address: %s" % get_ip_address())
- print("System Hostname: %s" % get_hostname())
- print("Access the Dashboard using a web browser using any of the following:")
- print("http://%s:%s or http://%s:%s" % (get_hostname(), http_settings['port'], get_ip_address(), http_settings['port']))
- print("---------------------------------------------------------------")
- print("To stop the application close this window.")
- print("---------------------------------------------------------------")
- http_server.serve_forever()
|