1
0

app.py 17 KB


  1. import datetime as datetime
  2. from urllib.parse import urlparse
  3. import webbrowser
  4. from emailtool.emailer import Emailer
  5. from flask import Flask, render_template, request, redirect, url_for, session, send_file
  6. from gevent.pywsgi import WSGIServer
  7. import socket
  8. import logging
  9. from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField
  10. import hashlib
  11. import os
  12. import csv
  13. db_name = 'app.db'
  14. db = SqliteDatabase(db_name)
  15. class User(Model):
  16. username = CharField()
  17. password = CharField()
  18. date_created = DateTimeField(default=datetime.datetime.now)
  19. logged_in = BooleanField(default=False)
  20. class Meta:
  21. database = db
  22. class IlsUser(Model):
  23. username = CharField()
  24. email = CharField()
  25. reset_datetime = DateTimeField(default=datetime.datetime.now)
  26. class Meta:
  27. database = db
  28. class Settings(Model):
  29. name = CharField()
  30. value = CharField()
  31. class Meta:
  32. database = db
  33. class Schedule(Model):
  34. days = CharField()
  35. class Meta:
  36. database = db
  37. class Log(Model):
  38. date = DateTimeField(default=datetime.datetime.now)
  39. username = CharField()
  40. action = CharField()
  41. class Meta:
  42. database = db
  43. class PasswordResetLog(Model):
  44. date = DateTimeField(default=datetime.datetime.now)
  45. username = CharField()
  46. class Meta:
  47. database = db
  48. settings = Settings.select().execute()
  49. debug_setting = Settings.get(Settings.name == 'Debug Mode')
  50. debug = debug_setting.value
  51. http_port_setting = Settings.get(Settings.name == 'HTTP Port')
  52. http_port = http_port_setting.value
  53. smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
  54. smtp_host = smtp_host_setting.value
  55. smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
  56. smtp_port = smtp_port_setting.value
  57. smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
  58. smtp_username = smtp_username_setting.value
  59. smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
  60. smtp_password = smtp_password_setting.value
  61. domain_name_setting = Settings.get(Settings.name == 'Domain Name')
  62. domain_name = domain_name_setting.value
  63. password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
  64. password_reset_url = password_reset_url_setting.value
  65. log = logging.getLogger('werkzeug')
  66. log.setLevel(logging.INFO)
  67. if debug.lower() == 'true':
  68. debug = True
  69. else:
  70. debug = False
  71. def send_email(to, subject, body):
  72. # Get settings from smtp_settings
  73. host = smtp_host
  74. port = smtp_port
  75. username = smtp_username
  76. password = smtp_password
  77. # Create an instance of the Emailer class
  78. emailer = Emailer(host, port, username, password)
  79. # Call the send_email method
  80. emailer.send_email(to, subject, body)
  81. # Encrypt the password with SHA256
  82. def encrypt_password(password):
  83. hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
  84. return hash
  85. def shutdown_session(exception=None):
  86. print('Stopping HTTP Service...')
  87. http_server.stop()
  88. # Get the systems hostname
  89. def get_hostname():
  90. return socket.gethostname()
  91. # Get systems IP address
  92. def get_ip_address():
  93. return socket.gethostbyname(socket.gethostname())
  94. # Method to check if a URL is valid using regex
  95. def is_valid_url(url):
  96. try:
  97. result = urlparse(url)
  98. if all([result.scheme, result.netloc]):
  99. url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
  100. else:
  101. url = False
  102. return url
  103. except:
  104. return False
  105. def requires_auth():
  106. if 'username' in session:
  107. username = session['username']
  108. user = User.get(User.username == username)
  109. if user.logged_in is True:
  110. return True
  111. else:
  112. return False
  113. else:
  114. return False
  115. # Check for DB tables and create if they don't exist
  116. if db.table_exists('user') is False:
  117. db.create_tables([User, IlsUser])
  118. User.create(username='admin', password=encrypt_password('admin'),
  119. date_created=datetime.datetime.now(), logged_in=False).save()
  120. if db.table_exists('ilsuser') is False:
  121. db.create_tables([IlsUser])
  122. if db.table_exists('settings') is False:
  123. db.create_tables([Settings])
  124. Settings.create(name='Debug Mode', value=False).save()
  125. Settings.create(name='HTTP Port', value=5055).save()
  126. Settings.create(name='SMTP Host', value="").save()
  127. Settings.create(name='SMTP Port', value="587").save()
  128. Settings.create(name='SMTP Username', value="").save()
  129. Settings.create(name='SMTP Password', value="").save()
  130. Settings.create(name='Domain Name', value="lynx").save()
  131. Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
  132. if db.table_exists('log') is False:
  133. db.create_tables([Log])
  134. if db.table_exists('password_reset_log') is False:
  135. db.create_tables([PasswordResetLog])
  136. if db.table_exists('schedule') is False:
  137. db.create_tables([Schedule])
  138. db.close()
  139. app = Flask(__name__)
  140. app.secret_key = os.urandom(24)
  141. @app.before_request
  142. def before_request():
  143. db.connect()
  144. @app.after_request
  145. def after_request(response):
  146. db.close()
  147. return response
  148. # Create a route for the home page
  149. @app.route('/', methods=['GET', 'POST'])
  150. def index():
  151. # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
  152. error = None
  153. reset = False
  154. reset_url = is_valid_url(password_reset_url)
  155. reset_url_error = False
  156. if reset_url is False:
  157. reset_url_error = True
  158. if request.method == 'POST':
  159. username = request.form.get('username')
  160. # Check for the username in the DB
  161. try:
  162. user = IlsUser.filter(IlsUser.username == username).first()
  163. except Exception as e:
  164. print(e)
  165. user = None
  166. if user:
  167. # Reset login datetime
  168. user.reset_datetime = datetime.datetime.now()
  169. PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
  170. user.save()
  171. # Open the reset URL in a new tab if the URL is valid
  172. if reset_url is not False:
  173. webbrowser.open_new_tab(str(reset_url))
  174. # Set reset to True to pass back to the view to display the correct content back to the user.
  175. reset = True
  176. else:
  177. error = 'Invalid username'
  178. context = {
  179. 'domain': domain_name,
  180. 'error': error,
  181. 'reset': reset,
  182. 'reset_url': reset_url,
  183. 'reset_url_error': reset_url_error,
  184. }
  185. return render_template('index.html', context=context)
  186. # Create a route for admin page
  187. @app.route('/admin/')
  188. def admin():
  189. # Check to see if user is logged in
  190. if not requires_auth():
  191. return redirect(url_for('login'))
  192. return render_template('admin.html')
  193. @app.route('/admin/users/', methods=['GET', 'POST'])
  194. def admin_users():
  195. # Check to see if user is logged in
  196. if not requires_auth():
  197. return redirect(url_for('login'))
  198. message = None
  199. if request.method == 'POST':
  200. username = request.form.get('username')
  201. password = request.form.get('password')
  202. confirm_password = request.form.get('confirm_password')
  203. # Check to see if username already exists
  204. try:
  205. user = User.filter(User.username == username).first()
  206. except Exception as e:
  207. print(e)
  208. user = None
  209. if user:
  210. message = 'Username already exists'
  211. else:
  212. if password == confirm_password:
  213. User.create(username=username, password=encrypt_password(password),
  214. date_created=datetime.datetime.now(), logged_in=False).save()
  215. message = 'User created successfully'
  216. Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
  217. else:
  218. message = 'Passwords do not match'
  219. # Get all admin users from the DB
  220. users = User.select().execute()
  221. context = {
  222. 'users': users,
  223. 'message': message,
  224. }
  225. return render_template('admin_users.html', context=context)
  226. @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
  227. def admin_users_delete(id):
  228. # Check to see if user is logged in
  229. if not requires_auth():
  230. return redirect(url_for('login'))
  231. # Get the user from the DB
  232. user = User.get(User.id == id)
  233. if user.username != 'admin':
  234. # Unset session variable if the user is deleting their own account
  235. if user.username == session['username']:
  236. session.pop('username', None)
  237. username = user.username
  238. user.delete_instance()
  239. Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
  240. return redirect(url_for('admin_users'))
  241. @app.route('/admin/users/ils', methods=['GET', 'POST'])
  242. def admin_ils_users():
  243. # Check to see if user is logged in
  244. if not requires_auth():
  245. return redirect(url_for('login'))
  246. message = None
  247. if request.method == 'POST':
  248. username = request.form.get('username')
  249. email = request.form.get('email')
  250. # Check to see if username already exists
  251. try:
  252. user = IlsUser.filter(IlsUser.username == username).first()
  253. except Exception as e:
  254. print(e)
  255. user = None
  256. if user:
  257. message = 'Username already exists'
  258. else:
  259. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  260. message = 'ILS User: %s created successfully' % username
  261. Log.create(username=session['username'], action='Created ILS User: %s' % username,).save()
  262. # Get all admin users from the DB
  263. users = IlsUser.select().execute()
  264. context = {
  265. 'users': users,
  266. 'message': message,
  267. }
  268. return render_template('admin_ils_users.html', context=context)
  269. @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
  270. def admin_ils_users_delete(id):
  271. # Check to see if user is logged in
  272. if not requires_auth():
  273. return redirect(url_for('login'))
  274. # Get the user from the DB
  275. user = IlsUser.get(IlsUser.id == id)
  276. username = user.username
  277. user.delete_instance()
  278. Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
  279. return redirect(url_for('admin_ils_users'))
  280. # create a route for generating a CSV file for download
  281. @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
  282. def admin_ils_users_csv_download():
  283. # Check to see if user is logged in
  284. if not requires_auth():
  285. return redirect(url_for('login'))
  286. # Create a CSV file with the users and don't add a blank line between rows
  287. with open('users.csv', 'w', newline='') as f:
  288. writer = csv.writer(f)
  289. writer.writerow(['username', 'email'])
  290. users = IlsUser.select().execute()
  291. for user in users:
  292. writer.writerow([user.username, user.email])
  293. Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
  294. # return the CSV file to the user
  295. return send_file('users.csv', as_attachment=True)
  296. @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
  297. def admin_ils_users_csv_import():
  298. # Check to see if user is logged in
  299. if not requires_auth():
  300. return redirect(url_for('login'))
  301. message = None
  302. if request.method == 'POST':
  303. csv_file = request.files['csv']
  304. if csv_file.filename != '':
  305. csv_file.save(os.path.join('uploads', csv_file.filename))
  306. with open(os.path.join('uploads', csv_file.filename), 'r') as f:
  307. reader = csv.reader(f)
  308. for row in reader:
  309. username = row[0]
  310. email = row[1]
  311. # ignore the header row
  312. if username == 'username':
  313. continue
  314. # ignore blank rows
  315. if username == '':
  316. continue
  317. # Check if user already exists and if it does update the entry
  318. try:
  319. user = IlsUser.filter(IlsUser.username == username).first()
  320. except Exception as e:
  321. print(e)
  322. user = None
  323. if user:
  324. user.email = email
  325. user.reset_datetime = datetime.datetime.now()
  326. user.save()
  327. else:
  328. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  329. # Delete the uploaded file
  330. os.remove(os.path.join('uploads', csv_file.filename))
  331. return redirect(url_for('admin_ils_users'))
  332. context = {
  333. 'message': message,
  334. }
  335. return render_template('csv.html', context=context)
  336. @app.route('/admin/settings', methods=['GET', 'POST'])
  337. def settings():
  338. # Check to see if user is logged in
  339. if not requires_auth():
  340. return redirect(url_for('login'))
  341. message = None
  342. # Process form submission
  343. if request.method == 'POST':
  344. # Assign form values to variables
  345. id = request.form.get('id')
  346. value = request.form.get('value')
  347. # Check if the setting exists
  348. try:
  349. setting = Settings.get(Settings.id == id)
  350. except Exception as e:
  351. print(e)
  352. setting = None
  353. if setting:
  354. # Update the setting
  355. old_value = setting.value
  356. setting.value = value
  357. setting.save()
  358. Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
  359. old_value,
  360. value)).save()
  361. message = '%s updated successfully' % setting.name
  362. # Get settings from DB
  363. all_settings = Settings.select().execute()
  364. context = {
  365. 'settings': all_settings,
  366. 'message': message,
  367. }
  368. return render_template('settings.html', context=context)
  369. @app.route('/admin/system/log')
  370. def system_log():
  371. # Check to see if user is logged in
  372. if not requires_auth():
  373. return redirect(url_for('login'))
  374. # Get all logs from the DB
  375. logs = Log.select().order_by(Log.id.desc()).execute()
  376. context = {
  377. 'logs': logs,
  378. }
  379. return render_template('system_log.html', context=context)
  380. @app.route('/admin/system/log/password/resets')
  381. def password_reset_log():
  382. # Check to see if user is logged in
  383. if not requires_auth():
  384. return redirect(url_for('login'))
  385. # Get all logs from the DB
  386. logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
  387. context = {
  388. 'logs': logs,
  389. }
  390. return render_template('password_reset_log.html', context=context)
  391. @app.route('/logout')
  392. def logout():
  393. if 'username' in session:
  394. username = session['username']
  395. user = User.get(User.username == username)
  396. user.logged_in = False
  397. Log.create(username=session['username'], action='Logged out').save()
  398. user.save()
  399. session.pop('username', None)
  400. return redirect(url_for('login'))
  401. @app.route('/login', methods=['GET', 'POST'])
  402. def login():
  403. if request.method == 'POST':
  404. username = request.form.get('username')
  405. password = encrypt_password(request.form.get('password'))
  406. try:
  407. user = User.filter(User.username == username and User.password == password).first()
  408. except Exception as e:
  409. print(e)
  410. user = None
  411. session.pop('username', None)
  412. if user:
  413. # Login user
  414. session['username'] = request.form.get('username')
  415. user.logged_in = True
  416. Log.create(username=session['username'], action='Logged in').save()
  417. user.save()
  418. return redirect(url_for('admin'))
  419. else:
  420. error = 'Invalid Credentials. Please try again.'
  421. context = {
  422. 'error': error
  423. }
  424. return render_template('login.html', context=context)
  425. context = {
  426. }
  427. return render_template('login.html', context=context)
  428. # on exit of the program make sure the http server is stopped
  429. #@app.teardown_appcontext
  430. if __name__ == "__main__":
  431. print("------------------------- Start up -----------------------------")
  432. print("Starting HTTP Service on port %s..." % http_port)
  433. if debug is True:
  434. print("Debug mode is enabled.")
  435. http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
  436. else:
  437. http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
  438. print("HTTP Service Started.")
  439. print("--------------------- Application Details ---------------------")
  440. print("Application started at %s" % datetime.datetime.now())
  441. print("System IP Address: %s" % get_ip_address())
  442. print("System Hostname: %s" % get_hostname())
  443. print("Access the Dashboard using a web browser using any of the following:")
  444. print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
  445. print("---------------------------------------------------------------")
  446. print("To stop the application close this window.")
  447. print("---------------------------------------------------------------")
  448. http_server.serve_forever()