app.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. import datetime as datetime
  2. from urllib.parse import urlparse
  3. import webbrowser
  4. from emailtool.emailer import Emailer
  5. from flask import Flask, render_template, request, redirect, url_for, session, send_file
  6. import configparser
  7. from gevent.pywsgi import WSGIServer
  8. import socket
  9. import logging
  10. from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField
  11. import hashlib
  12. import os
  13. import csv
  14. db_name = 'app.db'
  15. db = SqliteDatabase(db_name)
  16. class User(Model):
  17. username = CharField()
  18. password = CharField()
  19. date_created = DateTimeField(default=datetime.datetime.now)
  20. logged_in = BooleanField(default=False)
  21. class Meta:
  22. database = db
  23. class IlsUser(Model):
  24. username = CharField()
  25. email = CharField()
  26. reset_datetime = DateTimeField(default=datetime.datetime.now)
  27. class Meta:
  28. database = db
  29. config = configparser.ConfigParser()
  30. config.read('settings.ini')
  31. application_settings = config['application']
  32. smtp_settings = config['smtp']
  33. http_settings = config['http']
  34. domain_settings = config['domain']
  35. log = logging.getLogger('werkzeug')
  36. log.setLevel(logging.INFO)
  37. if application_settings['debug'].lower() == 'true':
  38. debug = True
  39. else:
  40. debug = False
  41. def send_email(to, subject, body):
  42. # Get settings from smtp_settings
  43. host = smtp_settings['host']
  44. port = smtp_settings['port']
  45. username = smtp_settings['username']
  46. password = smtp_settings['password']
  47. # Create an instance of the Emailer class
  48. emailer = Emailer(host, port, username, password)
  49. # Call the send_email method
  50. emailer.send_email(to, subject, body)
  51. # Encrypt the password with SHA256
  52. def encrypt_password(password):
  53. hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
  54. return hash
  55. def shutdown_session(exception=None):
  56. print('Stopping HTTP Service...')
  57. http_server.stop()
  58. # Get the systems hostname
  59. def get_hostname():
  60. return socket.gethostname()
  61. # Get systems IP address
  62. def get_ip_address():
  63. return socket.gethostbyname(socket.gethostname())
  64. # Method to check if a URL is valid using regex
  65. def is_valid_url(url):
  66. try:
  67. result = urlparse(url)
  68. if all([result.scheme, result.netloc]):
  69. url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
  70. else:
  71. url = False
  72. return url
  73. except:
  74. return False
  75. def requires_auth():
  76. if 'username' in session:
  77. username = session['username']
  78. user = User.get(User.username == username)
  79. if user.logged_in is True:
  80. return True
  81. else:
  82. return False
  83. else:
  84. return False
  85. # Check for DB tables and create if they don't exist
  86. if db.table_exists('user') is False:
  87. db.create_tables([User, IlsUser])
  88. User.create(username='admin', password=encrypt_password('admin'),
  89. date_created=datetime.datetime.now(), logged_in=False).save()
  90. if db.table_exists('ilsuser') is False:
  91. db.create_tables([IlsUser])
  92. db.close()
  93. app = Flask(__name__)
  94. app.secret_key = 'super secret key'
  95. @app.before_request
  96. def before_request():
  97. db.connect()
  98. @app.after_request
  99. def after_request(response):
  100. db.close()
  101. return response
  102. # Create a route for the home page
  103. @app.route('/', methods=['GET', 'POST'])
  104. def index():
  105. # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
  106. error = None
  107. reset = False
  108. reset_url = is_valid_url(domain_settings['reset_url'])
  109. reset_url_error = False
  110. if reset_url is False:
  111. reset_url_error = True
  112. if request.method == 'POST':
  113. username = request.form.get('username')
  114. # Check for the username in the DB
  115. try:
  116. user = IlsUser.filter(IlsUser.username == username).first()
  117. except Exception as e:
  118. print(e)
  119. user = None
  120. if user:
  121. # Reset login datetime
  122. user.reset_datetime = datetime.datetime.now()
  123. user.save()
  124. # Open the reset URL in a new tab if the URL is valid
  125. if reset_url is not False:
  126. webbrowser.open_new_tab(str(reset_url))
  127. # Set reset to True to pass back to the view to display the correct content back to the user.
  128. reset = True
  129. else:
  130. error = 'Invalid username'
  131. context = {
  132. 'domain': domain_settings['name'],
  133. 'error': error,
  134. 'reset': reset,
  135. 'reset_url': reset_url,
  136. 'reset_url_error': reset_url_error,
  137. }
  138. return render_template('index.html', context=context)
  139. # Create a route for admin page
  140. @app.route('/admin/')
  141. def admin():
  142. # Check to see if user is logged in
  143. if not requires_auth():
  144. return redirect(url_for('login'))
  145. return render_template('admin.html')
  146. @app.route('/admin/users/', methods=['GET', 'POST'])
  147. def admin_users():
  148. # Check to see if user is logged in
  149. if not requires_auth():
  150. return redirect(url_for('login'))
  151. message = None
  152. if request.method == 'POST':
  153. username = request.form.get('username')
  154. password = request.form.get('password')
  155. confirm_password = request.form.get('confirm_password')
  156. # Check to see if username already exists
  157. try:
  158. user = User.filter(User.username == username).first()
  159. except Exception as e:
  160. print(e)
  161. user = None
  162. if user:
  163. message = 'Username already exists'
  164. else:
  165. if password == confirm_password:
  166. User.create(username=username, password=encrypt_password(password),
  167. date_created=datetime.datetime.now(), logged_in=False).save()
  168. message = 'User created successfully'
  169. else:
  170. message = 'Passwords do not match'
  171. # Get all admin users from the DB
  172. users = User.select().execute()
  173. context = {
  174. 'users': users,
  175. 'message': message,
  176. }
  177. return render_template('admin_users.html', context=context)
  178. @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
  179. def admin_users_delete(id):
  180. # Check to see if user is logged in
  181. if not requires_auth():
  182. return redirect(url_for('login'))
  183. # Get the user from the DB
  184. user = User.get(User.id == id)
  185. if user.username != 'admin':
  186. # Unset session variable if the user is deleting their own account
  187. if user.username == session['username']:
  188. session.pop('username', None)
  189. user.delete_instance()
  190. return redirect(url_for('admin_users'))
  191. @app.route('/admin/users/ils', methods=['GET', 'POST'])
  192. def admin_ils_users():
  193. # Check to see if user is logged in
  194. if not requires_auth():
  195. return redirect(url_for('login'))
  196. message = None
  197. if request.method == 'POST':
  198. username = request.form.get('username')
  199. email = request.form.get('email')
  200. # Check to see if username already exists
  201. try:
  202. user = IlsUser.filter(IlsUser.username == username).first()
  203. except Exception as e:
  204. print(e)
  205. user = None
  206. if user:
  207. message = 'Username already exists'
  208. else:
  209. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  210. message = 'User created successfully'
  211. # Get all admin users from the DB
  212. users = IlsUser.select().execute()
  213. context = {
  214. 'users': users,
  215. 'message': message,
  216. }
  217. return render_template('admin_ils_users.html', context=context)
  218. @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
  219. def admin_ils_users_delete(id):
  220. # Check to see if user is logged in
  221. if not requires_auth():
  222. return redirect(url_for('login'))
  223. # Get the user from the DB
  224. user = IlsUser.get(IlsUser.id == id)
  225. user.delete_instance()
  226. return redirect(url_for('admin_ils_users'))
  227. # create a route for generating a CSV file for download
  228. @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
  229. def admin_ils_users_csv_download():
  230. # Check to see if user is logged in
  231. if not requires_auth():
  232. return redirect(url_for('login'))
  233. # Create a CSV file with the users and don't add a blank line between rows
  234. with open('users.csv', 'w', newline='') as f:
  235. writer = csv.writer(f)
  236. writer.writerow(['username', 'email'])
  237. users = IlsUser.select().execute()
  238. for user in users:
  239. writer.writerow([user.username, user.email])
  240. # return the CSV file to the user
  241. return send_file('users.csv', as_attachment=True)
  242. @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
  243. def admin_ils_users_csv_import():
  244. # Check to see if user is logged in
  245. if not requires_auth():
  246. return redirect(url_for('login'))
  247. message = None
  248. if request.method == 'POST':
  249. csv_file = request.files['csv']
  250. if csv_file.filename != '':
  251. csv_file.save(os.path.join('uploads', csv_file.filename))
  252. with open(os.path.join('uploads', csv_file.filename), 'r') as f:
  253. reader = csv.reader(f)
  254. for row in reader:
  255. username = row[0]
  256. email = row[1]
  257. # ignore the header row
  258. if username == 'username':
  259. continue
  260. # ignore blank rows
  261. if username == '':
  262. continue
  263. # Check if user already exists and if it does update the entry
  264. try:
  265. user = IlsUser.filter(IlsUser.username == username).first()
  266. except Exception as e:
  267. print(e)
  268. user = None
  269. if user:
  270. user.email = email
  271. user.reset_datetime = datetime.datetime.now()
  272. user.save()
  273. else:
  274. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  275. # Delete the uploaded file
  276. os.remove(os.path.join('uploads', csv_file.filename))
  277. return redirect(url_for('admin_ils_users'))
  278. context = {
  279. 'message': message,
  280. }
  281. return render_template('csv.html', context=context)
  282. @app.route('/logout')
  283. def logout():
  284. if 'username' in session:
  285. username = session['username']
  286. user = User.get(User.username == username)
  287. user.logged_in = False
  288. user.save()
  289. session.pop('username', None)
  290. return redirect(url_for('login'))
  291. @app.route('/login', methods=['GET', 'POST'])
  292. def login():
  293. if request.method == 'POST':
  294. username = request.form.get('username')
  295. password = encrypt_password(request.form.get('password'))
  296. try:
  297. user = User.filter(User.username == username and User.password == password).first()
  298. except Exception as e:
  299. print(e)
  300. user = None
  301. session.pop('username', None)
  302. if user:
  303. # Login user
  304. session['username'] = request.form.get('username')
  305. user.logged_in = True
  306. user.save()
  307. return redirect(url_for('admin'))
  308. else:
  309. error = 'Invalid Credentials. Please try again.'
  310. context = {
  311. 'error': error
  312. }
  313. return render_template('login.html', context=context)
  314. context = {
  315. }
  316. return render_template('login.html', context=context)
  317. # on exit of the program make sure the http server is stopped
  318. #@app.teardown_appcontext
  319. if __name__ == "__main__":
  320. print("------------------------- Start up -----------------------------")
  321. print("Starting HTTP Service on port %s..." % http_settings['port'])
  322. if debug is True:
  323. print("Debug mode is enabled.")
  324. http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app)
  325. else:
  326. http_server = WSGIServer(('0.0.0.0', int(http_settings['port'])), app, log=log, error_log=log)
  327. print("HTTP Service Started.")
  328. print("--------------------- Application Details ---------------------")
  329. print("Application started at %s" % datetime.datetime.now())
  330. print("System IP Address: %s" % get_ip_address())
  331. print("System Hostname: %s" % get_hostname())
  332. print("Access the Dashboard using a web browser using any of the following:")
  333. print("http://%s:%s or http://%s:%s" % (get_hostname(), http_settings['port'], get_ip_address(), http_settings['port']))
  334. print("---------------------------------------------------------------")
  335. print("To stop the application close this window.")
  336. print("---------------------------------------------------------------")
  337. http_server.serve_forever()