app.py 22 KB


  1. import datetime as datetime
  2. from urllib.parse import urlparse
  3. import webbrowser
  4. from emailtool.emailer import Emailer
  5. from flask import Flask, render_template, request, redirect, url_for, session, send_file
  6. from gevent.pywsgi import WSGIServer
  7. import socket
  8. import logging
  9. from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn
  10. import hashlib
  11. import os
  12. import csv
  13. db_name = 'app.db'
  14. db = SqliteDatabase(db_name)
  15. class User(Model):
  16. username = CharField()
  17. password = CharField()
  18. date_created = DateTimeField(default=datetime.datetime.now)
  19. logged_in = BooleanField(default=False)
  20. class Meta:
  21. database = db
  22. class IlsUser(Model):
  23. username = CharField()
  24. email = CharField()
  25. reset_datetime = DateTimeField(default=datetime.datetime.now)
  26. class Meta:
  27. database = db
  28. class Settings(Model):
  29. name = CharField()
  30. value = CharField()
  31. class Meta:
  32. database = db
  33. class Schedule(Model):
  34. interval = CharField()
  35. class Meta:
  36. database = db
  37. class Log(Model):
  38. date = DateTimeField(default=datetime.datetime.now)
  39. username = CharField()
  40. action = CharField()
  41. class Meta:
  42. database = db
  43. class PasswordResetLog(Model):
  44. date = DateTimeField(default=datetime.datetime.now)
  45. username = CharField()
  46. class Meta:
  47. database = db
  48. # Encrypt the password with SHA256
  49. def encrypt_password(password):
  50. hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
  51. return hash
  52. # Check for DB tables and create if they don't exist
  53. if db.table_exists('user') is False:
  54. db.create_tables([User, ])
  55. User.create(username='admin', password=encrypt_password('admin'),
  56. date_created=datetime.datetime.now(), logged_in=False).save()
  57. if db.table_exists('ilsuser') is False:
  58. db.create_tables([IlsUser, ])
  59. if db.table_exists('settings') is False:
  60. db.create_tables([Settings, ])
  61. Settings.create(name='Debug Mode', value=False).save()
  62. Settings.create(name='HTTP Port', value=5055).save()
  63. Settings.create(name='SMTP Host', value="").save()
  64. Settings.create(name='SMTP Port', value="587").save()
  65. Settings.create(name='SMTP Username', value="").save()
  66. Settings.create(name='SMTP Password', value="").save()
  67. Settings.create(name='Domain Name', value="lynx").save()
  68. Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
  69. if db.table_exists('log') is False:
  70. db.create_tables([Log, ])
  71. if db.table_exists('password_reset_log') is False:
  72. db.create_tables([PasswordResetLog, ])
  73. if db.table_exists('schedule') is False:
  74. db.create_tables([Schedule, ])
  75. db.close()
  76. settings = Settings.select().execute()
  77. debug_setting = Settings.get(Settings.name == 'Debug Mode')
  78. debug = debug_setting.value
  79. http_port_setting = Settings.get(Settings.name == 'HTTP Port')
  80. http_port = http_port_setting.value
  81. smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
  82. smtp_host = smtp_host_setting.value
  83. smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
  84. smtp_port = smtp_port_setting.value
  85. smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
  86. smtp_username = smtp_username_setting.value
  87. smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
  88. smtp_password = smtp_password_setting.value
  89. domain_name_setting = Settings.get(Settings.name == 'Domain Name')
  90. domain_name = domain_name_setting.value
  91. password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
  92. password_reset_url = password_reset_url_setting.value
  93. log = logging.getLogger('werkzeug')
  94. log.setLevel(logging.INFO)
  95. if debug.lower() == 'true':
  96. debug = True
  97. else:
  98. debug = False
  99. def send_email(to, subject, body):
  100. # Get settings from smtp_settings
  101. host = smtp_host
  102. port = smtp_port
  103. username = smtp_username
  104. password = smtp_password
  105. # Create an instance of the Emailer class
  106. emailer = Emailer(host, port, username, password)
  107. # Call the send_email method
  108. emailer.send_email(to, subject, body)
  109. # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
  110. def shutdown_session(exception=None):
  111. print('Stopping HTTP Service...')
  112. http_server.stop()
  113. # Get the systems hostname
  114. def get_hostname():
  115. return socket.gethostname()
  116. # Get systems IP address
  117. def get_ip_address():
  118. return socket.gethostbyname(socket.gethostname())
  119. # Method to check if a URL is valid using regex
  120. def is_valid_url(url):
  121. try:
  122. result = urlparse(url)
  123. if all([result.scheme, result.netloc]):
  124. url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
  125. else:
  126. url = False
  127. return url
  128. except:
  129. return False
  130. def requires_auth():
  131. if 'username' in session:
  132. username = session['username']
  133. user = User.get(User.username == username)
  134. if user.logged_in is True:
  135. return True
  136. else:
  137. return False
  138. else:
  139. return False
  140. def admin_password_check():
  141. admin_user = User.get(User.username == 'admin')
  142. if admin_user.password == encrypt_password('admin'):
  143. return True
  144. db.close()
  145. app = Flask(__name__)
  146. app.secret_key = os.urandom(24)
  147. def format_time_ago(timestamp):
  148. """Calculate the time passed since a datetime stamp and format it as a human-readable string."""
  149. now = datetime.datetime.utcnow()
  150. diff = now - timestamp
  151. if diff.days > 365:
  152. years = diff.days // 365
  153. return f"{years} year{'s' if years > 1 else ''} ago"
  154. if diff.days > 30:
  155. months = diff.days // 30
  156. return f"{months} month{'s' if months > 1 else ''} ago"
  157. if diff.days > 0:
  158. return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
  159. if diff.seconds > 3600:
  160. hours = diff.seconds // 3600
  161. return f"{hours} hour{'s' if hours > 1 else ''} ago"
  162. if diff.seconds > 60:
  163. minutes = diff.seconds // 60
  164. return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
  165. return "just now"
  166. app.jinja_env.filters['time_since'] = format_time_ago
  167. @app.before_request
  168. def before_request():
  169. db.connect()
  170. @app.after_request
  171. def after_request(response):
  172. db.close()
  173. return response
  174. @app.route('/admin/password/reset', methods=['GET', 'POST'])
  175. def admin_password_reset():
  176. message = None
  177. if request.method == 'POST':
  178. password = request.form.get('password')
  179. password_confirm = request.form.get('password_confirm')
  180. if password != password_confirm:
  181. message = 'Passwords do not match'
  182. else:
  183. try:
  184. user = User.get(User.username == 'admin')
  185. user.password = encrypt_password(password)
  186. user.logged_in = True
  187. user.save()
  188. session['username'] = 'admin'
  189. return redirect(url_for('admin'))
  190. except Exception as e:
  191. print(e)
  192. message = 'Username not found'
  193. context = {
  194. 'message': message,
  195. }
  196. return render_template('admin_password_reset.html', context=context)
  197. # Create a route for the home page
  198. @app.route('/', methods=['GET', 'POST'])
  199. def index():
  200. error = None
  201. reset = False
  202. reset_url = is_valid_url(password_reset_url)
  203. reset_url_error = False
  204. if reset_url is False:
  205. reset_url_error = True
  206. if request.method == 'POST':
  207. username = request.form.get('username')
  208. # Check for the username in the DB
  209. try:
  210. user = IlsUser.filter(IlsUser.username == username).first()
  211. except Exception as e:
  212. print(e)
  213. user = None
  214. if user:
  215. # Reset login datetime
  216. user.reset_datetime = datetime.datetime.now()
  217. PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
  218. user.save()
  219. # Open the reset URL in a new tab if the URL is valid
  220. if reset_url is not False:
  221. webbrowser.open_new_tab(str(reset_url))
  222. # Set reset to True to pass back to the view to display the correct content back to the user.
  223. reset = True
  224. else:
  225. error = 'Invalid username'
  226. context = {
  227. 'domain': domain_name,
  228. 'error': error,
  229. 'reset': reset,
  230. 'reset_url': reset_url,
  231. 'reset_url_error': reset_url_error,
  232. }
  233. return render_template('index.html', context=context)
  234. # Create a route for admin page
  235. @app.route('/admin/')
  236. def admin():
  237. if admin_password_check():
  238. return redirect(url_for('admin_password_reset'))
  239. # Check to see if user is logged in
  240. if not requires_auth():
  241. return redirect(url_for('login'))
  242. return render_template('admin.html')
  243. @app.route('/admin/users/', methods=['GET', 'POST'])
  244. def admin_users():
  245. # Check to see if user is logged in
  246. if not requires_auth():
  247. return redirect(url_for('login'))
  248. message = None
  249. if request.method == 'POST':
  250. username = request.form.get('username')
  251. password = request.form.get('password')
  252. confirm_password = request.form.get('confirm_password')
  253. # Check to see if username already exists
  254. try:
  255. user = User.filter(User.username == username).first()
  256. except Exception as e:
  257. print(e)
  258. user = None
  259. if user:
  260. message = 'Username already exists'
  261. else:
  262. if password == confirm_password:
  263. User.create(username=username, password=encrypt_password(password),
  264. date_created=datetime.datetime.now(), logged_in=False).save()
  265. message = 'User created successfully'
  266. Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
  267. else:
  268. message = 'Passwords do not match'
  269. # Get all admin users from the DB
  270. users = User.select().execute()
  271. context = {
  272. 'users': users,
  273. 'message': message,
  274. }
  275. return render_template('admin_users.html', context=context)
  276. @app.route('/admin/users/edit/<int:id>', methods=['GET', 'POST'])
  277. def admin_users_edit(id):
  278. # Check to see if user is logged in
  279. if not requires_auth():
  280. return redirect(url_for('login'))
  281. # Get the user from the DB
  282. user = User.get(User.id == id)
  283. message = None
  284. if request.method == 'POST':
  285. username = request.form.get('username')
  286. password = request.form.get('password')
  287. confirm_password = request.form.get('confirm_password')
  288. # Check to see if username already exists
  289. try:
  290. user = User.filter(User.username == username).first()
  291. except Exception as e:
  292. print(e)
  293. user = None
  294. if user.username != username:
  295. message = 'Username already exists'
  296. else:
  297. user.username = username
  298. if password is not None or password != '':
  299. if password == confirm_password:
  300. user.password = encrypt_password(password)
  301. else:
  302. message = 'Passwords do not match'
  303. user.save()
  304. message = 'User updated successfully'
  305. Log.create(username=session['username'], action='Updated admin user: %s' % username, ).save()
  306. context = {
  307. 'user': user,
  308. 'message': message,
  309. }
  310. return render_template('admin_user_edit.html', context=context)
  311. @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
  312. def admin_users_delete(id):
  313. # Check to see if user is logged in
  314. if not requires_auth():
  315. return redirect(url_for('login'))
  316. # Get the user from the DB
  317. user = User.get(User.id == id)
  318. if user.username != 'admin':
  319. # Unset session variable if the user is deleting their own account
  320. if user.username == session['username']:
  321. session.pop('username', None)
  322. username = user.username
  323. user.delete_instance()
  324. Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
  325. return redirect(url_for('admin_users'))
  326. @app.route('/admin/users/ils', methods=['GET', 'POST'])
  327. def admin_ils_users():
  328. # Check to see if user is logged in
  329. if not requires_auth():
  330. return redirect(url_for('login'))
  331. message = None
  332. if request.method == 'POST':
  333. username = request.form.get('username')
  334. email = request.form.get('email')
  335. # Check to see if username already exists
  336. try:
  337. user = IlsUser.filter(IlsUser.username == username).first()
  338. except Exception as e:
  339. print(e)
  340. user = None
  341. if user:
  342. message = 'Username already exists'
  343. else:
  344. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  345. message = 'ILS User: %s created successfully' % username
  346. Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save()
  347. # Get all admin users from the DB
  348. users = IlsUser.select().execute()
  349. context = {
  350. 'users': users,
  351. 'message': message,
  352. }
  353. return render_template('admin_ils_users.html', context=context)
  354. @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
  355. def admin_ils_users_delete(id):
  356. # Check to see if user is logged in
  357. if not requires_auth():
  358. return redirect(url_for('login'))
  359. # Get the user from the DB
  360. user = IlsUser.get(IlsUser.id == id)
  361. username = user.username
  362. user.delete_instance()
  363. Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
  364. return redirect(url_for('admin_ils_users'))
  365. # create a route for generating a CSV file for download
  366. @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
  367. def admin_ils_users_csv_download():
  368. # Check to see if user is logged in
  369. if not requires_auth():
  370. return redirect(url_for('login'))
  371. # Create a CSV file with the users and don't add a blank line between rows
  372. with open('users.csv', 'w', newline='') as f:
  373. writer = csv.writer(f)
  374. writer.writerow(['username', 'email'])
  375. users = IlsUser.select().execute()
  376. for user in users:
  377. writer.writerow([user.username, user.email])
  378. Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
  379. # return the CSV file to the user
  380. return send_file('users.csv', as_attachment=True)
  381. @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
  382. def admin_ils_users_csv_import():
  383. # Check to see if user is logged in
  384. if not requires_auth():
  385. return redirect(url_for('login'))
  386. message = None
  387. if request.method == 'POST':
  388. csv_file = request.files['csv']
  389. if csv_file.filename != '':
  390. csv_file.save(os.path.join('uploads', csv_file.filename))
  391. with open(os.path.join('uploads', csv_file.filename), 'r') as f:
  392. reader = csv.reader(f)
  393. for row in reader:
  394. username = row[0]
  395. email = row[1]
  396. # ignore the header row
  397. if username == 'username':
  398. continue
  399. # ignore blank rows
  400. if username == '':
  401. continue
  402. # Check if user already exists and if it does update the entry
  403. try:
  404. user = IlsUser.filter(IlsUser.username == username).first()
  405. except Exception as e:
  406. print(e)
  407. user = None
  408. if user:
  409. user.email = email
  410. user.reset_datetime = datetime.datetime.now()
  411. user.save()
  412. else:
  413. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  414. # Delete the uploaded file
  415. os.remove(os.path.join('uploads', csv_file.filename))
  416. return redirect(url_for('admin_ils_users'))
  417. context = {
  418. 'message': message,
  419. }
  420. return render_template('csv.html', context=context)
  421. @app.route('/admin/settings', methods=['GET', 'POST'])
  422. def settings():
  423. # Check to see if user is logged in
  424. if not requires_auth():
  425. return redirect(url_for('login'))
  426. message = None
  427. # Process form submission
  428. if request.method == 'POST':
  429. # Assign form values to variables
  430. id = request.form.get('id')
  431. value = request.form.get('value')
  432. # Check if the setting exists
  433. try:
  434. setting = Settings.get(Settings.id == id)
  435. except Exception as e:
  436. print(e)
  437. setting = None
  438. if setting:
  439. # Update the setting
  440. old_value = setting.value
  441. setting.value = value
  442. setting.save()
  443. Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
  444. old_value,
  445. value)).save()
  446. message = '%s updated successfully' % setting.name
  447. # Get settings from DB
  448. all_settings = Settings.select().execute()
  449. context = {
  450. 'settings': all_settings,
  451. 'message': message,
  452. }
  453. return render_template('settings.html', context=context)
  454. @app.route('/admin/schedule', methods=['GET', 'POST'])
  455. def schedule():
  456. # Check to see if user is logged in
  457. if not requires_auth():
  458. return redirect(url_for('login'))
  459. message = None
  460. # add schedule
  461. if request.method == 'POST':
  462. # Assign form values to variables
  463. interval = request.form.get('interval')
  464. # Check if the schedule already exists
  465. try:
  466. schedule = Schedule.get(Schedule.interval == interval)
  467. except Exception as e:
  468. print(e)
  469. schedule = None
  470. if schedule:
  471. message = 'Schedule already exists'
  472. else:
  473. # Create the schedule
  474. Schedule.create(interval=interval).save()
  475. Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save()
  476. message = 'Schedule: %s created successfully' % interval
  477. # Get all schedules from the DB
  478. schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
  479. context = {
  480. 'schedules': schedules,
  481. 'message': message,
  482. }
  483. return render_template('schedule.html', context=context)
  484. # remove schedule
  485. @app.route('/admin/schedule/remove/<int:id>', methods=['GET', 'POST'])
  486. def schedule_remove(id):
  487. # Check to see if user is logged in
  488. if not requires_auth():
  489. return redirect(url_for('login'))
  490. # Get the schedule from the DB
  491. schedule = Schedule.get(Schedule.id == id)
  492. schedule.delete_instance()
  493. Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save()
  494. return redirect(url_for('schedule'))
  495. @app.route('/admin/system/log')
  496. def system_log():
  497. # Check to see if user is logged in
  498. if not requires_auth():
  499. return redirect(url_for('login'))
  500. # Get all logs from the DB
  501. logs = Log.select().order_by(Log.id.desc()).execute()
  502. context = {
  503. 'logs': logs,
  504. }
  505. return render_template('system_log.html', context=context)
  506. @app.route('/admin/system/log/password/resets')
  507. def password_reset_log():
  508. # Check to see if user is logged in
  509. if not requires_auth():
  510. return redirect(url_for('login'))
  511. # Get all logs from the DB
  512. logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
  513. context = {
  514. 'logs': logs,
  515. }
  516. return render_template('password_reset_log.html', context=context)
  517. @app.route('/logout')
  518. def logout():
  519. if 'username' in session:
  520. username = session['username']
  521. user = User.get(User.username == username)
  522. user.logged_in = False
  523. Log.create(username=session['username'], action='Logged out').save()
  524. user.save()
  525. session.pop('username', None)
  526. return redirect(url_for('login'))
  527. @app.route('/login', methods=['GET', 'POST'])
  528. def login():
  529. if request.method == 'POST':
  530. username = request.form.get('username')
  531. password = encrypt_password(request.form.get('password'))
  532. try:
  533. user = User.filter(User.username == username and User.password == password).first()
  534. except Exception as e:
  535. print(e)
  536. user = None
  537. session.pop('username', None)
  538. if user:
  539. # Login user
  540. session['username'] = request.form.get('username')
  541. user.logged_in = True
  542. Log.create(username=session['username'], action='Logged in').save()
  543. user.save()
  544. return redirect(url_for('admin'))
  545. else:
  546. error = 'Invalid Credentials. Please try again.'
  547. context = {
  548. 'error': error
  549. }
  550. return render_template('login.html', context=context)
  551. context = {
  552. }
  553. return render_template('login.html', context=context)
  554. # on exit of the program make sure the http server is stopped
  555. # @app.teardown_appcontext
  556. if __name__ == "__main__":
  557. print("------------------------- Start up -----------------------------")
  558. print("Starting HTTP Service on port %s..." % http_port)
  559. if debug is True:
  560. print("Debug mode is enabled.")
  561. http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
  562. else:
  563. http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
  564. print("HTTP Service Started.")
  565. print("--------------------- Application Details ---------------------")
  566. print("Application started at %s" % datetime.datetime.now())
  567. print("System IP Address: %s" % get_ip_address())
  568. print("System Hostname: %s" % get_hostname())
  569. print("Access the Dashboard using a web browser using any of the following:")
  570. print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
  571. print("---------------------------------------------------------------")
  572. print("To stop the application close this window.")
  573. print("---------------------------------------------------------------")
  574. http_server.serve_forever()