A dispatcher function in Python

Today we are going to develop a dispatcher function in Python. In earlier posts we received different types of messages from a Roco Z21 DCC Control Center. We stopped at printing the raw messages to the console. But we would like to have specialized functions that can interpret the different message types. For this we will need a dispatcher function.

Mission

We are going to develop a dispatcher function that can call specialized functions depending on the type of message we received from the Z21. Remember that while the length of a message is specified in the first two bytes (0 and 1), the next two bytes contain the message type.

So we will have to create a dispatcher that calls other functions based on the value that we find in bytes 3 and 4 of the message. (Technically, only byte 3 is enough. Byte 4 is always 0).

Let’s cook!

In our first experiment we received the following message: the serial number that we had asked for.

b"\x08\x00\x10\x00\xa3\xcf\x01\x00"

We are going to do the TDD thing again, so we start with writing a test that verifies that the dispatch function will return LAN_GET_SERIAL_NUMBER if the third byte contains x10.

def dispatch_test(test_name,\
        input_string, expected_result):
    actual_result = dispatch(input_string)
    if actual_result == expected_result:
        print(test_name, ": PASSED")
    else:
        print(test_name, ": FAILED")
        print("expected: ", expected_result)
        print("actual: ", actual_result)

dispatch_test("LAN_GET_SERIAL_NUMBER", \ b"\x08\x00\x10\x00\xa3\xcf\x01\x00", \ "LAN_GET_SERIAL_NUMBER")

As expected, the test will fail since we didn’t write the dispatch function yet. Let’s fix that:

def dispatch(data_set):
    return "LAN_GET_SERIAL_NUMBER"

Now it’s done. We know that this will be the (incorrect) outcome of any test later on for other message types, but we’ll fix that when the time comes.

Now we’ll add another test. We received the following message before:

b'\x07\x00@\x00a\x00a'

Let’s analyze this a bit more using the following code:

msg = b"\x07\x00@\x00a\x00a"
print("length: ", hex(int.from_bytes(msg[0:2], \
    byteorder = "little")))
print("type: ", hex(int.from_bytes(msg[2:4], \
    byteorder = "little")))

The result is:

length: 0x7
type: 0x40

T he type of this message is 0x40. This means that it is an X-bus command. Many commands that would normally come via a throttle (using the XPressNet protocol) are tunneled via the LAN connection. All X-bus commands have a main type 0x40 and an X-Header that specifies what message types they are. This means that we’ll have to write a second level dispatch function for this message type.

For now, we are going to add the following test:

dispatch_test("LAN_X", b"\x07\x00@\x00a\x00a", \ "LAN_X")

In order to make this test, as well as the previous one pass, we’ll update the dispatch function as follows:

def dispatch(data_set):
    header = data_set[2]
    if header == 0x10:
        return "LAN_GET_SERIAL_NUMBER"
    if header == 0x40:
        return "LAN_X"
    return "unknown message type"

The last message type for now that we’re going to analyze is:

\x0f\x00@\x00\xef\x00\x03\x0c\x80\x00\x00\x00\x00\x00`

The output is:

length: 0xf
type: 0x40

This is also a LAN_X message, so the code for the main dispatcher does not need to get updated to pass all tests.

Second level dispatcher

For the different LAN_X message types we’ll need a second level dispatcher. The first level dispatcher will call this new dispatcher for all LAN_X messages. The structure is similar to the first level dispatcher.

Let’s look again at the message we analysed earlier. Now, we’re also going to look at the X-Header (and the DB0 data byte):

msg = b"\x07\x00@\x00a\x00a"
print("length: ", hex(int.from_bytes(msg[0:2], \
    byteorder = "little")))
print("type: ", hex(int.from_bytes(msg[2:4], \
    byteorder = "little")))
print("X-Header: ", hex(msg[4]))
print("DB0: ", hex(msg[5]))

Now, the output is:

length:  0x7
type:  0x40
X-Header:  0x61
DB0:  0x0

This message type is called LAN_X_BC_TRACK_POWER_OFF and obviously triggered when I press the STOP button on the Z21 itself. When I pressed again, I received the message:

b"\x07\x00@\x00a\x01`"

The output of this is:

length:  0x7
type:  0x40
X-Header:  0x61
DB0:  0x1

This message type is LAN_X_BC_TRACK_POWER_ON, which makes sense. X-Header 0x61 has various status messages depending on the content of DB0. If we’d need to handle this message type, we would need a third level dispatcher. For now, I’ll be happy with one handler function for X-Header 0x61.

So we will replace the LAN_X test case for these message types with a new one as follows:

dispatch_test("LAN_X 0x61", b"\x07\x00@\x00a\x00a",\
"LAN_X 0x61")

The test will fail because the actual result is still “LAN_X”, not “LAN_X 0x61”. We’ll fix that quickly, while creating a secondary dispatch function that is called from the first dispatch function:

