Skip to content
Snippets Groups Projects
Commit beb5df97 authored by ft-Demo's avatar ft-Demo
Browse files

commit by robo pro coding

parents
No related branches found
No related tags found
No related merge requests found
{"uuid":"e2679fc1-bbd2-5142-2ec8-ad7e1cab3590","name":"Sorting_Line_AI_py","mode":"PROFESSIONAL","version":"1.0","controller":"TXT4"}
\ No newline at end of file
import logging
import os
import threading
from lib.controller import *
from lib.display import *
from lib.machine_learning import *
from lib.node_red import *
from lib.sorting_line import *
controllerid = None
client = None
controllerid = os.uname()[1]
logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(levelname)-10s %(funcName)3s %(message)s #%(filename)3s:%(lineno)d")
display.set_attr("part_pass_fail.text", str(''.join([str(x) for x in ['<h4>UI at: http://', controllerid, '.local:1880/ui</h4>']])))
threading.Thread(target=thread_SLD, daemon=True).start()
client = mqtt_client_forever()
# auto generated content from camera configuration
from lib.controller import *
import fischertechnik.factories as txt_factory
TXT_SLD_M_USB1_1_camera.set_rotate(False)
TXT_SLD_M_USB1_1_camera.set_height(240)
TXT_SLD_M_USB1_1_camera.set_width(320)
TXT_SLD_M_USB1_1_camera.set_fps(15)
TXT_SLD_M_USB1_1_camera.start()
<xml type="camera" version="2" camera_name="TXT_SLD_M_USB1_1_camera"/>
\ No newline at end of file
import fischertechnik.factories as txt_factory
txt_factory.init()
txt_factory.init_input_factory()
txt_factory.init_output_factory()
txt_factory.init_motor_factory()
txt_factory.init_counter_factory()
txt_factory.init_usb_factory()
txt_factory.init_camera_factory()
TXT_SLD_M = txt_factory.controller_factory.create_graphical_controller()
TXT_SLD_M_I4_photo_transistor = txt_factory.input_factory.create_photo_transistor(TXT_SLD_M, 4)
TXT_SLD_M_I5_photo_transistor = txt_factory.input_factory.create_photo_transistor(TXT_SLD_M, 5)
TXT_SLD_M_I6_photo_transistor = txt_factory.input_factory.create_photo_transistor(TXT_SLD_M, 6)
TXT_SLD_M_I7_photo_transistor = txt_factory.input_factory.create_photo_transistor(TXT_SLD_M, 7)
TXT_SLD_M_I8_photo_transistor = txt_factory.input_factory.create_photo_transistor(TXT_SLD_M, 8)
TXT_SLD_M_O5_magnetic_valve = txt_factory.output_factory.create_magnetic_valve(TXT_SLD_M, 5)
TXT_SLD_M_O6_magnetic_valve = txt_factory.output_factory.create_magnetic_valve(TXT_SLD_M, 6)
TXT_SLD_M_O7_magnetic_valve = txt_factory.output_factory.create_magnetic_valve(TXT_SLD_M, 7)
TXT_SLD_M_O3_compressor = txt_factory.output_factory.create_compressor(TXT_SLD_M, 3)
TXT_SLD_M_O4_led = txt_factory.output_factory.create_led(TXT_SLD_M, 4)
TXT_SLD_M_O8_magnetic_valve = txt_factory.output_factory.create_magnetic_valve(TXT_SLD_M, 8)
TXT_SLD_M_M1_encodermotor = txt_factory.motor_factory.create_encodermotor(TXT_SLD_M, 1)
TXT_SLD_M_C1_motor_step_counter = txt_factory.counter_factory.create_encodermotor_counter(TXT_SLD_M, 1)
TXT_SLD_M_C1_motor_step_counter.set_motor(TXT_SLD_M_M1_encodermotor)
TXT_SLD_M_USB1_1_camera = txt_factory.usb_factory.create_camera(TXT_SLD_M, 1)
txt_factory.initialized()
\ No newline at end of file
import os
import time
import threading
import signal
import ftgui
display = ftgui.fttxt2_gui_connector("app")
display.open()
def display_monitoring():
while display.is_open():
time.sleep(1)
os.kill(os.getpid(), signal.SIGTERM)
exit()
threading.Thread(target=display_monitoring, daemon=True).start()
// auto generated content from display configuration
import QtQuick 2.2
import QtQuick.Window 2.0
import QtQuick.Controls 1.1
import QtQuick.Controls.Styles 1.1
import QtQuick.Extras 1.4
TXTWindow {
Rectangle {
id: rect
color: "grey"
anchors.fill: parent
}
TXTLabel {
id: img_label
text: ""
font.pixelSize: 16
elide: Text.ElideRight
x: 17
y: 19
width: 200
height: 150
}
StatusIndicator {
id: red
color: "#EC1600"
active: false
x: 71
y: 195
width: 35
height: 35
}
StatusIndicator {
id: white
color: "#FFFFFF"
active: false
x: 1
y: 194
width: 35
height: 35
}
StatusIndicator {
id: blue
color: "#005693"
active: false
x: 137
y: 194
width: 35
height: 35
}
StatusIndicator {
id: fail
color: "#E7AE13"
active: false
x: 204
y: 195
width: 35
height: 35
}
TXTLabel {
id: version_label
text: "<small>Sorting Line AI: Version 2022/11/24</small>"
font.pixelSize: 16
elide: Text.ElideRight
x: 19
y: 0
width: 201
height: 20
}
TXTLabel {
id: part_pass_fail
text: "<font>Webadress loading...</font>"
font.pixelSize: 16
elide: Text.ElideRight
x: 20
y: 170
width: 200
height: 22
}
}
<xml type="display" version="2"><item id="33" class="TXTLabel"><name>img_label</name><text/><geometry><x>17</x><y>19</y><width>200</width><height>150</height></geometry></item><item id="20" class="StatusIndicator"><name>red</name><color>#EC1600</color><active>false</active><geometry><x>71</x><y>195</y><width>35</width><height>35</height></geometry></item><item id="21" class="StatusIndicator"><name>white</name><color>#FFFFFF</color><active>false</active><geometry><x>1</x><y>194</y><width>35</width><height>35</height></geometry></item><item id="22" class="StatusIndicator"><name>blue</name><color>#005693</color><active>false</active><geometry><x>137</x><y>194</y><width>35</width><height>35</height></geometry></item><item id="23" class="StatusIndicator"><name>fail</name><color>#E7AE13</color><active>false</active><geometry><x>204</x><y>195</y><width>35</width><height>35</height></geometry></item><item id="24" class="TXTLabel"><name>version_label</name><text>&lt;small&gt;Sorting Line AI: Version 2022/11/24&lt;/small&gt;</text><geometry><x>19</x><y>0</y><width>201</width><height>20</height></geometry></item><item id="28" class="TXTLabel"><name>part_pass_fail</name><text>&lt;font&gt;Webadress loading...&lt;/font&gt;</text><geometry><x>20</x><y>170</y><width>200</width><height>22</height></geometry></item></xml>
\ No newline at end of file
import base64
import cv2
import datetime
import logging
import numpy as np
import subprocess
import time
from datetime import datetime
from fischertechnik.camera.VideoStream import VideoStream
from fischertechnik.controller.Motor import Motor
from fischertechnik.machine_learning.ObjectDetector import ObjectDetector
from lib.camera import *
from lib.controller import *
from lib.display import *
from lib.node_red import *
tag = None
value = None
num = None
color = None
ts = None
filename = None
sat = None
hue = None
duration = None
prob = None
keytext = None
pos = None
frame = None
detector = None
result = None
key = None
def reset_inteface():
global tag, value, num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
display.set_attr("part_pass_fail.text", str(containInHTML('i', 'Not analysed yet')))
display.set_attr("red.active", str(False).lower())
display.set_attr("white.active", str(False).lower())
display.set_attr("blue.active", str(False).lower())
display.set_attr("fail.active", str(False).lower())
def containInHTML(tag, value):
global num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
return ''.join([str(x) for x in ['<', tag, '>', value, '</', tag, '>']])
def MakePictureRunKiReturnFoundPart():
global tag, value, num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
reset_inteface()
TXT_SLD_M_O4_led.set_brightness(int(512))
time.sleep(0.2)
TXT_SLD_M_O4_led.set_brightness(int(30))
time.sleep(0.8)
duration = (time.time() * 1000)
num = 4
prob = 0
keytext = 'No feature found'
pos = ''
display.set_attr("part_pass_fail.text", str(containInHTML('i', 'processing')))
frame = TXT_SLD_M_USB1_1_camera.read_frame()
#get color from frame
color = (np.mean(frame[ 80:120, 100:240], axis=(0, 1)))
color = cv2.cvtColor(np.uint8([[[color[0],color[1],color[2]]]]),cv2.COLOR_BGR2HLS)[0][0]
hue = color[0] # range 0-180
sat = color[2] # range 0-255
TXT_SLD_M_C1_motor_step_counter.reset()
TXT_SLD_M_M1_encodermotor.set_speed(int(160), Motor.CCW)
TXT_SLD_M_M1_encodermotor.set_distance(int(240))
detector = ObjectDetector('/opt/ft/workspaces/machine-learning/object-detection/sorting_line/model.tflite', '/opt/ft/workspaces/machine-learning/object-detection/sorting_line/labels.txt')
result = detector.process_image(frame)
color = get_color()
TXT_SLD_M_O4_led.set_brightness(int(0))
print(result)
if len(result) > 0:
prob = result[0]['probability']
pos = result[0]['position']
key = result[0]['label']
keytext = ''
if key == 'CRACK':
keytext = 'Cracks in Workpiece'
elif key == 'MIPO1':
keytext = '1x milled pocket'
elif key == 'MIPO2':
keytext = '2x milled pocket'
elif key == 'BOHO':
keytext = 'Round hole'
elif key == 'BOHOEL':
keytext = 'Hole elyptical'
elif key == 'BOHOMIPO1':
keytext = 'Hole and 1x milled pocket'
elif key == 'BOHOMIPO2':
keytext = 'Hole and 2x milled pocket'
elif key == 'BLANK':
keytext = 'Workpiece without features'
else:
keytext = key
print('{} {} {} {} {} {}'.format(key, keytext, num, prob, pos, color))
if key == 'BOHO' and color == 1:
num = 1
display.set_attr("white.active", str(True).lower())
display.set_attr("part_pass_fail.text", str(containInHTML('b', "Workpiece <font color='#88ff88'> PASSED</font>")))
elif key == 'MIPO2' and color == 2:
num = 2
display.set_attr("red.active", str(True).lower())
display.set_attr("part_pass_fail.text", str(containInHTML('b', "Workpiece <font color='#88ff88'> PASSED</font>")))
elif key == 'BOHOMIPO2' and color == 3:
num = 3
display.set_attr("blue.active", str(True).lower())
display.set_attr("part_pass_fail.text", str(containInHTML('b', "Workpiece <font color='#88ff88'> PASSED</font>")))
else:
num = 4
display.set_attr("fail.active", str(True).lower())
display.set_attr("part_pass_fail.text", str(containInHTML('b', "Workpiece <font color='#ff8888'>FAILED</font>")))
else:
display.set_attr("part_pass_fail.text", str(containInHTML('b', "Workpiece <font color='#ff8888'>FAILED</font>")))
duration = (time.time() * 1000) - duration
saveFileandPublish()
return num
def get_color():
global tag, value, num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
if hue >= 85 and hue < 130 and sat >= 40:
color = 3
elif (hue >= 130 and hue <= 180 or hue >= 0 and hue < 15) and sat >= 40:
color = 2
else:
color = 1
return color
def timestamp():
global tag, value, num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
return ts
def saveFileandPublish():
global tag, value, num, color, ts, filename, sat, hue, duration, prob, keytext, pos, frame, detector, result, key
filename = '/opt/ft/workspaces/last-image.png'
if(pos != ""):
image = cv2.rectangle(frame, (pos[0], pos[1]), (pos[2], pos[3]), (180,105,0), 2)
logging.debug("write png file: ", filename)
cv2.imwrite(filename, frame)
subprocess.Popen(['chmod', '777', filename])
with open(filename, "rb") as img_file:
my_string = base64.b64encode(img_file.read())
imgb64 = "data:image/jpeg;base64," + (my_string.decode('utf-8'))
time.sleep(0.2)
publish(imgb64,keytext,color,num,prob,duration)
displaystr= "<img width='200' height='150' src='" + imgb64 + "'>"
display.set_attr("img_label.text", str(displaystr))
import base64
import cv2
import json
import logging
import numpy as np
import paho.mqtt.client as mqtt
from datetime import datetime
imagedata = None
description = None
color = None
bay = None
confidence = None
duration = None
client = None
string = None
ts = None
def mqtt_client_forever():
global imagedata, description, color, bay, confidence, duration, client, string, ts
client = mqtt.Client("mqtt_client")
client.connect('127.0.0.1',2883,60)
#client.on_message=on_message
#client.subscribe("#")
client.loop_forever()
return client
def get_client():
global imagedata, description, color, bay, confidence, duration, client, string, ts
return client
def publish(imagedata, description, color, bay, confidence, duration):
global client, string, ts
string = '{{ "ts":"{}", "description":"{}", "color":"{}", "bay":"{}", "confidence":"{}", "data":"{}", "duration":"{}" }}'.format(timestamp(), description, color, bay, confidence, imagedata, duration)
logging.debug("sending...")
ret = client.publish("i/cam",string)
def timestamp():
global imagedata, description, color, bay, confidence, duration, client, string, ts
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
return ts
import logging
import os
import time
from fischertechnik.controller.Motor import Motor
from lib.controller import *
from lib.machine_learning import *
BeltSpeed = None
BeltSteps = None
MovementSpeed = None
PositionBay1 = None
PositionBay2 = None
j = None
PositionBay3 = None
i = None
state_code = None
PositionBay4 = None
PositionCamera = None
dubblepart = None
num = None
def thread_SLD():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
logging.debug('starting thread SLD')
MovementSpeed = min(max(300, 1), 512)
PositionBay1 = 195
PositionBay2 = 280
PositionBay3 = 360
PositionBay4 = 443
PositionCamera = 105
dubblepart = False
num = -1
while True:
try:
mainSLDexternal_th()
except Exception as e:
print(e)
clean_exit()
def mainSLDexternal_th():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
if PartInGoodsReceipt():
reset_inteface()
logging.debug('start')
TXT_SLD_M_M1_encodermotor.set_speed(int(MovementSpeed * 0.5), Motor.CCW)
TXT_SLD_M_M1_encodermotor.start_sync()
for i in range(401):
if not PartInGoodsReceipt():
break
if i >= 399:
TXT_SLD_M_M1_encodermotor.stop_sync()
raise Exception("Insertion fault, workpiece did not clear the lightbeam in expected Time. [Trubleshoot: Is the workpiece stuck somewhere?]")
time.sleep(0.01)
TXT_SLD_M_M1_encodermotor.stop_sync()
SetBeltSpeedSteps(MovementSpeed, PositionCamera)
AwaitBeltToReachPosition()
num = MakePictureRunKiReturnFoundPart()
if num == 1:
SetBeltSpeedSteps(MovementSpeed, (PositionBay1 - PositionCamera) - (TXT_SLD_M_C1_motor_step_counter.get_count()))
AwaitBeltToReachPosition()
ejectWhite()
elif num == 2:
SetBeltSpeedSteps(MovementSpeed, (PositionBay2 - PositionCamera) - (TXT_SLD_M_C1_motor_step_counter.get_count()))
AwaitBeltToReachPosition()
ejectRed()
elif num == 3:
SetBeltSpeedSteps(MovementSpeed, (PositionBay3 - PositionCamera) - (TXT_SLD_M_C1_motor_step_counter.get_count()))
AwaitBeltToReachPosition()
ejectBlue()
else:
SetBeltSpeedSteps(MovementSpeed, (PositionBay4 - PositionCamera) - (TXT_SLD_M_C1_motor_step_counter.get_count()))
AwaitBeltToReachPosition()
ejectFAIL()
if (state_code == 0):
raise Exception("Ejection fault, workpiece did not reach storage bay in expected time. [Trubleshoot: Did the workpiece reach the bay? Is it stuck somewhere?]")
def AwaitBeltToReachPosition():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
while True:
if (not TXT_SLD_M_M1_encodermotor.is_running()):
break
time.sleep(0.010)
def SetBeltSpeedSteps(BeltSpeed, BeltSteps):
global MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
if BeltSteps < 0:
TXT_SLD_M_M1_encodermotor.set_speed(int(BeltSpeed), Motor.CW)
TXT_SLD_M_M1_encodermotor.set_distance(int(BeltSteps * -1))
else:
TXT_SLD_M_M1_encodermotor.set_speed(int(BeltSpeed), Motor.CCW)
TXT_SLD_M_M1_encodermotor.set_distance(int(BeltSteps))
def ejectWhite():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
TXT_SLD_M_O3_compressor.on()
logging.debug('WHITE')
TXT_SLD_M_O5_magnetic_valve.on()
for j in range(151):
state_code = 0
if isWhite():
if j <= 30:
time.sleep((300 - 10 * j) / 1000)
state_code = 1
break
time.sleep(0.01)
TXT_SLD_M_O5_magnetic_valve.off()
TXT_SLD_M_O3_compressor.off()
def ejectRed():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
TXT_SLD_M_O3_compressor.on()
logging.debug('RED')
TXT_SLD_M_O6_magnetic_valve.on()
for j in range(151):
state_code = 0
if isRed():
if j <= 30:
time.sleep((300 - 10 * j) / 1000)
state_code = 2
break
time.sleep(0.01)
TXT_SLD_M_O6_magnetic_valve.off()
TXT_SLD_M_O3_compressor.off()
def ejectBlue():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
TXT_SLD_M_O3_compressor.on()
logging.debug('BLUE')
TXT_SLD_M_O7_magnetic_valve.on()
for j in range(151):
state_code = 0
if isBlue():
if j <= 30:
time.sleep((300 - 10 * j) / 1000)
state_code = 3
break
time.sleep(0.01)
TXT_SLD_M_O7_magnetic_valve.off()
TXT_SLD_M_O3_compressor.off()
def ejectFAIL():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
TXT_SLD_M_O3_compressor.on()
logging.debug('FAIL')
TXT_SLD_M_O8_magnetic_valve.on()
for j in range(151):
state_code = 0
if isFAIL():
if j <= 30:
time.sleep((300 - 10 * j) / 1000)
state_code = 4
break
time.sleep(0.01)
TXT_SLD_M_O8_magnetic_valve.off()
TXT_SLD_M_O3_compressor.off()
def isWhite():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
return TXT_SLD_M_I8_photo_transistor.is_dark()
def isRed():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
return TXT_SLD_M_I7_photo_transistor.is_dark()
def isBlue():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
return TXT_SLD_M_I6_photo_transistor.is_dark()
def isFAIL():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
return TXT_SLD_M_I5_photo_transistor.is_dark()
def PartInGoodsReceipt():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
return TXT_SLD_M_I4_photo_transistor.is_dark()
def clean_exit():
global BeltSpeed, BeltSteps, MovementSpeed, PositionBay1, PositionBay2, j, PositionBay3, i, state_code, PositionBay4, PositionCamera, dubblepart, num
TXT_SLD_M_O4_led.set_brightness(int(0))
TXT_SLD_M_O3_compressor.off()
TXT_SLD_M_M1_encodermotor.stop_sync()
TXT_SLD_M_O5_magnetic_valve.off()
TXT_SLD_M_O6_magnetic_valve.off()
TXT_SLD_M_O7_magnetic_valve.off()
TXT_SLD_M_O8_magnetic_valve.off()
os._exit(os.EX_OK)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment