diff --git a/lib/util.py b/lib/util.py index 1c70dbd..f94a1ed 100644 --- a/lib/util.py +++ b/lib/util.py @@ -160,7 +160,7 @@ def int_to_bytes(value): def int_to_varint(value): '''Converts an integer to a Bitcoin-like varint bytes''' if value < 0: - raise Exception("attempt to write size < 0") + raise ValueError("attempt to write size < 0") elif value < 253: return pack(' 2 * int_t assert util.deep_getsizeof({1: 1}) > 2 * int_t assert util.deep_getsizeof({1: {1: 1}}) > 3 * int_t @@ -57,6 +116,73 @@ def test_increment_byte_string(): assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' assert util.increment_byte_string(b'\xff\xff') is None +def test_bytes_to_int(): + assert util.bytes_to_int(b'\x07[\xcd\x15') == 123456789 + +def test_int_to_bytes(): + assert util.int_to_bytes(456789) == b'\x06\xf8U' + +def test_int_to_varint(): + with pytest.raises(ValueError): + util.int_to_varint(-1) + assert util.int_to_varint(0) == b'\0' + assert util.int_to_varint(5) == b'\5' + assert util.int_to_varint(252) == b'\xfc' + assert util.int_to_varint(253) == b'\xfd\xfd\0' + assert util.int_to_varint(65535) == b'\xfd\xff\xff' + assert util.int_to_varint(65536) == b'\xfe\0\0\1\0' + assert util.int_to_varint(2**32-1) == b'\xfe\xff\xff\xff\xff' + assert util.int_to_varint(2**32) == b'\xff\0\0\0\0\1\0\0\0' + assert util.int_to_varint(2**64-1) \ + == b'\xff\xff\xff\xff\xff\xff\xff\xff\xff' + +def test_LogicalFile(tmpdir): + prefix = os.path.join(tmpdir, 'log') + L = util.LogicalFile(prefix, 2, 6) + with pytest.raises(FileNotFoundError): + L.open_file(0, create=False) + + # Check L.open creates a file + with L.open_file(8, create=True) as f: + pass + with util.open_file(prefix + '01') as f: + pass + + L.write(0, b'987') + assert L.read(0, -1) == b'987' + assert L.read(0, 4) == b'987' + assert L.read(1, 1) == b'8' + + L.write(0, b'01234567890') + assert L.read(0, -1) == b'01234567890' + assert L.read(5, -1) == b'567890' + with util.open_file(prefix + '01') as f: + assert f.read(-1) == b'67890' + + # Test file boundary + L.write(0, b'957' * 6) + assert L.read(0, -1) == b'957' * 6 + +def test_open_fns(tmpdir): + tmpfile = os.path.join(tmpdir, 'file1') + with pytest.raises(FileNotFoundError): + util.open_file(tmpfile) + with util.open_file(tmpfile, create=True) as f: + f.write(b'56') + with util.open_file(tmpfile) as f: + assert f.read(3) == b'56' + + # Test open_truncate truncates and creates + with util.open_truncate(tmpfile) as f: + assert f.read(3) == b'' + tmpfile = os.path.join(tmpdir, 'file2') + with util.open_truncate(tmpfile) as f: + assert f.read(3) == b'' + +def test_address_string(): + assert util.address_string(('foo.bar', 84)) == 'foo.bar:84' + assert util.address_string(('1.2.3.4', 84)) == '1.2.3.4:84' + assert util.address_string(('0a::23', 84)) == '[a::23]:84' def test_is_valid_hostname(): is_valid_hostname = util.is_valid_hostname @@ -135,4 +261,4 @@ def test_unpackers(): def test_hex_transforms(): h = "AABBCCDDEEFF" - assert util.hex_to_bytes(h) == b'\xaa\xbb\xcc\xdd\xee\xff' \ No newline at end of file + assert util.hex_to_bytes(h) == b'\xaa\xbb\xcc\xdd\xee\xff'