Skip to content

Commit

Permalink
Auto beat generation for a more consistent beat. Beat signal every se…
Browse files Browse the repository at this point in the history
…cond beat, bar signal every 16th beat
  • Loading branch information
scheb committed Apr 26, 2019
1 parent 61f6618 commit 9c20eca
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 76 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ Automatically listens to default audio input device.

OSC signals are sent to `localhost:7701`, can be changed in `beatDetector.py`.

- `/beat` is sent each time a beat is detected. Ideal for fast light change.
- `/bar` is sent every 8 beats. Ideal for less frequent light change.
- `/beat` is sent for every beat detected, or every 2nd beat when auto generating beats. Ideal for fast light change.
- `/bar` is sent every 16 beats. Ideal for less frequent light change.

## Acknowledgments

Expand Down
2 changes: 1 addition & 1 deletion beatDetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BeatDetector:
ui: ui.UserInterface
osc_client: osc.OscClient
input_recorder: InputRecorder
timer_period = 16.3333333
timer_period = 16.66666
bar_modulo = 8

def __init__(self, window) -> None:
Expand Down
228 changes: 156 additions & 72 deletions bpm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import ui
import sys
from PyQt4 import QtCore, QtGui
from PyQt4.QtCore import QTimer
from PyQt4 import QtCore
from recorder import *
from time import perf_counter
from pythonosc import udp_client

class AudioAnalyzer:
min_bpm = 60
Expand All @@ -19,71 +17,86 @@ class AudioAnalyzer:
bpm_history: list

# Intensity over time
pause_count = 0
y_avg_history: list
low_freq_avg_history: list
low_avg_counter: 0

def __init__(self, input_recorder):
self.input_recorder = input_recorder
self.reset_tracking()
self.callback_beat_detected = lambda: None
self.callback_bpm_changed = lambda: None
self.callback_new_song = lambda: None
self.callback_pause = lambda: None

def reset_tracking(self):
self.current_bpm = 0
self.prev_beat_time = perf_counter()
self.bpm_history = []
self.y_avg_history = []
self.low_freq_avg_history = []
self.low_avg_counter = 0

def analyze_audio(self):
if not self.input_recorder.has_new_audio:
return

self.current_time = perf_counter()

# Get x and y values from FFT
xs, ys = self.input_recorder.fft()

# Calculate average for all frequency ranges
y_avg = numpy.mean(ys)
self.y_avg_history.append(y_avg)

# Calculate low frequency average
low_freq = [ys[i] for i in range(len(xs)) if xs[i] < 500]
low_freq_avg = numpy.mean(low_freq)

# Calculate recent low frequency average
self.low_freq_avg_history.append(low_freq_avg)
recent_low_freq_avg = numpy.mean(self.low_freq_avg_history)

# Calculate bass frequency average
bass = low_freq[:int(len(low_freq) / 2)]
bass_avg = numpy.mean(bass)
# print("bass: {:.2f} vs cumulative: {:.2f}".format(bass_avg, cumulative_avg))

# Check if there is a beat
if (y_avg > 1000 # Minimum intensity
and (
bass_avg > recent_low_freq_avg * 1.5 # Significantly more bass than before
or (
low_freq_avg < y_avg * 1.2 #
and bass_avg > recent_low_freq_avg
)
)
):
self.current_time = perf_counter()
# print(self.curr_time - self.prev_beat)
time_since_last_beat = self.current_time - self.prev_beat_time
if (time_since_last_beat > 60 / self.max_bpm):
self.detect_beat()
self.detect_bpm(time_since_last_beat)
self.prev_beat_time = self.current_time

# Reset tracking if the song has stopped
# Track intensity
y_avg_mean = numpy.mean(self.y_avg_history)
# print(y_avg, y_avg_mean)
if y_avg < 100 or y_avg < y_avg_mean / 50:
if y_avg < y_avg_mean / 50:
self.low_avg_counter += 1
else:
self.low_avg_counter = 0

# Reset tracking if intensity dropped significantly for multiple iterations
if y_avg < 100 or self.low_avg_counter > 8:
print("low avg -> new song")
self.detect_new_song()

