308 lines
10 KiB
Python
308 lines
10 KiB
Python
import os
|
|
import smtplib
|
|
from email.message import EmailMessage
|
|
from functools import wraps
|
|
|
|
import requests as http_requests
|
|
from flask import Flask, render_template, request, redirect, url_for, abort, Response
|
|
|
|
from database import SessionLocal, AgencySetting, Tour, LeadForm
|
|
|
|
app = Flask(__name__, static_folder="static", template_folder="templates")
|
|
app.secret_key = os.environ.get("SECRET_KEY", "gtravel-secret-key-change-me")
|
|
|
|
|
|
# ---------- helpers ----------
|
|
|
|
def get_db():
|
|
return SessionLocal()
|
|
|
|
|
|
def set_setting(db, key, value):
|
|
setting = db.query(AgencySetting).filter(AgencySetting.setting_key == key).first()
|
|
if setting:
|
|
setting.setting_value = value
|
|
else:
|
|
db.add(AgencySetting(setting_key=key, setting_value=value))
|
|
db.commit()
|
|
|
|
|
|
def check_auth(username, password):
|
|
return username == "admin" and password == "admin"
|
|
|
|
|
|
def require_admin(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.authorization
|
|
if not auth or not check_auth(auth.username, auth.password):
|
|
return Response(
|
|
"Unauthorized", 401,
|
|
{"WWW-Authenticate": 'Basic realm="Login Required"'}
|
|
)
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
|
|
def send_email_mailtrap(settings, lead):
|
|
host = settings.get("mailtrap_host")
|
|
port = settings.get("mailtrap_port")
|
|
user = settings.get("mailtrap_user")
|
|
password = settings.get("mailtrap_password")
|
|
|
|
if not all([host, port, user, password]):
|
|
return False
|
|
|
|
msg = EmailMessage()
|
|
msg.set_content(
|
|
f"New Lead: {lead.name}\nEmail: {lead.email}\nPhone: {lead.phone}"
|
|
f"\nMessage: {lead.message}\nInterested in: {lead.interested_in}"
|
|
f"\nWants Retell AI: {lead.wants_retell_ai}"
|
|
)
|
|
msg["Subject"] = "New Inquiry from GTravel"
|
|
msg["From"] = settings.get("agency_email", "no-reply@gtravel.com")
|
|
msg["To"] = settings.get("agency_email", "admin@gtravel.com")
|
|
|
|
try:
|
|
with smtplib.SMTP(host, int(port)) as server:
|
|
server.login(user, password)
|
|
server.send_message(msg)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error sending email: {e}")
|
|
return False
|
|
|
|
|
|
def save_upload(file_storage, subfolder="images"):
|
|
"""Save a werkzeug FileStorage and return the URL path."""
|
|
dest_dir = os.path.join("static", subfolder)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
file_path = os.path.join(dest_dir, file_storage.filename)
|
|
file_storage.save(file_path)
|
|
return f"/{file_path}"
|
|
|
|
|
|
# ---------- PUBLIC ROUTES ----------
|
|
|
|
@app.route("/")
|
|
def home():
|
|
lang = request.args.get("lang", "en")
|
|
db = get_db()
|
|
try:
|
|
tours = db.query(Tour).filter(Tour.category == "tour").all()
|
|
surfs = db.query(Tour).filter(Tour.category == "surf").all()
|
|
volcanoes = db.query(Tour).filter(Tour.category == "volcano").all()
|
|
settings = {s.setting_key: s.setting_value for s in db.query(AgencySetting).all()}
|
|
|
|
return render_template("index.html",
|
|
lang=lang,
|
|
tours=tours,
|
|
surfs=surfs,
|
|
volcanoes=volcanoes,
|
|
settings=settings,
|
|
turnstile_site_key=settings.get("turnstile_site_key", "")
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/tour/<int:tour_id>")
|
|
def view_tour(tour_id):
|
|
lang = request.args.get("lang", "en")
|
|
db = get_db()
|
|
try:
|
|
tour = db.query(Tour).filter(Tour.id == tour_id).first()
|
|
if not tour:
|
|
abort(404)
|
|
|
|
settings = {s.setting_key: s.setting_value for s in db.query(AgencySetting).all()}
|
|
|
|
return render_template("detail.html",
|
|
lang=lang,
|
|
tour=tour,
|
|
settings=settings,
|
|
turnstile_site_key=settings.get("turnstile_site_key", "")
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/submit", methods=["POST"])
|
|
def submit_form():
|
|
lang = request.args.get("lang", "en")
|
|
db = get_db()
|
|
try:
|
|
settings_dict = {s.setting_key: s.setting_value for s in db.query(AgencySetting).all()}
|
|
secret_key = settings_dict.get("turnstile_secret_key")
|
|
|
|
cf_turnstile_response = request.form.get("cf-turnstile-response")
|
|
if secret_key and cf_turnstile_response:
|
|
verify_url = "https://challenges.cloudflare.com/turnstile/v0/siteverify"
|
|
verify_data = {"secret": secret_key, "response": cf_turnstile_response}
|
|
resp = http_requests.post(verify_url, data=verify_data)
|
|
if not resp.json().get("success"):
|
|
print("Turnstile verification failed")
|
|
|
|
wants_retell = request.form.get("wants_retell_ai") == "true"
|
|
|
|
lead = LeadForm(
|
|
name=request.form["name"],
|
|
email=request.form["email"],
|
|
phone=request.form["phone"],
|
|
message=request.form.get("message", ""),
|
|
interested_in=request.form["interested_in"],
|
|
wants_retell_ai=wants_retell
|
|
)
|
|
db.add(lead)
|
|
db.commit()
|
|
db.refresh(lead)
|
|
|
|
send_email_mailtrap(settings_dict, lead)
|
|
|
|
if wants_retell:
|
|
retell_api = settings_dict.get("retell_api_key")
|
|
retell_agent_id = settings_dict.get("retell_agent_id")
|
|
if retell_api and retell_agent_id:
|
|
try:
|
|
headers = {"Authorization": f"Bearer {retell_api}"}
|
|
payload = {"agent_id": retell_agent_id, "to_number": lead.phone}
|
|
# http_requests.post("https://api.retellai.com/create-phone-call", json=payload, headers=headers)
|
|
print(f"Initiating Retell AI call to {lead.phone}")
|
|
except Exception as e:
|
|
print(f"Retell error: {e}")
|
|
|
|
return redirect(f"/?lang={lang}&submitted=true")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ---------- ADMIN ROUTES ----------
|
|
|
|
@app.route("/admin")
|
|
@require_admin
|
|
def admin_dashboard():
|
|
db = get_db()
|
|
try:
|
|
tours = db.query(Tour).all()
|
|
leads = db.query(LeadForm).order_by(LeadForm.created_at.desc()).all()
|
|
settings = {s.setting_key: s.setting_value for s in db.query(AgencySetting).all()}
|
|
|
|
return render_template("dashboard.html",
|
|
tours=tours,
|
|
leads=leads,
|
|
settings=settings
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/admin/settings", methods=["POST"])
|
|
@require_admin
|
|
def admin_settings():
|
|
db = get_db()
|
|
try:
|
|
updates = {
|
|
"agency_name": request.form.get("agency_name", ""),
|
|
"agency_email": request.form.get("agency_email", ""),
|
|
"logo_url": request.form.get("logo_url", ""),
|
|
"hero_video_url": request.form.get("hero_video_url", ""),
|
|
"mailtrap_host": request.form.get("mailtrap_host", ""),
|
|
"mailtrap_port": request.form.get("mailtrap_port", ""),
|
|
"mailtrap_user": request.form.get("mailtrap_user", ""),
|
|
"mailtrap_password": request.form.get("mailtrap_password", ""),
|
|
"turnstile_site_key": request.form.get("turnstile_site_key", ""),
|
|
"turnstile_secret_key": request.form.get("turnstile_secret_key", ""),
|
|
"retell_api_key": request.form.get("retell_api_key", ""),
|
|
"retell_number": request.form.get("retell_number", ""),
|
|
"retell_agent_id": request.form.get("retell_agent_id", ""),
|
|
"retell_enabled": "true" if request.form.get("retell_enabled") == "on" else "false"
|
|
}
|
|
|
|
logo_file = request.files.get("logo_upload")
|
|
if logo_file and logo_file.filename:
|
|
updates["logo_url"] = save_upload(logo_file, "images")
|
|
|
|
video_file = request.files.get("hero_video_upload")
|
|
if video_file and video_file.filename:
|
|
updates["hero_video_url"] = save_upload(video_file, "videos")
|
|
|
|
for k, v in updates.items():
|
|
set_setting(db, k, v)
|
|
|
|
return redirect(url_for("admin_dashboard"))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/admin/tours", methods=["POST"])
|
|
@require_admin
|
|
def admin_add_tour():
|
|
db = get_db()
|
|
try:
|
|
final_image_url = request.form.get("image_url", "")
|
|
image_file = request.files.get("image_upload")
|
|
if image_file and image_file.filename:
|
|
final_image_url = save_upload(image_file, "images")
|
|
|
|
tour = Tour(
|
|
category=request.form["category"],
|
|
title_en=request.form["title_en"],
|
|
title_es=request.form["title_es"],
|
|
desc_en=request.form["desc_en"],
|
|
desc_es=request.form["desc_es"],
|
|
price=request.form["price"],
|
|
image_url=final_image_url
|
|
)
|
|
db.add(tour)
|
|
db.commit()
|
|
return redirect(url_for("admin_dashboard"))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/admin/tours/update/<int:tour_id>", methods=["POST"])
|
|
@require_admin
|
|
def admin_update_tour(tour_id):
|
|
db = get_db()
|
|
try:
|
|
tour = db.query(Tour).filter(Tour.id == tour_id).first()
|
|
if tour:
|
|
final_image_url = request.form.get("image_url", "")
|
|
image_file = request.files.get("image_upload")
|
|
if image_file and image_file.filename:
|
|
final_image_url = save_upload(image_file, "images")
|
|
elif final_image_url == "" and tour.image_url:
|
|
final_image_url = tour.image_url
|
|
|
|
tour.category = request.form["category"]
|
|
tour.title_en = request.form["title_en"]
|
|
tour.title_es = request.form["title_es"]
|
|
tour.desc_en = request.form["desc_en"]
|
|
tour.desc_es = request.form["desc_es"]
|
|
tour.price = request.form["price"]
|
|
tour.image_url = final_image_url
|
|
db.commit()
|
|
return redirect(url_for("admin_dashboard"))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route("/admin/tours/delete/<int:tour_id>", methods=["POST"])
|
|
@require_admin
|
|
def admin_delete_tour(tour_id):
|
|
db = get_db()
|
|
try:
|
|
tour = db.query(Tour).filter(Tour.id == tour_id).first()
|
|
if tour:
|
|
db.delete(tour)
|
|
db.commit()
|
|
return redirect(url_for("admin_dashboard"))
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ---------- RUN ----------
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8000, debug=True)
|