#!/usr/bin/env python3 # -*- coding: utf-8 -*- __name__ = "Spotify" import base64 import datetime from configparser import ConfigParser from dateutil import parser from urllib.parse import urlencode import requests import json import subprocess import logging defaultSpotifyConfigFilePath = '../config/spotifyd.cfg' class Spotify(object): def __init__(self, luniebox, configFilePath=defaultSpotifyConfigFilePath): self.luniebox = luniebox self.configFilePath = configFilePath self.read_config() def read_config(self): configParser = ConfigParser() dataset = configParser.read(self.configFilePath) if len(dataset) != 1: raise ValueError( "Config file {} not found!".format(self.configFilePath)) self.config = configParser def get_setting(self, key, default=None): if self.config.has_option('global', key): return self.config['global'][key].strip('\"') return default def set_setting(self, key, value): if not self.config.has_section('global'): self.config.add_section('global') self.config.set('global', key, str('\"' + value + '\"')) with open(self.configFilePath, 'w') as configfile: self.config.write(configfile) def get_status(self): devices = self.devices() device_id = self.luniebox.get_setting('spotify', 'device_id') device_name = self.get_setting('device_name') status = { 'status': False, 'device_name': device_name, 'device_id': device_id, 'errors': {'setup': True, 'not_running': True} } if not device_id: status['errors']['not_running'] = False for device in devices['devices']: if device['name'] == device_name: device_id = device['id'] self.luniebox.get_setting( 'spotify', 'device_id', device_id) status['errors']['setup'] = False else: status['errors']['setup'] = False for device in devices['devices']: if device['id'] == device_id: status['errors']['not_running'] = False if status['errors']['not_running'] and self.transfer_playback(): status['errors']['not_running'] = False return status def get_access_token(self): access_token = self.luniebox.get_setting('spotify', 'access_token') if not access_token: return False token_expires = parser.parse( self.luniebox.get_setting('spotify', 'token_expires')) if token_expires < datetime.datetime.now(): self.refresh_access_token() return self.get_access_token() return access_token def new_access_token(self, code): clientId = self.luniebox.get_setting('spotify', 'client_id') clientSecret = self.luniebox.get_setting('spotify', 'client_secret') redirectUri = self.luniebox.get_setting('spotify', 'redirect_uri') credentials = clientId + ":" + clientSecret headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8') } formData = { 'code': code, 'redirect_uri': redirectUri, 'grant_type': 'authorization_code' } response = requests.post( 'https://accounts.spotify.com/api/token', headers=headers, data=formData) tokenData = response.json() self.luniebox.set_setting( 'spotify', 'access_token', tokenData['access_token']) self.luniebox.set_setting('spotify', 'refresh_token', tokenData['refresh_token']) self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now( ) + datetime.timedelta(seconds=tokenData['expires_in']))) def refresh_access_token(self): clientId = self.luniebox.get_setting('spotify', 'client_id') clientSecret = self.luniebox.get_setting('spotify', 'client_secret') refresh_token = self.luniebox.get_setting('spotify', 'refresh_token') if not refresh_token: return False credentials = clientId + ":" + clientSecret headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Basic ' + str(base64.b64encode(credentials.encode('utf-8')), 'utf-8') } formData = { 'refresh_token': refresh_token, 'grant_type': 'refresh_token' } response = requests.post( 'https://accounts.spotify.com/api/token', headers=headers, data=formData) tokenData = response.json() self.luniebox.set_setting( 'spotify', 'access_token', tokenData['access_token']) self.luniebox.set_setting('spotify', 'token_expires', str(datetime.datetime.now( ) + datetime.timedelta(seconds=tokenData['expires_in']))) def api_call(self, url): access_token = self.get_access_token() headers = { 'Authorization': 'Bearer ' + access_token } return requests.get( 'https://api.spotify.com/v1' + url, headers=headers) def search(self, query, types, limit, offset=0): if limit > 50: limit = 50 response = self.api_call( '/search?' + urlencode({'q': query, 'type': ','.join(types), 'limit': limit, 'offset': offset})) if response.status_code == 200: return response.json() elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.search(query, types, limit, offset) return False def playback_state(self): response = self.api_call('/me/player') if response.status_code == 200: return response.json() if response.status_code == 204: return {"not_active": True} elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.playback_state() return False def transfer_playback(self): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } data = { 'device_ids': [device_id], 'play': False } response = requests.put( 'https://api.spotify.com/v1/me/player', headers=headers, data=json.dumps(data)) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.transfer_playback() logging.getLogger('luniebox').warn( "error on spotify transfer playback", response, response.text) return False def is_active(self): self.transfer_playback() state = self.playback_state() device_id = self.luniebox.get_setting('spotify', 'device_id') if not state: return False return 'device' in state and 'id' in state['device'] and state['device']['id'] == device_id def devices(self): response = self.api_call('/me/player/devices') return response.json() def play(self, uri): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } data = {} if uri: if uri.startswith('spotify:track:') or uri.startswith('spotify:episode:'): data = { "uris": [uri] } else: data = { "context_uri": uri } response = requests.put( 'https://api.spotify.com/v1/me/player/play?' + urlencode({'device_id': device_id}), headers=headers, data=json.dumps(data)) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.play(uri) logging.getLogger('luniebox').warn( "error on spotify play", response, response.text) return False def pause(self): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } response = requests.put( 'https://api.spotify.com/v1/me/player/pause?' + urlencode({'device_id': device_id}), headers=headers) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.pause() logging.getLogger('luniebox').warn( "error on spotify pause", response, response.text) return False def volume(self, value): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } if value < 0: value = 0 if value > 100: value = 100 response = requests.put( 'https://api.spotify.com/v1/me/player/volume?' + urlencode({'device_id': device_id, 'volume_percent': value}), headers=headers) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.volume(value) logging.getLogger('luniebox').warn( "error on spotify volume", response, response.text) return False def seek(self, position): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } if position < 0: position = 0 response = requests.put( 'https://api.spotify.com/v1/me/player/seek?' + urlencode({'device_id': device_id, 'position_ms': position}), headers=headers) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.seek(position) logging.getLogger('luniebox').warn( "error on spotify seek", response, response.text) return False def rewind(self, seconds): state = self.playback_state() if state and 'progress_ms' in state: progress = int(state['progress_ms']) progress -= seconds * 1000 return self.seek(progress) return False def fastforward(self, seconds): state = self.playback_state() if state and 'progress_ms' in state: progress = int(state['progress_ms']) progress += seconds * 1000 return self.seek(progress) return False def previous(self): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } response = requests.post( 'https://api.spotify.com/v1/me/player/previous?' + urlencode({'device_id': device_id}), headers=headers) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.previous() logging.getLogger('luniebox').warn( "error on spotify previous", response, response.text) return False def next(self): device_id = self.luniebox.get_setting('spotify', 'device_id') access_token = self.get_access_token() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + access_token } response = requests.post( 'https://api.spotify.com/v1/me/player/next?' + urlencode({'device_id': device_id}), headers=headers) if response.status_code == 204 or response.status_code == 202: return True elif response.status_code == 404 and self.luniebox.spotify_connect(restart=True): return self.next() logging.getLogger('luniebox').warn( "error on spotify next", response, response.text) return False