# Otherwise do normal beat detection
else:
# Calculate low frequency average
low_freq = [ys[i] for i in range(len(xs)) if xs[i] < 500]
low_freq_avg = numpy.mean(low_freq)

# Calculate recent low frequency average
self.low_freq_avg_history.append(low_freq_avg)
recent_low_freq_avg = numpy.mean(self.low_freq_avg_history)

# Calculate bass frequency average
bass = low_freq[:int(len(low_freq) / 2)]
bass_avg = numpy.mean(bass)
# print("bass: {:.2f} vs cumulative: {:.2f}".format(bass_avg, cumulative_avg))

# Check if there is a beat
if (y_avg > 1000 # Minimum intensity
and (
bass_avg > recent_low_freq_avg * 1.5 # Significantly more bass than before
or (
low_freq_avg < y_avg * 1.2 #
and bass_avg > recent_low_freq_avg
)
)
):
# print(self.curr_time - self.prev_beat)
time_since_last_beat = self.current_time - self.prev_beat_time
if time_since_last_beat > 60 / self.max_bpm:
self.detect_beat(time_since_last_beat)
self.prev_beat_time = self.current_time

# Detect pause in song when missing out more than 4 expected beats
if self.current_bpm > 0 and self.current_time - self.prev_beat_time > 60 / self.current_bpm * 4.5:
self.detect_pause()

self.housekeeping()

def housekeeping(self):
Expand All @@ -99,76 +112,147 @@ def housekeeping(self):
if len(self.bpm_history) > 24:
self.bpm_history = self.bpm_history[8:]

def detect_beat(self):
# print("detect beat")
self.callback_beat_detected()

def detect_new_song(self):
# print("detect new song")
self.reset_tracking()
self.callback_new_song()

def detect_bpm(self, time_since_last_beat):
# print("detect bpm")
bpm_detected = int(60 / time_since_last_beat)
if len(self.bpm_history) < 4:
def detect_beat(self, time_since_last_beat):
# print("beat detected")
bpm_detected = 60 / time_since_last_beat
if len(self.bpm_history) < 8:
if bpm_detected > self.min_bpm:
self.bpm_history.append(bpm_detected)
else:
# bpm_avg = int(numpy.mean(self.bpm_history))
if (self.current_bpm == 0 or abs(self.current_bpm - bpm_detected) < 35):
self.bpm_history.append(bpm_detected)
# Recalculate with the new BPM value included
self.current_bpm = int(numpy.mean(self.bpm_history))
self.callback_bpm_changed(self.current_bpm)
self.current_bpm = self.calculate_bpm()

self.callback_beat_detected(self.current_time, self.current_bpm)

def calculate_bpm(self):
self.reject_outliers(self.bpm_history)
return numpy.mean(self.bpm_history)

def reject_outliers(self, data, m=2.):
data = numpy.array(data)
return data[abs(data - numpy.mean(data)) < m * numpy.std(data)]

def detect_new_song(self):
# print("detect new song")
self.reset_tracking()
self.callback_new_song()

def detect_pause(self):
# print("detect pause")
self.callback_pause()

def on_beat_detected(self, callback):
self.callback_beat_detected = callback

def on_new_song_detected(self, callback):
self.callback_new_song = callback

def on_bpm_changed(self, callback):
self.callback_bpm_changed = callback
def on_pause(self, callback):
self.callback_pause = callback


class SignalGenerator:
bar_modulo: int
beat_index: int
bpm: int
auto_generating = False
timer: QTimer
last_beats: list
last_beat_time: float

def __init__(self, audio_analyzer, bar_modulo) -> None:
self.bar_modulo = bar_modulo
self.reset_tracking()

self.callback_beat = lambda : None
self.callback_bar = lambda : None
self.callback_new_song = lambda : None
self.callback_bpm_change = lambda : None
self.callback_beat = lambda: None
self.callback_bar = lambda: None
self.callback_new_song = lambda: None
self.callback_bpm_change = lambda: None

# Wire up detection events
audio_analyzer.on_beat_detected(self.track_beat)
audio_analyzer.on_new_song_detected(self.track_new_song)
audio_analyzer.on_bpm_changed(self.track_bpm_changed)
audio_analyzer.on_pause(self.track_pause)

def reset_tracking(self):
self.beat_index = self.bar_modulo - 1
self.beat_index = -1
self.auto_generating = False
self.bpm = 0

def track_beat(self):
self.beat_index += 1
beat_index_mod = self.beat_index % self.bar_modulo
self.callback_beat(beat_index_mod)
if (beat_index_mod == 0):
self.callback_bar()
self.last_beat_time = 0
self.last_beats = [] # Sliding window of the last 4 beats

def reset_beat_index(self):
self.beat_index = -1

def track_beat(self, beat_time, bpm):
bpm_changed = False

if abs(bpm - self.bpm) > 1:
print("BPM changed {:d} -> {:d}".format(int(self.bpm), int(bpm)))
self.bpm = bpm
self.callback_bpm_change(bpm)
bpm_changed = True

if self.auto_generating:
if bpm_changed:
print("Sync auto generated beat")
self.timer.stop()
self.generate_beat_signal(beat_time=beat_time)
else:
if bpm_changed and self.can_auto_generate():
print("Start auto generating beat with {:d} BPM".format(int(self.bpm)))
self.auto_generating = True
self.generate_beat_signal(beat_time=beat_time)

def can_auto_generate(self):
if self.bpm > 0 and len(self.last_beats) >= 8:
oldest_beat = numpy.min(self.last_beats)
newest_beat = numpy.min(self.last_beats)
max_difference = 60 / self.bpm * 16 # 8 beats max

# We have to see at least half of the expected beats to start auto generating
return newest_beat - oldest_beat < max_difference
return False

def generate_beat_signal(self, beat_time=None):
if beat_time is None:
beat_time = perf_counter()

# Protect against too many beat signals at once
if beat_time - self.last_beat_time > 0.333:
self.last_beats.append(beat_time)
if len(self.last_beats) > 8: # Keep the last 8 beats
self.last_beats = self.last_beats[1:]
self.last_beat_time = beat_time
self.beat_index += 1

beat_index_mod = self.beat_index % (self.bar_modulo * 2)
if self.beat_index % 2 == 0:
self.callback_beat(int(beat_index_mod / 2))
if beat_index_mod == 0:
self.callback_bar()

if self.auto_generating:
self.timer = QtCore.QTimer()
self.timer.setSingleShot(True)
self.timer.timeout.connect(self.generate_beat_signal)
time_passed = int((perf_counter() - beat_time) * 1000) # Take code execution time into account
timeout = int(60000 / self.bpm) - time_passed
self.timer.start(timeout)

def track_new_song(self):
print("New song")
self.callback_new_song()
self.reset_tracking()

def track_bpm_changed(self, bpm):
self.bpm = bpm
self.callback_bpm_change(bpm)
def track_pause(self):
print("Pause")
self.timer.stop()
self.auto_generating = False
self.reset_beat_index()

def on_beat(self, callback):
self.callback_beat = callback
Expand Down
2 changes: 1 addition & 1 deletion ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def display_beat_index(self, beat_index):
self.beat_button.setText(_fromUtf8("Beat: {:d}".format(beat_index)))

def display_bpm(self, bpm):
self.bar_button.setText(_fromUtf8("BPM: {:d}".format(bpm)))
self.bar_button.setText(_fromUtf8("BPM: {:d}".format(int(bpm))))

def display_new_song(self):
self.beat_button.setText(_fromUtf8("Beat"))
Expand Down

3 comments on commit 9c20eca

@mjbhuyan
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having an error-

python beatDetector.py
File "beatDetector.py", line 10
ui: ui.UserInterface
^
SyntaxError: invalid syntax

@scheb
Copy link
Owner Author

@scheb scheb commented on 9c20eca May 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which Python version do you use? I was using Python 3.7.3

@mjbhuyan
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply.
I'm trying with Python 3.5.6 as pyQt4 doesn't support Python 3.7.3

Please sign in to comment.