# """ # Component that performs TensorFlow classification on images. # For a quick start, pick a pre-trained COCO model from: # https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md # For more details about this platform, please refer to the documentation at # https://home-assistant.io/components/image_processing.tensorflow/ # """ # import logging # import sys # import os # import voluptuous as vol # from homeassistant.components.image_processing import ( # CONF_CONFIDENCE, CONF_ENTITY_ID, CONF_NAME, CONF_SOURCE, PLATFORM_SCHEMA, # ImageProcessingEntity) # from homeassistant.core import split_entity_id # from homeassistant.helpers import template # import homeassistant.helpers.config_validation as cv # REQUIREMENTS = ['numpy==1.15.3', 'pillow==5.2.0', 'protobuf==3.6.1'] # _LOGGER = logging.getLogger(__name__) # ATTR_MATCHES = 'matches' # ATTR_SUMMARY = 'summary' # ATTR_TOTAL_MATCHES = 'total_matches' # CONF_FILE_OUT = 'file_out' # CONF_MODEL = 'model' # CONF_GRAPH = 'graph' # CONF_LABELS = 'labels' # CONF_MODEL_DIR = 'model_dir' # CONF_CATEGORIES = 'categories' # CONF_CATEGORY = 'category' # CONF_AREA = 'area' # CONF_TOP = 'top' # CONF_LEFT = 'left' # CONF_BOTTOM = 'bottom' # CONF_RIGHT = 'right' # AREA_SCHEMA = vol.Schema({ # vol.Optional(CONF_TOP, default=0): cv.small_float, # vol.Optional(CONF_LEFT, default=0): cv.small_float, # vol.Optional(CONF_BOTTOM, default=1): cv.small_float, # vol.Optional(CONF_RIGHT, default=1): cv.small_float # }) # CATEGORY_SCHEMA = vol.Schema({ # vol.Required(CONF_CATEGORY): cv.string, # vol.Optional(CONF_AREA): AREA_SCHEMA # }) # PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ # vol.Optional(CONF_FILE_OUT, default=[]): # vol.All(cv.ensure_list, [cv.template]), # vol.Required(CONF_MODEL): vol.Schema({ # vol.Required(CONF_GRAPH): cv.isfile, # vol.Optional(CONF_LABELS): cv.isfile, # vol.Optional(CONF_MODEL_DIR): cv.isdir, # vol.Optional(CONF_AREA): AREA_SCHEMA, # vol.Optional(CONF_CATEGORIES, default=[]): # vol.All(cv.ensure_list, [vol.Any( # cv.string, # CATEGORY_SCHEMA # )]) # }) # }) # def draw_box(draw, box, img_width, # img_height, text='', color=(255, 255, 0)): # """Draw bounding box on image.""" # ymin, xmin, ymax, xmax = box # (left, right, top, bottom) = (xmin * img_width, xmax * img_width, # ymin * img_height, ymax * img_height) # draw.line([(left, top), (left, bottom), (right, bottom), # (right, top), (left, top)], width=5, fill=color) # if text: # draw.text((left, abs(top-15)), text, fill=color) # def setup_platform(hass, config, add_entities, discovery_info=None): # """Set up the TensorFlow image processing platform.""" # model_config = config.get(CONF_MODEL) # model_dir = model_config.get(CONF_MODEL_DIR) \ # or hass.config.path('tensorflow') # labels = model_config.get(CONF_LABELS) \ # or hass.config.path('tensorflow', 'object_detection', # 'data', 'mscoco_label_map.pbtxt') # # Make sure locations exist # if not os.path.isdir(model_dir) or not os.path.exists(labels): # _LOGGER.error("Unable to locate tensorflow models or label map.") # return # # append custom model path to sys.path # sys.path.append(model_dir) # try: # # Verify that the TensorFlow Object Detection API is pre-installed # # pylint: disable=unused-import,unused-variable # os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # import tensorflow as tf # noqa # from object_detection.utils import label_map_util # noqa # except ImportError: # # pylint: disable=line-too-long # _LOGGER.error( # "No TensorFlow Object Detection library found! Install or compile " # "for your system following instructions here: " # "https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/installation.md") # noqa # return # try: # # Display warning that PIL will be used if no OpenCV is found. # # pylint: disable=unused-import,unused-variable # import cv2 # noqa # except ImportError: # _LOGGER.warning("No OpenCV library found. " # "TensorFlow will process image with " # "PIL at reduced resolution.") # # setup tensorflow graph, session, and label map to pass to processor # # pylint: disable=no-member # detection_graph = tf.Graph() # with detection_graph.as_default(): # od_graph_def = tf.GraphDef() # with tf.gfile.GFile(model_config.get(CONF_GRAPH), 'rb') as fid: # serialized_graph = fid.read() # od_graph_def.ParseFromString(serialized_graph) # tf.import_graph_def(od_graph_def, name='') # session = tf.Session(graph=detection_graph) # label_map = label_map_util.load_labelmap(labels) # categories = label_map_util.convert_label_map_to_categories( # label_map, max_num_classes=90, use_display_name=True) # category_index = label_map_util.create_category_index(categories) # entities = [] # for camera in config[CONF_SOURCE]: # entities.append(TensorFlowImageProcessor( # hass, camera[CONF_ENTITY_ID], camera.get(CONF_NAME), # session, detection_graph, category_index, config)) # add_entities(entities) # class TensorFlowImageProcessor(ImageProcessingEntity): # """Representation of an TensorFlow image processor.""" # def __init__(self, hass, camera_entity, name, session, detection_graph, # category_index, config): # """Initialize the TensorFlow entity.""" # model_config = config.get(CONF_MODEL) # self.hass = hass # self._camera_entity = camera_entity # if name: # self._name = name # else: # self._name = "TensorFlow {0}".format( # split_entity_id(camera_entity)[1]) # self._session = session # self._graph = detection_graph # self._category_index = category_index # self._min_confidence = config.get(CONF_CONFIDENCE) # self._file_out = config.get(CONF_FILE_OUT) # # handle categories and specific detection areas # categories = model_config.get(CONF_CATEGORIES) # self._include_categories = [] # self._category_areas = {} # for category in categories: # if isinstance(category, dict): # category_name = category.get(CONF_CATEGORY) # category_area = category.get(CONF_AREA) # self._include_categories.append(category_name) # self._category_areas[category_name] = [0, 0, 1, 1] # if category_area: # self._category_areas[category_name] = [ # category_area.get(CONF_TOP), # category_area.get(CONF_LEFT), # category_area.get(CONF_BOTTOM), # category_area.get(CONF_RIGHT) # ] # else: # self._include_categories.append(category) # self._category_areas[category] = [0, 0, 1, 1] # # Handle global detection area # self._area = [0, 0, 1, 1] # area_config = model_config.get(CONF_AREA) # if area_config: # self._area = [ # area_config.get(CONF_TOP), # area_config.get(CONF_LEFT), # area_config.get(CONF_BOTTOM), # area_config.get(CONF_RIGHT) # ] # template.attach(hass, self._file_out) # self._matches = {} # self._total_matches = 0 # self._last_image = None # @property # def camera_entity(self): # """Return camera entity id from process pictures.""" # return self._camera_entity # @property # def name(self): # """Return the name of the image processor.""" # return self._name # @property # def state(self): # """Return the state of the entity.""" # return self._total_matches # @property # def device_state_attributes(self): # """Return device specific state attributes.""" # return { # ATTR_MATCHES: self._matches, # ATTR_SUMMARY: {category: len(values) # for category, values in self._matches.items()}, # ATTR_TOTAL_MATCHES: self._total_matches # } # def _save_image(self, image, matches, paths): # from PIL import Image, ImageDraw # import io # img = Image.open(io.BytesIO(bytearray(image))).convert('RGB') # img_width, img_height = img.size # draw = ImageDraw.Draw(img) # # Draw custom global region/area # if self._area != [0, 0, 1, 1]: # draw_box(draw, self._area, # img_width, img_height, # "Detection Area", (0, 255, 255)) # for category, values in matches.items(): # # Draw custom category regions/areas # if (category in self._category_areas # and self._category_areas[category] != [0, 0, 1, 1]): # label = "{} Detection Area".format(category.capitalize()) # draw_box(draw, self._category_areas[category], img_width, # img_height, label, (0, 255, 0)) # # Draw detected objects # for instance in values: # label = "{0} {1:.1f}%".format(category, instance['score']) # draw_box(draw, instance['box'], # img_width, img_height, # label, (255, 255, 0)) # for path in paths: # _LOGGER.info("Saving results image to %s", path) # img.save(path) # def process_image(self, image): # """Process the image.""" # import numpy as np # try: # import cv2 # pylint: disable=import-error # img = cv2.imdecode( # np.asarray(bytearray(image)), cv2.IMREAD_UNCHANGED) # inp = img[:, :, [2, 1, 0]] # BGR->RGB # inp_expanded = inp.reshape(1, inp.shape[0], inp.shape[1], 3) # except ImportError: # from PIL import Image # import io # img = Image.open(io.BytesIO(bytearray(image))).convert('RGB') # img.thumbnail((460, 460), Image.ANTIALIAS) # img_width, img_height = img.size # inp = np.array(img.getdata()).reshape( # (img_height, img_width, 3)).astype(np.uint8) # inp_expanded = np.expand_dims(inp, axis=0) # image_tensor = self._graph.get_tensor_by_name('image_tensor:0') # boxes = self._graph.get_tensor_by_name('detection_boxes:0') # scores = self._graph.get_tensor_by_name('detection_scores:0') # classes = self._graph.get_tensor_by_name('detection_classes:0') # boxes, scores, classes = self._session.run( # [boxes, scores, classes], # feed_dict={image_tensor: inp_expanded}) # boxes, scores, classes = map(np.squeeze, [boxes, scores, classes]) # classes = classes.astype(int) # matches = {} # total_matches = 0 # for box, score, obj_class in zip(boxes, scores, classes): # score = score * 100 # boxes = box.tolist() # # Exclude matches below min confidence value # if score < self._min_confidence: # continue # # Exclude matches outside global area definition # if (boxes[0] < self._area[0] or boxes[1] < self._area[1] # or boxes[2] > self._area[2] or boxes[3] > self._area[3]): # continue # category = self._category_index[obj_class]['name'] # # Exclude unlisted categories # if (self._include_categories # and category not in self._include_categories): # continue # # Exclude matches outside category specific area definition # if (self._category_areas # and (boxes[0] < self._category_areas[category][0] # or boxes[1] < self._category_areas[category][1] # or boxes[2] > self._category_areas[category][2] # or boxes[3] > self._category_areas[category][3])): # continue # # If we got here, we should include it # if category not in matches.keys(): # matches[category] = [] # matches[category].append({ # 'score': float(score), # 'box': boxes # }) # total_matches += 1 # # Save Images # if total_matches and self._file_out: # paths = [] # for path_template in self._file_out: # if isinstance(path_template, template.Template): # paths.append(path_template.render( # camera_entity=self._camera_entity)) # else: # paths.append(path_template) # self._save_image(image, matches, paths) # self._matches = matches # self._total_matches = total_matches