~ni/nxt-python

203b639d829ed417581b9b6061ccb533efa18423 — Nicolas Schodet 3 months ago d8d5ad2 + 046ff71
Merge branch 'marvinknoll-stop-turning-motor'
3 files changed, 100 insertions(+), 12 deletions(-)

A examples/tutorial/stop_turning_motor.py
M nxt/motor.py
M tests/test_motor.py
A examples/tutorial/stop_turning_motor.py => examples/tutorial/stop_turning_motor.py +27 -0
@@ 0,0 1,27 @@
#!/usr/bin/python3
import nxt.locator
import nxt.motor

import time
import threading

with nxt.locator.find() as b:
    # Get the motor connected to the port A.
    mymotor: nxt.motor.BaseMotor = b.get_motor(nxt.motor.Port.A)

    stop_motor = False  # controls wether the motor should stop turning

    # create thread that turns the motor
    t = threading.Thread(target=mymotor.turn, kwargs={
                         'power': 50, 'tacho_units': 360*4, 'brake': True, 'stop_turn': lambda: stop_motor})
    t.start()

    # stop motor after 1sec (motor would turn approximately 3sec)
    time.sleep(1)
    stop_motor = True

    t.join()

    # release motor after 1sec since brake=True
    time.sleep(1)
    mymotor.idle()

M nxt/motor.py => nxt/motor.py +22 -10
@@ 197,7 197,7 @@ def get_tacho_and_state(values):
class BaseMotor:
    """Base class for motors"""

    def turn(self, power, tacho_units, brake=True, timeout=1, emulate=True):
    def turn(self, power, tacho_units, brake=True, timeout=1, emulate=True, stop_turn=lambda: False):
        """Use this to turn a motor.

        :param int power: Value between -127 and 128 (an absolute value greater than 64


@@ 212,9 212,11 @@ class BaseMotor:
           If ``True``, a run() function equivalent is used. Warning: motors remember
           their positions and not using emulate may lead to strange behavior,
           especially with synced motors
        :param lambda: bool stop_turn: If stop_turn returns ``True`` the motor stops turning. 
           Depending on ``brake`` it stops by holding or not holding the motor.

        The motor will not stop until it turns the desired distance. Accuracy is much
        better over a USB connection than with bluetooth...
        The motor will not stop until it turns the desired distance or stop_turn is set to True. 
        Accuracy is much better over a USB connection than with bluetooth...
        """

        tacho_limit = tacho_units


@@ 246,20 248,27 @@ class BaseMotor:

        direction = 1 if power > 0 else -1
        logger.debug("tachocount: %s", tacho)

        current_time = time.time()
        last_time = time.time()
        tacho_target = tacho.get_target(tacho_limit, direction)

        blocked = False
        sleep_time = self._eta(tacho, tacho_target, power) / 2

        try:
            while True:
                time.sleep(self._eta(tacho, tacho_target, power) / 2)
            while not stop_turn():
                time.sleep(0.1)

                if not blocked:  # if still blocked, don't reset the counter
                    last_tacho = tacho
                    last_time = current_time
                if current_time - last_time < sleep_time:
                    current_time = time.time()
                    continue
                else:
                    if not blocked:  # if still blocked, don't reset the counter
                        last_tacho = tacho
                        last_time = current_time
                    current_time = time.time()

                tacho = self.get_tacho()
                current_time = time.time()
                blocked = self._is_blocked(tacho, last_tacho, direction)
                if blocked:
                    logger.debug("not advancing: %s %s", last_tacho, tacho)


@@ 272,10 281,13 @@ class BaseMotor:
                            raise BlockedException("Blocked!")
                else:
                    logger.debug("advancing: %s %s", last_tacho, tacho)

                if tacho.is_near(tacho_target, threshold) or tacho.is_greater(
                    tacho_target, direction
                ):
                    break

                sleep_time = self._eta(tacho, tacho_target, power) / 2
        finally:
            if brake:
                self.brake()

M tests/test_motor.py => tests/test_motor.py +51 -2
@@ 10,7 10,7 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
from unittest.mock import call
from unittest.mock import MagicMock, call

import pytest



@@ 167,7 167,7 @@ def test_turn(mbrick, mmotor, mtime):


def test_turn_blocked(mbrick, mmotor, mtime):
    mtime.time.side_effect = [0, 1]
    mtime.time.side_effect = [0, 0, 1, 2]
    with pytest.raises(nxt.motor.BlockedException):
        mmotor.turn(50, 360, brake=False, timeout=0.1, emulate=False)
    assert mbrick.mock_calls == [


@@ 188,6 188,55 @@ def test_turn_blocked(mbrick, mmotor, mtime):
    ]


def test_turn_stopped(mbrick, mmotor, mtime):
    # Test if skipping get_tacho() while not "slept enough"
    # Test motor gets stopped once stop_turn is set to True
    mbrick.get_output_state.side_effect = [
        (Port.A, 0, Mode.IDLE, RegulationMode.IDLE, 0, RunState.IDLE, 0, 0, 0, 0),
        (
            Port.A,
            50,
            Mode.ON | Mode.REGULATED,
            RegulationMode.SPEED,
            0,
            RunState.RUNNING,
            0,
            90,
            90,
            90,
        ),
    ]
    mtime.time.side_effect = [0, 0, 0.1, 0.2, 0.8, 0.9]

    stop_motor_mock = MagicMock(side_effect=lambda: False)
    stop_motor_mock.side_effect = [False, False, False, False, True]

    mmotor.turn(50, 360, stop_turn=stop_motor_mock)

    assert mbrick.mock_calls == [
        call.get_output_state(Port.A),
        call.set_output_state(
            Port.A,
            50,
            Mode.ON | Mode.REGULATED,
            RegulationMode.SPEED,
            0,
            RunState.RUNNING,
            0,
        ),
        call.get_output_state(Port.A),
        call.set_output_state(
            Port.A,
            0,
            Mode.ON | Mode.REGULATED | Mode.BRAKE,
            RegulationMode.SPEED,
            0,
            RunState.RUNNING,
            0,
        ),
    ]


def test_sync_run(mbrick, msyncmotor):
    msyncmotor.run(50)
    assert mbrick.mock_calls == [