Source code for advpistepper.stepper_process

#  Copyright (c) 2020 Thomas Holland (thomas@innot.de)
#  All rights reserved.
#
#  This code is licensed under the MIT License.
#
#  Refer to the LICENSE file which is part of the AdvPiStepper distribution or
#  to https://opensource.org/licenses/MIT for a text of the license.
#
#

"""
The backend Process for the AdvPiStepper.

This class accepts commands from the frontend, calculates target positions and speed,
including accelerations and decelerations, and finally converts these into steps and
the time delay between them. A driver is then used to generate pigpio pulse sequences
which can finally be sent to the pigpio daemon for the actual GPIO pulses.
"""

import time
import os
import multiprocessing
import logging
import gc
import pigpio


from dataclasses import dataclass
from enum import Enum, auto
from math import sqrt
from typing import Dict, Any, Union

from .common import *
from .driver_base import DriverBase


[docs]class State(Enum): """Enum of all states of the stepper engine.""" IDLE = auto() """Stepper is not running and the coils are deenergized.""" STOP = auto() """Stepper is not running but the coils are energized to hold the last position.""" ACCEL = auto() """Stepper is accelerating to the target speed.""" INC = auto() """After being in the RUN state the stepper is accelerating to a new target speed.""" RUN = auto() """Stepper is running at the target speed.""" DEC = auto() """Stepper is decelerating to a new, slower target speed.""" DECEL = auto() """Stepper is decelerating to a STOP."""
[docs]@dataclass class ControllerData: """ Object containing all data for the speed controller. """ state: int = State.IDLE """Current state of the stepper.""" current_direction: int = 0 """0 = at rest, CW = forward, CCW = backward.""" c_n: int = 0 """time from the current step to the next (in microseconds).""" c_min: int = 1000 """Minimum time between steps (at max rate) in microseconds. This determines the maximum speed of the motor. Default is 1ms (1000steps per second), but the motor stepper driver should supply a more appropriate value for the connected motor and its GPIO stepper. """ c_0: int = 10000 """Initial time between steps at the start of a move (in microseconds). 10000 is just a placeholder. The actual value is calculated from the set acceleration. """ c_target: int = 2000 """Time between steps for the target speed (as set by speed()) (in microseconds). """ delay: int = 0 """int: delay until the next step in microseconds.""" target_speed: float = 100 """Target speed in steps per second. Default is 100 steps per second.""" speed: float = 0.0 """Current speed in steps per second.""" step: int = 0 """The current step in the acceleration and deceleration phases.""" decel_steps: int = 0 """Number of steps required to decelerate to a full stop."""
[docs]class Verb(Enum): """List of Commands that can be send to the stepper background process.""" # set target values SPEED = auto() ACCELERATION = auto() DECELERATION = auto() FULL_STEPS_PER_REV = auto() MICROSTEPS = auto() # Relative Moves MOVE = auto() MOVE_DEG = auto() MOVE_RAD = auto() # Absolute Moves MOVETO = auto() MOVETO_DEG = auto() MOVETO_RAD = auto() # Constant speed mode RUN = auto() STOP = auto() # reset absolut position ZERO = auto() # emergency stop HARD_STOP = auto() # end process QUIT = auto() # Coil control ENGAGE = auto() RELEASE = auto() # get value. The Noun contains the ID of the value. The value is returned via the results Pipe. GET = auto() # Do nothing operator. Used for setting up the Pipe NOP = auto() # Acknowledge the command ACKNOWLEDGE = auto()
[docs]class Noun(Enum): """List of values that can be queried with the GET Verb.""" VAL_CURRENT_SPEED = auto() VAL_TARGET_SPEED = auto() VAL_CURRENT_POSITION = auto() VAL_TARGET_POSITION = auto() VAL_ACCELERATION = auto() VAL_DECELERATION = auto() VAL_FULL_STEPS_PER_REV = auto() VAL_MICROSTEPS = auto() VAL_ACKNOWLEDGE = auto() # Microstep return values MICROSTEP_NOT_POSSIBLE = auto() MICROSTEP_CHANGE_AT = auto()
[docs]@dataclass class Command(object): """Command object passed from the frontend to the background Process.""" verb: Verb = None noun: Union[Noun, int, float] = None def __init__(self, verb: Verb, noun: Union[Noun, int, float] = None): self.verb = verb self.noun = noun
[docs]@dataclass class Result(object): """Result object passed from the background Process to the frontend.""" noun: Noun = None value: Union[int, float, bool, Verb] = None def __init__(self, noun: Noun, value: Union[int, float, bool, Verb]): self.noun = noun self.value = value
[docs]class StepperProcess(multiprocessing.Process): """ Stepper engine background process. :param command_pipe: Pipe which will receive :class:`Command` objects. :type command_pipe: multiprocessing.Pipe :param results_pipe: Pipe where :class:`Result` objects are send back to the frontend. :type results_pipe: multiprocessing.Pipe :param run_lock: A lock which the backend uses while busy :type run_lock: multiprocessing.Lock :param driver: The GPIO driver used to translate steps to pigpio pulses and waves. :type driver: DriverBase :param parameters: Optinal list of parameters to override default values. :type parameters: Dict[str,Any] """ def __init__(self, command_pipe: multiprocessing.Pipe, results_pipe: multiprocessing.Pipe, run_lock: multiprocessing.Lock, driver: DriverBase = None, parameters: Dict[str, Any] = None): super(StepperProcess, self).__init__() self.params: Dict[str, Any] = driver.parameters # default values if parameters is not None: self.params.update(parameters) # replace defaults with custom values # store the arguments self.c_pipe = command_pipe self.r_pipe = results_pipe self.run_lock = run_lock self.driver = driver # set up the internal data self.cd = ControllerData() self.current_position: int = 0 """Where the motor is at any given moment in steps/microsteps.""" self.target_position: int = 0 """Where the motor should drive to (in steps/microsteps).""" self.microsteps: int = self.params[MICROSTEP_DEFAULT] """Number of microsteps per full step currently set.Default supplied by driver.""" self.acceleration: float = self.params[ACCELERATION_RATE] self.deceleration: float = self.params[DECELERATION_RATE] self.full_steps_per_rev: int = self.params[FULL_STEPS_PER_REV] self.set_acceleration(self.acceleration) # calculate c_0 from the passed acceleration self.move_required = False """Flag to indicate that the Process has received a command to move the motor""" self.quit_now = False """Flag to indicate that the Process should terminate nicely.""" self.microstep_change_at = None self.microstep_new_value = None # do not connect to pigpio yet as pigpio uses a lock which can not be # pickeled and therefore not be spawned / forked. self.pi = None # if running on Linux (Raspberry Pi) try to get a higher priority. # This works only if this is run with root privileges. try: os.nice(-10) except (PermissionError, AttributeError): pass
[docs] def run(self): # connect to pigpio once we have started as a process. # pigpio.pi can not be pickled and can therefore not be passed # to the stepper process. self.connect_pigpio() # Now we can initialize the driver self.driver.init(self.pi) self.idle_loop()
[docs] def connect_pigpio(self): p_addr = self.params.get(PIGPIO_ADDR) p_port = self.params.get(PIGPIO_PORT, 8888) if p_addr is not None: self.pi = pigpio.pi(host=p_addr, port=p_port) else: # use localhost or as set by OS env variable PIGPIO_ADDR / _PORT self.pi = pigpio.pi()
[docs] def command_handler(self, command: Command): verb = command.verb noun = command.noun logging.debug(f"Backend: received verb={verb}, noun={noun}") if verb == Verb.SPEED: self.set_speed(float(noun)) elif verb == Verb.ACCELERATION: self.set_acceleration(float(noun)) elif verb == Verb.DECELERATION: self.set_deceleration(float(noun)) elif verb == Verb.FULL_STEPS_PER_REV: self.set_full_steps_per_rev(int(noun)) elif verb == Verb.MICROSTEPS: self.set_microsteps(int(noun)) elif verb == Verb.MOVE: self.move(int(noun)) elif verb == Verb.MOVE_DEG: self.move_deg(float(noun)) elif verb == Verb.MOVETO: self.moveto(int(noun)) elif verb == Verb.MOVETO_DEG: self.moveto_deg(float(noun)) elif verb == Verb.RUN: self.continuous(int(noun)) elif verb == Verb.STOP: self.stop() elif verb == Verb.ZERO: self.zero() elif verb == Verb.HARD_STOP: self.hard_stop() elif verb == Verb.QUIT: self.quit() elif verb == Verb.ENGAGE: self.engage() elif verb == Verb.RELEASE: self.release() elif verb == Verb.GET: self.get_value(noun) elif verb == Verb.NOP: # do nothing pass else: raise RuntimeError(f"Received unknown command {command}") # Acknowledge to the frontend that the command has been received and processed. self.c_pipe.send(Result(Noun.VAL_ACKNOWLEDGE, verb))
[docs] def set_speed(self, speed): old_speed = self.cd.target_speed if speed == old_speed: # nothing to do here return # Check if the motor is already running # if yes, then accelerate / decelerate, but only if not already # accelerating or decelarating to a step if speed > old_speed and (self.cd.state == State.RUN or self.cd.state == State.DEC): # Austin Eq.16, Changes of acceleration self.cd.step = int((speed * speed) / (2.0 * self.acceleration)) self.cd.state = State.INC elif speed < old_speed and (self.cd.state == State.RUN or self.cd.state == State.INC): # see above, with negative sign to cause a deceleration self.cd.step = int(-(speed * speed) / (2.0 * self.deceleration)) self.cd.state = State.DEC # Set the actual target parameter self.cd.c_target = 1000000 / speed self.cd.target_speed = speed
[docs] def set_acceleration(self, rate: float): # recalculate n (step) and c_0 # See Austin Eq.15 self.cd.step = self.cd.step * (self.acceleration / rate) self.cd.c_0 = 0.676 * sqrt(2.0 / rate) * 1000000 self.acceleration = rate
[docs] def set_deceleration(self, rate: float): self.deceleration = rate
[docs] def set_full_steps_per_rev(self, steps: int): self.full_steps_per_rev = steps
[docs] def set_microsteps(self, steps: int): c_steps = self.driver.steps_until_change_microsteps(steps) if c_steps < 0: # microstep setting not possible self.r_pipe.send(Result(Noun.MICROSTEP_NOT_POSSIBLE, 0)) return self.microstep_new_value = steps if c_steps == 0: self._perform_microstep_change() change_pos = self.current_position else: change_pos = self.current_position + (c_steps * self.cd.current_direction) self.microstep_change_at = change_pos self.microstep_new_value = steps self.r_pipe.send(Result(Noun.MICROSTEP_CHANGE_AT, change_pos))
def _perform_microstep_change(self): previous = self.microsteps self.microsteps = self.microstep_new_value factor = self.microstep_new_value / previous self.current_position *= factor self.target_position *= factor self.set_acceleration(self.acceleration * factor) self.set_deceleration(self.deceleration * factor) self.set_speed(self.cd.target_speed * factor) self.cd.step *= factor self.cd.c_target *= factor self.microstep_change_at = None self.driver.set_microsteps(self.microsteps)
[docs] def move(self, relative): if self.target_position == float('inf') or self.target_position == float('-inf'): # when in continuous mode we can only reference the current position self.target_position = self.current_position + relative else: # otherwise reference of the current target_position. # when at idle this should be the same as the current_position. self.target_position += relative if relative != 0: self.move_required = True
[docs] def move_deg(self, angle: float): steps_per_deg = (self.microsteps * self.full_steps_per_rev) / 360 steps = round(angle * steps_per_deg) self.move(steps)
[docs] def moveto(self, absolute): if self.target_position != absolute: self.target_position = absolute self.move_required = True
[docs] def moveto_deg(self, angle: float): steps_per_rev = self.microsteps * self.full_steps_per_rev steps_per_deg = steps_per_rev / 360 # from the current position remove any partial rotation # this new value will be the reference for the move current_full_rev = int(self.current_position / steps_per_rev) * steps_per_rev if self.current_position > 0: current_angle = self.current_position % steps_per_rev else: current_angle = self.current_position % -steps_per_rev target_steps = angle * steps_per_deg target_full_rev = int(target_steps / steps_per_rev) * steps_per_rev if angle >= 0: target_angle = target_steps % steps_per_rev else: target_angle = target_steps % -steps_per_rev if angle >= 0: # Clockwise move if target_angle >= current_angle: # target angle is greater than current angle, only need to add # full revolutions to the target angle target_position = target_angle target_position += current_full_rev + target_full_rev else: # target_remainder < current_remainder # target angle is smaller than the current angle. For a CW move # we need to go the long way around to the new target target_position = target_angle + steps_per_rev target_position += current_full_rev + target_full_rev else: # angle < 0 # Counterclockwie move # make target angle have the same sign as current angle if current_angle > 0 and target_angle < 0: target_angle += steps_per_rev elif current_angle < 0 and target_angle > 0: target_angle -= steps_per_rev if target_angle <= current_angle: # target angle is smaller than current angle, only need to add # full revolutions to the target angle target_position = target_angle target_position += current_full_rev + target_full_rev else: # target_remainder > current_remainder # target angle is greater than the current angle. For a CCW move # we need to go the long way around to the new target target_position = target_angle - steps_per_rev target_position += current_full_rev + target_full_rev self.moveto(target_position)
[docs] def continuous(self, direction: int): if direction == CW: self.target_position = float('inf') else: self.target_position = float('-inf') self.move_required = True
[docs] def stop(self): direction = self.cd.current_direction if direction == CW: self.target_position = self.current_position + self.cd.decel_steps else: self.target_position = self.current_position - self.cd.decel_steps
[docs] def zero(self): old_target = self.target_position delta = old_target - self.current_position self.current_position = 0 self.target_position = delta
[docs] def hard_stop(self): # get the driver to stop immediately. self.driver.hard_stop() # stop the engine self.cd.state = State.STOP self.cd.speed = 0.0 self.cd.step = 0 # tbd: maybe just invalidate both as the motor might have travelled some more # steps before coming to a full stop self.target_position = self.current_position
[docs] def quit(self): self.quit_now = True
[docs] def engage(self): # just pass on to the driver self.driver.engage()
[docs] def release(self): # Tell the driver to release the coils self.driver.release() self.hard_stop() # stop any movements
[docs] def get_value(self, noun: Noun): if noun == Noun.VAL_CURRENT_SPEED: value = self.cd.speed elif noun == Noun.VAL_TARGET_SPEED: value = self.cd.target_speed elif noun == Noun.VAL_CURRENT_POSITION: value = self.current_position elif noun == Noun.VAL_TARGET_POSITION: value = self.target_position elif noun == Noun.VAL_ACCELERATION: value = self.acceleration elif noun == Noun.VAL_DECELERATION: value = self.deceleration elif noun == Noun.VAL_FULL_STEPS_PER_REV: value = self.full_steps_per_rev else: value = None result = Result(noun, value) self.r_pipe.send(result)
[docs] def idle_loop(self): try: while not self.quit_now: pipedata = self.c_pipe.poll(0.1) # Wait for command if pipedata: self.run_lock.acquire() # Tell the world we are busy... command = self.c_pipe.recv() self.command_handler(command) if self.move_required: gc.disable() self.busy_loop() gc.enable() self.move_required = False self.run_lock.release() # ... and that we are twiddeling our thumbs again except EOFError: # the other end has closed the pipe. # clean up and go home return
[docs] def init_move(self): self.driver.engage() steps = self.target_position - self.current_position if steps < 0: direction = CCW else: direction = CW self.cd.current_direction = direction if self.params.get(DIRECTION_INVERT): direction = -direction self.driver.direction = direction
[docs] def busy_loop(self): self.init_move() self.pi.wave_clear() prev_wave_id = -1 # start of with a minimal delay pulse just so that we have a # current_wave_id. This saves one check in the loop. pulse = pigpio.pulse(0, 0, 100) self.pi.wave_add_generic([pulse]) current_wave_id = self.pi.wave_create_and_pad(10) self.pi.wave_send_once(current_wave_id) # calculate the initial delay delay = self.calculate_delay try: while not self.quit_now: wave = self.driver.perform_step(delay) self.pi.wave_add_generic(wave) next_wave_id = self.pi.wave_create_and_pad(10) self.pi.wave_send_using_mode(next_wave_id, pigpio.WAVE_MODE_ONE_SHOT_SYNC) # update the internal position as soon as the pulses are on # their way. if self.cd.current_direction == CW: self.current_position += 1 else: self.current_position -= 1 # check if a change in microsteps is scheduled if self.microstep_change_at == self.current_position: self._perform_microstep_change() # use the time while the current and next wave are being transmitted # to calculate the delay of the next step delay = self.calculate_delay if delay == 0: # move finished, clean up all waves while self.pi.wave_tx_busy(): # wait for all pulses to transmit time.sleep(0.001) self.pi.wave_clear() return # to the idle loop while self.pi.wave_tx_at() == current_wave_id: # to keep the timing as tight as practical # even at the expense of a high cpu load we just # poll the command pipe and the current wave id # at maximum speed and without yielding if self.c_pipe.poll(): command = self.c_pipe.recv() self.command_handler(command) if prev_wave_id != -1: self.pi.wave_delete(prev_wave_id) prev_wave_id = current_wave_id current_wave_id = next_wave_id # end of loop except BrokenPipeError: # Command pipe was closed - the other end has terminated # close shop and go home return
@property def calculate_delay(self) -> int: data = self.cd delta_position = self.target_position - self.current_position # determine the number of steps to come to a full stop from # the current speed. [1] Equation 16 decel_steps = int(((data.speed * data.speed) / (2 * self.deceleration))) data.decel_steps = decel_steps if delta_position == 0 and decel_steps <= 1: # target position reached, move completed data.speed = 0.0 data.step = 0 data.state = State.STOP return 0 if delta_position * data.current_direction < 0: # direction reversal data.state = State.DECEL data.step = -int(decel_steps) self.driver.direction = data.current_direction elif abs(delta_position) <= decel_steps and data.state != State.DECEL: data.state = State.DECEL data.step = -int(decel_steps) if data.step == 0: # first step or reversal infliction point data.c_n = data.c_0 data.step = 1 if data.state == State.DECEL: # 0-point of a reversal # officially change direction. if delta_position > 0: data.current_direction = CW else: data.current_direction = CCW if self.params.get(DIRECTION_INVERT): self.driver.direction = -data.current_direction else: self.driver.direction = data.current_direction data.state = State.ACCEL elif data.state == State.ACCEL or data.state == State.INC: data.c_n = data.c_n - ((2.0 * data.c_n) / ((4.0 * data.step) + 1)) if data.c_n <= data.c_target: # selected speed reached. Change to constant speed mode. data.c_n = data.c_target data.state = State.RUN else: data.step += 1 elif data.state == State.DECEL or data.state == State.DEC: data.c_n = data.c_n - ((2.0 * data.c_n) / ((4.0 * data.step) + 1)) data.step += 1 # Speed in steps per second data.speed = 1000000 / data.c_n # logging.debug(f"CurrPos:{self.current_position}, Step:{data.step}, State:{data.state}, " # f"c_n:{int(data.c_n)}, CurrSpeed:{int(data.speed)}, DecelSteps:{decel_steps}") return int(data.c_n)