386 lines
13 KiB
Python
386 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
__name__ = "Spotify"
|
|
|
|
import base64
|
|
import datetime
|
|
from configparser import ConfigParser
|
|
from dateutil import parser
|
|
from urllib.parse import urlencode
|
|
import requests
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
|
|
|
|
defaultSpotifyConfigFilePath = '../config/spotifyd.cfg'
|
|
|
|
|
|
class Spotify(object):
|
|
|
|
def __init__(self, luniebox, configFilePath=defaultSpotifyConfigFilePath):
|
|
self.luniebox = luniebox
|
|
self.configFilePath = configFilePath
|
|
self.read_config()
|
|
|
|
def read_config(self):
|
|
configParser = ConfigParser()
|
|
dataset = configParser.read(self.configFilePath)
|
|
if len(dataset) != 1:
|
|
raise ValueError(
|
|
"Config file {} not found!".format(self.configFilePath))
|
|
self.config = configParser
|
|
|
|
def get_setting(self, key, default=None):
|
|
if self.config.has_option('global', key):
|
|
return self.config['global'][key].strip('\"')
|
|
return default
|
|
|
|
def set_setting(self, key, value):
|
|
if not self.config.has_section('global'):
|
|
self.config.add_section('global')
|
|
self.config.set('global', key, str('\"' + value + '\"'))
|
|
|
|
with open(self.configFilePath, 'w') as configfile:
|
|
self.config.write(configfile)
|
|
|
|
def get_status(self):
|
|
devices = self.devices()
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
device_name = self.get_setting('device_name')
|
|
status = {
|
|
'status': False,
|
|
'device_name': device_name,
|
|
'device_id': device_id,
|
|
'errors': {'setup': True, 'not_running': True}
|
|
}
|
|
|
|
if not device_id:
|
|
status['errors']['not_running'] = False
|
|
for device in devices['devices']:
|
|
if device['name'] == device_name:
|
|
device_id = device['id']
|
|
self.luniebox.get_setting(
|
|
'spotify', 'device_id', device_id)
|
|
status['errors']['setup'] = False
|
|
else:
|
|
status['errors']['setup'] = False
|
|
for device in devices['devices']:
|
|
if device['id'] == device_id:
|
|
status['errors']['not_running'] = False
|
|
|
|
if status['errors']['not_running'] and self.transfer_playback():
|
|
status['errors']['not_running'] = False
|
|
|
|
return status
|
|
|
|
def get_access_token(self):
|
|
access_token = self.luniebox.get_setting('spotify', 'access_token')
|
|
if not access_token:
|
|
return False
|
|
|
|
token_expires = parser.parse(
|
|
self.luniebox.get_setting('spotify', 'token_expires'))
|
|
|
|
if token_expires < datetime.datetime.now():
|
|
self.refresh_access_token()
|
|
return self.get_access_token()
|
|
|
|
return access_token
|
|
|
|
def new_access_token(self, code):
|
|
clientId = self.luniebox.get_setting('spotify', 'client_id')
|
|
clientSecret = self.luniebox.get_setting('spotify', 'client_secret')
|
|
redirectUri = self.luniebox.get_setting('spotify', 'redirect_uri')
|
|
|
|
credentials = clientId + ":" + clientSecret
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8')
|
|
}
|
|
|
|
formData = {
|
|
'code': code,
|
|
'redirect_uri': redirectUri,
|
|
'grant_type': 'authorization_code'
|
|
}
|
|
|
|
response = requests.post(
|
|
'https://accounts.spotify.com/api/token', headers=headers, data=formData)
|
|
|
|
tokenData = response.json()
|
|
|
|
self.luniebox.set_setting(
|
|
'spotify', 'access_token', tokenData['access_token'])
|
|
self.luniebox.set_setting('spotify', 'refresh_token',
|
|
tokenData['refresh_token'])
|
|
self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now(
|
|
) + datetime.timedelta(seconds=tokenData['expires_in'])))
|
|
|
|
def refresh_access_token(self):
|
|
clientId = self.luniebox.get_setting('spotify', 'client_id')
|
|
clientSecret = self.luniebox.get_setting('spotify', 'client_secret')
|
|
refresh_token = self.luniebox.get_setting('spotify', 'refresh_token')
|
|
|
|
if not refresh_token:
|
|
return False
|
|
|
|
credentials = clientId + ":" + clientSecret
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8')
|
|
}
|
|
|
|
formData = {
|
|
'refresh_token': refresh_token,
|
|
'grant_type': 'refresh_token'
|
|
}
|
|
|
|
response = requests.post(
|
|
'https://accounts.spotify.com/api/token', headers=headers, data=formData)
|
|
|
|
tokenData = response.json()
|
|
|
|
self.luniebox.set_setting(
|
|
'spotify', 'access_token', tokenData['access_token'])
|
|
self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now(
|
|
) + datetime.timedelta(seconds=tokenData['expires_in'])))
|
|
|
|
def api_call(self, url):
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
return requests.get(
|
|
'https://api.spotify.com/v1' + url, headers=headers)
|
|
|
|
def search(self, query, types, limit, offset=0):
|
|
if limit > 50:
|
|
limit = 50
|
|
response = self.api_call(
|
|
'/search?' + urlencode({'q': query, 'type': ','.join(types), 'limit': limit, 'offset': offset}))
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.search(query, types, limit, offset)
|
|
|
|
return False
|
|
|
|
def playback_state(self):
|
|
response = self.api_call('/me/player')
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
if response.status_code == 204:
|
|
return {"not_active": True}
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.playback_state()
|
|
|
|
return False
|
|
|
|
def transfer_playback(self):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
data = {
|
|
'device_ids': [device_id],
|
|
'play': False
|
|
}
|
|
|
|
response = requests.put(
|
|
'https://api.spotify.com/v1/me/player', headers=headers, data=json.dumps(data))
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.transfer_playback()
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify transfer playback", response, response.text)
|
|
return False
|
|
|
|
def is_active(self):
|
|
self.transfer_playback()
|
|
state = self.playback_state()
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
if not state:
|
|
return False
|
|
return 'device' in state and 'id' in state['device'] and state['device']['id'] == device_id
|
|
|
|
def devices(self):
|
|
response = self.api_call('/me/player/devices')
|
|
return response.json()
|
|
|
|
def play(self, uri):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
data = {}
|
|
|
|
if uri:
|
|
if uri.startswith('spotify:track:') or uri.startswith('spotify:episode:'):
|
|
data = {
|
|
"uris": [uri]
|
|
}
|
|
else:
|
|
data = {
|
|
"context_uri": uri
|
|
}
|
|
|
|
response = requests.put(
|
|
'https://api.spotify.com/v1/me/player/play?' +
|
|
urlencode({'device_id': device_id}), headers=headers, data=json.dumps(data))
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.play(uri)
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify play", response, response.text)
|
|
return False
|
|
|
|
def pause(self):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
response = requests.put(
|
|
'https://api.spotify.com/v1/me/player/pause?' +
|
|
urlencode({'device_id': device_id}), headers=headers)
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.pause()
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify pause", response, response.text)
|
|
return False
|
|
|
|
def volume(self, value):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
if value < 0:
|
|
value = 0
|
|
if value > 100:
|
|
value = 100
|
|
|
|
response = requests.put(
|
|
'https://api.spotify.com/v1/me/player/volume?' +
|
|
urlencode({'device_id': device_id, 'volume_percent': value}), headers=headers)
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.volume(value)
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify volume", response, response.text)
|
|
return False
|
|
|
|
def seek(self, position):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
if position < 0:
|
|
position = 0
|
|
|
|
response = requests.put(
|
|
'https://api.spotify.com/v1/me/player/seek?' +
|
|
urlencode({'device_id': device_id, 'position_ms': position}), headers=headers)
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.seek(position)
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify seek", response, response.text)
|
|
return False
|
|
|
|
def rewind(self, seconds):
|
|
state = self.playback_state()
|
|
if state and 'progress_ms' in state:
|
|
progress = int(state['progress_ms'])
|
|
progress -= seconds * 1000
|
|
return self.seek(progress)
|
|
|
|
return False
|
|
|
|
def fastforward(self, seconds):
|
|
state = self.playback_state()
|
|
if state and 'progress_ms' in state:
|
|
progress = int(state['progress_ms'])
|
|
progress += seconds * 1000
|
|
return self.seek(progress)
|
|
|
|
return False
|
|
|
|
def previous(self):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
response = requests.post(
|
|
'https://api.spotify.com/v1/me/player/previous?' +
|
|
urlencode({'device_id': device_id}), headers=headers)
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.previous()
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify previous", response, response.text)
|
|
return False
|
|
|
|
def next(self):
|
|
device_id = self.luniebox.get_setting('spotify', 'device_id')
|
|
access_token = self.get_access_token()
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Bearer ' + access_token
|
|
}
|
|
|
|
response = requests.post(
|
|
'https://api.spotify.com/v1/me/player/next?' +
|
|
urlencode({'device_id': device_id}), headers=headers)
|
|
|
|
if response.status_code == 204 or response.status_code == 202:
|
|
return True
|
|
elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True):
|
|
return self.next()
|
|
|
|
logging.getLogger('luniebox').warn(
|
|
"error on spotify next", response, response.text)
|
|
return False
|