luniebox/application/api.py

177 lines
5.8 KiB
Python
Raw Normal View History

2022-02-06 10:19:03 +01:00
from functools import wraps
from flask import Blueprint, request, abort, redirect, url_for, jsonify
from urllib.parse import urlencode
import subprocess
import logging
from luniebox import luniebox
from spotifydl import SpotifyDLStatus
import util
api = Blueprint('api', __name__)
def api_key_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.headers.has_key('Authorization') or request.headers.get('Authorization') != luniebox.get_setting('API', 'API_KEY'):
abort(401)
return f(*args, **kwargs)
return decorated_function
@api.route("/")
@api_key_required
def index():
return jsonify("test")
@api.route("/setup", methods=['POST'])
def setup():
spotify_username = request.form.get('spotify-username')
spotify_password = request.form.get('spotify-password')
spotify_client_id = request.form.get('spotify-client-id')
spotify_client_secret = request.form.get('spotify-client-secret')
spotify_redirect_uri = request.form.get('spotify-redirect-uri')
spotify_device_name = request.form.get('spotify-device-name')
luniebox.set_setting('luniebox', 'setup', True)
luniebox.set_setting('spotify', 'client_id', spotify_client_id)
luniebox.set_setting('spotify', 'client_secret', spotify_client_secret)
luniebox.set_setting('spotify', 'redirect_uri', spotify_redirect_uri)
luniebox.spotify.set_setting('username', spotify_username)
luniebox.spotify.set_setting('password', spotify_password)
luniebox.spotify.set_setting('device_name', spotify_device_name)
subprocess.run(["sudo", "systemctl", "restart", "spotifyd"])
return redirect(url_for('api.spotify_authorize'))
spotify_authorize_state = False
@api.route("/spotify/authorize", methods=['GET'])
def spotify_authorize():
client_id = luniebox.get_setting('spotify', 'client_id')
redirect_uri = luniebox.get_setting('spotify', 'redirect_uri')
global spotify_authorize_state
spotify_authorize_state = util.randomString(16)
scope = 'user-read-playback-state user-modify-playback-state user-read-private'
return redirect('https://accounts.spotify.com/authorize?' +
urlencode({
'response_type': 'code',
'client_id': client_id,
'scope': scope,
'redirect_uri': redirect_uri,
'state': spotify_authorize_state
}))
@api.route("/spotify/callback", methods=['GET'])
def spotify_callback():
code = request.args['code']
returnedState = request.args['state']
global spotify_authorize_state
if not returnedState or returnedState != spotify_authorize_state:
abort(403)
luniebox.spotify.new_access_token(code)
return redirect(url_for('pages.index'))
@api.route("/play", methods=['GET'])
def play():
p = subprocess.Popen(["sudo", "systemctl", "stop", "luniebox-daemon"])
p.communicate()
uri = request.args['uri']
luniebox.play(uri)
return redirect(url_for('pages.success', play=uri))
@api.route("/spotify/dl", methods=['GET'])
def spotify_dl():
if luniebox.spotifydl_connect():
uri = request.args['uri']
dlStatus = luniebox.spotifydl.download(uri)
return redirect(url_for('pages.spotifydl', status=dlStatus, uri=uri))
else:
return redirect(url_for('pages.spotifydl', status=SpotifyDLStatus.DISABLED))
@api.route("/spotify/downloads", methods=['GET'])
def spotify_downloads():
if luniebox.spotifydl_connect():
return jsonify(luniebox.spotifydl.getDownloads())
else:
return redirect(url_for('pages.error', error='spotifydownloads'))
@api.route("/rfid/write", methods=['GET'])
def rfid_write():
value = request.args['value']
p = subprocess.Popen(["sudo", "systemctl", "stop", "luniebox-daemon"])
p.communicate()
luniebox.rfid_write(value)
logging.getLogger('luniebox').info("Write to RFID: " + value)
return redirect(url_for('pages.success', rfid_write=value))
@api.route("/rfid/read", methods=['GET'])
def rfid_read():
p = subprocess.Popen(["sudo", "systemctl", "stop", "luniebox-daemon"])
p.communicate()
value = luniebox.rfid_readOnce()
return redirect(url_for('pages.success', rfid_read=value))
@api.route("/rfid/play", methods=['GET'])
def rfid_play():
p = subprocess.Popen(["sudo", "systemctl", "stop", "luniebox-daemon"])
p.communicate()
value = luniebox.rfid_readOnce()
if luniebox.play(value):
return redirect(url_for('pages.success', rfid_play=value))
else:
return redirect(url_for('pages.error', error='play'))
@api.route("/spotifyd/restart", methods=['GET'])
def restart_spotifyd():
subprocess.run(["sudo", "systemctl", "restart", "spotifyd"])
return redirect(url_for('pages.success', restart_spotifyd=True))
@api.route("/mpd/restart", methods=['GET'])
def restart_mpd():
subprocess.run(["sudo", "systemctl", "restart", "mpd"])
return redirect(url_for('pages.success', restart_mpd=True))
@api.route("/mpd/list", methods=['GET'])
def mpd_list():
if 'path' in request.args:
results = luniebox.mpd_list(request.args['path'])
else:
results = luniebox.mpd_list()
return jsonify(results)
@api.route("/daemon/start", methods=['GET'])
def daemon_start():
subprocess.run(["sudo", "systemctl", "start", "luniebox-daemon"])
return redirect(url_for('pages.success', start_daemon=True))
@api.route("/daemon/stop", methods=['GET'])
def daemon_stop():
subprocess.run(["sudo", "systemctl", "stop", "luniebox-daemon"])
return redirect(url_for('pages.success', stop_daemon=True))
@api.route("/restart", methods=['GET'])
def restart_app():
subprocess.run(["sudo", "systemctl", "restart", "luniebox-app"])
return redirect(url_for('pages.success', restart_luniebox=True))