Skip to content

Commit

Permalink
Added multireaderfilestream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dma1dma1 committed May 27, 2024
1 parent 8b01add commit dedf6ca
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
2 changes: 1 addition & 1 deletion codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class MultiReaderFileStream(BytesIO):
"""
NUM_READERS = 2
LOOKBACK_LENGTH = 33554432
MAX_THRESHOLD = LOOKBACK_LENGTH * 4
MAX_THRESHOLD = LOOKBACK_LENGTH * 2

def __init__(self, fileobj):
self._buffer = bytes()
Expand Down
148 changes: 148 additions & 0 deletions tests/unit/beam/multireaderfilestream_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import tempfile
import time
import unittest

from threading import Thread

from codalab.lib.beam.MultiReaderFileStream import MultiReaderFileStream

FILESIZE = 100000000
CHUNKSIZE = FILESIZE/10

class MultiReaderFileStreamTest(unittest.TestCase):
def write_file_of_size(self, size: int, file_path: str):
with open(file_path, "wb") as f:
f.seek(size - 1)
f.write(b"\0")


def test_reader_distance(self):
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream[0]
reader_2 = m_stream[1]

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)

t1.start()

# Sleep a little for thread 1 to start reading
time.sleep(3)

# Assert that the first reader has not read past the Maximum threshold
self.assertGreater(70000000, m_stream._pos[0])

t2.start()

# Sleep a little for thread 2 to start reading
time.sleep(3)

# Assert that the first reader is at 100000000, second reader is at 40000000
self.assertEqual(100000000, m_stream._pos[0])
self.assertEqual(40000000, m_stream._pos[1])

# Assert that the buffer is at 6445568 (40000000 - LOOKBACK_LENGTH)
self.assertEqual(6445568, m_stream._buffer_pos)

# Assert that the buffer is length 100000000 - 6445568
self.assertEqual(93554432, m_stream._size)

t1.join()
t2.join()

def test_seek(self):
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream[0]
reader_2 = m_stream[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to 10000000 and read another 4/10 of the file
for _ in range(4):
reader_2.read(CHUNKSIZE)

try:
reader_2.seek(10000000)
except AssertionError as e:
result = e

for _ in range(4):
reader_2.read(CHUNKSIZE)

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsNone(result)

# Check that reader 2 is at 50000000 and buffer position is correct
self.assertEqual(50000000, m_stream._pos[1])
self.assertEqual(16445568, m_stream._buffer_pos)


def test_toofar_seek(self):
with tempfile.NamedTemporaryFile(delete=True) as f:
f.seek(FILESIZE - 1)
f.write(b"\0")

m_stream = MultiReaderFileStream(f)
reader_1 = m_stream[0]
reader_2 = m_stream[1]

result = None

def thread1():
while True:
status = reader_1.read(CHUNKSIZE)
if not status:
break

def thread2():
# This reader will only read 4/10 of the file, then seek to the beginning
for _ in range(4):
status = reader_2.read(CHUNKSIZE)

try:
reader_2.seek(0)
except AssertionError as e:
result = e

t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()

t1.join()
t2.join()

self.assertIsInstance(result, AssertionError)
2 changes: 1 addition & 1 deletion tests/unit/lib/upload_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_upload_memory(self):
interval=0.1,
timeout=1
)
self.assertEqual(max(memory_usage) < 90000000, True)
self.assertEqual(max(memory_usage) < 100000000, True)

def write_string_to_file(self, string, file_path):
with open(file_path, 'w') as f:
Expand Down

0 comments on commit dedf6ca

Please sign in to comment.