In earlier posts, we made a start implementing the communication protocol of the Roco Z21 DCC control center, in Python. We’ll take the next step in this post:
The mission
According to the published protocol specification, a single packet containing multiple data sets is equivalent to the same data sets in multiple packets. We’re going to write a function that splits the content of an incoming UDP packet into individual Z21 data sets so that each data set can be processed individually.
The format of a data set is as follows:
- DataLen: the first two bytes specify the length of the dataset (binary, little endian)
- Header: the third and fourth bytes identify the command (again, little endian)
- Data: the remaining bytes contain extra data, of which the meaning and quantity depend on the command.
An example of a Z21 data set (in Python):
b'\x08\x00\x10\x00\xa3\xcf\x01\x00'
This data set has a length of 8 bytes, which is also specified in the first two bytes (0x0008). The header is 0x0010 which means it is a response to the LAN_GET_SERIAL_NUMBER command. The remaining bytes contain the serial number in hexadecimal format.
Theoretically it is possible that this data set would be sent by the Z21 twice, in the same packet. In that case, the packet would look like this:
b'\x08\x00\x10\x00\xa3\xcf\x01\x00\x08\x00\x10\x00\xa3\xcf\x01\x00'
It would be up to our function to split this into two individual data sets into a list of two messages:
[b'\x08\x00\x10\x00\xa3\xcf\x01\x00', b'\x08\x00\x10\x00\xa3\xcf\x01\x00']
Note: normally the data sets would be different. There could be even more data sets in the packet, and hence, in the final list.
Once the list is completed, the data sets can be processed in turn, just like if they would have arrived in individual packets.
Let’s cook, the TDD way
We are going to use a test driven development (TDD) approach for developing this function. In short this means:
- Failed test. We write a test of the piece of functionality that we want to implement. Since we didn’t implement anything yet, the test will fail.
- Passed test. We will write the minimal code that will satisfy the earlier failed tests. If we had any other previous tests that already had passed, they shall pass now too, of course.
- Refactor. If the code that we just wrote can be cleaned up, we will refactor it. Of course we will execute the test again (including all previous tests), to make sure that we didn’t introduce any bugs.
Then we continue with the next piece of functionality and follow the same cycle.
Test code
First, we’ll put together a simple test runner. It could look like this:
def split_data_set_test(test_name,\
input_string, expected_result):
actual_result = split_data_sets(input_string)
if actual_result == expected_result:
print(test_name, ": PASSED")
else:
print(test_name, ": FAILED")
print("expected: ", expected_result)
print("actual: ", actual_result)
This function requires a test name (for reporting), an input string that contains the packet that needs to be split, and an expected result. When we call this test function, it will call the split_data_set function with the string that we supplied. After that, this test function will compare the result of the split_data_set function (actual result) with the expected result as provided.
Obviously, if the comparison is correct, the test passed. Otherwise, the test fails and both the expected and actual results are presented.
Note: this is a very rudimentary testing framework that we just made up ad hoc. Many advanced frameworks are available for Python that we could and maybe should use instead. That will be something for another day.
Single data set
First, we are going to add the processing for a single data set. We just want to get the same data set back, but in a list, i.e. between square brackets in Python syntax.
The test code for a single data set:
split_data_set_test("single data set", \
b'\x08\x00\x10\x00\xa3\xcf\x01\x00', \
[b'\x08\x00\x10\x00\xa3\xcf\x01\x00'])
To make this test pass, as well as the previous test, we had to enhance the code as follows:
def split_data_sets(input_string):
return [ input_string ]
Simple enough, isn’t it?
Multiple data sets
Now the more challenging part. First the test, of course!
split_data_set_test("multiple data sets", \
b'\x04\x00\x01\x02\x06\x00\x10\x00\x10\x10', \
[b'\x04\x00\x01\x02', b'\x06\x00\x10\x00\x10\x10'])
We send in a long string, which will be split into two strings and wrapped in a list.
The implementation could look like this:
def split_data_sets(input_string):
input_string_length = len(input_string)
first_data_set_length = \
int.from_bytes(input_string[0:2], \
byteorder = "little")
first_data_set = \
input_string[:first_data_set_length]
remainder = input_string[first_data_set_length:]
if first_data_set_length == input_string_length:
return [ first_data_set ]
return [ first_data_set ] + \
split_data_sets(remainder)
First step is to determine the cut off point. Remember that the first two bytes of the data set specify the length of the data set. So we calculate the cut off point from these bytes. Then we split the input string into the first data set and a remainder. If there is any remainder left, we’ll use the magic of recursion to also split up the remainder.
Too short data sets
According to specification, a data set has a length of at least four bytes. The function would generate an error if the string would be less than two bytes. To make the function more robust, we can set a guard that just returns an empty list if we send too short a data set.
Let’s add some tests:
split_data_set_test("empty data set", '', [])
split_data_set_test("too short data set", b'\x02', [])
And the modified code:
def split_data_sets(input_string):
input_string_length = len(input_string)
if input_string_length < 2:
return []
first_data_set_length = \
int.from_bytes(input_string[0:2], \
byteorder = "little")
first_data_set = \
input_string[:first_data_set_length]
remainder = input_string[first_data_set_length:]
if first_data_set_length == input_string_length:
return [ first_data_set ]
return [ first_data_set ] + \
split_data_sets(remainder)
Now, if the string is less than two characters, the function will return an empty list. No more errors!
Refactoring time!
Now that we added this guard, we can also use this guard to stop the recursion, since it is safe to start the recursion when remainder is an empty string. So we don’t have to make a decision whether or not call split_data_sets(remainder). Let’s try removing this if statement.
def split_data_sets(input_string):
if len(input_string) < 2:
return []
first_data_set_length = \
int.from_bytes(input_string[0:2], \
byteorder = "little")
first_data_set = \
input_string[:first_data_set_length]
remainder = input_string[first_data_set_length:]
return [ first_data_set ] + \
split_data_sets(remainder)
It works! All test passed. Here we see the power of TDD: we can experiment with optimizations and get immediate feedback in the sense of passed or failed tests. This encourages to clean up the code as much as possible, minimize complexity and maximize transparency.
Final thoughts
In this post we solved a functional problem with the Z21 communication protocol. We used TDD as an approach to develop an elegant and efficient solution of which the quality is proven using automated tests. Let’s try to apply TDD in other future experiments!
We could harden the function even more, e.g. by adding guards against non-decimal values or data sets that are shorter than specified in the DataLen bytes. I don’t think that is necessary here, as we are only going to process data sets from the Z21. It is very unlikely that the Z21 would break its own protocol.
Leave a Reply