def dispatch(data_set):
    message_type = data_set[2]
    if message_type == 0x10:
        return "LAN_GET_SERIAL_NUMBER"
    if message_type == 0x40:
        return dispatch_x(data_set)
    return "unknown message type"

def dispatch_x(data_set):
    return "LAN_X 0x61"

Now we’ll go back to the other message we analyzed earlier:

b"\x0f\x00@\x00\xef\x00\x03\x0c\x80\x00\x00\x00\x00\x00`"

The output is:

length: 0xf
type: 0x40
X-Header: 0xef

This X-Header signifies that this is a LAN_X_LOCO_INFO message. The data section of the message describes the speed, direction and status of all kinds of functions that the locomotive may have.

So we will add a test case:

dispatch_test("LAN_X_LOCO_INFO", \
    b"\x0f\x00@\x00\xef\x00\x03\x0c\x80\x00\x00\x00\x00\x00`"\
    ,"LAN_X_LOCO_INFO")

This test will fail until we enhance the dispatch_x function as follows:

def dispatch_x(data_set):
    x_header = data_set[4]
    if x_header == 0x61:
        return "LAN_X 0x61"
    if x_header == 0xEF:
        return "LAN_X_LOCO_INFO"
    return "unknown X-header"

An actual handler function

For now, the dispatchers just returned the message types that they identified. The idea is that they are going to call handler functions for the different message types. For now, we’ll illustrate this with the LAN_GET_SERIAL_NUMBER message type, since we already have an implementation.

We’ll update the test as follows:

dispatch_test("LAN_GET_SERIAL_NUMBER", b"\x08\x00\x10\x00\xa3\xcf\x01\x00", \
    "LAN_GET_SERIAL_NUMBER: 118691")

Then we create a function that extracts the serial number from the message:

def handle_lan_get_serial_number(data_set):
    serial_number = int.from_bytes(data_set[4:], \
        byteorder = "little")
    return "LAN_GET_SERIAL_NUMBER: " + \
        str(serial_number)

Finally, we’ll update the dispatch function so that it calls the new handler function:

def dispatch(data_set):
    header = data_set[2]
    if header == 0x10:
        return handle_lan_get_serial_number(data_set)
    if header == 0x40:
        return dispatch_x(data_set)
    return "unknown header"

And now the updated test, as well as all other tests that we wrote today, passes.

Final code

The final code for the dispatchers and handler functions and all tests are as follows:

def dispatch(data_set):
    header = data_set[2]
    if header == 0x10:
        return handle_lan_get_serial_number(data_set)
    if header == 0x40:
        return dispatch_x(data_set)
    return "unknown header"

def dispatch_x(data_set):
    x_header = data_set[4]
    if x_header == 0x61:
        return "LAN_X 0x61"
    if x_header == 0xEF:
        return "LAN_X_LOCO_INFO"
    return "unknown X-header"

def handle_lan_get_serial_number(data_set):
    serial_number = int.from_bytes(data_set[4:], \
        byteorder = "little")
    return "LAN_GET_SERIAL_NUMBER: " + \
        str(serial_number)

def dispatch_test(test_name,\
        input_string, expected_result):
    actual_result = dispatch(input_string)
    if actual_result == expected_result:
        print(test_name, ": PASSED")
    else:
        print(test_name, ": FAILED")
        print("expected: ", expected_result)
        print("actual: ", actual_result)

dispatch_test("LAN_GET_SERIAL_NUMBER", \
    b"\x08\x00\x10\x00\xa3\xcf\x01\x00", \
    "LAN_GET_SERIAL_NUMBER: 118691")
dispatch_test("LAN_X 0x61", b"\x07\x00@\x00a\x00a",\
    "LAN_X 0x61")
dispatch_test("LAN_X_LOCO_INFO", \
    b"\x0f\x00@\x00\xef\x00\x03\x0c\x80\x00\x00\x00\x00\x00`"\
    ,"LAN_X_LOCO_INFO")

Limitations

  • Clumsy structure. The dispatch functions handle only a few message types. The implementation with if statements, or alternatively match statements would become cumbersome if we implement support for more message types. There are more elegant and scalable solutions possible that map handler functions to message types, e.g. using a dictionary.
  • No isolated testing of dispatching functions. We tested the dispatching functions using strings that they returned themselves, or results from underlying handler functions. This results in tight coupling. We could use techniques like dependency injection for the dispatcher functions.
  • No specialized test cases for dispatch_x and handler functions. We test these functions implicitly by testing the dispatch function. If these functions would have been more complicated, their functionality should be tested separately.
  • No feedback messages. The messages we received are quite meaningless as long as we don’t receive any feedback messages. Insight in which sections of the layout are occupied at any given moment is critical for automated train control.

Especially this last limitation is something we are going to work on next.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *