Using the Bpod Class

The Bpod class is bpod-core’s entry point for communicating with a Bpod device. It opens a serial connection, sends compiled state machines to the hardware, controls execution, and returns trial data.

Connecting to a Device

Bpod can connect to a device by port, serial_number, or — when both are omitted — by auto-discovering the first available Bpod on the system. In most use cases it’s best to use the class as a context manager: it guarantees the serial connection is closed and any running trial is allowed to finish on exit.

Listing 6 Using a context manager to connect to a Bpod.
from bpod_core.bpod import Bpod

with Bpod('/dev/ttyACM0') as bpod:
    pass  # do things

Tip

Serial port names such as /dev/ttyACM0 are assigned by the OS and may change across reboots; the serial number, however, is a stable hardware identifier.

If you do not know your device’s serial number yet, connect once (auto-discover or by port) and read serial_number off the object. Then use it in all subsequent scripts for a stable, reboot-proof connection:

>>> with Bpod() as bpod:
...     bpod.serial_number
'14260000'

>>> with Bpod(serial_number='14260000') as bpod:
...     pass  # do things ...

Running State Machines

To run a state machine on a Bpod device, use the run() method of your Bpod instance. It validates and compiles the state machine, and finally enqueues it on the Bpod for immediate execution. Calling get_data() after a state machine run returns the collected data as a Polars DataFrame (see Data Format).

Listing 7 Defining and running a state machine.
from bpod_core.bpod import Bpod
from bpod_core.fsm import StateMachine

# define a state machine
fsm = StateMachine()
fsm.add_state('Wait', transitions={'Port1_High': 'LightPort1', 'Port2_High': 'LightPort2'})
fsm.add_state('LightPort1', timer=1, transitions={'Tup': '>exit'}, actions={'PWM1': 255})
fsm.add_state('LightPort2', timer=1, transitions={'Tup': '>exit'}, actions={'PWM2': 255})

# run the state machine
with Bpod() as bpod:
    bpod.run(fsm)

# collect the data
data = bpod.get_data()

Note

In the example above, get_data() is called outside the context manager. The connection is already closed at that point, but the data queue persists on the Bpod object and is safe to drain after the context exits.

See also

Refer to Finite-State Machines for the full reference of the StateMachine object.

Zero-Downtime Execution

You can invoke run() several times in quick succession. Each call automatically blocks until the hardware is ready to accept the next state machine. The Bpod starts executing the transferred state machine as soon as possible—either right away if no trial is running, or as soon as the preceding trial ends—enabling back-to-back execution of state machines with zero inter-trial downtime and continuous acquisition.

../_images/bpod_run__light.svg

Fig. 13 Timeline of four consecutive run() calls across the Host PC and Bpod.

../_images/bpod_run__dark.svg

Fig. 14 Timeline of four consecutive run() calls across the Host PC and Bpod.

When calling get_data(), data from individual trials is automatically concatenated to a continuous Polars DataFrame. In the following example, a single state machine is executed 100 times:

Listing 8 Running several trials of the same state machine in immediate succession.
from bpod_core.bpod import Bpod
from bpod_core.fsm import StateMachine

with Bpod() as bpod:
    for trial in range(100):
        bpod.run(fsm)

data = bpod.get_data()

On-The-Fly Definition

Similarly, you can define state machines on-the-fly within the loop. The run() method is non-blocking (unless the Bpod is not yet ready to accept a transfer) and the individual state machines will run continuously, as long as preparing and uploading a state machine’s successor takes less time than the current state machine takes to execute.

StateMachine objects are mutable: state timers, actions, and transitions can all be reassigned between trials without rebuilding the full state machine from scratch. In the following example we construct an initial StateMachine instance that we continue to modify from trial to trial, yielding randomized state timers and action values:

Listing 9 Generating state machines on the fly.
from random import random, randint

from bpod_core.fsm import StateMachine
from bpod_core.bpod import Bpod

fsm = StateMachine()
fsm.add_state('s1', transitions={'Tup': 's2'})
fsm.add_state('s2', timer=0.1, transitions={'Tup': '>exit'})

with Bpod() as bpod:
    for trial in range(100):
        fsm.states['s1'].timer = random() / 10
        fsm.states['s1'].actions['PWM1'] = randint(0, 255)
        bpod.run(fsm)

data = bpod.get_data()

Retrieval of Partial Data

peek_data() retrieves data from a running state machine without waiting for it to finish. It blocks until one of the specified trigger_states is reached, then returns a copy of all data collected up to that point.

The example below uses two state machines per trial: the first measures the duration of an event on Port0; the second plays back an output of that same duration on PWM0. After starting fsm1, peek_data() blocks until the pause state is reached, extracts the timing, and uses it to configure fsm2 before queuing it—with no idle time between the two runs. The final get_data() call collects data across all trials.

Listing 10 Using the peek_data() method
import polars as pl

from bpod_core.fsm import StateMachine
from bpod_core.bpod import Bpod

# construct first state machine (identical across all trials)
fsm1 = StateMachine()
fsm1.add_state('wait', transitions={'Port0_High': 'measure'})
fsm1.add_state('measure', transitions={'Port0_Low': 'pause'})
fsm1.add_state('pause', timer=0.5, transitions={'Tup': '>exit'})

# construct second state machine (will be modified within each trial)
fsm2 = StateMachine()
fsm2.add_state('echo', transitions={'Tup': '>exit'}, actions={'PWM0': 255})

with Bpod() as bpod:
    for i in range(10):
        # run first state machine
        bpod.run(fsm1, trial_number=i)

        # peek at data once 'pause' state has been reached
        runtime_data = bpod.peek_data(trigger_states=['pause'])

        # calculate duration of 'Port0_High'
        d = runtime_data.filter(pl.col("channel") == "Port0")['time'].diff().last()

        # set state timer and run second state machine
        fsm2.states['echo'].timer = d
        bpod.run(fsm2, trial_number=i)

data = bpod.get_data()  # collect data across all state machine runs

Note

The trial number is normally incremented automatically with each call to run(). Here, two state machines share a single trial, so trial_number is set explicitly to keep it aligned with the loop index.