In [0]:
In [1]:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# ### Algorithm:
# 1. HPSS => Harmonics, Percussives
# 1. Percussives => peak pick => noise generator
# 1. Harmonics => CQT
# 1. CQT => Bass range => peak pick => triangle generator
# 1. CQT => Treble range => peak pick => harmony => pulse generator
import argparse
import sys
import os
import time
import librosa
import numpy as np
import scipy.signal
import scipy.ndimage
import functools
import warnings
warnings.filterwarnings("ignore")
# MAGIC NUMBERS
sr = 22050
MIDI_MIN = 18
MIDI_MAX = 96
fmin = librosa.midi_to_hz(MIDI_MIN)
n_fft = 2048
hop_length = 512
TREBLE_MIN = 54 - MIDI_MIN
TREBLE_MAX = MIDI_MAX - MIDI_MIN
BASS_MIN = 24 - MIDI_MIN
BASS_MAX = TREBLE_MIN + 24
def triangle(*args, **kwargs):
'''Synthesize a triangle wave'''
v = scipy.signal.sawtooth(*args, **kwargs)
return 2 * np.abs(v) - 1.
def nes_triangle(*args, **kwargs):
'''Synthesize a quantized NES triangle'''
# http://wiki.nesdev.com/w/index.php/APU_Triangle
# NES triangle is quantized to 16 values
w = triangle(*args, **kwargs)
qw = w - np.mod(w, 2./15)
return qw
def noise(seq):
'''Synthesize binary noise'''
v = np.random.randint(0, 2, size=(len(seq),))
return 2 * v - 1.
def quantize_values(X, v=15):
'''Quantize values to at most v discrete values'''
X = X - X.min()
X = X / X.max()
X = X - np.mod(X, 1./v)
return X
def synthesize(beats, piano_roll, fmin=0, bins_per_octave=12,
tuning=0.0, wave=None, n=None):
'''Synthesize a weighted piano roll'''
# Quantize the piano roll
sr = 22050
piano_roll = quantize_values(piano_roll)
if wave is None:
wave = functools.partial(scipy.signal.square, duty=0.5)
bins_per_semi = bins_per_octave/12
first_bin = bins_per_semi/2
frequencies = librosa.cqt_frequencies(n_bins=piano_roll.shape[0],
fmin=fmin,
bins_per_octave=bins_per_octave,
tuning=tuning)
beats -= beats[0]
if n is None:
n = beats[-1] + 0.5 * sr
beats = librosa.util.fix_frames(beats, x_min=0, x_max=n)
beat_intervals = librosa.util.frame(beats, frame_length=2, hop_length=1).T
output = np.zeros(n)
correction = 2.0 ** (tuning / bins_per_octave)
stream = correction * 2.0 * np.pi * np.arange(len(output)) / sr
active_bins = piano_roll.sum(axis=1) > 0
for n, freq in enumerate(frequencies):
if not active_bins[n * bins_per_semi + first_bin]:
continue
my_f = freq * stream
sine = wave(my_f)
# Align beat timings to zero crossings of sine
zc_mask = librosa.zero_crossings(sine)
beat_f = match_zc(beat_intervals, zc_mask, freq * correction, sr)
# Mask out this frequency wherever it's inactive
for m, (start, end) in enumerate(beat_f):
sine[start:end] *= piano_roll[n*bins_per_semi + first_bin, m]
output += sine
output = librosa.util.normalize(output)
return output, sr
def match_zc(queries, zc_mask, my_f, sr):
# For each query, bound define a search range
window = int(np.ceil(sr / my_f))
output = np.empty(queries.size, dtype=int)
for i, q in enumerate(queries.ravel()):
s = np.maximum(0, q - window)
t = np.minimum(len(zc_mask), q + window)
vals = s + np.flatnonzero(zc_mask[s:t])
output[i] = vals[np.argmin(np.abs(q - vals))]
return output.reshape(queries.shape)
def peakgram(C, max_peaks=1, note_search=8):
'''Compute spectrogram column-wise peaks subject to constraints'''
mask = np.zeros_like(C)
for t in range(C.shape[1]):
if t == 0 or not np.any(mask[:, t-1]):
col = C[:, t]
else:
col = np.min(C[:, t]) * np.ones(C.shape[0])
# Find peaks in the previous column
# zero out anything outside of +- note_search from a peak
for v in np.argwhere(mask[:, t-1]):
r_min = max(0, v-note_search)
r_max = min(col.shape[0], v+note_search+1)
col[r_min:r_max] = C[r_min:r_max, t]
# Local normalization
z = col.max()
if z > 0:
col = col / z
# Don't look for 2nds or 11ths
# Compute max over an octave range, +- 3 semitones
peaks = librosa.util.peak_pick(col, 3, 3, 6, 6, 1e-2, 3)
if len(peaks) == 0:
continue
# Order the peaks by loudness
idx = np.argsort(col[peaks])[::-1]
peaks = peaks[idx]
# Clip to max_peaks
peaks = peaks[:min(len(peaks), max_peaks)]
# If there are any peaks, pick them out
mask[peaks, t] = 1
return mask
def dft_normalize(c):
D = np.fft.rfft(c, axis=0)
D = D / (1e-8 + np.mean(np.abs(D)**2, axis=1, keepdims=True)**0.5)
cinv = np.fft.irfft(D, axis=0)
return np.clip(cinv, 0, None)
def process_audio(y):
'''load the audio, do feature extraction'''
# Get the harmonic and percussive components
y_harm, y_perc = librosa.effects.hpss(y)
# compute CQT
cq = librosa.cqt(y_harm, sr, fmin=fmin, n_bins=MIDI_MAX-MIDI_MIN, hop_length=hop_length)
cq = dft_normalize(cq)
"""
# Trim to match cq and P shape
P = (librosa.feature.rmse(y=y_perc, hop_length=hop_length)
/ librosa.feature.rmse(y=y, hop_length=hop_length))
P[~np.isfinite(P)] = 0.0
"""
duration = cq.shape[-1]
# P = librosa.util.fix_length(P, duration, axis=-1)
cq = librosa.util.fix_length(cq, duration, axis=-1)
return y, cq, y_perc
def get_wav(cq, nmin=60, nmax=120, width=5, max_peaks=1, wave=None, n=None):
# Slice down to the bass range
cq = cq[nmin:nmax]
# Pick peaks at each time
mask = peakgram(librosa.logamplitude(cq**2, top_db=60, ref_power=np.max),
max_peaks=max_peaks)
# Smooth in time
mask = scipy.ndimage.median_filter(mask,
size=(1, width),
mode='mirror')
# resynthesize with some magnitude compression
wav = synthesize(librosa.frames_to_samples(np.arange(cq.shape[-1]),
hop_length=hop_length),
mask * cq**(1./3),
fmin=librosa.midi_to_hz(nmin + MIDI_MIN),
bins_per_octave=12,
wave=wave,
n=n)[0]
return wav
def get_drum_wav(percussion, width=5, n=None):
# Compute volume shaper
percussion = librosa.util.normalize(percussion.ravel())
v = scipy.ndimage.median_filter(percussion,
width,
mode='mirror')
v = np.atleast_2d(v)
wav = synthesize(librosa.frames_to_samples(np.arange(v.shape[-1]),
hop_length=hop_length),
v,
fmin=librosa.midi_to_hz(0),
bins_per_octave=12,
wave=noise,
n=n)[0]
return wav
def process_arguments(args):
parser = argparse.ArgumentParser(description='Auto Chip Tune')
parser.add_argument('input_file',
action='store',
help='Path to the input audio file')
parser.add_argument('output_file',
action='store',
help='Path to store the generated chiptune')
parser.add_argument('-s', '--stereo',
action='store_true',
default=False,
help='Mix original and synthesized tracks in stereo')
return vars(parser.parse_args(args))
In [2]:
'''Robust PCA in python.'''
# CREATED:2013-04-29 08:50:44 by Brian McFee <brm2132@columbia.edu>
#
import numpy as np
import scipy
import scipy.linalg
import scipy.weave
def nuclear_prox(A, r=1.0):
'''Proximal operator for scaled nuclear norm:
Y* <- argmin_Y r * ||Y||_* + 1/2 * ||Y - A||_F^2
Arguments:
A -- (ndarray) input matrix
r -- (float>0) scaling factor
Returns:
Y -- (ndarray) if A = USV', then Y = UTV'
where T = max(S - r, 0)
'''
U, S, V = scipy.linalg.svd(A, full_matrices=False)
T = np.maximum(S - r, 0.0)
Y = (U * T).dot(V)
return Y
def l1_prox(A, r=1.0, nonneg=False):
'''Proximal operator for entry-wise matrix l1 norm:
Y* <- argmin_Y r * ||Y||_1 + 1/2 * ||Y - A||_F^2
Arguments:
Arguments:
A -- (ndarray) input matrix
r -- (float>0) scaling factor
nonneg -- (bool) retain only the non-negative portion
Returns:
Y -- (ndarray) Y = A after shrinkage
'''
Y = np.zeros_like(A)
numel = A.size
shrinkage = r"""
int NN = (int)nonneg;
for (int i = 0; i < numel; i++) {
Y[i] = 0;
if (A[i] - r > 0) {
Y[i] = A[i] - r;
} else if (!NN && (A[i] + r <= 0)) {
Y[i] = A[i] + r;
}
}
"""
scipy.weave.inline(shrinkage, ['numel', 'A', 'r', 'Y', 'nonneg'])
return Y
def robust_pca_cost(Y, Z, alpha):
'''Get the cost of an RPCA solution.
Arguments:
Y -- (ndarray) the low-rank component
Z -- (ndarray) the sparse component
alpha -- (float>0) the balancing factor
Returns:
total, nuclear_norm, l1_norm -- (list of floats)
'''
nuclear_norm = scipy.linalg.svd(Y, full_matrices=False, compute_uv=False).sum()
l1_norm = np.abs(Z).sum()
return nuclear_norm + alpha * l1_norm, nuclear_norm, l1_norm
def robust_pca(X, alpha=None, max_iter=100, verbose=False, nonneg=False):
'''ADMM solver for robust PCA.
min_Y ||Y||_* + alpha * ||X-Y||_1
Arguments:
X -- (ndarray) input data (d-by-n)
alpha -- (float>0) weight of the l1 penalty
if unspecified, defaults to 1.0 / sqrt(max(d, n))
nonneg -- (bool) constrain to non-negative noise
Returns:
Y -- (ndarray) low-rank component of X
Z -- (ndarray) sparse component of X
diags -- (dict) diagnostic output
'''
RHO_MIN = 1e-1
RHO_MAX = 1e6
MAX_RATIO = 1e1
SCALE_FACTOR = 3.0e0
ABS_TOL = 1e-4
REL_TOL = 1e-3
# update rules:
# Y+ <- nuclear_prox(X - Z - W, 1/rho)
# Z+ <- l1_prox(X - Y - W, alpha/rho)
# W+ <- W + Y + Z - X
# Initialize
rho = RHO_MIN
Y = X.copy()
Z = np.zeros_like(X)
W = np.zeros_like(X)
norm_X = scipy.linalg.norm(X)
if alpha is None:
alpha = max(X.shape)**(-0.5)
m = X.size
_DIAG = {
'err_primal': [],
'err_dual': [],
'eps_primal': [],
'eps_dual': [],
'rho': []
}
for t in range(max_iter):
Y = nuclear_prox(X - Z - W, 1.0/ rho)
Z_old = Z.copy()
Z = l1_prox(X - Y - W, alpha / rho, nonneg)
if nonneg:
Z = np.maximum(Z, X)
residual_pri = Y + Z - X
residual_dual = Z - Z_old
res_norm_pri = scipy.linalg.norm(residual_pri)
res_norm_dual = rho * scipy.linalg.norm(residual_dual)
W = W + residual_pri
eps_pri = np.sqrt(m) * ABS_TOL + REL_TOL * max(scipy.linalg.norm(Y), scipy.linalg.norm(Z), norm_X)
eps_dual = np.sqrt(m) * ABS_TOL + REL_TOL * scipy.linalg.norm(W)
_DIAG['eps_primal'].append(eps_pri)
_DIAG['eps_dual' ].append(eps_dual)
_DIAG['err_primal'].append(res_norm_pri)
_DIAG['err_dual' ].append(res_norm_dual)
_DIAG['rho' ].append(rho)
if res_norm_pri <= eps_pri and res_norm_dual <= eps_dual:
break
if res_norm_pri > MAX_RATIO * res_norm_dual and rho * SCALE_FACTOR <= RHO_MAX:
rho = rho * SCALE_FACTOR
W = W / SCALE_FACTOR
elif res_norm_dual > MAX_RATIO * res_norm_pri and rho / SCALE_FACTOR >= RHO_MIN:
rho = rho / SCALE_FACTOR
W = W * SCALE_FACTOR
if verbose:
if t < max_iter - 1:
print 'Converged in %d steps' % t
else:
print 'Reached maximum iterations'
Y = X - Z
_DIAG['cost'] = robust_pca_cost(Y, Z, alpha)
return (Y, Z, _DIAG)
In [3]:
# From https://github.com/bmcfee/musichacks/blob/master/2013yyz/frankenmash/mangler.py
# Robust PCA method of vocal removal
def rpca_correct(X, L, S, p=0):
# Recombobulate into an energy partition
L = np.maximum(0.0, L)
L = np.minimum(X, L)
S = X - L
# Wiener-filter to smooth out the edges
if p > 0:
zS = S == 0
S = S ** p
S[zS] = 0.0
zL = L == 0
L = L ** p
L[zL] = 0.0
L[zL & zS] = 0.5
S[zL & zS] = 0.5
Ml = L / (L + S)
Ms = S / (L + S)
return (X * Ml, X * Ms)
else:
return (L, S)
def remove_vocals(y):
# Step 1: compute STFT
D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length).astype(np.complex64)
# Step 2: separate magnitude and phase
S, P = librosa.magphase(D)
S = S / S.max()
tau = (D.shape[0] * 3) / 4
# Step 3: RPCA to separate voice and background
S1, S2, _ = robust_pca(S[:tau,:], max_iter=25)
S1, S2 = rpca_correct(S[:tau,:], S1, S2)
S1 = np.vstack((S1, S[tau:,:]))
del D
del S
del S2
# Step 4: recombine with phase, return vocals
return S1 * P
In [4]:
def autochip(input_file=None, output_file=None, stereo=False):
#global y_treb, y_bass, y_perc, percussion
print 'Processing {:s}'.format(os.path.basename(input_file))
y, sr = librosa.load(input_file)
""" TODO: remove vocals in percussion layer
print "remove vox"
y_novox = remove_vocals(y)
print "leny"
leny = len(y)
print "process audio"
y, cq, _ = process_audio(y) # include vocals in harmony
_, _, y_perc = process_audio(y_novox) # build the percussion from the version without vocals
del y
"""
leny = len(y)
y, cq, y_perc = process_audio(y)
del y
print 'Synthesizing squares...'
y_treb = get_wav(cq,
nmin=MIDI_MIN,
nmax=MIDI_MAX,
max_peaks=20,
n=leny)
print 'Synthesizing triangles...'
y_bass = get_wav(cq,
nmin=MIDI_MIN,
nmax=BASS_MAX,
wave=nes_triangle,
max_peaks=4,
n=leny)
del cq
print 'Synthesizing drums...'
#1 11325 5798016
#y_drum = get_drum_wav(percussion, n=len(y))
#y_drum = librosa.istft(percussion, hop_length, n_fft)
#print len(y_bass), len(y_perc), y_perc.size
bits = pow(2,2)
bitcrush_ = np.vectorize(lambda x: round(x*bits)/bits)
y_perc = bitcrush_(y_perc)
print 'Mixing... '
y_chip = librosa.util.normalize(0.5 * y_treb +
0.7 * y_bass +
0.8 * y_perc) # took the drum out
del y_treb
del y_bass
del y_perc
"""
bitcrush = np.vectorize(lambda x, bits: floor(x*bits)/bits)
y_chip = y_treb + y_bass
del y_treb
del y_bass
y_chip = bitcrush(y_chip, pow(2,16))
y_perc = bitcrush(y_perc,1.2)
y_chip = (y_chip) + (0.3 * y_perc)
del y_perc
y_chip = librosa.util.normalize(y_chip)
#y_chip = bitcrush(y_chip, pow(2,8))
"""
if stereo:
y_out = np.vstack([librosa.util.normalize(y), y_chip])
else:
y_out = y_chip
del y_chip
librosa.output.write_wav(output_file, y_out, sr)
del y_out
print 'Done.'
In [5]:
#!/usr/bin/env python
# encoding: utf=8
"""
offliberty gnabber
downloads tracks from soundcloud, always 128k mp3
cortexel.us 2012
"""
import urllib2, urllib, re, json
usage = "Offliberty Gnabber. Downloads tracks from soundcloud, youtube, etc.\n\
Usage: offliberty.py url\
Example: offliberty.py http://soundcloud.com/chopshopshockshack/mt-analogue-you-know-what-time"
def gnab_download(url, file_name):
print "gnab download.."
url = url.replace("https","http")
#print "url: %s / filename: %s" % (url, file_name)
print "opening url", url
u = urllib2.urlopen(url)
#print u
#print "opening filename", file_name
f = open(file_name, 'wb')
#print f
meta = u.info()
#print meta
file_size = False
if(meta.getheaders("Content-Length")):
file_size = int(meta.getheaders("Content-Length")[0])
#print file_size
#print "Downloading: %s \nBytes: %s" % (file_name, file_size)
#if(file_size >= self.MAX_FILE_SIZE):
# raise Exception("File size too big!! %i > %i " % [file_size, self.MAX_FILE_SIZE])
file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
if(file_size):
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
else:
status = r"%10d" % (file_size_dl)
status = status + chr(8)*(len(status)+1)
#print status,
print "\n"
f.close()
#gnab_download('https://i1.sndcdn.com/avatars-000008456211-qo1al2-large.jpg', './dadabots/art/ᴄЯͧtxĽz\n.avatar.jpg')
def gnabsong(url, filename):
gnab_download(geturl(url), filename)
# given url, grabs mp3 using offliberty
def geturl(song_url):
if "soundcloud.com" in song_url:
return get_soundcloud_song(song_url)
off_url = "http://offliberty.com/off02.php"
req = urllib2.Request(off_url)
#res = urllib2.urlopen(req)
print sc_url
req.add_header("User-agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36")
req.add_header("Referer","http://offliberty.com/")
data = { 'track' : sc_url, 'refext' : ""}
req.add_data(urllib.urlencode(data))
res = urllib2.urlopen(req)
offpage = res.read(4000)
if("You can't use Offliberty so often" in offpage):
print "Offliberty limit reached. . sleeping for one hour"
sys.stdout.flush()
time.sleep(60*60)
url_list = re.findall('(?:https?://|www.)[^"\' ]+', offpage)
return url_list[0]
def get_soundcloud_song(song_url):
# given url, grabs mp3 using offliberty
api_url = "https://api.soundcloud.com/resolve.json?" + urllib.urlencode([("url",song_url), ("client_id",client_id)])
req = urllib2.Request(api_url)
#res = urllib2.urlopen(req)
print song_url, api_url
#req.add_header("User-agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36")
#req.add_header("Referer","http://offliberty.com/")
result = json.loads(urllib2.urlopen(req).read())
mp3_url = result["stream_url"] + "?client_id=" + client_id
return mp3_url
In [5]:
In [6]:
from PIL import Image
import PIL.ImageOps
import PIL.ImageFilter as ImageFilter
import ImageChops
# resizes image so its all blurred
def autochip_art(bot,track_art):
image = Image.open(track_art)
image = image.resize((16,16),Image.NEAREST).resize((800,800),Image.NEAREST)
new_track_art = track_art[:-3] + "rmx.autochip." + track_art[-3:]
image.save(new_track_art)
return new_track_art
# takes art, finds edges, creates horz and vert symmetry
def art_florp(bot,track_art):
image = Image.open(track_art)
image = image.filter(ImageFilter.FIND_EDGES)
image = Image.blend(Image.blend(image, image.rotate(90), 0.5), Image.blend(image.rotate(180), image.rotate(270), 0.5), 0.5)
new_track_art = track_art[:-3] + "rmx.florp." + track_art[-3:]
image.save(new_track_art)
return new_track_art
# simple horz flip, kinda dumb
def horzflip(bot,art):
image = Image.open(art)
image = Image.blend(image, image.rotate(180), 0.5)
new_art = art[:-3] + "rmx.horzflip." + art[-3:]
image.save(new_art)
return new_art
#octogon flip
def octoflip(bot,art):
image = Image.open(art)
image = Image.blend(Image.blend(image, image.rotate(90), 0.5), Image.blend(image.rotate(180), image.rotate(270), 0.5), 0.5)
image = Image.blend(image, image.rotate(45), 0.5)
image = image.filter(ImageFilter.SHARPEN)
image = image.filter(ImageFilter.SHARPEN)
new_art = art[:-3] + "rmx.octo." + art[-3:]
image.save(new_art)
return new_art
In [6]:
In [6]:
In [7]:
#!/usr/bin/env python
# encoding: utf-8
"""
DadaBot.py
Class for soundcloud bots.
By CJ Carr [cortexel.us]
Zack Zukowski
"""
import soundcloud
import time
import sys
import os.path
import urllib2
from random import choice
import random
from random import randint
from requests.exceptions import HTTPError
import pickle
import copy
import subprocess
from random import shuffle
usage = "Use it correctly, not incorrectly"
class DadaBot:
def __init__(bot):
bot.client = None # need to call connectSC()
bot.bot_user = None # need to call connectSC()
bot.followers = None # list of all followers
bot.follower = None # follower sc object whose music is remixed
bot.track = None # track sc object to be remixed
bot.track_mp3 = "" # file name of downloaded song to be remixed
bot.track_art = "" # filename of track art downloaded
bot.user_art = "" # filename of user art downloaded
bot.always_find_new_tracks = True
# If True, will not remix a track which has been favorited by the bot
# If False, all tracks are fair game.
bot.creative_commons_only = True
# ignores tracks which aren't creative commons
bot.followerid = 0
bot.trackid = 0
bot.remix_trackid = 0
bot.bot_userid = 0
bot.remix_track = None # track sc of new remix track
bot.remix_title = "" # title of new remix track
bot.remix_mp3 = "" # filename of new remix
bot.remix_artwork = "" # filename of remixed artwork
bot.remix_completed = False
bot.remix_process_call = ""
# DEFAULT info
bot.username = "****"
bot.password = "******"
bot.genre = "chiptune"
bot.tag = "autochiptune" # for file names example: "song.rmx.weev.mp3"
# unicode("z ", errors='ignore')# unicode(" ", errors='ignore') # unicode("\u0290 ", errors='replace') #" " #.decode("utf-8", "replace")
bot.MAX_TRACK_DURATION = 1000 * 60 * 8 # no tracks longer than 8 minutes
bot.MIN_TRACK_DURATION = 10 * 1000 # minimum 10 seconds
bot.MAX_FILE_SIZE = 10000000
bot.comments = [
'autochiptune_remix: %s',
]
def connect(bot, *args):
global client_id, client_secret
if len(args)==2:
username = args[0]
password = args[1]
else:
username = bot.username
password = bot.password
# intiialize
print "Initializing soundcloud.Client . . . "
bot.client = soundcloud.Client(
client_id=client_id,
client_secret=client_secret,
username=username,
password=password)
bot.bot_user = bot.client.get('/me')
bot.bot_userid = bot.bot_user.id
# Get a list of all the given user's followers. stop at
def get_followers(bot, user_id, max_limit):
followers = []
url = ('/users/%s/followers') % user_id
# soundcloud api only gives you 200 at a time, so we need to loop through with next_href
# see: https://developers.soundcloud.com/docs#pagination
while len(followers)<max_limit:
result = bot.client.get(url, limit=200, linked_partitioning=1)
followers.extend(result.obj["collection"])
if(result.next_href):
url = result.next_href
else:
break
return followers
# follows all the followers' followers
def amicabilify(bot, user_id):
global followers
print "Following some of the followers' followers . . . "
#print user_id
limit = 10
followers = bot.get_followers(user_id,200)
#print followers
shuffle(followers)
followers = followers[:10]
try:
for follower in followers:
bot.client.put('/me/followings/%d' % follower['id'])
except:
print "failed at following the followers' followers"
print sys.exc_info()
traceback.print_exc(file=sys.stdout)
return followers
def like_track(bot, track):
print "Liking track . . . "
bot.client.put('/me/favorites/%d' % track.id)
# gets the user's list of favorite tracks
def get_favorites(bot, user):
# return bot.client.get('/users/%i/favorites' % user.id)
favorites = []
url = ('/users/%s/favorites' % user.id)
while True:
result = bot.client.get(url, limit=200, linked_partitioning=1)
favorites.extend(result.obj["collection"])
if('next_href' in result.obj):
url = result.obj['next_href']
else:
break
return favorites
# returns true if user has favorited track
def is_track_favorited(bot, track):
for fav in bot.favorites:
if track.id == fav['id']:
return True
return False
# if bot has remixed this song before, returns false
# (based on the assumption that the bot liked the track after posting the remix)
def havent_heard_this_before(bot, track):
return not bot.is_track_favorited(track)
# returns True if track has creative commons license
def is_creative_commons(bot, track):
return track.license[:2] == "cc"
def download_mp3_and_art(bot):
url = bot.track.permalink_url
print "url", url
p = bot.track.permalink_url.split('/')
bot.track_mp3 = './dadabots/mp3/' + p[-2] + "_" + p[-1] + ".mp3"
bot.remix_mp3 = './dadabots/mp3/' + p[-2] + "_" + p[-1] + ".rmx." + bot.tag + ".mp3"
bot.remix_track_art = ''
bot.remix_user_art = ''
bot.track_art = './dadabots/art/' + p[-2] + "_" + p[-1] + ".jpg"
bot.user_art = './dadabots/art/' + p[-2] + ".avatar.jpg"
try:
gnabsong(url, bot.track_mp3)
except:
print "!! Error. Song isn't available. %s %s\n\n" % (url, bot.track_mp3)
print sys.exc_info()
traceback.print_exc(file=sys.stdout)
return False
print "bot.track.artwork_url", bot.track.artwork_url
if(bot.track.artwork_url):
bot.track.artwork_url = bot.track.artwork_url.replace("https://","http://")
print "artwork_url", bot.track.artwork_url
gnab_download(bot.track.artwork_url, bot.track_art)
#print "!! Failed to download trackartwork: " + bot.track.artwork_url
#bot.track_art = ""
else:
bot.track_art = ""
if(bot.follower['avatar_url']):
bot.follower['avatar_url'] = bot.follower['avatar_url'].replace("https://","http://")
print "avatar_url", bot.follower['avatar_url']
gnab_download(bot.follower['avatar_url'], bot.user_art)
#print "!! Failed to download trackartwork: " + bot.follower['avatar_url']
#bot.user_art = ""
else:
bot.user_art = ""
print "\n"
print "MP3: " + bot.track_mp3
print "TRACK_ART: " + bot.track_art
print "USER_ART: " + bot.user_art
return True
def remix(bot):
# and here is where a large security flaw lays dormant
#print "About to call the following process: " + bot.remix_process_call
print "Calling autochip(%s, %s)" % (bot.track_mp3, bot.remix_mp3)
sys.stdout.flush()
#cmdline = ['avconv','-i', bot.track_mp3, bot.track_mp3 + ".wav"]
#subprocess.call(cmdline)
autochip(bot.track_mp3, bot.remix_mp3, False)
#bot.remix_mp3 = bot.track_mp3
#subprocess.call(bot.remix_process_call % (bot.track_mp3, bot.remix_mp3))
bot.remix_completed = True
# have we completed remixing yet?
def already_remixed(bot):
return bot.remix_completed and os.path.isfile(bot.track_mp3)
def grab_track(bot, url):
bot.track = bot.client.get('/resolve',url=url)
bot.follower = bot.client.get('/users/'+str(bot.track.user_id))
if bot.download_mp3_and_art():
bot.trackid = bot.track.id
bot.followerid = bot.follower.id
return True
else:
raise Exception ("BAD URL")
#algorithm for choosing which track to download
def find_track(bot):
bot.favorites = bot.get_favorites(bot.bot_user)
bot.followers = bot.get_followers(bot.bot_user.id,2000)
shuffle(bot.followers)
track_found = False
for follower in bot.followers:
print "Searching " + follower['username'] + "'s tracks . . . "
#order='hotness' #filter='downloadable' #license='
tracks = bot.client.get('/users/' + str(follower['id']) + '/tracks', order='created_at', limit=200)
tracktitles = []
for t in tracks:
tracktitles.append(t.title)
print "Titles: ", tracktitles
tracks = filter(lambda track:
((bot.is_creative_commons(track) if bot.creative_commons_only else True) and
(bot.havent_heard_this_before(track) if bot.always_find_new_tracks else True) and
track.duration <= bot.MAX_TRACK_DURATION and
track.duration >= bot.MIN_TRACK_DURATION), tracks)
shuffle(tracks)
tracktitles = []
for t in tracks:
tracktitles.append(t.title)
print "Filtered titles: ", tracktitles
for track in tracks:
print "Hmm.. ", track.title
bot.follower = follower
bot.track = track
if bot.download_mp3_and_art():
track_found = True
break
"""else:
sys.stdout.write("Keep searching through " + follower.username + "'s tracks?")
choice = raw_input().lower()
if choice=="y" or choice=="yes":
continue
else:
break
"""
if track_found:
break
if not track_found:
raise Exception( "NO TRACKS!!!!!!!!!!" )
print "\n"
bot.trackid = bot.track.id
bot.followerid = bot.follower['id']
return True
def dump_intention(bot):
intention_filename = "./dadabots/intn/" + bot.track_mp3[6:-4] + "." + bot.tag + ".intention"
# an intention is a copy of the bot object
# except it does not contain the soundcloud objects (because this causes a stack overflow)
intention = copy.copy(bot)
intention.follower = None
intention.followers = None
intention.track = None
intention.remix_track = None
intention.client = None
intention.bot_user = None
file = open(intention_filename, "wb" )
pickle.dump(intention, file)
file.close()
print intention_filename
return intention_filename
@staticmethod
def load(intention_filename):
file = open( intention_filename, "rb" )
sys.setrecursionlimit(10000)
bot = pickle.load(file)
file.close()
bot.connect(bot.username, bot.password)
bot.track = bot.client.get('/tracks/'+str(bot.trackid))
bot.follower = bot.client.get('/users/'+str(bot.track.user_id))
return bot
def post_remix(bot):
print "Uploading remix . . . "
print "Remix track art", bot.remix_track_art
if(bot.remix_track_art):
art_bytes = open(bot.remix_track_art, 'rb')
elif(bot.remix_user_art):
art_bytes = open(bot.remix_user_art, 'rb')
else:
art_bytes = ""
if not bot.remix_description:
bot.remix_description = '%s remix of %s' % (bot.tag, bot.track.permalink_url)
try:
bot.remix_track = bot.client.post('/tracks', track={
'title': bot.remix_title,
'asset_data': open(bot.remix_mp3, 'rb'),
'sharing': 'public',
'description' : bot.remix_description,
'genre' : bot.genre,
'artwork_data' : art_bytes
})
except HTTPError as err:
print "Err", err
if "422" in err:
raise Exception("Error uploading track to SoundCloud. Account has probably reached upload limit. Limit=3 hours for standard SoundCloud accounts")
else:
raise
except:
print "Exception:", sys.exc_info()[0]
raise
# print track link
print bot.remix_track.permalink_url
return bot.remix_track
def update_avatar(bot,image):
print "Updating avatar..." + image
user = bot.client.put('/me', user={
'avatar_data' :open(image, 'rb')
})
return user
# returns a comment
def comment(bot, url):
comment = choice(bot.comments) % url
return comment
def main(bot):
print "hi"
bot.creative_commons_only = False
#connect to soundcloud API
bot.connect(bot.username, bot.password)
# grab track from this follower
# downloads mp3, track art, and user art, and returns filenames
bot.find_track()
# bot.grab_track("https://soundcloud.com/leeloo0905/in-the-eye")
# update bot user art
if(bot.user_art):
bot.remix_user_art = autochip_art(bot, bot.user_art)
bot.update_avatar(bot.remix_user_art)
# remix track art
if(bot.track_art):
print "Remixing track art", bot.track_art
bot.remix_track_art = autochip_art(bot, bot.track_art)
else:
print "No track art to remix"
bot.remix_description ="Automatic chiptune remix bot.\nFollow if you want your tracks remixed.\nUnfollow if you don't."
bot.remix()
#bot.remix_mp3 = bot.track_mp3
# remix title
bot.remix_title = "%s: %s [autochip rmx]" % tuple([bot.follower['username'], bot.track.title])
print "Remix title: "+bot.remix_title
sys.stdout.flush()
# POST remix
remix = bot.post_remix()
# delete all the files
print "deleting tracks"
!rm {bot.remix_mp3}
!rm {bot.track_mp3}
!rm {bot.remix_track_art}
!rm {bot.remix_user_art}
!rm {bot.user_art}
!rm {bot.track_art}
# follow all the follower's followers
# off for now
bot.amicabilify(bot.follower['id'])
# like track
bot.like_track(bot.track)
# comment on original track
comment_string = bot.comment(bot.remix_track.permalink_url)
print "Commenting . . . " + comment_string + "\n"
track_comment = bot.client.post('/tracks/%d/comments' % bot.track.id, comment={
'body': comment_string,
'timestamp': random.randint(0,bot.track.duration-1)
})
# comment on remix
remix_comment_string = "Original: " + bot.track.permalink_url
print "Commenting . . . " + remix_comment_string + "\n"
track_comment = bot.client.post('/tracks/%d/comments' % bot.remix_track.id, comment={
'body': remix_comment_string,
'timestamp': 100
})
# delete old mixes
print "bye"
return True
In [7]:
In [ ]:
In [ ]:
import gc
import traceback
bot = DadaBot()
gc.enable()
client_id = '***********************'
client_secret = '*********************'
while True:
print "Autochiptune remix."
tic = time.time()
bot = DadaBot()
try:
bot.main()
except:
print "Unexpected error:", sys.exc_info()
traceback.print_exc(file=sys.stdout)
del bot
toc = time.time()
print "sleeping.."
sys.stdout.flush()
gc.collect()
#time.sleep(1) # Delay
time.sleep(10) #delay
print "awake."
In [ ]:
In [ ]:
In [ ]: