-
Notifications
You must be signed in to change notification settings - Fork 2
/
MidiPreprocessor.py
1920 lines (1766 loc) · 117 KB
/
MidiPreprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import mido, os
import numpy as np
from functools import reduce
import DurationFactory as df
from Events import Note, TimeSignature, Tempo, Program
from Instruments import Instruments
from utilities import print_divider
class MidiPreprocessor:
def __init__(self):
self.inst = Instruments()
self.dur = df.DurationFactory()
self.default_duration_set = self.dur.get_duration_set(4, 4, 480)
self.default_common_duration_set = self.default_duration_set
self.default_triple_duration_set = self.dur.get_duration_set(3, 4, 480)
self.default_duple_duration_set = self.dur.get_duration_set(2, 4, 480)
self.default_beat_units = ["d8"] # only basic units allowed
self.default_bar_unit_duration = self.default_duration_set.bar_duration.duration
self.default_beats_vector = []
for unit in self.default_beat_units:
self.default_beats_vector += [time for time in range(0, self.default_bar_unit_duration, self.default_duration_set.basic_set[unit].duration)]
self.default_beats_vector = np.unique(np.array(self.default_beats_vector, dtype = np.int32)) # sorts and removes non-unique
self.USE_DISTRIBUTED_DURATIONS = False
self.NUM_BASIC_DURATIONS = self.dur.sz_basic_set
self.NUM_COMPOUND_DURATIONS = self.dur.sz_compound_set
self.NUM_INSTRUMENTS = self.inst.sz_instrument_set
self.NUM_USED_PITCHES = 96
self.NUM_SPECIAL_TOKENS = 1
self.LOW_NOTE_THRESHOLD = 17 # inclusive
self.HIGH_NOTE_THRESHOLD = 112 # inclusive
self.NUM_PITCHES = self.HIGH_NOTE_THRESHOLD - self.LOW_NOTE_THRESHOLD + 1
self.MAX_PITCHES = 128
self.NUM_CHANNELS = 16
self.NUM_BEATS = len(self.default_beats_vector)
self.OCTAVE = 12
self.P_INSTRUMENTS = False
self.P_DURATIONS = False
self.P_STREAMING = False
self.P_TOKENIZATION = False
self.P_ADJUSTMENT = 0
self.P_FINALIZATION = False
self.P_TO_DATA = False
self.P_FROM_DATA = False
self.P_TO_MIDI = False
self.P_GENERATOR = False
self.DEFAULT_MSPQ = 500000 # default tempo in microseconds per quarter note
self.MAX_FAILED_ADJUSTMENTS = 0.05 # max ratio of failed adjustments in process_tokens
self.FAST_THRESHOLD = 300000 # tempo in microseconds per quarter note under which the tempo is considered FAST
self.SLOW_THRESHOLD = 900000 # same but over which the tempo is SLOW
if self.USE_DISTRIBUTED_DURATIONS:
self.num_durations = self.NUM_BASIC_DURATIONS
self.default_durations = np.zeros([self.num_durations], dtype = np.int32)
for i in range(self.num_durations):
self.default_durations[i] = self.default_duration_set.inverse_mappings["bi" + str(i)].duration
else:
self.num_durations = self.NUM_COMPOUND_DURATIONS
self.default_durations = np.zeros([self.num_durations], dtype = np.int32)
for i in range(self.num_durations):
self.default_durations[i] = self.default_duration_set.inverse_mappings["li" + str(i)].duration
# indices
self.time_a = 0
self.beats_a = self.time_a + self.num_durations
self.inp_a = self.beats_a + self.NUM_BEATS
self.dur_a = self.inp_a + self.NUM_PITCHES
self.instr_a = self.dur_a + self.num_durations
# offset beats pitch duration instr start + end active notes active instruments
# layout: |---------|----------|-----------|------------|-------------|----------------|--------------------|-------------|
#
self.final_tokens_a = self.instr_a + self.NUM_INSTRUMENTS
self.act_notes_a = self.final_tokens_a + self.NUM_SPECIAL_TOKENS
self.act_inst_a = self.act_notes_a + self.NUM_PITCHES
self.START_TOKEN = self.final_tokens_a
self.END_TOKEN = self.START_TOKEN + 1
# time since last event, the pitch, instrument and duration of the current input and a set of currently active pitches and instruments
self.timestep_sz = self.num_durations + self.NUM_PITCHES + self.NUM_INSTRUMENTS + self.num_durations + self.NUM_PITCHES + self.NUM_INSTRUMENTS + self.NUM_BEATS + self.NUM_SPECIAL_TOKENS #last to for start and end
def tracks_to_stream(self, midi, adjustments, factor, limit):
p = self.P_STREAMING
tokens = []
tracks_no = 0
for i, track in enumerate(midi.tracks):
tracks_no += 1
ticks = 0
program = 0
if p:
print(i, track[1])
ex = [32, 33, 22] + list(range(2, 12))
ex = []
if i not in ex:
for j, msg in enumerate(track):
ticks += msg.time
type = msg.dict()['type']
if type == "program_change":
program = int(msg.dict()['program'])
# to shift all notes in the input a given interval, uncomment this
#if type == "note_on" or type == "note_off":
# msg = msg.copy(note = msg.dict()["note"] + 6)
if type != "end_of_track":
offset = 0
for adj_tuple in adjustments:
if ticks >= adj_tuple[0]:
offset += adj_tuple[1]
else:
break
tokens += [(i, round((ticks + offset) * factor), msg, program, -1)]
tokens.sort(key = lambda x: (x[1], int(not((x[2].dict()['type'] == "note_on" and int(x[2].dict()['velocity']) == 0) or x[2].dict()['type'] == "note_off")))) # sort by overall time
# discard songs with ridiculously long pauses in them (probably due to malformatting)
last = 0
init = False
for t in tokens:
if t[2].dict()['type'] == "note_on":
assert(not init or (t[1] - last) <= limit), "[STREAM]: ERROR - input offset between two note events too large, was " + str(t[1] - last) + " and limit is " + str(limit)
init = True
last = t[1]
return tokens, tracks_no
''' Tokenizes a MIDI file by going through all events and storing four types of events: tempo changes, instrument changes, time signature changes and note events (both on and off.
The output is a list of events where each note event is represented by its starting time and how long it is in ticks. Some notes may be invalid but still returned as part of the
output list. No further adjustments are done to events at this time. All output events are timed with absolute timing (not relative as in the MIDI format itself). In the output
all time signatures, instrument changes or tempo changes occur before possible notes starting at the same tick. Nevertheless, any original ordering within these three categories are
preserved. Also, pitches starting at the same tick are sorted by pitch in ascending order. Order within Also returns a list of the indices in tokens which constitues new times
signatures so that further processing may be divided into chunks based on time signature.
'''
def tokenize(self, midi, inst, quarter, num_tracks, start = None, end = None, limit = None, internal_instrument = None, neglected_ts = []):
p = self.P_TOKENIZATION
#p = True
tokens = [] # a list of message tokens
last_started = [0] * num_tracks # holds when the last started note on a specific track was started
time_signatures = [] # a list of the indices in tokens which constitutes new time signatures, if any
initiated = np.zeros((self.NUM_CHANNELS, self.MAX_PITCHES), dtype='int') # a map over the channels and which pitches are active, e.g. has been started at a specific time (its values is not -1)
initiated.fill(-1) # all notes are unused when we beging
instruments = np.zeros((self.NUM_CHANNELS), dtype='int') # current instrument at each input channel, default is piano
preceding_notes = [[] for _ in range(num_tracks)]
stat_pitches = np.zeros((self.MAX_PITCHES), dtype = 'int')
time = 0
tempo = self.DEFAULT_MSPQ # default tempo corresponding to 120 bpm if no other tempo has been given, needed to convert events given in seconds to ticks
def file_note(index, channel, pitch, track_number):
tokens[index].duration = time - tokens[index].start
# shorten this note if a new note started in the same track right before this one (probably held the old note slightly too long)
if tokens[index].duration < quarter and ((time - last_started[track_number]) < (tokens[index].duration / 2)):
tokens[index].duration -= (time - last_started[track_number])
initiated[channel][pitch] = -1 # signal that there is no ongoing note for this pitch any more
# save the just ended notes
if len(preceding_notes[track_number]) == 0:
# no notes bookmarked, add this one to the queue
preceding_notes[track_number] += [tokens[index]]
else:
next = []
for previous_note in preceding_notes[track_number]:
previous_end_time = previous_note.start + previous_note.duration
current_end_time = tokens[index].start + tokens[index].duration
if current_end_time - previous_end_time < quarter / 8: # was 16, allows prolongation of notes that ended up to a 16th not before the current one
next += [previous_note]
next += [tokens[index]]
preceding_notes[track_number] = next
stat_pitches[pitch] += 1
for token in midi: # setting up this iterator internally takes quite a long time because all tracks are merged into a single ordered iterator if events
time = token[1]
if (start is None or time > start) and (end is None or time < end) and (limit is None or len(tokens) < limit):
if p:
print("TOKEN: ", token[0], token[1], token[2], token[3])
track_number = token[0]
msg = token[2]
instr = token[3]
type = msg.dict()['type']
if type == "note_off" or (type == "note_on" and int(msg.dict()['velocity']) == 0): # note off event
pitch = int(msg.dict()['note'])
while pitch <= self.LOW_NOTE_THRESHOLD:
pitch += self.OCTAVE
while pitch > self.HIGH_NOTE_THRESHOLD:
pitch -= self.OCTAVE
channel = int(msg.dict()['channel'])
index = initiated[channel][pitch]
if index >= 0: # this note off event actually turns off a note, otherwise, we have an error and should ignore this event since it shuts off an unstarted note
assert(tokens[index].channel == channel and tokens[index].pitch == pitch), "Inconsistency - Filed note has incorrect pitch or channel"
file_note(index, channel, pitch, track_number)
if p:
print("MIDI note off event: ", msg)
print("--->VALID END: ", tokens[index])
else:
if p:
print("--->INVALID END: ", msg)
elif type == 'note_on':
pitch = int(msg.dict()['note'])
while pitch <= self.LOW_NOTE_THRESHOLD:
pitch += self.OCTAVE
while pitch > self.HIGH_NOTE_THRESHOLD:
pitch -= self.OCTAVE
channel = int(msg.dict()['channel'])
if instr == 0: # if regular piano, check if something else is registered in the channel bank
instr = instruments[channel]
if p:
print("FOUND INSTR: ", instr)
if channel != 9 and inst.is_valid_instrument(inst.get_internal_mapping(instr)) and (internal_instrument == None or internal_instrument == inst.get_internal_mapping(instr)): # ignore drum channel as well as instruments that are chosen to be ignored
index = len(tokens)
if p:
print("Track", token[0], "start", token[1], "msg", token[2], "instr", token[3])
# prolong the preceding notes in the same track if they ended relatively late compared to when this note starts (probably result of staccato)
indices_previous_notes = preceding_notes[track_number]
for previous_note in indices_previous_notes:
if previous_note.duration < (quarter * 2):
if abs(previous_note.start - last_started[track_number]) < (quarter / 8):
# was 16, allows prolongation of notes that ended up to a 16th not before the current one
# we only want to process notes in the last cluster of notes starter
delta = time - (previous_note.start + previous_note.duration)
if delta < quarter:
if p:
print("Prolonged: ", previous_note, "to", previous_note.duration + delta)
print(time, delta, previous_note.start, previous_note.duration)
previous_note.duration += delta
else:
# prolong it for as much as possible depending on position
for duration in [quarter, quarter / 2, quarter / 4, quarter / 3, quarter / 6]:
mod = previous_note.start % duration
if abs(mod - duration) < mod:
mod -= duration
if abs(mod) < (quarter / 16): # was 16
if p:
print("Prolonged by quantization: ", previous_note, "to", duration)
previous_note.duration = duration
break
if p:
print("NOT Prolonged: ", previous_note)
else:
if p:
print("NOT Prolonged TOO LONG: ", previous_note)
preceding_notes[track_number] = []
# place this note slightly earlier if there is an even in the same track indicating that a note was JUST started
if time - last_started[track_number] < (quarter / 16):
tokens += [Note(last_started[track_number], pitch, channel, inst.get_internal_mapping(instr), inst, track_number, tempo)]
else:
tokens += [Note(time, pitch, channel, inst.get_internal_mapping(instr), inst, track_number, tempo)]
last_started[track_number] = time
if p:
print("Added token to index ", len(tokens) - 1)
print("MIDI note on event: ", msg)
print("--->VALID START: ", tokens[len(tokens) - 1])
while index > 0 and tokens[index - 1].start == tokens[index].start and isinstance(tokens[index - 1], Note) and (tokens[index].pitch < tokens[index -1].pitch):
tokens[index - 1], tokens[index] = tokens[index], tokens[index - 1]
if p:
print("Switching places", index, index - 1)
print(" -->", tokens[index])
print(" -->", tokens[index - 1])
if tokens[index].duration == 0:
initiated[tokens[index].channel][tokens[index].pitch] = index # propagate
index -= 1
if initiated[channel][pitch] >= 0: # error need to invalidate a previous unstopped note (this note should be excluded from the entire processing)
# or save the previous note, assuming its length is up to this note IF its length doesn't exceed some value
if time - tokens[initiated[channel][pitch]].start < 4000:
file_note(initiated[channel][pitch], channel, pitch, tokens[initiated[channel][pitch]].track)
else:
assert(tokens[initiated[channel][pitch]].channel == channel and tokens[initiated[channel][pitch]].pitch == pitch), "Inconsistency - Filed note has incorrect pitch or channel"
tokens[initiated[channel][pitch]].invalidate()
initiated[channel][pitch] = index # file the newly started note
else:
pass
elif type == "set_tempo" or type == "program_change":
if type == "program_change":
next_instrument = int(msg.dict()['program'])
channel = int(msg.dict()['channel'])
instruments[channel] = next_instrument
#tokens += [Program(time, channel, next_instrument)] # save for testing only, need not be included in the final product
else:
tempo = int(msg.dict()['tempo']) # change tempo for all subsequent timing calculations
elif type == "time_signature":
ts_num = int(msg.dict()['numerator'])
ts_denom = int(msg.dict()['denominator'])
if (ts_num, ts_denom) not in neglected_ts and (ts_num, '*') not in neglected_ts and ('*', ts_denom) not in neglected_ts:
if p:
print("MIDI time signature event: ", msg, time)
index = len(tokens)
tokens += [TimeSignature(time,ts_num, ts_denom)]
while index > 0 and isinstance(tokens[index - 1], Note) and tokens[index - 1].start == tokens[index].start:
tokens[index - 1], tokens[index] = tokens[index], tokens[index - 1]
if tokens[index].duration == 0:
initiated[tokens[index].channel][tokens[index].pitch] = index # propagate if unfinished
index -= 1
time_signatures += [index]
# do not allow active notes across the line of a time signature change
for index_to_erase in initiated[initiated >= 0]: # note without an ending event
tokens[index_to_erase].invalidate()
initiated.fill(-1) # all notes are unused when we beging
for track_no in range(len(last_started)): # reset all places where a note might be tempted to be placed earlier into the previous time signature
if last_started[track_no] < time:
last_started[track_no] = 0
elif limit is not None and len(tokens) >= limit:
break
# clean up unterminated notes and invalidate them in the output
for i in initiated[initiated >= 0]: # note without an ending event
tokens[i].invalidate()
for i in range(1, len(tokens)):
if p:
print("TEST", tokens[i - 1], tokens[i])
# remove duplicate notes (same note, same instrument, same starting time)
if isinstance(tokens[i - 1], Note) and isinstance(tokens[i], Note) and tokens[i - 1] == tokens[i] and tokens[i - 1].is_valid():
tokens[i].invalidate()
if p:
print("REMOVED!")
return tokens, time_signatures, stat_pitches
'''Processes ranges of unquantized note events in the same time signature and performs a sort of normalization. The starting point is the notion of a number of ticks per quarter beat,
which is adjusted based on time signature: if the denominator in the time signature is anything else than a quarter beat, the beat unit used is divided in half (if 8) or double
(2) since these beat units are the ones used in the music. Each time signature has a number of fractions of the base beat unit that are allowed when quantizing (or rounding) each duration
to its closest allowed event. The allowed durations are duple-based or triplet-based. Furthermore, if a triplet-based time signature is used (6/8, 9/8 and 12/8), the beat unit is
multiplied with 1.5 to translate the processing into triplet-based meter. As an example, if an eight note is used as a beat unit, two eight notes in 6/8 could be replaced by three
triplet eights instead. In that case, we would start working in duple rhythms in triplet time and thus allow for triplet rhyhms in triplet time. This would require us to have a lot
of more possible durations and it would also be impractical since triplet rhythms in triplet time is uncommon. Instead, we CHOOSE to see an eights note as a dotted eight note and
if it actually is only an eighth note, this is possible to express as well. The starting time and length of each duration is normalized and notes that are longer than a bar are kept
that way with the rest being quantized as usual.'''
def process_tokens(self, tokens, ds, tracks_no, triplet_ts, range_start, offset = 0): # ds = duration set
p = self.P_ADJUSTMENT
i = 0
processed_valid_notes = 0 # keep track of how many valid notes are in this subrange, if zero, return an empty vector instead...
eighth = int(ds.basic_set['d8'].duration)
status = [(0, 0, 0, 0)] * tracks_no # for each track, indicate type (triplet, duplet or both) and when the last note ended
failed_adjustments = 0
successful_adjustments = 0
# find the first actual note and adjust initial time offset to this point
'''while i < len(tokens) and type(tokens[i]) is not Note:
tokens[i].start = 0 # reset start times for intermediate events
i += 1'''
if len(tokens) > 0 and i < len(tokens): # there are items in the list and there is at least one note event
''' Searches for a range of he most suitable, in terms of distance, quantized duration given a duration by means of binary search'''
def get_quantization_candidates(duration, limit, durations):
a, b = 0, len(durations) - 1
while(b - a > 1):
pivot = int(a + (b - a) / 2)
if duration > durations[pivot].duration:
a = pivot
else:
b = pivot
if abs(duration - durations[a].duration) <= abs(duration - durations[b].duration):
center = a
else:
center = b
if duration < limit.duration: # duration is shorter than a quarter, just return two values
ret = [durations[a], durations[b]]
else:
a = center
while a > 0 and abs(a - center) <= 2: # assume value at the lowest position
a -= 1
b = center + 1
while b < len(durations) and abs(b - center) <= 2: # assume value at the lowest position
b += 1
ret = durations[a: b]
return ret, durations[center]
def sieve_candidates(duration_triple, candidates, starting_time):
nonlocal successful_adjustments, failed_adjustments
scores = [0] * len(candidates) # scores to minimize for the candidate to win
best_index = -1
adjustments = [0] * len(candidates)
partial_duration = duration_triple[0]
partial_forced_alignment = duration_triple[1]
partial_start = duration_triple[2]
for i, duration in enumerate(candidates):
# account for SIGNED quantization: if quantization forces duration to be LONGER this is POSITIVE, if SHORTER this is NEGATIVE, this is scaled
# by how common the note in question is (classes 0-4, translated into 1-5 for multiplicative reasons)
initial_distance = duration.duration - partial_duration # pure distance
if partial_forced_alignment == 0 or duration.duration == 0:
# ignore if aligned by start of bar = foced alignment = 0 since this event always starts at the beginning of a bar line by default, no alignment needed
# also ignore adjustment if it is recommended to erase this note since this is equivalent to no realignment
adjustment = 0 # already 0 so unnecessary, only for clarity
resulting_distance = 0
elif partial_forced_alignment == 2: # duration MUST be aligned to the end of this bar
if partial_duration > ds.bar_duration.duration:
needed_start_in_current_bar = (ds.bar_duration.duration - (duration.duration - ds.bar_duration.duration))
adjustment = needed_start_in_current_bar - ((partial_start - ds.bar_duration.duration) % ds.bar_duration.duration)
else:
needed_start_in_current_bar = (ds.bar_duration.duration - duration.duration)
adjustment = needed_start_in_current_bar - (partial_start % ds.bar_duration.duration)
# candidate duration start first MINUS actual start of this duration: adjustment is NEGATIVE if the chosen candidate forces alignment
# EARLIER in time, effectively INCREASING the size of the desired duration, if the chosen candidate forces the actual duration
# to start later, it effectively REDUCES the size of the chosen duration and the sign is POSITIVE
# now, a POSITIVE quantization score indicates a LONGER note, and adding an adjustment that is POSITIVE indicates that the note should be delayed
# in time further. This creates a note that both starts later than the original note AND is longer, which makes it ring even further longer afterin
# the original note ended. if the alignment is NEGATIVE, the extra added duration is added to the beginning of the note, making the prolongation
# of the note more likely since we are probably dealing with a note that was struck slightly too late. The opposite applies as well when shrunken and
# delayed.
resulting_distance = initial_distance + adjustment
# distance is now updated with information that either corroborates the growing / shrinking or contradicts it
# for example, if the note was first increased in size and then we ALSO have to adjust it to LATER in time, then both values will be positive
# reflecting that we are farther away from the original note than after the first duration adjustment. On the other hand, if the note is increased
# in size but moved earlier in time, then it will perhaps be a note that was pushed down slightly too late and so its likeliness is increased.
elif partial_forced_alignment == 1:
# no particular alignment, place the different sub-partials (of the candidates) at their correct place and see which results in the
# smallest need for alignment, find the BEST adjustment
base = duration.get_basic_component().duration # this is the largest sub-component
# try to place it first
adjustment = -(partial_start % base) # NEGATIVE, forces note to GROW in size (potentially keeping the same end time)
if (base + adjustment) < abs(adjustment):
adjustment += base # POSITIVE, forces note to SHRINK in size (potentially keeping the same end time)
best = adjustment # keep the best so far
if len(duration.partials) > 1:
# now place each and one of the other notes before (single note) and see if this results in a better alignment for the largest note
for j in range(len(duration.partials) - 1):
start = partial_start + duration.partials[j].duration
adjustment = -(start % base)
if (base + adjustment) < abs(adjustment):
adjustment += base
if abs(adjustment) < abs(best):
best = adjustment
# now, if there are 3 partials, try to place the two smaller ones first
if len(duration.partials) == 3:
start = partial_start + duration.partials[0].duration + duration.partials[1].duration
adjustment = -(start % base)
if (base + adjustment) < abs(adjustment):
adjustment += base
if abs(adjustment) < abs(best):
best = adjustment
elif base >= ds.basic_set['t4'].duration: # this duration consists of only one partial, attempt to place it to the closest halve of it as well
half = int(base / 2)
start = partial_start
adjustment = -(start % half)
if (half + adjustment) < abs(adjustment):
adjustment += half
if abs(adjustment) < abs(best):
best = adjustment
adjustment = best
resulting_distance = initial_distance + adjustment
# process the actual score by the help of the variables initial_distance, resulting_distance and adjustment, the last being the most important
if initial_distance < 0: # all cases where a note is being deleted totally ends up here
# had to shrink the original duration to fit this suggestion
if resulting_distance < initial_distance: # adjustment placed this earlier in time, potentially making it an even worse fit to the original
distance_score = round((1.0 + (resulting_distance / initial_distance)) * (initial_distance / 2.0))
else: # adjustment place this later in time, corroborating shortening the note, perhaps a note that was hit slightly to early?
if resulting_distance > 0: # the adjustment made up for the entire decrease in duration and even more than that
distance_score = round(initial_distance / 2.0) # still negative
distance_score -= round((resulting_distance / initial_distance) * (initial_distance / 2.0)) # result is positive before minus sign
elif duration.duration == 0:
distance_score = initial_distance
else: # the adjustment made up for part of the decrease in duration, but not all of it
distance_score = round((1.0 + (resulting_distance / initial_distance)) * (initial_distance / 2.0))
elif initial_distance > 0:
# had to grow the original note to fit this suggestion
if resulting_distance > initial_distance: # adjustment placed this suggestion even later in time, making it an even worse fit
distance_score = round((1.0 + (resulting_distance / initial_distance)) * (initial_distance / 2.0))
else: # adjustment placed this note earlier in time, perhaps making up for the increased duration, perhaps a note that was hit too late?
if resulting_distance < 0: # the adjustment made up for the entire increase in note size and even more than that
distance_score = round(initial_distance / 2.0)
distance_score -= round((resulting_distance / initial_distance) * (initial_distance / 2.0)) # result is negative before minus sign
else: # the adjustment made up for part of the increase in note size but note all of it
distance_score = round((1.0 + (resulting_distance / initial_distance)) * (initial_distance / 2.0))
else:
# if initial distance is 0, there is nothing to scale and the only penalty is the adjustment term
distance_score = 0
if abs(adjustment) >= 1.5 * (ds.basic_set['d16'].duration): # tuneable parameter, the size which it is NEVER reasonable to move the start of a note
scores[i] = 1000000 # rule this suggestion out of the picture
elif starting_time + adjustment < 0:
print("Event start,adjustment, range start", starting_time, adjustment, range_start) if p >= 2 else 0
scores[i] = 1000000 # rule this suggestion out since it implies moving the event BEFORE the starting time of the current time signature start
else:
scores[i] = abs(distance_score * (duration.get_frequency_class() + 1)) + (duration.get_frequency_class() * 15)
scores[i] += abs(adjustment * 4) # see above for explanation
adjustments[i] = adjustment
if best_index == -1 or scores[i] < scores[best_index]: # update the best found so far
best_index = i
elif scores[i] == scores[best_index]:
if candidates[best_index].duration == 0:
best_index = i
elif candidates[i].is_basic() and not candidates[best_index].is_basic():
best_index = i
elif candidates[best_index].is_triplet_based() and not candidates[i].is_triplet_based():
best_index = i
print(" SCORE: ", scores[i], "(", initial_distance, resulting_distance, adjustment, ")", duration) if p >= 2 else 0
if scores[best_index] == 1000000:
failed_adjustments += 1
return candidates[best_index], adjustments[best_index], False
else:
successful_adjustments += 1
return candidates[best_index], adjustments[best_index], True
''' Aligns start of events with the granularity chosen for the data representation, that is, equivalent to the smallest unit representable. Also quantizes the resulting notes'''
def quantize_and_align(event):
# process everything, both Notes, non-Notes and invalid notes.. we only have start notes at this time
print("Event BEFORE adjustment to the current time signature:") if p >= 2 else 0
print(event) if p >= 2 else 0
event.start = event.start + offset - range_start # offset accounts for adjusment to current CHUNK and range_start to adjustment within current subrange
print("Event AFTER adjustment to the current time signature:") if p >= 2 else 0
print(event) if p >= 2 else 0
assert(event.start >= 0), "Discrepancy, found an event with a negative starting time relative to the range start: " + str(event)
to_process = []
success = True # to determine whether to invalidate the event after processing or not (due to at least one sub partial being impossible to find a good candidate for)
if type(event) is Note:
if event.is_valid(): # only process valid notes
duration = event.duration
saved_duration = 0 # excluded full bars
out_duration = 0
out_adjustment = 0
if duration > ds.max_duration.duration: # longer than the longest permitted single note
pre = ds.bar_duration.duration - (event.start % ds.bar_duration.duration) # exactly what is left until bar line
duration -= pre
saved_duration = int(duration / ds.bar_duration.duration) * ds.bar_duration.duration
if (pre + ds.bar_duration.duration) <= ds.max_duration.duration: # the first chunk can contain a full bar as well
saved_duration -= ds.bar_duration.duration
pre += ds.bar_duration.duration
duration -= ds.bar_duration.duration
post = duration - saved_duration
if saved_duration > 0.0 and ((post + ds.bar_duration.duration) <= ds.max_duration.duration): # the last chunk can include a full bar
saved_duration -= ds.bar_duration.duration
post += ds.bar_duration.duration
to_process += [(pre, 2, event.start)] # must be aligned to bar line at right end
to_process += [(post, 0, event.start + pre + saved_duration)] # must be aligned to bar line at left end
else:
to_process += [(duration, 1, event.start)] # no alignment to bar lines required
print("------SAVED BARS: ", saved_duration / ds.bar_duration.duration) if p >= 2 else 0
end_time = event.start
last_partial_index = len(to_process) - 1
for partial_index, duration_triple in enumerate(to_process):
print("------PARTIAL DURATION: ", duration_triple) if p >= 2 else 0
# use quarter note as a limit for when less margin for quantization and alignment is used
# this is a heuristic system that affects what note values will be available when aligning and quantizing a note. The idea is that
# often, in a track, no matter if there is polyphony or not, all or groups of pitches follow the same note values and starts in
# some relationship to notes that start at the same time or ends at the same time as when a note starts. We therefore, for each track
# keep track of the previous note: if it is triplet, duplet or based on both, starting time, ending time and whether the preceding note
# was triplet, duplet or both. We can then deduce stuff about the upcoming note, IF it turns out to start at the same time or at the
# ending time of the previous note. If we are at a place in time where we can go both in duplet and triplet direction, we will do
# so, otherwise we will use the info about predecing note to determine whether to choose from triplet or duplet note values. If there is
# too much time that has passed, we will ignore the info and finally, when encountering multiple notes that starts at the same time but
# ends at different times, we will keep the shortest. This is a heuristic.
# separate indicates whether the next note is to be considered isolated from previous note or not
if (event.start - status[event.track][1]) < (eighth / 4): # started at the same time as the recorded event
separate = False
status_index = 3
elif (event.start - status[event.track][2]) < (eighth / 4): # started when the recorded event ended
status_index = 0
separate = False
else:
separate = True # open to suggestions
if triplet_ts or (not separate and (status[event.track][status_index] == 1)): # triplets only
if event.tempo < self.FAST_THRESHOLD:
chosen_track = "TRIPLET_FAST"
excludes = ["d32", "d16", "t16"]
elif event.tempo > self.SLOW_THRESHOLD:
excludes = ["d32", "d16"]
chosen_track = "TRIPLET_SLOW"
else:
chosen_track = "TRIPLET_STANDARD"
excludes = ["d32", "d16"]
elif not separate and status[event.track][status_index] == 2:
if event.tempo < self.FAST_THRESHOLD:
excludes = ["t16", "t8", "t4", "t2", "t1", "d32", "d16"]
chosen_track = "DUPLET_FAST"
elif event.tempo > self.SLOW_THRESHOLD:
excludes = ["t16", "t8", "t4", "t2", "t1"]
chosen_track = "DUPLET_SLOW"
else:
excludes = ["t16", "t8", "t4", "t2", "t1", "d32"]
chosen_track = "DUPLET_STANDARD"
if event.instrument in [2, 4, 5, 6, 7, 8, 9]:
chosen_track += " (NO_SHORTS)"
excludes += ["d32", "t16"]
else: # open to suggestions
if event.tempo < self.FAST_THRESHOLD:
chosen_track = "FAST"
excludes = ["d32", "t16", "d16"]
elif event.tempo > self.SLOW_THRESHOLD:
chosen_track = "SLOW"
excludes = []
else:
excludes = []
chosen_track = "STANDARD"
if event.instrument in [2, 4, 5, 6, 7, 8, 9]:
chosen_track += " (NO_SHORTS)"
excludes += ["d32", "t16"]
if event.instrument in [9]:
chosen_track += " (TIMPANI)"
excludes += ["d16"]
excludes += ["t16"]
if event.tempo < self.SLOW_THRESHOLD:
excludes += ["d32"]
current_ds = ds.filter_durations(duration_triple[2], ds.basic_set['d4'].duration / 16, excludes)
candidates, top_candidate = get_quantization_candidates(duration_triple[0], ds.basic_set['none'], current_ds)
if p >= 2:
for t in candidates:
print("......CAND:", t, " with excluded notes ", excludes, "from strategy ", chosen_track)
print("......TOP: ", top_candidate, top_candidate)
# pick the best candidate
best_duration, adjustment, successful_match = sieve_candidates(duration_triple, candidates, event.start)
success = successful_match if success else success # only replace success if it is Ture, otherwise, we stick with the negative outcome
print(":::::::WINNER:", best_duration, adjustment) if p >= 2 else 0
if duration_triple[1] == 2: # first partial of several
if best_duration.duration > 0:
event.duration_partials += [best_duration]
#if int(saved_duration / ds.bar_duration.duration) > 0: # should not be needed
event.duration_partials += ([ds.bar_duration] * int(saved_duration / ds.bar_duration.duration))
else: # applies to single partial durations and the last partial of a multiple partial duration
if best_duration.duration > 0:
event.duration_partials += [best_duration]
if p >= 2:
print(end_time)
print(adjustment)
if adjustment != 0:
event.start += adjustment
end_time += adjustment
out_adjustment = adjustment
#out_adjustment = adjustment # should only be non-zero, if at all, when processing one of the, at most two, partials
out_duration += best_duration.duration
end_time += best_duration.duration
if partial_index == last_partial_index: # the info to be stored is only important in this case
if out_duration > 0: # include notes with several partials where the last is 0 as well
if (end_time - best_duration.duration) == status[event.track][1]:
# the processed event started at the same time as the last event on this track
if end_time >= status[event.track][2]:
# the current event ends after the one already registered, keep the shorter
# essentially, we already have what is recorded and may keep it as is
pass
else:
# the newly found duration ends earlier than the one we had recorded from before
if (end_time % eighth) == 0:
status[event.track] = (0, end_time - best_duration.duration, end_time, status[event.track][3])
else:
if best_duration.is_triplet_based():
status[event.track] = (1, end_time - best_duration.duration, end_time, status[event.track][3])
else:
status[event.track] = (2, end_time - best_duration.duration, end_time, status[event.track][3])
elif (end_time - best_duration.duration) == status[event.track][2]:
# the processed event starts at the same time as the earlier ended
if (end_time % eighth) == 0:
status[event.track] = (0, end_time - best_duration.duration, end_time, status[event.track][0])
else:
if best_duration.is_triplet_based():
status[event.track] = (1, end_time - best_duration.duration, end_time, status[event.track][0])
else:
status[event.track] = (2, end_time - best_duration.duration, end_time, status[event.track][0])
else:
# this note is not related to earlier notes, create new entries that do not depend on previous ones
if (end_time % eighth) == 0:
status[event.track] = (0, end_time - best_duration.duration, end_time, 0)
else:
if best_duration.is_triplet_based():
status[event.track] = (1, end_time - best_duration.duration, end_time, 0)
else:
status[event.track] = (2, end_time - best_duration.duration, end_time, 0)
else:
# this note was determined to be deleted
# keep the record as it is of the last seen note, the distance to it will determine whether to use the supplied info or not anyways
pass
out_duration += saved_duration # restore the full bars saved
# follow up
if out_duration == 0 or not success: # this note was quantized to nothing
event.invalidate()
else:
nonlocal processed_valid_notes
processed_valid_notes += 1
event.duration = out_duration
print("Resulting adjustment: ", out_adjustment) if p >= 2 else 0
event.start += range_start
return event
tokens = tokens[0: i] + list(map(quantize_and_align, list(tokens[i: len(tokens)])))
tokens.sort(key = lambda x: (x.start, int(x.is_start()), x.pitch)) # sort first by time, then by start and end not and last by pitch in ascending order
if successful_adjustments + failed_adjustments > 0:
failed_ratio = failed_adjustments / (successful_adjustments + failed_adjustments)
else:
failed_ratio = 0.0
assert(failed_ratio < self.MAX_FAILED_ADJUSTMENTS), "[PROC_TOK]: ERROR - " + str(round(100 * failed_ratio, 2)) + "% of processed tokens were not successfully matched, investigate if desirable, might be due to introduction of incorrect offset somewhere"
if processed_valid_notes == 0:
tokens = []
else:
tokens = []
return tokens
# when multiple notes in the same track is sounding, try to make them unified into a single duration instead of several different
def unify_tokens(self, tokens, unify_individual_tracks = True):
total = len(tokens)
time = 0
i = 0
while i < total:
starts = {} # bookmark the notes starting on this time step by "track_number: [NOTES]"
# collect all the notes that start on this time step
while i < total and tokens[i].start == time:
if tokens[i].is_valid():
if unify_individual_tracks:
track_no = tokens[i].track
else:
track_no = 0
if track_no not in starts:
starts[track_no] = []
starts[track_no] += [i]
i += 1
# unify the notes found on this time step, if possible
for key in starts:
notes_no = len(starts[key]) # number of notes in this track starting at this time step
if notes_no > 1: # only unify if we have at least two notes to compare
durations = {} # bookmark the durations found by "duration (in ticks): (occurrences in this time step, the duration(s) objects making up this duration)"
basics = {} # same as above but only for basic units since these are more common
for token_index in starts[key]:
# if they show coherence and one is sticking out, make them all the same length
duration = tokens[token_index].duration
if len(tokens[token_index].duration_partials) == 1 and tokens[token_index].duration_partials[0].is_basic():
if duration not in basics:
basics[duration] = (1, tokens[token_index].duration_partials)
else:
basics[duration] = (basics[duration][0], tokens[token_index].duration_partials)
else:
if duration not in durations:
durations[duration] = (1, tokens[token_index].duration_partials)
else:
durations[duration] = (durations[duration][0], tokens[token_index].duration_partials)
# all occurrences recorded, now go through the notes with this in mind
durations = [(d, durations[d][0], durations[d][1]) for d in durations] # duration length and the number of occurrences and duration object(s)
durations.sort(key = lambda x: x[1], reverse = True) # sort by number of occurrences, descending order
basics = [(b, basics[b][0], basics[b][1]) for b in basics]
basics.sort(key = lambda x: x[1], reverse = True)
for token_index in starts[key]:
done = False
for basic in basics:
if basic[1] >= (notes_no / 2) and abs(basic[0] - tokens[token_index].duration) < (tokens[token_index].duration / 4):
# only replace a note if the found basic note is at least half the number of occurrences and the correction corresponds to less
# than a quarter of the original note
tokens[token_index].duration = basic[0]
tokens[token_index].duration_partials = basic[2]
done = True
break
if not done:
for duration in durations:
if duration[1] >= (notes_no / 2) and abs(duration[0] - tokens[token_index].duration) < (tokens[token_index].duration / 8):
tokens[token_index].duration = duration[0]
tokens[token_index].duration_partials = duration[2]
break
if i < total:
time = tokens[i].start
return tokens
# adds the final data before translation to data representation can take place, this include splitting long notes and inserting them as well as inserting end notes
def finalize_representation(self, tokens, ds): # sets up the list with start tokens with end tokens as well so it will be easier to make the training matrices
out_tokens = list([])
cancelled_notes = 0
start = 0
end = 0
# lists to verify that each included notes can actually be playr with a 16 (really 15) channel MIDI device
out_instruments = [-1] * self.NUM_CHANNELS # holds the current internal instrument of a certain out channel
out_hold = [0] * self.NUM_CHANNELS # holds the absolute time until which an outgoing channel is occupied until available for a new instrument
out_hold[9] = float('inf') # never write to drum channel
# holds a mapping from internal instrument number to channel number, -1 indicates that no channel has this instrument right now , last index is always
out_reverse_instruments = [-1] * self.NUM_INSTRUMENTS
index = 0 # the earliest index in the output list where it is possible to place the start of the next token
for token in tokens:
if type(token) is Note and token.is_valid(): # it is a note (should only be notes here anyways)
current_time = token.start
frontier = index
track = out_reverse_instruments[token.instrument]
# find a hypothetic channel for this event
if track == -1:
track = out_hold.index(min(out_hold)) # find the out channel that is available at the earliest
if out_hold[track] <= current_time: # there is a channel available for instrument change to the needed instrument right now
if out_instruments[track] >= 0: # this track has had an instrument assigned to it before
out_reverse_instruments[out_instruments[track]] = -1 # this instrument is no longer associated with a channel
out_instruments[track] = token.instrument
out_reverse_instruments[token.instrument] = track
else:
track = -1
# only use token if it passed the hypothetical channel test, otherwise, cancel the token without any distinction
if track >= 0:
out_hold[track] = current_time + token.duration
# set the length of the first (and perhaps only duration) that we will lay out
for partial in token.duration_partials: # fill up with notes for as long as needed
#print("it: ", next_duration)
# insert new starting note
start_note = Note(current_time, token.pitch, token.channel, token.instrument, self.inst, token.track, token.tempo)
start_note.duration = partial.duration
start_note.duration_partials = [partial]
while frontier < len(out_tokens) and (current_time > out_tokens[frontier].start or (current_time == out_tokens[frontier].start and (out_tokens[frontier].is_end() or out_tokens[frontier].pitch < token.pitch))):
frontier += 1
out_tokens.insert(frontier, start_note)
frontier += 1
if current_time == token.start: # means that this is the start of the first duration and then index needs to be updated by where we insert this
index = frontier
end_time = current_time + partial.duration
# insert end note
end_note = Note(end_time, token.pitch, token.channel, token.instrument, self.inst, token.track, token.tempo)
start_note.end = end_note # bookmark the end note so that we can invalidate it as well if we invalidate the start note
while frontier < len(out_tokens) and ((end_time > out_tokens[frontier].start or (end_time == out_tokens[frontier].start and (out_tokens[frontier].is_end() and out_tokens[frontier].pitch < token.pitch)))):
frontier += 1
out_tokens.insert(frontier, end_note)
frontier += 1
current_time = end_time
else:
cancelled_notes += 1
return out_tokens, cancelled_notes
def to_data_representation(self, tokens, ds, last_event, milestone, active_notes, active_instruments):
p = self.P_TO_DATA
timestep_status = []
skipped = 0
output = np.zeros((max(len(tokens), 20), self.timestep_sz), dtype=np.uint8)
if active_instruments is None:
active_instruments = np.zeros((self.NUM_INSTRUMENTS), dtype="int")
if active_notes is None:
active_notes = np.zeros((self.NUM_PITCHES), dtype='int')
i = 0
skip = False
# to keep the beats invariant of the current time signature, we alwaysmodel beats according to a thought 4/4 time signature with the same beats / quarter as the current one
fictive_4_4_bar = ds.basic_set["d4"].duration * 4
beats_vector = []
for unit in self.default_beat_units:
beats_vector += [time for time in range(0, fictive_4_4_bar, ds.basic_set[unit].duration)]
beats_vector = np.unique(np.array(beats_vector, dtype = np.int32))
assert(len(beats_vector) == len(self.default_beats_vector)), "Discrepancy! Temporary and default beats vector differ in length!"
def next_timestep(i, output, tick, is_pause):
nonlocal timestep_status
timestep_status += [(int(is_pause), tick)]
i += 1
if output.shape[0] <= i:
return i, np.concatenate((output, np.zeros((output.shape), dtype=np.uint8)))
else:
return i, output
st = 0
en = 0
oog = 0
oop = -1
for token in tokens:
if token.is_valid():
if token.is_start():
if token.start != oog:
oop = -1
else:
if(token.pitch < oop):
print("WRONG BAD SHIT!" + str(token.pitch) + " " + oop)
exit()
oop = token.pitch
skip = False
# construct the input vector for this time step
if p:
print("i: ", i, "DELTA: ", token.start - last_event, "TOKEN: ", token)
delta = token.start - last_event
if p:
print("delta: ", delta, " token start: ", token.start, "max duration: ", ds.max_duration.duration)
# lay out single pauses if the distance to the next note is too large to capture ina single time step
while (token.start - last_event) > ds.max_duration.duration:
to_next_barline = ds.bar_duration.duration - (last_event % ds.bar_duration.duration)
if (to_next_barline + ds.bar_duration.duration) <= ds.max_duration.duration:
to_next_barline += ds.bar_duration.duration
if self.USE_DISTRIBUTED_DURATIONS:
for basic_duration in ds.inverse_mappings["l" + str(to_next_barline)].partials:
output[i][self.time_a + basic_duration.get_distributed_index()] = 1
else: # USE ONE HOT DURATIONS AND COMPOUND DURATIONS
output[i][self.time_a + ds.inverse_mappings["l" + str(to_next_barline)].get_one_hot_index()] = 1
output[i][self.dur_a + ds.inverse_mappings["l0"].get_one_hot_index()] = 1 # mark an empty event as a zero duration
output[i][self.act_notes_a: self.act_notes_a + self.NUM_PITCHES] = (active_notes > 0).astype(int)
output[i][self.act_inst_a: self.act_inst_a + self.NUM_INSTRUMENTS] = (active_instruments > 0).astype(int)
last_event += to_next_barline
current_beats_vector = ((last_event % ds.bar_duration.duration) == beats_vector).astype(np.uint8)
assert(len(np.nonzero(current_beats_vector)[0]) <= 1), "1several 1s in beats vector, must be wrong " + str(current_beats_vector)
output[i][self.beats_a: self.beats_a + self.NUM_BEATS] = current_beats_vector
i, output = next_timestep(i, output, last_event, True)
#process an actual note
if (token.start - last_event) > 0:
if "l" + str(token.start - last_event) not in ds.inverse_mappings:
# need to divide this into two
first_half_fill = ds.basic_set['d8'].duration - (last_event % ds.basic_set['d8'].duration)
if "l" + str(first_half_fill) not in ds.inverse_mappings:
skip = True
elif "l" + str(token.start - (last_event + first_half_fill)) not in ds.inverse_mappings:
skip = True
else:
if self.USE_DISTRIBUTED_DURATIONS:
for basic_duration in ds.inverse_mappings["l" + str(first_half_fill)].partials:
output[i][self.time_a + basic_duration.get_distributed_index()] = 1
else: # USE ONE HOT DURATIONS AND COMPOUND DURATIONS
output[i][self.time_a + ds.inverse_mappings["l" + str(first_half_fill)].get_one_hot_index()] = 1
output[i][self.dur_a + ds.inverse_mappings["l0"].get_one_hot_index()] = 1 # mark an empty event as a zero duration
output[i][self.act_notes_a: self.act_notes_a + self.NUM_PITCHES] = (active_notes > 0).astype(int)
output[i][self.act_inst_a: self.act_inst_a + self.NUM_INSTRUMENTS] = (active_instruments > 0).astype(int)
last_event += first_half_fill
current_beats_vector = ((last_event % ds.bar_duration.duration) == beats_vector).astype(np.uint8)
assert(len(np.nonzero(current_beats_vector)[0]) <= 1), "2several 1s in beats vector, must be wrong " + str(current_beats_vector)
output[i][self.beats_a: self.beats_a + self.NUM_BEATS] = current_beats_vector
i, output = next_timestep(i, output, last_event, True)
if not skip:
st += 1
if self.USE_DISTRIBUTED_DURATIONS:
if (token.start - last_event) > 0:
for basic_duration in ds.inverse_mappings["l" + str(token.start - last_event)].partials:
output[i][self.time_a + basic_duration.get_distributed_index()] = 1
else: # USE ONE HOT DURATIONS AND COMPOUND DURATIONS
output[i][self.time_a + ds.inverse_mappings["l" + str(token.start - last_event)].get_one_hot_index()] = 1
output[i][self.inp_a + token.pitch - self.LOW_NOTE_THRESHOLD] = 1
output[i][self.instr_a + token.instrument] = 1
if self.USE_DISTRIBUTED_DURATIONS:
for basic_duration in token.duration_partials[0].partials:
output[i][self.dur_a + basic_duration.get_distributed_index()] = 1
else: # USE ONE HOT DURATIONS AND COMPOUND DURATIONS
output[i][self.dur_a + token.duration_partials[0].get_one_hot_index()] = 1
output[i][self.act_notes_a: self.act_notes_a + self.NUM_PITCHES] = (active_notes > 0).astype(int)
output[i][self.act_inst_a: self.act_inst_a + self.NUM_INSTRUMENTS] = (active_instruments > 0).astype(int)
current_beats_vector = ((token.start % ds.bar_duration.duration) == beats_vector).astype(np.uint8)
assert(len(np.nonzero(current_beats_vector)[0]) <= 1), "3several 1s in beats vector, must be wrong " + str(current_beats_vector)
output[i][self.beats_a: self.beats_a + self.NUM_BEATS] = current_beats_vector
# alter state for next time step
active_instruments[token.instrument] += 1
active_notes[token.pitch - self.LOW_NOTE_THRESHOLD] += 1
last_event = token.start
i, output = next_timestep(i, output, last_event, False)
else:
skipped += 1
token.end.invalidate()
else: # token is an end token
en += 1
active_instruments[token.instrument] -= 1
assert(active_instruments[token.instrument] >= 0), "Map over active instruments sank below 0"
active_notes[token.pitch - self.LOW_NOTE_THRESHOLD] -= 1
assert(active_notes[token.pitch - self.LOW_NOTE_THRESHOLD] >= 0), "Map over active pitches sank below 0"
# lay out pauses to the end of the bar if necessary
if milestone is not None:
# lay out single pauses if the distance to the next note is too large to capture ina single time step
while (milestone - last_event) > 0:
left = milestone - last_event
to_next_barline = ds.bar_duration.duration - (last_event % ds.bar_duration.duration)
layout = min(left, to_next_barline)
if (to_next_barline + ds.bar_duration.duration) <= ds.max_duration.duration:
to_next_barline += ds.bar_duration.duration
layout = min(layout, to_next_barline)
if self.USE_DISTRIBUTED_DURATIONS:
for basic_duration in ds.inverse_mappings["l" + str(layout)].partials:
output[i][self.time_a + basic_duration.get_distributed_index()] = 1
else: # USE ONE HOT DURATIONS AND COMPOUND DURATIONS
output[i][self.time_a + ds.inverse_mappings["l" + str(layout)].get_one_hot_index()] = 1
output[i][self.dur_a + ds.inverse_mappings["l0"].get_one_hot_index()] = 1 # mark an empty event as a zero duration
output[i][self.act_notes_a: self.act_notes_a + self.NUM_PITCHES] = (active_notes > 0).astype(int)
output[i][self.act_inst_a: self.act_inst_a + self.NUM_INSTRUMENTS] = (active_instruments > 0).astype(int)
last_event += layout
current_beats_vector = ((last_event % ds.bar_duration.duration) == beats_vector).astype(np.uint8)
assert(len(np.nonzero(current_beats_vector)[0]) <= 1), "4several 1s in beats vector, must be wrong " + str(current_beats_vector)
output[i][self.beats_a: self.beats_a + self.NUM_BEATS] = current_beats_vector
i, output = next_timestep(i, output, last_event, True)
return output[: i], timestep_status, active_notes, active_instruments, skipped, beats_vector
def from_data_representation(self, ranges, fill = False, default_duration = "d8"):
p = self.P_FROM_DATA
time = 0
offset = 0
num = -1
denom = -1
index = 0
tokens = []
for range in ranges:
if p:
print("Range length: ", len(range[0]))
ds = range[2]