app.py 20 KB


  1. import datetime as datetime
  2. from urllib.parse import urlparse
  3. import webbrowser
  4. from emailtool.emailer import Emailer
  5. from flask import Flask, render_template, request, redirect, url_for, session, send_file
  6. from gevent.pywsgi import WSGIServer
  7. import socket
  8. import logging
  9. from peewee import Model, CharField, DateTimeField, SqliteDatabase, BooleanField, fn
  10. import hashlib
  11. import os
  12. import csv
  13. db_name = 'app.db'
  14. db = SqliteDatabase(db_name)
  15. class User(Model):
  16. username = CharField()
  17. password = CharField()
  18. date_created = DateTimeField(default=datetime.datetime.now)
  19. logged_in = BooleanField(default=False)
  20. class Meta:
  21. database = db
  22. class IlsUser(Model):
  23. username = CharField()
  24. email = CharField()
  25. reset_datetime = DateTimeField(default=datetime.datetime.now)
  26. class Meta:
  27. database = db
  28. class Settings(Model):
  29. name = CharField()
  30. value = CharField()
  31. class Meta:
  32. database = db
  33. class Schedule(Model):
  34. interval = CharField()
  35. class Meta:
  36. database = db
  37. class Log(Model):
  38. date = DateTimeField(default=datetime.datetime.now)
  39. username = CharField()
  40. action = CharField()
  41. class Meta:
  42. database = db
  43. class PasswordResetLog(Model):
  44. date = DateTimeField(default=datetime.datetime.now)
  45. username = CharField()
  46. class Meta:
  47. database = db
  48. # Encrypt the password with SHA256
  49. def encrypt_password(password):
  50. hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
  51. return hash
  52. # Check for DB tables and create if they don't exist
  53. if db.table_exists('user') is False:
  54. db.create_tables([User, ])
  55. User.create(username='admin', password=encrypt_password('admin'),
  56. date_created=datetime.datetime.now(), logged_in=False).save()
  57. if db.table_exists('ilsuser') is False:
  58. db.create_tables([IlsUser, ])
  59. if db.table_exists('settings') is False:
  60. db.create_tables([Settings, ])
  61. Settings.create(name='Debug Mode', value=False).save()
  62. Settings.create(name='HTTP Port', value=5055).save()
  63. Settings.create(name='SMTP Host', value="").save()
  64. Settings.create(name='SMTP Port', value="587").save()
  65. Settings.create(name='SMTP Username', value="").save()
  66. Settings.create(name='SMTP Password', value="").save()
  67. Settings.create(name='Domain Name', value="lynx").save()
  68. Settings.create(name='Password Reset URL', value="https://terminal.idaho-lynx.org").save()
  69. if db.table_exists('log') is False:
  70. db.create_tables([Log, ])
  71. if db.table_exists('password_reset_log') is False:
  72. db.create_tables([PasswordResetLog, ])
  73. if db.table_exists('schedule') is False:
  74. db.create_tables([Schedule, ])
  75. db.close()
  76. settings = Settings.select().execute()
  77. debug_setting = Settings.get(Settings.name == 'Debug Mode')
  78. debug = debug_setting.value
  79. http_port_setting = Settings.get(Settings.name == 'HTTP Port')
  80. http_port = http_port_setting.value
  81. smtp_host_setting = Settings.get(Settings.name == 'SMTP Host')
  82. smtp_host = smtp_host_setting.value
  83. smtp_port_setting = Settings.get(Settings.name == 'SMTP Port')
  84. smtp_port = smtp_port_setting.value
  85. smtp_username_setting = Settings.get(Settings.name == 'SMTP Username')
  86. smtp_username = smtp_username_setting.value
  87. smtp_password_setting = Settings.get(Settings.name == 'SMTP Password')
  88. smtp_password = smtp_password_setting.value
  89. domain_name_setting = Settings.get(Settings.name == 'Domain Name')
  90. domain_name = domain_name_setting.value
  91. password_reset_url_setting = Settings.get(Settings.name == 'Password Reset URL')
  92. password_reset_url = password_reset_url_setting.value
  93. log = logging.getLogger('werkzeug')
  94. log.setLevel(logging.INFO)
  95. if debug.lower() == 'true':
  96. debug = True
  97. else:
  98. debug = False
  99. def send_email(to, subject, body):
  100. # Get settings from smtp_settings
  101. host = smtp_host
  102. port = smtp_port
  103. username = smtp_username
  104. password = smtp_password
  105. # Create an instance of the Emailer class
  106. emailer = Emailer(host, port, username, password)
  107. # Call the send_email method
  108. emailer.send_email(to, subject, body)
  109. def shutdown_session(exception=None):
  110. print('Stopping HTTP Service...')
  111. http_server.stop()
  112. # Get the systems hostname
  113. def get_hostname():
  114. return socket.gethostname()
  115. # Get systems IP address
  116. def get_ip_address():
  117. return socket.gethostbyname(socket.gethostname())
  118. # Method to check if a URL is valid using regex
  119. def is_valid_url(url):
  120. try:
  121. result = urlparse(url)
  122. if all([result.scheme, result.netloc]):
  123. url = '{uri.scheme}://{uri.netloc}/'.format(uri=result)
  124. else:
  125. url = False
  126. return url
  127. except:
  128. return False
  129. def requires_auth():
  130. if 'username' in session:
  131. username = session['username']
  132. user = User.get(User.username == username)
  133. if user.logged_in is True:
  134. return True
  135. else:
  136. return False
  137. else:
  138. return False
  139. db.close()
  140. app = Flask(__name__)
  141. app.secret_key = os.urandom(24)
  142. def format_time_ago(timestamp):
  143. """Calculate the time passed since a datetime stamp and format it as a human-readable string."""
  144. now = datetime.datetime.utcnow()
  145. diff = now - timestamp
  146. if diff.days > 365:
  147. years = diff.days // 365
  148. return f"{years} year{'s' if years > 1 else ''} ago"
  149. if diff.days > 30:
  150. months = diff.days // 30
  151. return f"{months} month{'s' if months > 1 else ''} ago"
  152. if diff.days > 0:
  153. return f"{diff.days} day{'s' if diff.days > 1 else ''} ago"
  154. if diff.seconds > 3600:
  155. hours = diff.seconds // 3600
  156. return f"{hours} hour{'s' if hours > 1 else ''} ago"
  157. if diff.seconds > 60:
  158. minutes = diff.seconds // 60
  159. return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
  160. return "just now"
  161. app.jinja_env.filters['time_since'] = format_time_ago
  162. @app.before_request
  163. def before_request():
  164. db.connect()
  165. @app.after_request
  166. def after_request(response):
  167. db.close()
  168. return response
  169. # Create a route for the home page
  170. @app.route('/', methods=['GET', 'POST'])
  171. def index():
  172. # send_email('aday@twinfallspubliclibrary.org', 'TEST', 'This is a test email')
  173. error = None
  174. reset = False
  175. reset_url = is_valid_url(password_reset_url)
  176. reset_url_error = False
  177. if reset_url is False:
  178. reset_url_error = True
  179. if request.method == 'POST':
  180. username = request.form.get('username')
  181. # Check for the username in the DB
  182. try:
  183. user = IlsUser.filter(IlsUser.username == username).first()
  184. except Exception as e:
  185. print(e)
  186. user = None
  187. if user:
  188. # Reset login datetime
  189. user.reset_datetime = datetime.datetime.now()
  190. PasswordResetLog.create(username=user.username, date_created=datetime.datetime.now()).save()
  191. user.save()
  192. # Open the reset URL in a new tab if the URL is valid
  193. if reset_url is not False:
  194. webbrowser.open_new_tab(str(reset_url))
  195. # Set reset to True to pass back to the view to display the correct content back to the user.
  196. reset = True
  197. else:
  198. error = 'Invalid username'
  199. context = {
  200. 'domain': domain_name,
  201. 'error': error,
  202. 'reset': reset,
  203. 'reset_url': reset_url,
  204. 'reset_url_error': reset_url_error,
  205. }
  206. return render_template('index.html', context=context)
  207. # Create a route for admin page
  208. @app.route('/admin/')
  209. def admin():
  210. # Check to see if user is logged in
  211. if not requires_auth():
  212. return redirect(url_for('login'))
  213. return render_template('admin.html')
  214. @app.route('/admin/users/', methods=['GET', 'POST'])
  215. def admin_users():
  216. # Check to see if user is logged in
  217. if not requires_auth():
  218. return redirect(url_for('login'))
  219. message = None
  220. if request.method == 'POST':
  221. username = request.form.get('username')
  222. password = request.form.get('password')
  223. confirm_password = request.form.get('confirm_password')
  224. # Check to see if username already exists
  225. try:
  226. user = User.filter(User.username == username).first()
  227. except Exception as e:
  228. print(e)
  229. user = None
  230. if user:
  231. message = 'Username already exists'
  232. else:
  233. if password == confirm_password:
  234. User.create(username=username, password=encrypt_password(password),
  235. date_created=datetime.datetime.now(), logged_in=False).save()
  236. message = 'User created successfully'
  237. Log.create(username=session['username'], action='Created admin user: %s' % username, ).save()
  238. else:
  239. message = 'Passwords do not match'
  240. # Get all admin users from the DB
  241. users = User.select().execute()
  242. context = {
  243. 'users': users,
  244. 'message': message,
  245. }
  246. return render_template('admin_users.html', context=context)
  247. @app.route('/admin/users/delete/<int:id>', methods=['GET', 'POST'])
  248. def admin_users_delete(id):
  249. # Check to see if user is logged in
  250. if not requires_auth():
  251. return redirect(url_for('login'))
  252. # Get the user from the DB
  253. user = User.get(User.id == id)
  254. if user.username != 'admin':
  255. # Unset session variable if the user is deleting their own account
  256. if user.username == session['username']:
  257. session.pop('username', None)
  258. username = user.username
  259. user.delete_instance()
  260. Log.create(username=session['username'], action='Removed admin user: %s' % username, ).save()
  261. return redirect(url_for('admin_users'))
  262. @app.route('/admin/users/ils', methods=['GET', 'POST'])
  263. def admin_ils_users():
  264. # Check to see if user is logged in
  265. if not requires_auth():
  266. return redirect(url_for('login'))
  267. message = None
  268. if request.method == 'POST':
  269. username = request.form.get('username')
  270. email = request.form.get('email')
  271. # Check to see if username already exists
  272. try:
  273. user = IlsUser.filter(IlsUser.username == username).first()
  274. except Exception as e:
  275. print(e)
  276. user = None
  277. if user:
  278. message = 'Username already exists'
  279. else:
  280. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  281. message = 'ILS User: %s created successfully' % username
  282. Log.create(username=session['username'], action='Created ILS User: %s' % username, ).save()
  283. # Get all admin users from the DB
  284. users = IlsUser.select().execute()
  285. context = {
  286. 'users': users,
  287. 'message': message,
  288. }
  289. return render_template('admin_ils_users.html', context=context)
  290. @app.route('/admin/users/ils/delete/<int:id>', methods=['GET', 'POST'])
  291. def admin_ils_users_delete(id):
  292. # Check to see if user is logged in
  293. if not requires_auth():
  294. return redirect(url_for('login'))
  295. # Get the user from the DB
  296. user = IlsUser.get(IlsUser.id == id)
  297. username = user.username
  298. user.delete_instance()
  299. Log.create(username=session['username'], action='Removed ILS user: %s' % username, ).save()
  300. return redirect(url_for('admin_ils_users'))
  301. # create a route for generating a CSV file for download
  302. @app.route('/admin/users/ils/csv/download', methods=['GET', 'POST'])
  303. def admin_ils_users_csv_download():
  304. # Check to see if user is logged in
  305. if not requires_auth():
  306. return redirect(url_for('login'))
  307. # Create a CSV file with the users and don't add a blank line between rows
  308. with open('users.csv', 'w', newline='') as f:
  309. writer = csv.writer(f)
  310. writer.writerow(['username', 'email'])
  311. users = IlsUser.select().execute()
  312. for user in users:
  313. writer.writerow([user.username, user.email])
  314. Log.create(username=session['username'], action='Downloaded ILS user CSV file.').save()
  315. # return the CSV file to the user
  316. return send_file('users.csv', as_attachment=True)
  317. @app.route('/admin/users/ils/csv/import', methods=['GET', 'POST'])
  318. def admin_ils_users_csv_import():
  319. # Check to see if user is logged in
  320. if not requires_auth():
  321. return redirect(url_for('login'))
  322. message = None
  323. if request.method == 'POST':
  324. csv_file = request.files['csv']
  325. if csv_file.filename != '':
  326. csv_file.save(os.path.join('uploads', csv_file.filename))
  327. with open(os.path.join('uploads', csv_file.filename), 'r') as f:
  328. reader = csv.reader(f)
  329. for row in reader:
  330. username = row[0]
  331. email = row[1]
  332. # ignore the header row
  333. if username == 'username':
  334. continue
  335. # ignore blank rows
  336. if username == '':
  337. continue
  338. # Check if user already exists and if it does update the entry
  339. try:
  340. user = IlsUser.filter(IlsUser.username == username).first()
  341. except Exception as e:
  342. print(e)
  343. user = None
  344. if user:
  345. user.email = email
  346. user.reset_datetime = datetime.datetime.now()
  347. user.save()
  348. else:
  349. IlsUser.create(username=username, email=email, reset_datetime=datetime.datetime.now()).save()
  350. # Delete the uploaded file
  351. os.remove(os.path.join('uploads', csv_file.filename))
  352. return redirect(url_for('admin_ils_users'))
  353. context = {
  354. 'message': message,
  355. }
  356. return render_template('csv.html', context=context)
  357. @app.route('/admin/settings', methods=['GET', 'POST'])
  358. def settings():
  359. # Check to see if user is logged in
  360. if not requires_auth():
  361. return redirect(url_for('login'))
  362. message = None
  363. # Process form submission
  364. if request.method == 'POST':
  365. # Assign form values to variables
  366. id = request.form.get('id')
  367. value = request.form.get('value')
  368. # Check if the setting exists
  369. try:
  370. setting = Settings.get(Settings.id == id)
  371. except Exception as e:
  372. print(e)
  373. setting = None
  374. if setting:
  375. # Update the setting
  376. old_value = setting.value
  377. setting.value = value
  378. setting.save()
  379. Log.create(username=session['username'], action='Changed %s setting from "%s" to "%s"' % (setting.name,
  380. old_value,
  381. value)).save()
  382. message = '%s updated successfully' % setting.name
  383. # Get settings from DB
  384. all_settings = Settings.select().execute()
  385. context = {
  386. 'settings': all_settings,
  387. 'message': message,
  388. }
  389. return render_template('settings.html', context=context)
  390. @app.route('/admin/schedule', methods=['GET', 'POST'])
  391. def schedule():
  392. # Check to see if user is logged in
  393. if not requires_auth():
  394. return redirect(url_for('login'))
  395. message = None
  396. # add schedule
  397. if request.method == 'POST':
  398. # Assign form values to variables
  399. interval = request.form.get('interval')
  400. # Check if the schedule already exists
  401. try:
  402. schedule = Schedule.get(Schedule.interval == interval)
  403. except Exception as e:
  404. print(e)
  405. schedule = None
  406. if schedule:
  407. message = 'Schedule already exists'
  408. else:
  409. # Create the schedule
  410. Schedule.create(interval=interval).save()
  411. Log.create(username=session['username'], action='Created schedule for %s day interval.' % interval).save()
  412. message = 'Schedule: %s created successfully' % interval
  413. # Get all schedules from the DB
  414. schedules = Schedule.select().order_by(Schedule.interval.cast("INTEGER")).execute()
  415. context = {
  416. 'schedules': schedules,
  417. 'message': message,
  418. }
  419. return render_template('schedule.html', context=context)
  420. # remove schedule
  421. @app.route('/admin/schedule/remove/<int:id>', methods=['GET', 'POST'])
  422. def schedule_remove(id):
  423. # Check to see if user is logged in
  424. if not requires_auth():
  425. return redirect(url_for('login'))
  426. # Get the schedule from the DB
  427. schedule = Schedule.get(Schedule.id == id)
  428. schedule.delete_instance()
  429. Log.create(username=session['username'], action='Removed schedule for a %s day reminder.' % schedule.interval).save()
  430. return redirect(url_for('schedule'))
  431. @app.route('/admin/system/log')
  432. def system_log():
  433. # Check to see if user is logged in
  434. if not requires_auth():
  435. return redirect(url_for('login'))
  436. # Get all logs from the DB
  437. logs = Log.select().order_by(Log.id.desc()).execute()
  438. context = {
  439. 'logs': logs,
  440. }
  441. return render_template('system_log.html', context=context)
  442. @app.route('/admin/system/log/password/resets')
  443. def password_reset_log():
  444. # Check to see if user is logged in
  445. if not requires_auth():
  446. return redirect(url_for('login'))
  447. # Get all logs from the DB
  448. logs = PasswordResetLog.select().order_by(PasswordResetLog.id.desc()).execute()
  449. context = {
  450. 'logs': logs,
  451. }
  452. return render_template('password_reset_log.html', context=context)
  453. @app.route('/logout')
  454. def logout():
  455. if 'username' in session:
  456. username = session['username']
  457. user = User.get(User.username == username)
  458. user.logged_in = False
  459. Log.create(username=session['username'], action='Logged out').save()
  460. user.save()
  461. session.pop('username', None)
  462. return redirect(url_for('login'))
  463. @app.route('/login', methods=['GET', 'POST'])
  464. def login():
  465. if request.method == 'POST':
  466. username = request.form.get('username')
  467. password = encrypt_password(request.form.get('password'))
  468. try:
  469. user = User.filter(User.username == username and User.password == password).first()
  470. except Exception as e:
  471. print(e)
  472. user = None
  473. session.pop('username', None)
  474. if user:
  475. # Login user
  476. session['username'] = request.form.get('username')
  477. user.logged_in = True
  478. Log.create(username=session['username'], action='Logged in').save()
  479. user.save()
  480. return redirect(url_for('admin'))
  481. else:
  482. error = 'Invalid Credentials. Please try again.'
  483. context = {
  484. 'error': error
  485. }
  486. return render_template('login.html', context=context)
  487. context = {
  488. }
  489. return render_template('login.html', context=context)
  490. # on exit of the program make sure the http server is stopped
  491. # @app.teardown_appcontext
  492. if __name__ == "__main__":
  493. print("------------------------- Start up -----------------------------")
  494. print("Starting HTTP Service on port %s..." % http_port)
  495. if debug is True:
  496. print("Debug mode is enabled.")
  497. http_server = WSGIServer(('0.0.0.0', int(http_port)), app)
  498. else:
  499. http_server = WSGIServer(('0.0.0.0', int(http_port)), app, log=log, error_log=log)
  500. print("HTTP Service Started.")
  501. print("--------------------- Application Details ---------------------")
  502. print("Application started at %s" % datetime.datetime.now())
  503. print("System IP Address: %s" % get_ip_address())
  504. print("System Hostname: %s" % get_hostname())
  505. print("Access the Dashboard using a web browser using any of the following:")
  506. print("http://%s:%s or http://%s:%s" % (get_hostname(), http_port, get_ip_address(), http_port))
  507. print("---------------------------------------------------------------")
  508. print("To stop the application close this window.")
  509. print("---------------------------------------------------------------")
  510. http_server.serve_forever()