luniebox/application/spotify.py

386 lines
13 KiB
Python
Raw Permalink Normal View History

2022-02-06 10:19:03 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__name__ = "Spotify"
import base64
import datetime
from configparser import ConfigParser
from dateutil import parser
from urllib.parse import urlencode
import requests
import json
import subprocess
import logging
defaultSpotifyConfigFilePath = '../config/spotifyd.cfg'
class Spotify(object):
def __init__(self, luniebox, configFilePath=defaultSpotifyConfigFilePath):
self.luniebox = luniebox
self.configFilePath = configFilePath
self.read_config()
def read_config(self):
configParser = ConfigParser()
dataset = configParser.read(self.configFilePath)
if len(dataset) != 1:
raise ValueError(
"Config file {} not found!".format(self.configFilePath))
self.config = configParser
def get_setting(self, key, default=None):
if self.config.has_option('global', key):
return self.config['global'][key].strip('\"')
return default
def set_setting(self, key, value):
if not self.config.has_section('global'):
self.config.add_section('global')
self.config.set('global', key, str('\"' + value + '\"'))
with open(self.configFilePath, 'w') as configfile:
self.config.write(configfile)
def get_status(self):
devices = self.devices()
device_id = self.luniebox.get_setting('spotify', 'device_id')
device_name = self.get_setting('device_name')
status = {
'status': False,
'device_name': device_name,
'device_id': device_id,
'errors': {'setup': True, 'not_running': True}
}
if not device_id:
status['errors']['not_running'] = False
for device in devices['devices']:
if device['name'] == device_name:
device_id = device['id']
self.luniebox.get_setting(
'spotify', 'device_id', device_id)
status['errors']['setup'] = False
else:
status['errors']['setup'] = False
for device in devices['devices']:
if device['id'] == device_id:
status['errors']['not_running'] = False
if status['errors']['not_running'] and self.transfer_playback():
status['errors']['not_running'] = False
return status
def get_access_token(self):
access_token = self.luniebox.get_setting('spotify', 'access_token')
if not access_token:
return False
token_expires = parser.parse(
self.luniebox.get_setting('spotify', 'token_expires'))
if token_expires < datetime.datetime.now():
self.refresh_access_token()
return self.get_access_token()
return access_token
def new_access_token(self, code):
clientId = self.luniebox.get_setting('spotify', 'client_id')
clientSecret = self.luniebox.get_setting('spotify', 'client_secret')
redirectUri = self.luniebox.get_setting('spotify', 'redirect_uri')
credentials = clientId + ":" + clientSecret
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8')
}
formData = {
'code': code,
'redirect_uri': redirectUri,
'grant_type': 'authorization_code'
}
response = requests.post(
'https://accounts.spotify.com/api/token', headers=headers, data=formData)
tokenData = response.json()
self.luniebox.set_setting(
'spotify', 'access_token', tokenData['access_token'])
self.luniebox.set_setting('spotify', 'refresh_token',
tokenData['refresh_token'])
self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now(
) + datetime.timedelta(seconds=tokenData['expires_in'])))
def refresh_access_token(self):
clientId = self.luniebox.get_setting('spotify', 'client_id')
clientSecret = self.luniebox.get_setting('spotify', 'client_secret')
refresh_token = self.luniebox.get_setting('spotify', 'refresh_token')
if not refresh_token:
return False
credentials = clientId + ":" + clientSecret
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8')
}
formData = {
'refresh_token': refresh_token,
'grant_type': 'refresh_token'
}
response = requests.post(
'https://accounts.spotify.com/api/token', headers=headers, data=formData)
tokenData = response.json()
self.luniebox.set_setting(
'spotify', 'access_token', tokenData['access_token'])
self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now(
) + datetime.timedelta(seconds=tokenData['expires_in'])))
def api_call(self, url):
access_token = self.get_access_token()
headers = {
'Authorization': 'Bearer ' + access_token
}
return requests.get(
'https://api.spotify.com/v1' + url, headers=headers)
def search(self, query, types, limit, offset=0):
if limit > 50:
limit = 50
response = self.api_call(
'/search?' + urlencode({'q': query, 'type': ','.join(types), 'limit': limit, 'offset': offset}))
if response.status_code == 200:
return response.json()
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.search(query, types, limit, offset)
return False
def playback_state(self):
response = self.api_call('/me/player')
if response.status_code == 200:
return response.json()
if response.status_code == 204:
return {"not_active": True}
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.playback_state()
return False
def transfer_playback(self):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
data = {
'device_ids': [device_id],
'play': False
}
response = requests.put(
'https://api.spotify.com/v1/me/player', headers=headers, data=json.dumps(data))
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.transfer_playback()
logging.getLogger('luniebox').warn(
"error on spotify transfer playback", response, response.text)
return False
def is_active(self):
self.transfer_playback()
state = self.playback_state()
device_id = self.luniebox.get_setting('spotify', 'device_id')
if not state:
return False
return 'device' in state and 'id' in state['device'] and state['device']['id'] == device_id
def devices(self):
response = self.api_call('/me/player/devices')
return response.json()
def play(self, uri):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
data = {}
if uri:
2022-03-13 09:28:58 +01:00
if uri.startswith('spotify:track:') or uri.startswith('spotify:episode:'):
2022-02-06 10:19:03 +01:00
data = {
"uris": [uri]
}
else:
data = {
"context_uri": uri
}
response = requests.put(
'https://api.spotify.com/v1/me/player/play?' +
urlencode({'device_id': device_id}), headers=headers, data=json.dumps(data))
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.play(uri)
logging.getLogger('luniebox').warn(
"error on spotify play", response, response.text)
return False
def pause(self):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
response = requests.put(
'https://api.spotify.com/v1/me/player/pause?' +
urlencode({'device_id': device_id}), headers=headers)
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.pause()
logging.getLogger('luniebox').warn(
"error on spotify pause", response, response.text)
return False
def volume(self, value):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
if value < 0:
value = 0
if value > 100:
value = 100
response = requests.put(
'https://api.spotify.com/v1/me/player/volume?' +
urlencode({'device_id': device_id, 'volume_percent': value}), headers=headers)
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.volume(value)
logging.getLogger('luniebox').warn(
"error on spotify volume", response, response.text)
return False
def seek(self, position):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
if position < 0:
position = 0
response = requests.put(
'https://api.spotify.com/v1/me/player/seek?' +
urlencode({'device_id': device_id, 'position_ms': position}), headers=headers)
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.seek(position)
logging.getLogger('luniebox').warn(
"error on spotify seek", response, response.text)
return False
def rewind(self, seconds):
state = self.playback_state()
if state and 'progress_ms' in state:
progress = int(state['progress_ms'])
progress -= seconds * 1000
return self.seek(progress)
return False
def fastforward(self, seconds):
state = self.playback_state()
if state and 'progress_ms' in state:
progress = int(state['progress_ms'])
progress += seconds * 1000
return self.seek(progress)
return False
def previous(self):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
response = requests.post(
'https://api.spotify.com/v1/me/player/previous?' +
urlencode({'device_id': device_id}), headers=headers)
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.previous()
logging.getLogger('luniebox').warn(
"error on spotify previous", response, response.text)
return False
def next(self):
device_id = self.luniebox.get_setting('spotify', 'device_id')
access_token = self.get_access_token()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + access_token
}
response = requests.post(
'https://api.spotify.com/v1/me/player/next?' +
urlencode({'device_id': device_id}), headers=headers)
if response.status_code == 204 or response.status_code == 202:
return True
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
return self.next()
logging.getLogger('luniebox').warn(
"error on spotify next", response, response.text)
return False