app.py 20 KB


  1. import datetime as datetime
  2. from urllib.parse import urlparse
  3. import webbrowser
  4. from emailtool.emailer import Emailer
  5. from flask import Flask, render_template, request, redirect, url_for, session, send_file
  6. from gevent.pywsgi import WSGIServer
  7. import socket
  8. import logging
  9. from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn
  10. import hashlib
  11. import os
  12. import csv
  13. db_name = 'app.db'
  14. db = SqliteDatabase(db_name)
  15. class User(Model):
  16. username = CharField()
  17. password = CharField()
  18. date_created = DateTimeField(default=datetime.datetime.now)
  19. logged_in = BooleanField(default=False)
  20. class Meta:
  21. database = db
  22. class IlsUser(Model):
  23. username = CharField()
  24. email = CharField()
  25. reset_datetime = DateTimeField(default=datetime.datetime.now)
  26. class Meta:
  27. database = db
  28. class Settings(Model):
  29. name = CharField()
  30. value = CharField()
  31. class Meta:
  32. database = db
  33. class Schedule(Model):
  34. interval = CharField()
  35. class Meta:
  36. database = db
  37. class Log(Model):
  38. date = DateTimeField(default=datetime.datetime.now)
  39. username = CharField()
  40. action = CharField()
  41. class Meta:
  42. database = db
  43. class PasswordResetLog(Model):
  44. date = DateTimeField(default=datetime.datetime.now)
  45. username = CharField()
  46. class Meta:
  47. database = db
  48. settings = Settings.select().execute()
  49. debug_setting = Settings.get(Settings.name == 'Debug Mode')
  50. debug = debug_setting.value
  51. http_port_setting = Settings.get(Settings.name == 'HTTP Port')
  52. http_port = http_port_setting.value
  53. smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
  54. smtp_host = smtp_host_setting.value
  55. smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
  56. smtp_port = smtp_port_setting.value
  57. smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
  58. smtp_username = smtp_username_setting.value
  59. smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
  60. smtp_password = smtp_password_setting.value
  61. domain_name_setting = Settings.get(Settings.name == 'Domain Name')
  62. domain_name = domain_name_setting.value
  63. password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
  64. password_reset_url = password_reset_url_setting.value
  65. log = logging.getLogger('werkzeug')
  66. log.setLevel(logging.INFO)
  67. if debug.lower() == 'true':
  68. debug = True
  69. else:
  70. debug = False
  71. def send_email(to, subject, body):
  72. # Get settings from smtp_settings
  73. host = smtp_host
  74. port = smtp_port
  75. username = smtp_username
  76. password = smtp_password
  77. # Create an instance of the Emailer class
  78. emailer = Emailer(host, port, username, password)
  79. # Call the send_email method
  80. emailer.send_email(to, subject, body)
  81. # Encrypt the password with SHA256
  82. def encrypt_password(password):
  83. hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
  84. return hash
  85. def shutdown_session(exception=None):
  86. print('Stopping HTTP Service...')
  87. http_server.stop()
  88. # Get the systems hostname
  89. def get_hostname():
  90. return socket.gethostname()
  91. # Get systems IP address
  92. def get_ip_address():
  93. return socket.gethostbyname(socket.gethostname())
  94. # Method to check if a URL is valid using regex
  95. def is_valid_url(url):
  96. try:
  97. result = urlparse(url)
  98. if all([result.scheme, result.netloc]):
  99. url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
  100. else:
  101. url = False
  102. return url
  103. except:
  104. return False
  105. def requires_auth():
  106. if 'username' in session:
  107. username = session['username']
  108. user = User.get(User.username == username)
  109. if user.logged_in is True:
  110. return True
  111. else:
  112. return False
  113. else:
  114. return False
  115. # Check for DB tables and create if they don't exist
  116. if db.table_exists('user') is False:
  117. db.create_tables([User, IlsUser])
  118. User.create(username='admin', password=encrypt_password('admin'),
  119. date_created=datetime.datetime.now(), logged_in=False).save()
  120. if db.table_exists('ilsuser') is False:
  121. db.create_tables([IlsUser])
  122. if db.table_exists('settings') is False:
  123. db.create_tables([Settings])
  124. Settings.create(name='Debug Mode', value=False).save()
  125. Settings.create(name='HTTP Port', value=5055).save()
  126. Settings.create(name='SMTP Host', value="").save()
  127. Settings.create(name='SMTP Port', value="587").save()
  128. Settings.create(name='SMTP Username', value="").save()
  129. Settings.create(name='SMTP Password', value="").save()
  130. Settings.create(name='Domain Name', value="lynx").save()
  131. Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
  132. if db.table_exists('log') is False:
  133. db.create_tables([Log])
  134. if db.table_exists('password_reset_log') is False:
  135. db.create_tables([PasswordResetLog])
  136. if db.table_exists('schedule') is False:
  137. db.create_tables([Schedule])
  138. db.close()
  139. app = Flask(__name__)
  140. app.secret_key = os.urandom(24)
  141. def format_time_ago(timestamp):
  142. """Calculate the time passed since a datetime stamp and format it as a human-readable string."""
  143. now = datetime.datetime.utcnow()
  144. diff = now - timestamp
  145. if diff.days > 365:
  146. years = diff.days // 365
  147. return f"{years} year{'s' if years > 1 else ''} ago"
  148. if diff.days > 30:
  149. months = diff.days // 30
  150. return f"{months} month{'s' if months > 1 else ''} ago"
  151. if diff.days > 0:
  152. return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
  153. if diff.seconds > 3600:
  154. hours = diff.seconds // 3600
  155. return f"{hours} hour{'s' if hours > 1 else ''} ago"
  156. if diff.seconds > 60:
  157. minutes = diff.seconds // 60
  158. return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
  159. return "just now"
  160. app.jinja_env.filters['time_since'] = format_time_ago
  161. @app.before_request
  162. def before_request():
  163. db.connect()
  164. @app.after_request
  165. def after_request(response):
  166. db.close()
  167. return response
  168. # Create a route for the home page
  169. @app.route('/', methods=['GET', 'POST'])
  170. def index():
  171. # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
  172. error = None
  173. reset = False
  174. reset_url = is_valid_url(password_reset_url)
  175. reset_url_error = False
  176. if reset_url is False:
  177. reset_url_error = True
  178. if request.method == 'POST':
  179. username = request.form.get('username')
  180. # Check for the username in the DB
  181. try:
  182. user = IlsUser.filter(IlsUser.username == username).first()
  183. except Exception as e:
  184. print(e)
  185. user = None
  186. if user:
  187. # Reset login datetime
  188. user.reset_datetime = datetime.datetime.now()
  189. PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
  190. user.save()
  191. # Open the reset URL in a new tab if the URL is valid
  192. if reset_url is not False:
  193. webbrowser.open_new_tab(str(reset_url))
  194. # Set reset to True to pass back to the view to display the correct content back to the user.
  195. reset = True
  196. else:
  197. error = 'Invalid username'
  198. context = {
  199. 'domain': domain_name,
  200. 'error': error,
  201. 'reset': reset,
  202. 'reset_url': reset_url,
  203. 'reset_url_error': reset_url_error,
  204. }
  205. return render_template('index.html', context=context)
  206. # Create a route for admin page
  207. @app.route('/admin/')
  208. def admin():
  209. # Check to see if user is logged in
  210. if not requires_auth():
  211. return redirect(url_for('login'))
  212. return render_template('admin.html')
  213. @app.route('/admin/users/', methods=['GET', 'POST'])
  214. def admin_users():
  215. # Check to see if user is logged in
  216. if not requires_auth():
  217. return redirect(url_for('login'))
  218. message = None
  219. if request.method == 'POST':
  220. username = request.form.get('username')
  221. password = request.form.get('password')
  222. confirm_password = request.form.get('confirm_password')
  223. # Check to see if username already exists
  224. try:
  225. user = User.filter(User.username == username).first()
  226. except Exception as e:
  227. print(e)
  228. user = None
  229. if user:
  230. message = 'Username already exists'
  231. else:
  232. if password == confirm_password:
  233. User.create(username=username, password=encrypt_password(password),
  234. date_created=datetime.datetime.now(), logged_in=False).save()
  235. message = 'User created successfully'
  236. Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
  237. else:
  238. message = 'Passwords do not match'
  239. # Get all admin users from the DB
  240. users = User.select().execute()
  241. context = {
  242. 'users': users,
  243. 'message': message,
  244. }
  245. return render_template('admin_users.html', context=context)
  246. @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
  247. def admin_users_delete(id):
  248. # Check to see if user is logged in
  249. if not requires_auth():
  250. return redirect(url_for('login'))
  251. # Get the user from the DB
  252. user = User.get(User.id == id)
  253. if user.username != 'admin':
  254. # Unset session variable if the user is deleting their own account
  255. if user.username == session['username']:
  256. session.pop('username', None)
  257. username = user.username
  258. user.delete_instance()
  259. Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
  260. return redirect(url_for('admin_users'))
  261. @app.route('/admin/users/ils', methods=['GET', 'POST'])
  262. def admin_ils_users():
  263. # Check to see if user is logged in
  264. if not requires_auth():
  265. return redirect(url_for('login'))
  266. message = None
  267. if request.method == 'POST':
  268. username = request.form.get('username')
  269. email = request.form.get('email')
  270. # Check to see if username already exists
  271. try:
  272. user = IlsUser.filter(IlsUser.username == username).first()
  273. except Exception as e:
  274. print(e)
  275. user = None
  276. if user:
  277. message = 'Username already exists'
  278. else:
  279. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  280. message = 'ILS User: %s created successfully' % username
  281. Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save()
  282. # Get all admin users from the DB
  283. users = IlsUser.select().execute()
  284. context = {
  285. 'users': users,
  286. 'message': message,
  287. }
  288. return render_template('admin_ils_users.html', context=context)
  289. @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
  290. def admin_ils_users_delete(id):
  291. # Check to see if user is logged in
  292. if not requires_auth():
  293. return redirect(url_for('login'))
  294. # Get the user from the DB
  295. user = IlsUser.get(IlsUser.id == id)
  296. username = user.username
  297. user.delete_instance()
  298. Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
  299. return redirect(url_for('admin_ils_users'))
  300. # create a route for generating a CSV file for download
  301. @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
  302. def admin_ils_users_csv_download():
  303. # Check to see if user is logged in
  304. if not requires_auth():
  305. return redirect(url_for('login'))
  306. # Create a CSV file with the users and don't add a blank line between rows
  307. with open('users.csv', 'w', newline='') as f:
  308. writer = csv.writer(f)
  309. writer.writerow(['username', 'email'])
  310. users = IlsUser.select().execute()
  311. for user in users:
  312. writer.writerow([user.username, user.email])
  313. Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
  314. # return the CSV file to the user
  315. return send_file('users.csv', as_attachment=True)
  316. @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
  317. def admin_ils_users_csv_import():
  318. # Check to see if user is logged in
  319. if not requires_auth():
  320. return redirect(url_for('login'))
  321. message = None
  322. if request.method == 'POST':
  323. csv_file = request.files['csv']
  324. if csv_file.filename != '':
  325. csv_file.save(os.path.join('uploads', csv_file.filename))
  326. with open(os.path.join('uploads', csv_file.filename), 'r') as f:
  327. reader = csv.reader(f)
  328. for row in reader:
  329. username = row[0]
  330. email = row[1]
  331. # ignore the header row
  332. if username == 'username':
  333. continue
  334. # ignore blank rows
  335. if username == '':
  336. continue
  337. # Check if user already exists and if it does update the entry
  338. try:
  339. user = IlsUser.filter(IlsUser.username == username).first()
  340. except Exception as e:
  341. print(e)
  342. user = None
  343. if user:
  344. user.email = email
  345. user.reset_datetime = datetime.datetime.now()
  346. user.save()
  347. else:
  348. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  349. # Delete the uploaded file
  350. os.remove(os.path.join('uploads', csv_file.filename))
  351. return redirect(url_for('admin_ils_users'))
  352. context = {
  353. 'message': message,
  354. }
  355. return render_template('csv.html', context=context)
  356. @app.route('/admin/settings', methods=['GET', 'POST'])
  357. def settings():
  358. # Check to see if user is logged in
  359. if not requires_auth():
  360. return redirect(url_for('login'))
  361. message = None
  362. # Process form submission
  363. if request.method == 'POST':
  364. # Assign form values to variables
  365. id = request.form.get('id')
  366. value = request.form.get('value')
  367. # Check if the setting exists
  368. try:
  369. setting = Settings.get(Settings.id == id)
  370. except Exception as e:
  371. print(e)
  372. setting = None
  373. if setting:
  374. # Update the setting
  375. old_value = setting.value
  376. setting.value = value
  377. setting.save()
  378. Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
  379. old_value,
  380. value)).save()
  381. message = '%s updated successfully' % setting.name
  382. # Get settings from DB
  383. all_settings = Settings.select().execute()
  384. context = {
  385. 'settings': all_settings,
  386. 'message': message,
  387. }
  388. return render_template('settings.html', context=context)
  389. @app.route('/admin/schedule', methods=['GET', 'POST'])
  390. def schedule():
  391. # Check to see if user is logged in
  392. if not requires_auth():
  393. return redirect(url_for('login'))
  394. message = None
  395. # add schedule
  396. if request.method == 'POST':
  397. # Assign form values to variables
  398. interval = request.form.get('interval')
  399. # Check if the schedule already exists
  400. try:
  401. schedule = Schedule.get(Schedule.interval == interval)
  402. except Exception as e:
  403. print(e)
  404. schedule = None
  405. if schedule:
  406. message = 'Schedule already exists'
  407. else:
  408. # Create the schedule
  409. Schedule.create(interval=interval).save()
  410. Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save()
  411. message = 'Schedule: %s created successfully' % interval
  412. # Get all schedules from the DB
  413. schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
  414. context = {
  415. 'schedules': schedules,
  416. 'message': message,
  417. }
  418. return render_template('schedule.html', context=context)
  419. # remove schedule
  420. @app.route('/admin/schedule/remove/<int:id>', methods=['GET', 'POST'])
  421. def schedule_remove(id):
  422. # Check to see if user is logged in
  423. if not requires_auth():
  424. return redirect(url_for('login'))
  425. # Get the schedule from the DB
  426. schedule = Schedule.get(Schedule.id == id)
  427. schedule.delete_instance()
  428. Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save()
  429. return redirect(url_for('schedule'))
  430. @app.route('/admin/system/log')
  431. def system_log():
  432. # Check to see if user is logged in
  433. if not requires_auth():
  434. return redirect(url_for('login'))
  435. # Get all logs from the DB
  436. logs = Log.select().order_by(Log.id.desc()).execute()
  437. context = {
  438. 'logs': logs,
  439. }
  440. return render_template('system_log.html', context=context)
  441. @app.route('/admin/system/log/password/resets')
  442. def password_reset_log():
  443. # Check to see if user is logged in
  444. if not requires_auth():
  445. return redirect(url_for('login'))
  446. # Get all logs from the DB
  447. logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
  448. context = {
  449. 'logs': logs,
  450. }
  451. return render_template('password_reset_log.html', context=context)
  452. @app.route('/logout')
  453. def logout():
  454. if 'username' in session:
  455. username = session['username']
  456. user = User.get(User.username == username)
  457. user.logged_in = False
  458. Log.create(username=session['username'], action='Logged out').save()
  459. user.save()
  460. session.pop('username', None)
  461. return redirect(url_for('login'))
  462. @app.route('/login', methods=['GET', 'POST'])
  463. def login():
  464. if request.method == 'POST':
  465. username = request.form.get('username')
  466. password = encrypt_password(request.form.get('password'))
  467. try:
  468. user = User.filter(User.username == username and User.password == password).first()
  469. except Exception as e:
  470. print(e)
  471. user = None
  472. session.pop('username', None)
  473. if user:
  474. # Login user
  475. session['username'] = request.form.get('username')
  476. user.logged_in = True
  477. Log.create(username=session['username'], action='Logged in').save()
  478. user.save()
  479. return redirect(url_for('admin'))
  480. else:
  481. error = 'Invalid Credentials. Please try again.'
  482. context = {
  483. 'error': error
  484. }
  485. return render_template('login.html', context=context)
  486. context = {
  487. }
  488. return render_template('login.html', context=context)
  489. # on exit of the program make sure the http server is stopped
  490. # @app.teardown_appcontext
  491. if __name__ == "__main__":
  492. print("------------------------- Start up -----------------------------")
  493. print("Starting HTTP Service on port %s..." % http_port)
  494. if debug is True:
  495. print("Debug mode is enabled.")
  496. http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
  497. else:
  498. http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
  499. print("HTTP Service Started.")
  500. print("--------------------- Application Details ---------------------")
  501. print("Application started at %s" % datetime.datetime.now())
  502. print("System IP Address: %s" % get_ip_address())
  503. print("System Hostname: %s" % get_hostname())
  504. print("Access the Dashboard using a web browser using any of the following:")
  505. print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
  506. print("---------------------------------------------------------------")
  507. print("To stop the application close this window.")
  508. print("---------------------------------------------------------------")
  509. http_server.serve_forever()