Initial commit. Working state as of Oct 27
This commit is contained in:
parent
d498f81357
commit
a191c3b831
14
.gitignore
vendored
Normal file
14
.gitignore
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
.python_history
|
||||
.sqlite_history
|
||||
.bash_history
|
||||
.bash_logout
|
||||
.bash_profile
|
||||
.bashrc
|
||||
.local
|
||||
.cache
|
||||
.config
|
||||
static
|
||||
__pycache__
|
||||
fullchain.pem
|
||||
privkey.pem
|
||||
api.log
|
8
face_comparison.py
Normal file
8
face_comparison.py
Normal file
@ -0,0 +1,8 @@
|
||||
import sys
|
||||
import face_recognition
|
||||
target = face_recognition.load_image_file("target.jpg" if len(sys.argv) <=1 else sys.argv[1])
|
||||
candidate = face_recognition.load_image_file("candidate.jpg" if len(sys.argv) <=1 else sys.argv[2])
|
||||
target_enc = face_recognition.face_encodings(target)[0]
|
||||
candidate_enc = face_recognition.face_encodings(candidate)[0]
|
||||
results = face_recognition.compare_faces([candidate_enc],target_enc)
|
||||
print(results[0])
|
7
face_recognition/__init__.py
Normal file
7
face_recognition/__init__.py
Normal file
@ -0,0 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
__author__ = """Adam Geitgey"""
|
||||
__email__ = 'ageitgey@gmail.com'
|
||||
__version__ = '1.2.3'
|
||||
|
||||
from .api import load_image_file, face_locations, batch_face_locations, face_landmarks, face_encodings, compare_faces, face_distance
|
228
face_recognition/api.py
Normal file
228
face_recognition/api.py
Normal file
@ -0,0 +1,228 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import PIL.Image
|
||||
import dlib
|
||||
import numpy as np
|
||||
from PIL import ImageFile
|
||||
|
||||
try:
|
||||
import face_recognition_models
|
||||
except Exception:
|
||||
print("Please install `face_recognition_models` with this command before using `face_recognition`:\n")
|
||||
print("pip install git+https://github.com/ageitgey/face_recognition_models")
|
||||
quit()
|
||||
|
||||
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
||||
|
||||
face_detector = dlib.get_frontal_face_detector()
|
||||
|
||||
predictor_68_point_model = face_recognition_models.pose_predictor_model_location()
|
||||
pose_predictor_68_point = dlib.shape_predictor(predictor_68_point_model)
|
||||
|
||||
predictor_5_point_model = face_recognition_models.pose_predictor_five_point_model_location()
|
||||
pose_predictor_5_point = dlib.shape_predictor(predictor_5_point_model)
|
||||
|
||||
cnn_face_detection_model = face_recognition_models.cnn_face_detector_model_location()
|
||||
cnn_face_detector = dlib.cnn_face_detection_model_v1(cnn_face_detection_model)
|
||||
|
||||
face_recognition_model = face_recognition_models.face_recognition_model_location()
|
||||
face_encoder = dlib.face_recognition_model_v1(face_recognition_model)
|
||||
|
||||
|
||||
def _rect_to_css(rect):
|
||||
"""
|
||||
Convert a dlib 'rect' object to a plain tuple in (top, right, bottom, left) order
|
||||
|
||||
:param rect: a dlib 'rect' object
|
||||
:return: a plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
"""
|
||||
return rect.top(), rect.right(), rect.bottom(), rect.left()
|
||||
|
||||
|
||||
def _css_to_rect(css):
|
||||
"""
|
||||
Convert a tuple in (top, right, bottom, left) order to a dlib `rect` object
|
||||
|
||||
:param css: plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
:return: a dlib `rect` object
|
||||
"""
|
||||
return dlib.rectangle(css[3], css[0], css[1], css[2])
|
||||
|
||||
|
||||
def _trim_css_to_bounds(css, image_shape):
|
||||
"""
|
||||
Make sure a tuple in (top, right, bottom, left) order is within the bounds of the image.
|
||||
|
||||
:param css: plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
:param image_shape: numpy shape of the image array
|
||||
:return: a trimmed plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
"""
|
||||
return max(css[0], 0), min(css[1], image_shape[1]), min(css[2], image_shape[0]), max(css[3], 0)
|
||||
|
||||
|
||||
def face_distance(face_encodings, face_to_compare):
|
||||
"""
|
||||
Given a list of face encodings, compare them to a known face encoding and get a euclidean distance
|
||||
for each comparison face. The distance tells you how similar the faces are.
|
||||
|
||||
:param faces: List of face encodings to compare
|
||||
:param face_to_compare: A face encoding to compare against
|
||||
:return: A numpy ndarray with the distance for each face in the same order as the 'faces' array
|
||||
"""
|
||||
if len(face_encodings) == 0:
|
||||
return np.empty((0))
|
||||
face_to_compare_norm = np.linalg.norm(face_to_compare)
|
||||
# return [1-np.arccos(sum([face_encoding[k]*face_to_compare[k] for k in range(len(face_encoding))])/(face_to_compare_norm*np.linalg.norm(face_encoding)))/np.pi for face_encoding in face_encodings]
|
||||
# return np.linalg.norm(face_encodings - face_to_compare, axis=1)/(np.linalg.norm(face_encodings)+face_to_compare_norm)
|
||||
return 1-np.linalg.norm(face_encodings - face_to_compare, axis=1)/min(np.linalg.norm(face_encodings),face_to_compare_norm)
|
||||
# return 1/(1+np.linalg.norm(face_encodings - face_to_compare, axis=1))
|
||||
|
||||
|
||||
def load_image_file(file, mode='RGB'):
|
||||
"""
|
||||
Loads an image file (.jpg, .png, etc) into a numpy array
|
||||
|
||||
:param file: image file name or file object to load
|
||||
:param mode: format to convert the image to. Only 'RGB' (8-bit RGB, 3 channels) and 'L' (black and white) are supported.
|
||||
:return: image contents as numpy array
|
||||
"""
|
||||
im = PIL.Image.open(file)
|
||||
if mode:
|
||||
im = im.convert(mode)
|
||||
return np.array(im)
|
||||
|
||||
|
||||
def _raw_face_locations(img, number_of_times_to_upsample=1, model="hog"):
|
||||
"""
|
||||
Returns an array of bounding boxes of human faces in a image
|
||||
|
||||
:param img: An image (as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param model: Which face detection model to use. "hog" is less accurate but faster on CPUs. "cnn" is a more accurate
|
||||
deep-learning model which is GPU/CUDA accelerated (if available). The default is "hog".
|
||||
:return: A list of dlib 'rect' objects of found face locations
|
||||
"""
|
||||
if model == "cnn":
|
||||
return cnn_face_detector(img, number_of_times_to_upsample)
|
||||
else:
|
||||
return face_detector(img, number_of_times_to_upsample)
|
||||
|
||||
|
||||
def face_locations(img, number_of_times_to_upsample=1, model="hog"):
|
||||
"""
|
||||
Returns an array of bounding boxes of human faces in a image
|
||||
|
||||
:param img: An image (as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param model: Which face detection model to use. "hog" is less accurate but faster on CPUs. "cnn" is a more accurate
|
||||
deep-learning model which is GPU/CUDA accelerated (if available). The default is "hog".
|
||||
:return: A list of tuples of found face locations in css (top, right, bottom, left) order
|
||||
"""
|
||||
if model == "cnn":
|
||||
return [_trim_css_to_bounds(_rect_to_css(face.rect), img.shape) for face in _raw_face_locations(img, number_of_times_to_upsample, "cnn")]
|
||||
else:
|
||||
return [_trim_css_to_bounds(_rect_to_css(face), img.shape) for face in _raw_face_locations(img, number_of_times_to_upsample, model)]
|
||||
|
||||
|
||||
def _raw_face_locations_batched(images, number_of_times_to_upsample=1, batch_size=128):
|
||||
"""
|
||||
Returns an 2d array of dlib rects of human faces in a image using the cnn face detector
|
||||
|
||||
:param img: A list of images (each as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:return: A list of dlib 'rect' objects of found face locations
|
||||
"""
|
||||
return cnn_face_detector(images, number_of_times_to_upsample, batch_size=batch_size)
|
||||
|
||||
|
||||
def batch_face_locations(images, number_of_times_to_upsample=1, batch_size=128):
|
||||
"""
|
||||
Returns an 2d array of bounding boxes of human faces in a image using the cnn face detector
|
||||
If you are using a GPU, this can give you much faster results since the GPU
|
||||
can process batches of images at once. If you aren't using a GPU, you don't need this function.
|
||||
|
||||
:param img: A list of images (each as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param batch_size: How many images to include in each GPU processing batch.
|
||||
:return: A list of tuples of found face locations in css (top, right, bottom, left) order
|
||||
"""
|
||||
def convert_cnn_detections_to_css(detections):
|
||||
return [_trim_css_to_bounds(_rect_to_css(face.rect), images[0].shape) for face in detections]
|
||||
|
||||
raw_detections_batched = _raw_face_locations_batched(images, number_of_times_to_upsample, batch_size)
|
||||
|
||||
return list(map(convert_cnn_detections_to_css, raw_detections_batched))
|
||||
|
||||
|
||||
def _raw_face_landmarks(face_image, face_locations=None, model="large"):
|
||||
if face_locations is None:
|
||||
face_locations = _raw_face_locations(face_image)
|
||||
else:
|
||||
face_locations = [_css_to_rect(face_location) for face_location in face_locations]
|
||||
|
||||
pose_predictor = pose_predictor_68_point
|
||||
|
||||
if model == "small":
|
||||
pose_predictor = pose_predictor_5_point
|
||||
|
||||
return [pose_predictor(face_image, face_location) for face_location in face_locations]
|
||||
|
||||
|
||||
def face_landmarks(face_image, face_locations=None, model="large"):
|
||||
"""
|
||||
Given an image, returns a dict of face feature locations (eyes, nose, etc) for each face in the image
|
||||
|
||||
:param face_image: image to search
|
||||
:param face_locations: Optionally provide a list of face locations to check.
|
||||
:param model: Optional - which model to use. "large" (default) or "small" which only returns 5 points but is faster.
|
||||
:return: A list of dicts of face feature locations (eyes, nose, etc)
|
||||
"""
|
||||
landmarks = _raw_face_landmarks(face_image, face_locations, model)
|
||||
landmarks_as_tuples = [[(p.x, p.y) for p in landmark.parts()] for landmark in landmarks]
|
||||
|
||||
# For a definition of each point index, see https://cdn-images-1.medium.com/max/1600/1*AbEg31EgkbXSQehuNJBlWg.png
|
||||
if model == 'large':
|
||||
return [{
|
||||
"chin": points[0:17],
|
||||
"left_eyebrow": points[17:22],
|
||||
"right_eyebrow": points[22:27],
|
||||
"nose_bridge": points[27:31],
|
||||
"nose_tip": points[31:36],
|
||||
"left_eye": points[36:42],
|
||||
"right_eye": points[42:48],
|
||||
"top_lip": points[48:55] + [points[64]] + [points[63]] + [points[62]] + [points[61]] + [points[60]],
|
||||
"bottom_lip": points[54:60] + [points[48]] + [points[60]] + [points[67]] + [points[66]] + [points[65]] + [points[64]]
|
||||
} for points in landmarks_as_tuples]
|
||||
elif model == 'small':
|
||||
return [{
|
||||
"nose_tip": [points[4]],
|
||||
"left_eye": points[2:4],
|
||||
"right_eye": points[0:2],
|
||||
} for points in landmarks_as_tuples]
|
||||
else:
|
||||
raise ValueError("Invalid landmarks model type. Supported models are ['small', 'large'].")
|
||||
|
||||
|
||||
def face_encodings(face_image, known_face_locations=None, num_jitters=10):
|
||||
"""
|
||||
Given an image, return the 128-dimension face encoding for each face in the image.
|
||||
|
||||
:param face_image: The image that contains one or more faces
|
||||
:param known_face_locations: Optional - the bounding boxes of each face if you already know them.
|
||||
:param num_jitters: How many times to re-sample the face when calculating encoding. Higher is more accurate, but slower (i.e. 100 is 100x slower)
|
||||
:return: A list of 128-dimensional face encodings (one for each face in the image)
|
||||
"""
|
||||
raw_landmarks = _raw_face_landmarks(face_image, known_face_locations, model="small")
|
||||
return [np.array(face_encoder.compute_face_descriptor(face_image, raw_landmark_set, num_jitters)) for raw_landmark_set in raw_landmarks]
|
||||
|
||||
|
||||
def compare_faces(known_face_encodings, face_encoding_to_check, tolerance=0.6):
|
||||
"""
|
||||
Compare a list of face encodings against a candidate encoding to see if they match.
|
||||
|
||||
:param known_face_encodings: A list of known face encodings
|
||||
:param face_encoding_to_check: A single face encoding to compare against the list
|
||||
:param tolerance: How much distance between faces to consider it a match. Lower is more strict. 0.6 is typical best performance.
|
||||
:return: A list of True/False values indicating which known_face_encodings match the face encoding to check
|
||||
"""
|
||||
return list(face_distance(known_face_encodings, face_encoding_to_check))
|
225
face_recognition/api0.py
Normal file
225
face_recognition/api0.py
Normal file
@ -0,0 +1,225 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import PIL.Image
|
||||
import dlib
|
||||
import numpy as np
|
||||
from PIL import ImageFile
|
||||
|
||||
try:
|
||||
import face_recognition_models
|
||||
except Exception:
|
||||
print("Please install `face_recognition_models` with this command before using `face_recognition`:\n")
|
||||
print("pip install git+https://github.com/ageitgey/face_recognition_models")
|
||||
quit()
|
||||
|
||||
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
||||
|
||||
face_detector = dlib.get_frontal_face_detector()
|
||||
|
||||
predictor_68_point_model = face_recognition_models.pose_predictor_model_location()
|
||||
pose_predictor_68_point = dlib.shape_predictor(predictor_68_point_model)
|
||||
|
||||
predictor_5_point_model = face_recognition_models.pose_predictor_five_point_model_location()
|
||||
pose_predictor_5_point = dlib.shape_predictor(predictor_5_point_model)
|
||||
|
||||
cnn_face_detection_model = face_recognition_models.cnn_face_detector_model_location()
|
||||
cnn_face_detector = dlib.cnn_face_detection_model_v1(cnn_face_detection_model)
|
||||
|
||||
face_recognition_model = face_recognition_models.face_recognition_model_location()
|
||||
face_encoder = dlib.face_recognition_model_v1(face_recognition_model)
|
||||
|
||||
|
||||
def _rect_to_css(rect):
|
||||
"""
|
||||
Convert a dlib 'rect' object to a plain tuple in (top, right, bottom, left) order
|
||||
|
||||
:param rect: a dlib 'rect' object
|
||||
:return: a plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
"""
|
||||
return rect.top(), rect.right(), rect.bottom(), rect.left()
|
||||
|
||||
|
||||
def _css_to_rect(css):
|
||||
"""
|
||||
Convert a tuple in (top, right, bottom, left) order to a dlib `rect` object
|
||||
|
||||
:param css: plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
:return: a dlib `rect` object
|
||||
"""
|
||||
return dlib.rectangle(css[3], css[0], css[1], css[2])
|
||||
|
||||
|
||||
def _trim_css_to_bounds(css, image_shape):
|
||||
"""
|
||||
Make sure a tuple in (top, right, bottom, left) order is within the bounds of the image.
|
||||
|
||||
:param css: plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
:param image_shape: numpy shape of the image array
|
||||
:return: a trimmed plain tuple representation of the rect in (top, right, bottom, left) order
|
||||
"""
|
||||
return max(css[0], 0), min(css[1], image_shape[1]), min(css[2], image_shape[0]), max(css[3], 0)
|
||||
|
||||
|
||||
def face_distance(face_encodings, face_to_compare):
|
||||
"""
|
||||
Given a list of face encodings, compare them to a known face encoding and get a euclidean distance
|
||||
for each comparison face. The distance tells you how similar the faces are.
|
||||
|
||||
:param faces: List of face encodings to compare
|
||||
:param face_to_compare: A face encoding to compare against
|
||||
:return: A numpy ndarray with the distance for each face in the same order as the 'faces' array
|
||||
"""
|
||||
if len(face_encodings) == 0:
|
||||
return np.empty((0))
|
||||
|
||||
return np.linalg.norm(face_encodings - face_to_compare, axis=1)
|
||||
|
||||
|
||||
def load_image_file(file, mode='RGB'):
|
||||
"""
|
||||
Loads an image file (.jpg, .png, etc) into a numpy array
|
||||
|
||||
:param file: image file name or file object to load
|
||||
:param mode: format to convert the image to. Only 'RGB' (8-bit RGB, 3 channels) and 'L' (black and white) are supported.
|
||||
:return: image contents as numpy array
|
||||
"""
|
||||
im = PIL.Image.open(file)
|
||||
if mode:
|
||||
im = im.convert(mode)
|
||||
return np.array(im)
|
||||
|
||||
|
||||
def _raw_face_locations(img, number_of_times_to_upsample=1, model="hog"):
|
||||
"""
|
||||
Returns an array of bounding boxes of human faces in a image
|
||||
|
||||
:param img: An image (as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param model: Which face detection model to use. "hog" is less accurate but faster on CPUs. "cnn" is a more accurate
|
||||
deep-learning model which is GPU/CUDA accelerated (if available). The default is "hog".
|
||||
:return: A list of dlib 'rect' objects of found face locations
|
||||
"""
|
||||
if model == "cnn":
|
||||
return cnn_face_detector(img, number_of_times_to_upsample)
|
||||
else:
|
||||
return face_detector(img, number_of_times_to_upsample)
|
||||
|
||||
|
||||
def face_locations(img, number_of_times_to_upsample=1, model="hog"):
|
||||
"""
|
||||
Returns an array of bounding boxes of human faces in a image
|
||||
|
||||
:param img: An image (as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param model: Which face detection model to use. "hog" is less accurate but faster on CPUs. "cnn" is a more accurate
|
||||
deep-learning model which is GPU/CUDA accelerated (if available). The default is "hog".
|
||||
:return: A list of tuples of found face locations in css (top, right, bottom, left) order
|
||||
"""
|
||||
if model == "cnn":
|
||||
return [_trim_css_to_bounds(_rect_to_css(face.rect), img.shape) for face in _raw_face_locations(img, number_of_times_to_upsample, "cnn")]
|
||||
else:
|
||||
return [_trim_css_to_bounds(_rect_to_css(face), img.shape) for face in _raw_face_locations(img, number_of_times_to_upsample, model)]
|
||||
|
||||
|
||||
def _raw_face_locations_batched(images, number_of_times_to_upsample=1, batch_size=128):
|
||||
"""
|
||||
Returns an 2d array of dlib rects of human faces in a image using the cnn face detector
|
||||
|
||||
:param img: A list of images (each as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:return: A list of dlib 'rect' objects of found face locations
|
||||
"""
|
||||
return cnn_face_detector(images, number_of_times_to_upsample, batch_size=batch_size)
|
||||
|
||||
|
||||
def batch_face_locations(images, number_of_times_to_upsample=1, batch_size=128):
|
||||
"""
|
||||
Returns an 2d array of bounding boxes of human faces in a image using the cnn face detector
|
||||
If you are using a GPU, this can give you much faster results since the GPU
|
||||
can process batches of images at once. If you aren't using a GPU, you don't need this function.
|
||||
|
||||
:param img: A list of images (each as a numpy array)
|
||||
:param number_of_times_to_upsample: How many times to upsample the image looking for faces. Higher numbers find smaller faces.
|
||||
:param batch_size: How many images to include in each GPU processing batch.
|
||||
:return: A list of tuples of found face locations in css (top, right, bottom, left) order
|
||||
"""
|
||||
def convert_cnn_detections_to_css(detections):
|
||||
return [_trim_css_to_bounds(_rect_to_css(face.rect), images[0].shape) for face in detections]
|
||||
|
||||
raw_detections_batched = _raw_face_locations_batched(images, number_of_times_to_upsample, batch_size)
|
||||
|
||||
return list(map(convert_cnn_detections_to_css, raw_detections_batched))
|
||||
|
||||
|
||||
def _raw_face_landmarks(face_image, face_locations=None, model="large"):
|
||||
if face_locations is None:
|
||||
face_locations = _raw_face_locations(face_image)
|
||||
else:
|
||||
face_locations = [_css_to_rect(face_location) for face_location in face_locations]
|
||||
|
||||
pose_predictor = pose_predictor_68_point
|
||||
|
||||
if model == "small":
|
||||
pose_predictor = pose_predictor_5_point
|
||||
|
||||
return [pose_predictor(face_image, face_location) for face_location in face_locations]
|
||||
|
||||
|
||||
def face_landmarks(face_image, face_locations=None, model="large"):
|
||||
"""
|
||||
Given an image, returns a dict of face feature locations (eyes, nose, etc) for each face in the image
|
||||
|
||||
:param face_image: image to search
|
||||
:param face_locations: Optionally provide a list of face locations to check.
|
||||
:param model: Optional - which model to use. "large" (default) or "small" which only returns 5 points but is faster.
|
||||
:return: A list of dicts of face feature locations (eyes, nose, etc)
|
||||
"""
|
||||
landmarks = _raw_face_landmarks(face_image, face_locations, model)
|
||||
landmarks_as_tuples = [[(p.x, p.y) for p in landmark.parts()] for landmark in landmarks]
|
||||
|
||||
# For a definition of each point index, see https://cdn-images-1.medium.com/max/1600/1*AbEg31EgkbXSQehuNJBlWg.png
|
||||
if model == 'large':
|
||||
return [{
|
||||
"chin": points[0:17],
|
||||
"left_eyebrow": points[17:22],
|
||||
"right_eyebrow": points[22:27],
|
||||
"nose_bridge": points[27:31],
|
||||
"nose_tip": points[31:36],
|
||||
"left_eye": points[36:42],
|
||||
"right_eye": points[42:48],
|
||||
"top_lip": points[48:55] + [points[64]] + [points[63]] + [points[62]] + [points[61]] + [points[60]],
|
||||
"bottom_lip": points[54:60] + [points[48]] + [points[60]] + [points[67]] + [points[66]] + [points[65]] + [points[64]]
|
||||
} for points in landmarks_as_tuples]
|
||||
elif model == 'small':
|
||||
return [{
|
||||
"nose_tip": [points[4]],
|
||||
"left_eye": points[2:4],
|
||||
"right_eye": points[0:2],
|
||||
} for points in landmarks_as_tuples]
|
||||
else:
|
||||
raise ValueError("Invalid landmarks model type. Supported models are ['small', 'large'].")
|
||||
|
||||
|
||||
def face_encodings(face_image, known_face_locations=None, num_jitters=1):
|
||||
"""
|
||||
Given an image, return the 128-dimension face encoding for each face in the image.
|
||||
|
||||
:param face_image: The image that contains one or more faces
|
||||
:param known_face_locations: Optional - the bounding boxes of each face if you already know them.
|
||||
:param num_jitters: How many times to re-sample the face when calculating encoding. Higher is more accurate, but slower (i.e. 100 is 100x slower)
|
||||
:return: A list of 128-dimensional face encodings (one for each face in the image)
|
||||
"""
|
||||
raw_landmarks = _raw_face_landmarks(face_image, known_face_locations, model="small")
|
||||
return [np.array(face_encoder.compute_face_descriptor(face_image, raw_landmark_set, num_jitters)) for raw_landmark_set in raw_landmarks]
|
||||
|
||||
|
||||
def compare_faces(known_face_encodings, face_encoding_to_check, tolerance=0.6):
|
||||
"""
|
||||
Compare a list of face encodings against a candidate encoding to see if they match.
|
||||
|
||||
:param known_face_encodings: A list of known face encodings
|
||||
:param face_encoding_to_check: A single face encoding to compare against the list
|
||||
:param tolerance: How much distance between faces to consider it a match. Lower is more strict. 0.6 is typical best performance.
|
||||
:return: A list of True/False values indicating which known_face_encodings match the face encoding to check
|
||||
"""
|
||||
return list(face_distance(known_face_encodings, face_encoding_to_check) <= tolerance)
|
70
face_recognition/face_detection_cli.py
Normal file
70
face_recognition/face_detection_cli.py
Normal file
@ -0,0 +1,70 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import print_function
|
||||
import click
|
||||
import os
|
||||
import re
|
||||
import face_recognition.api as face_recognition
|
||||
import multiprocessing
|
||||
import sys
|
||||
import itertools
|
||||
|
||||
|
||||
def print_result(filename, location):
|
||||
top, right, bottom, left = location
|
||||
print("{},{},{},{},{}".format(filename, top, right, bottom, left))
|
||||
|
||||
|
||||
def test_image(image_to_check, model):
|
||||
unknown_image = face_recognition.load_image_file(image_to_check)
|
||||
face_locations = face_recognition.face_locations(unknown_image, number_of_times_to_upsample=0, model=model)
|
||||
|
||||
for face_location in face_locations:
|
||||
print_result(image_to_check, face_location)
|
||||
|
||||
|
||||
def image_files_in_folder(folder):
|
||||
return [os.path.join(folder, f) for f in os.listdir(folder) if re.match(r'.*\.(jpg|jpeg|png)', f, flags=re.I)]
|
||||
|
||||
|
||||
def process_images_in_process_pool(images_to_check, number_of_cpus, model):
|
||||
if number_of_cpus == -1:
|
||||
processes = None
|
||||
else:
|
||||
processes = number_of_cpus
|
||||
|
||||
# macOS will crash due to a bug in libdispatch if you don't use 'forkserver'
|
||||
context = multiprocessing
|
||||
if "forkserver" in multiprocessing.get_all_start_methods():
|
||||
context = multiprocessing.get_context("forkserver")
|
||||
|
||||
pool = context.Pool(processes=processes)
|
||||
|
||||
function_parameters = zip(
|
||||
images_to_check,
|
||||
itertools.repeat(model),
|
||||
)
|
||||
|
||||
pool.starmap(test_image, function_parameters)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument('image_to_check')
|
||||
@click.option('--cpus', default=1, help='number of CPU cores to use in parallel. -1 means "use all in system"')
|
||||
@click.option('--model', default="hog", help='Which face detection model to use. Options are "hog" or "cnn".')
|
||||
def main(image_to_check, cpus, model):
|
||||
# Multi-core processing only supported on Python 3.4 or greater
|
||||
if (sys.version_info < (3, 4)) and cpus != 1:
|
||||
click.echo("WARNING: Multi-processing support requires Python 3.4 or greater. Falling back to single-threaded processing!")
|
||||
cpus = 1
|
||||
|
||||
if os.path.isdir(image_to_check):
|
||||
if cpus == 1:
|
||||
[test_image(image_file, model) for image_file in image_files_in_folder(image_to_check)]
|
||||
else:
|
||||
process_images_in_process_pool(image_files_in_folder(image_to_check), cpus, model)
|
||||
else:
|
||||
test_image(image_to_check, model)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
119
face_recognition/face_recognition_cli.py
Normal file
119
face_recognition/face_recognition_cli.py
Normal file
@ -0,0 +1,119 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import print_function
|
||||
import click
|
||||
import os
|
||||
import re
|
||||
import face_recognition.api as face_recognition
|
||||
import multiprocessing
|
||||
import itertools
|
||||
import sys
|
||||
import PIL.Image
|
||||
import numpy as np
|
||||
|
||||
|
||||
def scan_known_people(known_people_folder):
|
||||
known_names = []
|
||||
known_face_encodings = []
|
||||
|
||||
for file in image_files_in_folder(known_people_folder):
|
||||
basename = os.path.splitext(os.path.basename(file))[0]
|
||||
img = face_recognition.load_image_file(file)
|
||||
encodings = face_recognition.face_encodings(img)
|
||||
|
||||
if len(encodings) > 1:
|
||||
click.echo("WARNING: More than one face found in {}. Only considering the first face.".format(file))
|
||||
|
||||
if len(encodings) == 0:
|
||||
click.echo("WARNING: No faces found in {}. Ignoring file.".format(file))
|
||||
else:
|
||||
known_names.append(basename)
|
||||
known_face_encodings.append(encodings[0])
|
||||
|
||||
return known_names, known_face_encodings
|
||||
|
||||
|
||||
def print_result(filename, name, distance, show_distance=False):
|
||||
if show_distance:
|
||||
print("{},{},{}".format(filename, name, distance))
|
||||
else:
|
||||
print("{},{}".format(filename, name))
|
||||
|
||||
|
||||
def test_image(image_to_check, known_names, known_face_encodings, tolerance=0.6, show_distance=False):
|
||||
unknown_image = face_recognition.load_image_file(image_to_check)
|
||||
|
||||
# Scale down image if it's giant so things run a little faster
|
||||
if max(unknown_image.shape) > 1600:
|
||||
pil_img = PIL.Image.fromarray(unknown_image)
|
||||
pil_img.thumbnail((1600, 1600), PIL.Image.LANCZOS)
|
||||
unknown_image = np.array(pil_img)
|
||||
|
||||
unknown_encodings = face_recognition.face_encodings(unknown_image)
|
||||
|
||||
for unknown_encoding in unknown_encodings:
|
||||
distances = face_recognition.face_distance(known_face_encodings, unknown_encoding)
|
||||
result = list(distances <= tolerance)
|
||||
|
||||
if True in result:
|
||||
[print_result(image_to_check, name, distance, show_distance) for is_match, name, distance in zip(result, known_names, distances) if is_match]
|
||||
else:
|
||||
print_result(image_to_check, "unknown_person", None, show_distance)
|
||||
|
||||
if not unknown_encodings:
|
||||
# print out fact that no faces were found in image
|
||||
print_result(image_to_check, "no_persons_found", None, show_distance)
|
||||
|
||||
|
||||
def image_files_in_folder(folder):
|
||||
return [os.path.join(folder, f) for f in os.listdir(folder) if re.match(r'.*\.(jpg|jpeg|png)', f, flags=re.I)]
|
||||
|
||||
|
||||
def process_images_in_process_pool(images_to_check, known_names, known_face_encodings, number_of_cpus, tolerance, show_distance):
|
||||
if number_of_cpus == -1:
|
||||
processes = None
|
||||
else:
|
||||
processes = number_of_cpus
|
||||
|
||||
# macOS will crash due to a bug in libdispatch if you don't use 'forkserver'
|
||||
context = multiprocessing
|
||||
if "forkserver" in multiprocessing.get_all_start_methods():
|
||||
context = multiprocessing.get_context("forkserver")
|
||||
|
||||
pool = context.Pool(processes=processes)
|
||||
|
||||
function_parameters = zip(
|
||||
images_to_check,
|
||||
itertools.repeat(known_names),
|
||||
itertools.repeat(known_face_encodings),
|
||||
itertools.repeat(tolerance),
|
||||
itertools.repeat(show_distance)
|
||||
)
|
||||
|
||||
pool.starmap(test_image, function_parameters)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument('known_people_folder')
|
||||
@click.argument('image_to_check')
|
||||
@click.option('--cpus', default=1, help='number of CPU cores to use in parallel (can speed up processing lots of images). -1 means "use all in system"')
|
||||
@click.option('--tolerance', default=0.6, help='Tolerance for face comparisons. Default is 0.6. Lower this if you get multiple matches for the same person.')
|
||||
@click.option('--show-distance', default=False, type=bool, help='Output face distance. Useful for tweaking tolerance setting.')
|
||||
def main(known_people_folder, image_to_check, cpus, tolerance, show_distance):
|
||||
known_names, known_face_encodings = scan_known_people(known_people_folder)
|
||||
|
||||
# Multi-core processing only supported on Python 3.4 or greater
|
||||
if (sys.version_info < (3, 4)) and cpus != 1:
|
||||
click.echo("WARNING: Multi-processing support requires Python 3.4 or greater. Falling back to single-threaded processing!")
|
||||
cpus = 1
|
||||
|
||||
if os.path.isdir(image_to_check):
|
||||
if cpus == 1:
|
||||
[test_image(image_file, known_names, known_face_encodings, tolerance, show_distance) for image_file in image_files_in_folder(image_to_check)]
|
||||
else:
|
||||
process_images_in_process_pool(image_files_in_folder(image_to_check), known_names, known_face_encodings, cpus, tolerance, show_distance)
|
||||
else:
|
||||
test_image(image_to_check, known_names, known_face_encodings, tolerance, show_distance)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
20
face_recognition_models/__init__.py
Normal file
20
face_recognition_models/__init__.py
Normal file
@ -0,0 +1,20 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
__author__ = """Adam Geitgey"""
|
||||
__email__ = 'ageitgey@gmail.com'
|
||||
__version__ = '0.1.0'
|
||||
|
||||
from pkg_resources import resource_filename
|
||||
|
||||
def pose_predictor_model_location():
|
||||
return resource_filename(__name__, "models/shape_predictor_68_face_landmarks.dat")
|
||||
|
||||
def pose_predictor_five_point_model_location():
|
||||
return resource_filename(__name__, "models/shape_predictor_5_face_landmarks.dat")
|
||||
|
||||
def face_recognition_model_location():
|
||||
return resource_filename(__name__, "models/dlib_face_recognition_resnet_model_v1.dat")
|
||||
|
||||
def cnn_face_detector_model_location():
|
||||
return resource_filename(__name__, "models/mmod_human_face_detector.dat")
|
||||
|
Binary file not shown.
BIN
face_recognition_models/models/mmod_human_face_detector.dat
Normal file
BIN
face_recognition_models/models/mmod_human_face_detector.dat
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
80
fuzzywuzzy/StringMatcher.py
Normal file
80
fuzzywuzzy/StringMatcher.py
Normal file
@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
"""
|
||||
StringMatcher.py
|
||||
|
||||
ported from python-Levenshtein
|
||||
[https://github.com/miohtama/python-Levenshtein]
|
||||
License available here: https://github.com/miohtama/python-Levenshtein/blob/master/COPYING
|
||||
"""
|
||||
|
||||
from Levenshtein import *
|
||||
from warnings import warn
|
||||
|
||||
|
||||
class StringMatcher:
|
||||
"""A SequenceMatcher-like class built on the top of Levenshtein"""
|
||||
|
||||
def _reset_cache(self):
|
||||
self._ratio = self._distance = None
|
||||
self._opcodes = self._editops = self._matching_blocks = None
|
||||
|
||||
def __init__(self, isjunk=None, seq1='', seq2=''):
|
||||
if isjunk:
|
||||
warn("isjunk not NOT implemented, it will be ignored")
|
||||
self._str1, self._str2 = seq1, seq2
|
||||
self._reset_cache()
|
||||
|
||||
def set_seqs(self, seq1, seq2):
|
||||
self._str1, self._str2 = seq1, seq2
|
||||
self._reset_cache()
|
||||
|
||||
def set_seq1(self, seq1):
|
||||
self._str1 = seq1
|
||||
self._reset_cache()
|
||||
|
||||
def set_seq2(self, seq2):
|
||||
self._str2 = seq2
|
||||
self._reset_cache()
|
||||
|
||||
def get_opcodes(self):
|
||||
if not self._opcodes:
|
||||
if self._editops:
|
||||
self._opcodes = opcodes(self._editops, self._str1, self._str2)
|
||||
else:
|
||||
self._opcodes = opcodes(self._str1, self._str2)
|
||||
return self._opcodes
|
||||
|
||||
def get_editops(self):
|
||||
if not self._editops:
|
||||
if self._opcodes:
|
||||
self._editops = editops(self._opcodes, self._str1, self._str2)
|
||||
else:
|
||||
self._editops = editops(self._str1, self._str2)
|
||||
return self._editops
|
||||
|
||||
def get_matching_blocks(self):
|
||||
if not self._matching_blocks:
|
||||
self._matching_blocks = matching_blocks(self.get_opcodes(),
|
||||
self._str1, self._str2)
|
||||
return self._matching_blocks
|
||||
|
||||
def ratio(self):
|
||||
if not self._ratio:
|
||||
self._ratio = ratio(self._str1, self._str2)
|
||||
return self._ratio
|
||||
|
||||
def quick_ratio(self):
|
||||
# This is usually quick enough :o)
|
||||
if not self._ratio:
|
||||
self._ratio = ratio(self._str1, self._str2)
|
||||
return self._ratio
|
||||
|
||||
def real_quick_ratio(self):
|
||||
len1, len2 = len(self._str1), len(self._str2)
|
||||
return 2.0 * min(len1, len2) / (len1 + len2)
|
||||
|
||||
def distance(self):
|
||||
if not self._distance:
|
||||
self._distance = distance(self._str1, self._str2)
|
||||
return self._distance
|
2
fuzzywuzzy/__init__.py
Normal file
2
fuzzywuzzy/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
__version__ = '0.18.0'
|
306
fuzzywuzzy/fuzz.py
Normal file
306
fuzzywuzzy/fuzz.py
Normal file
@ -0,0 +1,306 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
from __future__ import unicode_literals
|
||||
import platform
|
||||
import warnings
|
||||
|
||||
try:
|
||||
from .StringMatcher import StringMatcher as SequenceMatcher
|
||||
except ImportError:
|
||||
if platform.python_implementation() != "PyPy":
|
||||
warnings.warn('Using slow pure-python SequenceMatcher. Install python-Levenshtein to remove this warning')
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
from . import utils
|
||||
|
||||
|
||||
###########################
|
||||
# Basic Scoring Functions #
|
||||
###########################
|
||||
|
||||
@utils.check_for_none
|
||||
@utils.check_for_equivalence
|
||||
@utils.check_empty_string
|
||||
def ratio(s1, s2):
|
||||
s1, s2 = utils.make_type_consistent(s1, s2)
|
||||
|
||||
m = SequenceMatcher(None, s1, s2)
|
||||
return utils.intr(100 * m.ratio())
|
||||
|
||||
|
||||
@utils.check_for_none
|
||||
@utils.check_for_equivalence
|
||||
@utils.check_empty_string
|
||||
def partial_ratio(s1, s2):
|
||||
""""Return the ratio of the most similar substring
|
||||
as a number between 0 and 100."""
|
||||
s1, s2 = utils.make_type_consistent(s1, s2)
|
||||
|
||||
if len(s1) <= len(s2):
|
||||
shorter = s1
|
||||
longer = s2
|
||||
else:
|
||||
shorter = s2
|
||||
longer = s1
|
||||
|
||||
m = SequenceMatcher(None, shorter, longer)
|
||||
blocks = m.get_matching_blocks()
|
||||
|
||||
# each block represents a sequence of matching characters in a string
|
||||
# of the form (idx_1, idx_2, len)
|
||||
# the best partial match will block align with at least one of those blocks
|
||||
# e.g. shorter = "abcd", longer = XXXbcdeEEE
|
||||
# block = (1,3,3)
|
||||
# best score === ratio("abcd", "Xbcd")
|
||||
scores = []
|
||||
for block in blocks:
|
||||
long_start = block[1] - block[0] if (block[1] - block[0]) > 0 else 0
|
||||
long_end = long_start + len(shorter)
|
||||
long_substr = longer[long_start:long_end]
|
||||
|
||||
m2 = SequenceMatcher(None, shorter, long_substr)
|
||||
r = m2.ratio()
|
||||
if r > .995:
|
||||
return 100
|
||||
else:
|
||||
scores.append(r)
|
||||
|
||||
return utils.intr(100 * max(scores))
|
||||
|
||||
|
||||
##############################
|
||||
# Advanced Scoring Functions #
|
||||
##############################
|
||||
|
||||
def _process_and_sort(s, force_ascii, full_process=True):
|
||||
"""Return a cleaned string with token sorted."""
|
||||
# pull tokens
|
||||
ts = utils.full_process(s, force_ascii=force_ascii) if full_process else s
|
||||
tokens = ts.split()
|
||||
|
||||
# sort tokens and join
|
||||
sorted_string = u" ".join(sorted(tokens))
|
||||
return sorted_string.strip()
|
||||
|
||||
|
||||
# Sorted Token
|
||||
# find all alphanumeric tokens in the string
|
||||
# sort those tokens and take ratio of resulting joined strings
|
||||
# controls for unordered string elements
|
||||
@utils.check_for_none
|
||||
def _token_sort(s1, s2, partial=True, force_ascii=True, full_process=True):
|
||||
sorted1 = _process_and_sort(s1, force_ascii, full_process=full_process)
|
||||
sorted2 = _process_and_sort(s2, force_ascii, full_process=full_process)
|
||||
|
||||
if partial:
|
||||
return partial_ratio(sorted1, sorted2)
|
||||
else:
|
||||
return ratio(sorted1, sorted2)
|
||||
|
||||
|
||||
def token_sort_ratio(s1, s2, force_ascii=True, full_process=True):
|
||||
"""Return a measure of the sequences' similarity between 0 and 100
|
||||
but sorting the token before comparing.
|
||||
"""
|
||||
return _token_sort(s1, s2, partial=False, force_ascii=force_ascii, full_process=full_process)
|
||||
|
||||
|
||||
def partial_token_sort_ratio(s1, s2, force_ascii=True, full_process=True):
|
||||
"""Return the ratio of the most similar substring as a number between
|
||||
0 and 100 but sorting the token before comparing.
|
||||
"""
|
||||
return _token_sort(s1, s2, partial=True, force_ascii=force_ascii, full_process=full_process)
|
||||
|
||||
|
||||
@utils.check_for_none
|
||||
def _token_set(s1, s2, partial=True, force_ascii=True, full_process=True):
|
||||
"""Find all alphanumeric tokens in each string...
|
||||
- treat them as a set
|
||||
- construct two strings of the form:
|
||||
<sorted_intersection><sorted_remainder>
|
||||
- take ratios of those two strings
|
||||
- controls for unordered partial matches"""
|
||||
|
||||
if not full_process and s1 == s2:
|
||||
return 100
|
||||
|
||||
p1 = utils.full_process(s1, force_ascii=force_ascii) if full_process else s1
|
||||
p2 = utils.full_process(s2, force_ascii=force_ascii) if full_process else s2
|
||||
|
||||
if not utils.validate_string(p1):
|
||||
return 0
|
||||
if not utils.validate_string(p2):
|
||||
return 0
|
||||
|
||||
# pull tokens
|
||||
tokens1 = set(p1.split())
|
||||
tokens2 = set(p2.split())
|
||||
|
||||
intersection = tokens1.intersection(tokens2)
|
||||
diff1to2 = tokens1.difference(tokens2)
|
||||
diff2to1 = tokens2.difference(tokens1)
|
||||
|
||||
sorted_sect = " ".join(sorted(intersection))
|
||||
sorted_1to2 = " ".join(sorted(diff1to2))
|
||||
sorted_2to1 = " ".join(sorted(diff2to1))
|
||||
|
||||
combined_1to2 = sorted_sect + " " + sorted_1to2
|
||||
combined_2to1 = sorted_sect + " " + sorted_2to1
|
||||
|
||||
# strip
|
||||
sorted_sect = sorted_sect.strip()
|
||||
combined_1to2 = combined_1to2.strip()
|
||||
combined_2to1 = combined_2to1.strip()
|
||||
|
||||
if partial:
|
||||
ratio_func = partial_ratio
|
||||
else:
|
||||
ratio_func = ratio
|
||||
|
||||
pairwise = [
|
||||
ratio_func(sorted_sect, combined_1to2),
|
||||
ratio_func(sorted_sect, combined_2to1),
|
||||
ratio_func(combined_1to2, combined_2to1)
|
||||
]
|
||||
return max(pairwise)
|
||||
|
||||
|
||||
def token_set_ratio(s1, s2, force_ascii=True, full_process=True):
|
||||
return _token_set(s1, s2, partial=False, force_ascii=force_ascii, full_process=full_process)
|
||||
|
||||
|
||||
def partial_token_set_ratio(s1, s2, force_ascii=True, full_process=True):
|
||||
return _token_set(s1, s2, partial=True, force_ascii=force_ascii, full_process=full_process)
|
||||
|
||||
|
||||
###################
|
||||
# Combination API #
|
||||
###################
|
||||
|
||||
# q is for quick
|
||||
def QRatio(s1, s2, force_ascii=True, full_process=True):
|
||||
"""
|
||||
Quick ratio comparison between two strings.
|
||||
|
||||
Runs full_process from utils on both strings
|
||||
Short circuits if either of the strings is empty after processing.
|
||||
|
||||
:param s1:
|
||||
:param s2:
|
||||
:param force_ascii: Allow only ASCII characters (Default: True)
|
||||
:full_process: Process inputs, used here to avoid double processing in extract functions (Default: True)
|
||||
:return: similarity ratio
|
||||
"""
|
||||
|
||||
if full_process:
|
||||
p1 = utils.full_process(s1, force_ascii=force_ascii)
|
||||
p2 = utils.full_process(s2, force_ascii=force_ascii)
|
||||
else:
|
||||
p1 = s1
|
||||
p2 = s2
|
||||
|
||||
if not utils.validate_string(p1):
|
||||
return 0
|
||||
if not utils.validate_string(p2):
|
||||
return 0
|
||||
|
||||
return ratio(p1, p2)
|
||||
|
||||
|
||||
def UQRatio(s1, s2, full_process=True):
|
||||
"""
|
||||
Unicode quick ratio
|
||||
|
||||
Calls QRatio with force_ascii set to False
|
||||
|
||||
:param s1:
|
||||
:param s2:
|
||||
:return: similarity ratio
|
||||
"""
|
||||
return QRatio(s1, s2, force_ascii=False, full_process=full_process)
|
||||
|
||||
|
||||
# w is for weighted
|
||||
def WRatio(s1, s2, force_ascii=True, full_process=True):
|
||||
"""
|
||||
Return a measure of the sequences' similarity between 0 and 100, using different algorithms.
|
||||
|
||||
**Steps in the order they occur**
|
||||
|
||||
#. Run full_process from utils on both strings
|
||||
#. Short circuit if this makes either string empty
|
||||
#. Take the ratio of the two processed strings (fuzz.ratio)
|
||||
#. Run checks to compare the length of the strings
|
||||
* If one of the strings is more than 1.5 times as long as the other
|
||||
use partial_ratio comparisons - scale partial results by 0.9
|
||||
(this makes sure only full results can return 100)
|
||||
* If one of the strings is over 8 times as long as the other
|
||||
instead scale by 0.6
|
||||
|
||||
#. Run the other ratio functions
|
||||
* if using partial ratio functions call partial_ratio,
|
||||
partial_token_sort_ratio and partial_token_set_ratio
|
||||
scale all of these by the ratio based on length
|
||||
* otherwise call token_sort_ratio and token_set_ratio
|
||||
* all token based comparisons are scaled by 0.95
|
||||
(on top of any partial scalars)
|
||||
|
||||
#. Take the highest value from these results
|
||||
round it and return it as an integer.
|
||||
|
||||
:param s1:
|
||||
:param s2:
|
||||
:param force_ascii: Allow only ascii characters
|
||||
:type force_ascii: bool
|
||||
:full_process: Process inputs, used here to avoid double processing in extract functions (Default: True)
|
||||
:return:
|
||||
"""
|
||||
|
||||
if full_process:
|
||||
p1 = utils.full_process(s1, force_ascii=force_ascii)
|
||||
p2 = utils.full_process(s2, force_ascii=force_ascii)
|
||||
else:
|
||||
p1 = s1
|
||||
p2 = s2
|
||||
|
||||
if not utils.validate_string(p1):
|
||||
return 0
|
||||
if not utils.validate_string(p2):
|
||||
return 0
|
||||
|
||||
# should we look at partials?
|
||||
try_partial = True
|
||||
unbase_scale = .95
|
||||
partial_scale = .90
|
||||
|
||||
base = ratio(p1, p2)
|
||||
len_ratio = float(max(len(p1), len(p2))) / min(len(p1), len(p2))
|
||||
|
||||
# if strings are similar length, don't use partials
|
||||
if len_ratio < 1.5:
|
||||
try_partial = False
|
||||
|
||||
# if one string is much much shorter than the other
|
||||
if len_ratio > 8:
|
||||
partial_scale = .6
|
||||
|
||||
if try_partial:
|
||||
partial = partial_ratio(p1, p2) * partial_scale
|
||||
ptsor = partial_token_sort_ratio(p1, p2, full_process=False) \
|
||||
* unbase_scale * partial_scale
|
||||
ptser = partial_token_set_ratio(p1, p2, full_process=False) \
|
||||
* unbase_scale * partial_scale
|
||||
|
||||
return utils.intr(max(base, partial, ptsor, ptser))
|
||||
else:
|
||||
tsor = token_sort_ratio(p1, p2, full_process=False) * unbase_scale
|
||||
tser = token_set_ratio(p1, p2, full_process=False) * unbase_scale
|
||||
|
||||
return utils.intr(max(base, tsor, tser))
|
||||
|
||||
|
||||
def UWRatio(s1, s2, full_process=True):
|
||||
"""Return a measure of the sequences' similarity between 0 and 100,
|
||||
using different algorithms. Same as WRatio but preserving unicode.
|
||||
"""
|
||||
return WRatio(s1, s2, force_ascii=False, full_process=full_process)
|
285
fuzzywuzzy/process.py
Normal file
285
fuzzywuzzy/process.py
Normal file
@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
from . import fuzz
|
||||
from . import utils
|
||||
import heapq
|
||||
import logging
|
||||
from functools import partial
|
||||
|
||||
|
||||
default_scorer = fuzz.WRatio
|
||||
|
||||
|
||||
default_processor = utils.full_process
|
||||
|
||||
|
||||
def extractWithoutOrder(query, choices, processor=default_processor, scorer=default_scorer, score_cutoff=0):
|
||||
"""Select the best match in a list or dictionary of choices.
|
||||
|
||||
Find best matches in a list or dictionary of choices, return a
|
||||
generator of tuples containing the match and its score. If a dictionary
|
||||
is used, also returns the key for each match.
|
||||
|
||||
Arguments:
|
||||
query: An object representing the thing we want to find.
|
||||
choices: An iterable or dictionary-like object containing choices
|
||||
to be matched against the query. Dictionary arguments of
|
||||
{key: value} pairs will attempt to match the query against
|
||||
each value.
|
||||
processor: Optional function of the form f(a) -> b, where a is the query or
|
||||
individual choice and b is the choice to be used in matching.
|
||||
|
||||
This can be used to match against, say, the first element of
|
||||
a list:
|
||||
|
||||
lambda x: x[0]
|
||||
|
||||
Defaults to fuzzywuzzy.utils.full_process().
|
||||
scorer: Optional function for scoring matches between the query and
|
||||
an individual processed choice. This should be a function
|
||||
of the form f(query, choice) -> int.
|
||||
|
||||
By default, fuzz.WRatio() is used and expects both query and
|
||||
choice to be strings.
|
||||
score_cutoff: Optional argument for score threshold. No matches with
|
||||
a score less than this number will be returned. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
Generator of tuples containing the match and its score.
|
||||
|
||||
If a list is used for choices, then the result will be 2-tuples.
|
||||
If a dictionary is used, then the result will be 3-tuples containing
|
||||
the key for each match.
|
||||
|
||||
For example, searching for 'bird' in the dictionary
|
||||
|
||||
{'bard': 'train', 'dog': 'man'}
|
||||
|
||||
may return
|
||||
|
||||
('train', 22, 'bard'), ('man', 0, 'dog')
|
||||
"""
|
||||
# Catch generators without lengths
|
||||
def no_process(x):
|
||||
return x
|
||||
|
||||
try:
|
||||
if choices is None or len(choices) == 0:
|
||||
return
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
# If the processor was removed by setting it to None
|
||||
# perfom a noop as it still needs to be a function
|
||||
if processor is None:
|
||||
processor = no_process
|
||||
|
||||
# Run the processor on the input query.
|
||||
processed_query = processor(query)
|
||||
|
||||
if len(processed_query) == 0:
|
||||
logging.warning(u"Applied processor reduces input query to empty string, "
|
||||
"all comparisons will have score 0. "
|
||||
"[Query: \'{0}\']".format(query))
|
||||
|
||||
# Don't run full_process twice
|
||||
if scorer in [fuzz.WRatio, fuzz.QRatio,
|
||||
fuzz.token_set_ratio, fuzz.token_sort_ratio,
|
||||
fuzz.partial_token_set_ratio, fuzz.partial_token_sort_ratio,
|
||||
fuzz.UWRatio, fuzz.UQRatio] \
|
||||
and processor == utils.full_process:
|
||||
processor = no_process
|
||||
|
||||
# Only process the query once instead of for every choice
|
||||
if scorer in [fuzz.UWRatio, fuzz.UQRatio]:
|
||||
pre_processor = partial(utils.full_process, force_ascii=False)
|
||||
scorer = partial(scorer, full_process=False)
|
||||
elif scorer in [fuzz.WRatio, fuzz.QRatio,
|
||||
fuzz.token_set_ratio, fuzz.token_sort_ratio,
|
||||
fuzz.partial_token_set_ratio, fuzz.partial_token_sort_ratio]:
|
||||
pre_processor = partial(utils.full_process, force_ascii=True)
|
||||
scorer = partial(scorer, full_process=False)
|
||||
else:
|
||||
pre_processor = no_process
|
||||
processed_query = pre_processor(processed_query)
|
||||
|
||||
try:
|
||||
# See if choices is a dictionary-like object.
|
||||
for key, choice in choices.items():
|
||||
processed = pre_processor(processor(choice))
|
||||
score = scorer(processed_query, processed)
|
||||
if score >= score_cutoff:
|
||||
yield (choice, score, key)
|
||||
except AttributeError:
|
||||
# It's a list; just iterate over it.
|
||||
for choice in choices:
|
||||
processed = pre_processor(processor(choice))
|
||||
score = scorer(processed_query, processed)
|
||||
if score >= score_cutoff:
|
||||
yield (choice, score)
|
||||
|
||||
|
||||
def extract(query, choices, processor=default_processor, scorer=default_scorer, limit=5):
|
||||
"""Select the best match in a list or dictionary of choices.
|
||||
|
||||
Find best matches in a list or dictionary of choices, return a
|
||||
list of tuples containing the match and its score. If a dictionary
|
||||
is used, also returns the key for each match.
|
||||
|
||||
Arguments:
|
||||
query: An object representing the thing we want to find.
|
||||
choices: An iterable or dictionary-like object containing choices
|
||||
to be matched against the query. Dictionary arguments of
|
||||
{key: value} pairs will attempt to match the query against
|
||||
each value.
|
||||
processor: Optional function of the form f(a) -> b, where a is the query or
|
||||
individual choice and b is the choice to be used in matching.
|
||||
|
||||
This can be used to match against, say, the first element of
|
||||
a list:
|
||||
|
||||
lambda x: x[0]
|
||||
|
||||
Defaults to fuzzywuzzy.utils.full_process().
|
||||
scorer: Optional function for scoring matches between the query and
|
||||
an individual processed choice. This should be a function
|
||||
of the form f(query, choice) -> int.
|
||||
By default, fuzz.WRatio() is used and expects both query and
|
||||
choice to be strings.
|
||||
limit: Optional maximum for the number of elements returned. Defaults
|
||||
to 5.
|
||||
|
||||
Returns:
|
||||
List of tuples containing the match and its score.
|
||||
|
||||
If a list is used for choices, then the result will be 2-tuples.
|
||||
If a dictionary is used, then the result will be 3-tuples containing
|
||||
the key for each match.
|
||||
|
||||
For example, searching for 'bird' in the dictionary
|
||||
|
||||
{'bard': 'train', 'dog': 'man'}
|
||||
|
||||
may return
|
||||
|
||||
[('train', 22, 'bard'), ('man', 0, 'dog')]
|
||||
"""
|
||||
sl = extractWithoutOrder(query, choices, processor, scorer)
|
||||
return heapq.nlargest(limit, sl, key=lambda i: i[1]) if limit is not None else \
|
||||
sorted(sl, key=lambda i: i[1], reverse=True)
|
||||
|
||||
|
||||
def extractBests(query, choices, processor=default_processor, scorer=default_scorer, score_cutoff=0, limit=5):
|
||||
"""Get a list of the best matches to a collection of choices.
|
||||
|
||||
Convenience function for getting the choices with best scores.
|
||||
|
||||
Args:
|
||||
query: A string to match against
|
||||
choices: A list or dictionary of choices, suitable for use with
|
||||
extract().
|
||||
processor: Optional function for transforming choices before matching.
|
||||
See extract().
|
||||
scorer: Scoring function for extract().
|
||||
score_cutoff: Optional argument for score threshold. No matches with
|
||||
a score less than this number will be returned. Defaults to 0.
|
||||
limit: Optional maximum for the number of elements returned. Defaults
|
||||
to 5.
|
||||
|
||||
Returns: A a list of (match, score) tuples.
|
||||
"""
|
||||
|
||||
best_list = extractWithoutOrder(query, choices, processor, scorer, score_cutoff)
|
||||
return heapq.nlargest(limit, best_list, key=lambda i: i[1]) if limit is not None else \
|
||||
sorted(best_list, key=lambda i: i[1], reverse=True)
|
||||
|
||||
|
||||
def extractOne(query, choices, processor=default_processor, scorer=default_scorer, score_cutoff=0):
|
||||
"""Find the single best match above a score in a list of choices.
|
||||
|
||||
This is a convenience method which returns the single best choice.
|
||||
See extract() for the full arguments list.
|
||||
|
||||
Args:
|
||||
query: A string to match against
|
||||
choices: A list or dictionary of choices, suitable for use with
|
||||
extract().
|
||||
processor: Optional function for transforming choices before matching.
|
||||
See extract().
|
||||
scorer: Scoring function for extract().
|
||||
score_cutoff: Optional argument for score threshold. If the best
|
||||
match is found, but it is not greater than this number, then
|
||||
return None anyway ("not a good enough match"). Defaults to 0.
|
||||
|
||||
Returns:
|
||||
A tuple containing a single match and its score, if a match
|
||||
was found that was above score_cutoff. Otherwise, returns None.
|
||||
"""
|
||||
best_list = extractWithoutOrder(query, choices, processor, scorer, score_cutoff)
|
||||
try:
|
||||
return max(best_list, key=lambda i: i[1])
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def dedupe(contains_dupes, threshold=70, scorer=fuzz.token_set_ratio):
|
||||
"""This convenience function takes a list of strings containing duplicates and uses fuzzy matching to identify
|
||||
and remove duplicates. Specifically, it uses the process.extract to identify duplicates that
|
||||
score greater than a user defined threshold. Then, it looks for the longest item in the duplicate list
|
||||
since we assume this item contains the most entity information and returns that. It breaks string
|
||||
length ties on an alphabetical sort.
|
||||
|
||||
Note: as the threshold DECREASES the number of duplicates that are found INCREASES. This means that the
|
||||
returned deduplicated list will likely be shorter. Raise the threshold for fuzzy_dedupe to be less
|
||||
sensitive.
|
||||
|
||||
Args:
|
||||
contains_dupes: A list of strings that we would like to dedupe.
|
||||
threshold: the numerical value (0,100) point at which we expect to find duplicates.
|
||||
Defaults to 70 out of 100
|
||||
scorer: Optional function for scoring matches between the query and
|
||||
an individual processed choice. This should be a function
|
||||
of the form f(query, choice) -> int.
|
||||
By default, fuzz.token_set_ratio() is used and expects both query and
|
||||
choice to be strings.
|
||||
|
||||
Returns:
|
||||
A deduplicated list. For example:
|
||||
|
||||
In: contains_dupes = ['Frodo Baggin', 'Frodo Baggins', 'F. Baggins', 'Samwise G.', 'Gandalf', 'Bilbo Baggins']
|
||||
In: fuzzy_dedupe(contains_dupes)
|
||||
Out: ['Frodo Baggins', 'Samwise G.', 'Bilbo Baggins', 'Gandalf']
|
||||
"""
|
||||
|
||||
extractor = []
|
||||
|
||||
# iterate over items in *contains_dupes*
|
||||
for item in contains_dupes:
|
||||
# return all duplicate matches found
|
||||
matches = extract(item, contains_dupes, limit=None, scorer=scorer)
|
||||
# filter matches based on the threshold
|
||||
filtered = [x for x in matches if x[1] > threshold]
|
||||
# if there is only 1 item in *filtered*, no duplicates were found so append to *extracted*
|
||||
if len(filtered) == 1:
|
||||
extractor.append(filtered[0][0])
|
||||
|
||||
else:
|
||||
# alpha sort
|
||||
filtered = sorted(filtered, key=lambda x: x[0])
|
||||
# length sort
|
||||
filter_sort = sorted(filtered, key=lambda x: len(x[0]), reverse=True)
|
||||
# take first item as our 'canonical example'
|
||||
extractor.append(filter_sort[0][0])
|
||||
|
||||
# uniquify *extractor* list
|
||||
keys = {}
|
||||
for e in extractor:
|
||||
keys[e] = 1
|
||||
extractor = keys.keys()
|
||||
|
||||
# check that extractor differs from contain_dupes (e.g. duplicates were found)
|
||||
# if not, then return the original list
|
||||
if len(extractor) == len(contains_dupes):
|
||||
return contains_dupes
|
||||
else:
|
||||
return extractor
|
30
fuzzywuzzy/string_processing.py
Normal file
30
fuzzywuzzy/string_processing.py
Normal file
@ -0,0 +1,30 @@
|
||||
from __future__ import unicode_literals
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
|
||||
PY3 = sys.version_info[0] == 3
|
||||
if PY3:
|
||||
string = str
|
||||
|
||||
|
||||
class StringProcessor(object):
|
||||
"""
|
||||
This class defines method to process strings in the most
|
||||
efficient way. Ideally all the methods below use unicode strings
|
||||
for both input and output.
|
||||
"""
|
||||
|
||||
regex = re.compile(r"(?ui)\W")
|
||||
|
||||
@classmethod
|
||||
def replace_non_letters_non_numbers_with_whitespace(cls, a_string):
|
||||
"""
|
||||
This function replaces any sequence of non letters and non
|
||||
numbers with a single white space.
|
||||
"""
|
||||
return cls.regex.sub(" ", a_string)
|
||||
|
||||
strip = staticmethod(string.strip)
|
||||
to_lower_case = staticmethod(string.lower)
|
||||
to_upper_case = staticmethod(string.upper)
|
105
fuzzywuzzy/utils.py
Normal file
105
fuzzywuzzy/utils.py
Normal file
@ -0,0 +1,105 @@
|
||||
from __future__ import unicode_literals
|
||||
import sys
|
||||
import functools
|
||||
|
||||
from fuzzywuzzy.string_processing import StringProcessor
|
||||
|
||||
|
||||
PY3 = sys.version_info[0] == 3
|
||||
|
||||
|
||||
def validate_string(s):
|
||||
"""
|
||||
Check input has length and that length > 0
|
||||
|
||||
:param s:
|
||||
:return: True if len(s) > 0 else False
|
||||
"""
|
||||
try:
|
||||
return len(s) > 0
|
||||
except TypeError:
|
||||
return False
|
||||
|
||||
|
||||
def check_for_equivalence(func):
|
||||
@functools.wraps(func)
|
||||
def decorator(*args, **kwargs):
|
||||
if args[0] == args[1]:
|
||||
return 100
|
||||
return func(*args, **kwargs)
|
||||
return decorator
|
||||
|
||||
|
||||
def check_for_none(func):
|
||||
@functools.wraps(func)
|
||||
def decorator(*args, **kwargs):
|
||||
if args[0] is None or args[1] is None:
|
||||
return 0
|
||||
return func(*args, **kwargs)
|
||||
return decorator
|
||||
|
||||
|
||||
def check_empty_string(func):
|
||||
@functools.wraps(func)
|
||||
def decorator(*args, **kwargs):
|
||||
if len(args[0]) == 0 or len(args[1]) == 0:
|
||||
return 0
|
||||
return func(*args, **kwargs)
|
||||
return decorator
|
||||
|
||||
|
||||
bad_chars = str("").join([chr(i) for i in range(128, 256)]) # ascii dammit!
|
||||
if PY3:
|
||||
translation_table = dict((ord(c), None) for c in bad_chars)
|
||||
unicode = str
|
||||
|
||||
|
||||
def asciionly(s):
|
||||
if PY3:
|
||||
return s.translate(translation_table)
|
||||
else:
|
||||
return s.translate(None, bad_chars)
|
||||
|
||||
|
||||
def asciidammit(s):
|
||||
if type(s) is str:
|
||||
return asciionly(s)
|
||||
elif type(s) is unicode:
|
||||
return asciionly(s.encode('ascii', 'ignore'))
|
||||
else:
|
||||
return asciidammit(unicode(s))
|
||||
|
||||
|
||||
def make_type_consistent(s1, s2):
|
||||
"""If both objects aren't either both string or unicode instances force them to unicode"""
|
||||
if isinstance(s1, str) and isinstance(s2, str):
|
||||
return s1, s2
|
||||
|
||||
elif isinstance(s1, unicode) and isinstance(s2, unicode):
|
||||
return s1, s2
|
||||
|
||||
else:
|
||||
return unicode(s1), unicode(s2)
|
||||
|
||||
|
||||
def full_process(s, force_ascii=False):
|
||||
"""Process string by
|
||||
-- removing all but letters and numbers
|
||||
-- trim whitespace
|
||||
-- force to lower case
|
||||
if force_ascii == True, force convert to ascii"""
|
||||
|
||||
if force_ascii:
|
||||
s = asciidammit(s)
|
||||
# Keep only Letters and Numbers (see Unicode docs).
|
||||
string_out = StringProcessor.replace_non_letters_non_numbers_with_whitespace(s)
|
||||
# Force into lowercase.
|
||||
string_out = StringProcessor.to_lower_case(string_out)
|
||||
# Remove leading and trailing whitespaces.
|
||||
string_out = StringProcessor.strip(string_out)
|
||||
return string_out
|
||||
|
||||
|
||||
def intr(n):
|
||||
'''Returns a correctly rounded integer'''
|
||||
return int(round(n))
|
19
mailer.py
Normal file
19
mailer.py
Normal file
@ -0,0 +1,19 @@
|
||||
import smtplib
|
||||
import email
|
||||
import time
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from os import listdir
|
||||
def send(rec_list,subject="",body=""):
|
||||
msg = MIMEMultipart()
|
||||
msg["From"] = "globalists@condorbs.net"
|
||||
msg["To"] = ",".join(rec_list)
|
||||
msg["Subject"] = "[MNeural] "+subject
|
||||
msg['Message-ID'] = f"<condorbs{int(time.time()*1000000)}>"
|
||||
body="Visita el siguiente link para reestablecer tu password "+body
|
||||
msg.attach(MIMEText(body,"plain","utf-8"))
|
||||
with smtplib.SMTP("smtp.condorbs.net",587) as server:
|
||||
server.starttls()
|
||||
server.login("globalists@condorbs.net","No-BlaCk3")
|
||||
server.send_message(msg)
|
137
serve_api.py
Executable file
137
serve_api.py
Executable file
@ -0,0 +1,137 @@
|
||||
#!/usr/bin/python3.7
|
||||
import os
|
||||
import csv
|
||||
import json
|
||||
import time
|
||||
import mailer
|
||||
import sqlite3
|
||||
import hashlib
|
||||
import flask
|
||||
from flask import Flask
|
||||
from flask import request
|
||||
from flask import jsonify
|
||||
from flask import abort
|
||||
from flask_cors import CORS
|
||||
from fuzzywuzzy import fuzz
|
||||
from multiprocessing import Process, Queue
|
||||
import face_recognition
|
||||
#from fset import fset
|
||||
#from flask_security import auth_token_required
|
||||
#from werkzeug.http import HTTP_STATUS_CODES
|
||||
#def error_response(status_code, message=None):
|
||||
# payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
|
||||
# if message:
|
||||
# payload['message'] = message
|
||||
# response = jsonify(payload)
|
||||
# response.status_code = status_code
|
||||
# return response
|
||||
|
||||
#def tobs66(st):
|
||||
# bs64=" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
# acc=[(u'á','a'),(u'é','e',u'í','i'),(u'ó','o'),(u'ú','u'),(u'Á','A'),(u'É','E'),(u'Í','I'),(u'Ó','O'),(u'Ú','U'),('.',' '),(',',' '),(':',' '),(';',' '),('\n',' '),('\t',' '),('-',' '),('"',' '),("'",' ')]
|
||||
# for r in acc: st=st.replace(r[0],r[1])
|
||||
# return "".join(c for c in st if c in bs64 or c in [u'ñ',u'Ñ'])
|
||||
|
||||
#db_connector = sqlite3.connect("/var/lib/exp/praxis/lists.db")
|
||||
#db_cursor = db_connector.cursor()
|
||||
#db_sentence = "SELECT id,nombre,alias FROM lst ;"
|
||||
#db_cursor.execute(db_sentence)
|
||||
#names = fset((row[1] for row in db_cursor.fetchall()))
|
||||
#names = [row for row in db_cursor.fetchall()]
|
||||
#phph = lambda nnmm:nnmm.replace('LL',u'Ж').replace('RR',u'Р').replace('CH',u'Ч')
|
||||
#names_ph = {nm[1]:phph(nm[1]) for nm in names}
|
||||
#db_cursor.close(); db_connector.close()
|
||||
app = Flask(__name__,subdomain_matching=True)
|
||||
CORS(app)
|
||||
#app.config['SECURITY_TOKEN_AUTHENTICATION_KEY'] = '7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT'
|
||||
#print(app.config['SECURITY_TOKEN_AUTHENTICATION_HEADER'])
|
||||
#print(app.config['SECURITY_TOKEN_AUTHENTICATION_KEY'])
|
||||
app.config["SERVER_NAME"] = "condorgl.net"
|
||||
@app.route("/")
|
||||
def rootr(): return ""
|
||||
@app.route("/login",subdomain="auth",methods=['POST'])
|
||||
def login():
|
||||
return jsonify({"success":request.form["username"] in ["aeespinosa","cobra"] and request.form["password"] in ["test"],"payload":{}})
|
||||
@app.route("/resetpw",subdomain="auth",methods=['POST'])
|
||||
def resetpw():
|
||||
return jsonify({"success":request.form["username"] in ["aeespinosa","cobra"] and request.form["email"] in ["h@condorbs.net"],"payload":{""}})
|
||||
|
||||
|
||||
@app.route("/",subdomain="globalists")
|
||||
@app.route("/<path:wp>",subdomain="globalists")
|
||||
def webapp(wp="index.html"): return app.send_static_file("globalists/"+wp+"index.html" if wp.endswith('/') else "globalists/"+wp)
|
||||
@app.route("/",subdomain="mneural")
|
||||
@app.route("/<path:wp>",subdomain="mneural")
|
||||
def webapp2(wp="index.html"): return app.send_static_file("mneural/"+wp+"index.html" if wp.endswith('/') else "mneural/"+wp)
|
||||
response_queue = Queue()
|
||||
@app.route("/match",subdomain="api", methods=['GET','POST','PUT','DELETE','TRACE','HEAD','OPTIONS'])
|
||||
#@auth_token_required
|
||||
def match():
|
||||
fields = {"name":"nombre","nationality":"pais","rfc":"rfc","status":"estatus"}
|
||||
data = {field:request.args.get(field) for field in list(fields)+["similarity"]}
|
||||
if not (request.args.get("token") and (request.args.get("name") or request.args.get("rfc"))): return {"success":False,"error":"400 Bad Request"},400
|
||||
if request.method != 'GET': return {"success":False,"error":"405 Method Not Allowed"},405
|
||||
if request.args.get("token") not in ["7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT","j6KbS9IVIdWReQkag3Own9XS1YGBCt4L2j070YonBV4T"]:
|
||||
return {"success":False,"error":"403 Not authorized"},403
|
||||
#print(data)
|
||||
def __match(data):
|
||||
matched_names = []; matched_aliases = []
|
||||
for sname in sorted(data['name'].upper().split(' '),key=len)[-2:]:
|
||||
tmp_f = f"tmp-{sname}-{int(time.time())}"
|
||||
os.system("agrep -1 -e '%s' names > %s-n"%(sname,tmp_f))
|
||||
os.system("agrep -1 -e '%s' aliases > %s-a"%(sname,tmp_f))
|
||||
with open(f"{tmp_f}-n",'r') as tmp_ff:
|
||||
for row in tmp_ff: matched_names.append(row[:-1])
|
||||
with open(f"{tmp_f}-a",'r') as tmp_ff:
|
||||
for row in tmp_ff: matched_aliases.append(row[:-1])
|
||||
#print(matched_names)
|
||||
os.remove(f"{tmp_f}-n"); os.remove(f"{tmp_f}-a")
|
||||
db_connector = sqlite3.connect("/var/globalists/lists.db")
|
||||
db_cursor = db_connector.cursor()
|
||||
db_sentence = "SELECT substr(id,0,4) as list,nombre as name,alias,ubicacion as location,fechanac as birth_date,pais as nationality,rfc,programa as program,cargo as position,dependencia as department,fechapub as publication_date,estatus as status FROM lst WHERE "
|
||||
#nms = [nm for nm in matched_names if fuzz.token_set_ratio(data["name"].upper(),nm)>80]
|
||||
#als = [nm for nm in matched_aliases if fuzz.token_set_ratio(data["name"].upper(),nm)>80]
|
||||
nms = {nm:fuzz.token_set_ratio(data["name"].upper(),nm) for nm in matched_names}
|
||||
als = {nm:fuzz.token_set_ratio(data["name"].upper(),nm) for nm in matched_aliases}
|
||||
nms = {nm:nmp for nm,nmp in nms.items() if nmp>100*float(data["similarity"] or 0.8)}
|
||||
als = {nm:nmp for nm,nmp in als.items() if nmp>100*float(data["similarity"] or 0.8)}
|
||||
#print(nms)
|
||||
db_sentence+="( nombre IN ("+",".join([f"'{nm}'" for nm in nms])+")"
|
||||
db_sentence+=" OR alias IN ("+",".join([f"'{nm}'" for nm in als])+") )"
|
||||
db_sent_2 =" AND ".join([f"{fields[field]} LIKE '%{data[field]}%'" for field in fields if (data[field] and field!="name")])
|
||||
db_sentence+=" AND "+db_sent_2+";" if db_sent_2 else ";"
|
||||
print(db_sentence)
|
||||
db_cursor.execute(db_sentence)
|
||||
table = [{db_cursor.description[k][0]:row[k] for k in range(len(row))} for row in db_cursor.fetchall()]
|
||||
for row in table:
|
||||
row['name_similarity'] = nms.get(row['name'],0.0)/100.0
|
||||
row['alias_similarity'] = als.get(row['alias'],0.0)/100.0
|
||||
#print(table)
|
||||
db_cursor.close(); db_connector.close()
|
||||
response_queue.put(table)
|
||||
thread = Process(target=__match,args=(data,),daemon=True)
|
||||
thread.run()
|
||||
return jsonify({"success":True,"payload":response_queue.get()})
|
||||
@app.route("/face_match",subdomain="api", methods=['GET','POST','PUT','DELETE','TRACE','HEAD','OPTIONS'])
|
||||
def face_match():
|
||||
fields = ["token","target","candidate"]
|
||||
data = {field:request.args.get(field) for field in fields}
|
||||
#if not all(request.args.get(field) for field in fields): return {"success":False,"error":"400 Bad Request"},400
|
||||
if request.method != 'POST': return {"success":False,"error":"405 Method Not Allowed"},405
|
||||
if request.args.get("token") != "7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT":
|
||||
return {"success":False,"error":"403 Not authorized"},403 #abort(403)
|
||||
target_f = request.files["target"]
|
||||
candidate_f = request.files["candidate"]
|
||||
# breakpoint()
|
||||
target_f.save("target.jpg");candidate_f.save("target2.jpg")
|
||||
target_enc = face_recognition.face_encodings(face_recognition.load_image_file(target_f))
|
||||
candidate_enc = face_recognition.face_encodings(face_recognition.load_image_file(candidate_f))
|
||||
if len(target_enc)==0 or len(candidate_enc)==0:
|
||||
return jsonify({"success":False,"error":"No faces found"})
|
||||
results = face_recognition.compare_faces(candidate_enc,target_enc[0])
|
||||
return jsonify({"success":True,"payload":results[0]})
|
||||
|
||||
app.run(host="0.0.0.0",port=443,ssl_context=("./fullchain.pem","./privkey.pem"),debug=True)
|
||||
#import wsgiserver
|
||||
#server = wsgiserver.WSGIServer(app,host="0.0.0.0",port=5000,certfile='./fullchain.pem',keyfile='./privkey.pem')
|
||||
#server.start()
|
132
serve_api.py0
Executable file
132
serve_api.py0
Executable file
@ -0,0 +1,132 @@
|
||||
#!/usr/bin/python3.7
|
||||
import os
|
||||
import csv
|
||||
import json
|
||||
import time
|
||||
import sqlite3
|
||||
import hashlib
|
||||
import flask
|
||||
from flask import Flask
|
||||
from flask import request
|
||||
from flask import jsonify
|
||||
from flask import abort
|
||||
from flask_cors import CORS
|
||||
from fuzzywuzzy import fuzz
|
||||
from multiprocessing import Process, Queue
|
||||
import face_recognition
|
||||
#from fset import fset
|
||||
#from flask_security import auth_token_required
|
||||
#from werkzeug.http import HTTP_STATUS_CODES
|
||||
#def error_response(status_code, message=None):
|
||||
# payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
|
||||
# if message:
|
||||
# payload['message'] = message
|
||||
# response = jsonify(payload)
|
||||
# response.status_code = status_code
|
||||
# return response
|
||||
|
||||
#def tobs66(st):
|
||||
# bs64=" 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
# acc=[(u'á','a'),(u'é','e',u'í','i'),(u'ó','o'),(u'ú','u'),(u'Á','A'),(u'É','E'),(u'Í','I'),(u'Ó','O'),(u'Ú','U'),('.',' '),(',',' '),(':',' '),(';',' '),('\n',' '),('\t',' '),('-',' '),('"',' '),("'",' ')]
|
||||
# for r in acc: st=st.replace(r[0],r[1])
|
||||
# return "".join(c for c in st if c in bs64 or c in [u'ñ',u'Ñ'])
|
||||
|
||||
#db_connector = sqlite3.connect("/var/lib/exp/praxis/lists.db")
|
||||
#db_cursor = db_connector.cursor()
|
||||
#db_sentence = "SELECT id,nombre,alias FROM lst ;"
|
||||
#db_cursor.execute(db_sentence)
|
||||
#names = fset((row[1] for row in db_cursor.fetchall()))
|
||||
#names = [row for row in db_cursor.fetchall()]
|
||||
#phph = lambda nnmm:nnmm.replace('LL',u'Ж').replace('RR',u'Р').replace('CH',u'Ч')
|
||||
#names_ph = {nm[1]:phph(nm[1]) for nm in names}
|
||||
#db_cursor.close(); db_connector.close()
|
||||
app = Flask(__name__,subdomain_matching=True)
|
||||
CORS(app)
|
||||
#app.config['SECURITY_TOKEN_AUTHENTICATION_KEY'] = '7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT'
|
||||
#print(app.config['SECURITY_TOKEN_AUTHENTICATION_HEADER'])
|
||||
#print(app.config['SECURITY_TOKEN_AUTHENTICATION_KEY'])
|
||||
app.config["SERVER_NAME"] = "condorgl.net"
|
||||
@app.route("/")
|
||||
def rootr(): return ""
|
||||
@app.route("/login",subdomain="auth",methods=['POST'])
|
||||
def login():
|
||||
return jsonify({"success":request.form["username"] in ["aeespinosa","cobra"] and request.form["password"] in ["test"],"payload":{}})
|
||||
|
||||
@app.route("/",subdomain="globalists")
|
||||
@app.route("/<path:wp>",subdomain="globalists")
|
||||
def webapp(wp="index.html"): return app.send_static_file("globalists/"+wp+"index.html" if wp.endswith('/') else "globalists/"+wp)
|
||||
@app.route("/",subdomain="mneural")
|
||||
@app.route("/<path:wp>",subdomain="mneural")
|
||||
def webapp2(wp="index.html"): return app.send_static_file("mneural/"+wp+"index.html" if wp.endswith('/') else "mneural/"+wp)
|
||||
response_queue = Queue()
|
||||
@app.route("/match",subdomain="api", methods=['GET','POST','PUT','DELETE','TRACE','HEAD','OPTIONS'])
|
||||
#@auth_token_required
|
||||
def match():
|
||||
fields = {"name":"nombre","nationality":"pais","rfc":"rfc","status":"estatus"}
|
||||
data = {field:request.args.get(field) for field in list(fields)+["similarity"]}
|
||||
if not (request.args.get("token") and (request.args.get("name") or request.args.get("rfc"))): return {"success":False,"error":"400 Bad Request"},400
|
||||
if request.method != 'GET': return {"success":False,"error":"405 Method Not Allowed"},405
|
||||
if request.args.get("token") not in ["7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT","j6KbS9IVIdWReQkag3Own9XS1YGBCt4L2j070YonBV4T"]:
|
||||
return {"success":False,"error":"403 Not authorized"},403
|
||||
#print(data)
|
||||
def __match(data):
|
||||
matched_names = []; matched_aliases = []
|
||||
for sname in sorted(data['name'].upper().split(' '),key=len)[-2:]:
|
||||
tmp_f = f"tmp-{sname}-{int(time.time())}"
|
||||
os.system("agrep -1 -e '%s' names > %s-n"%(sname,tmp_f))
|
||||
os.system("agrep -1 -e '%s' aliases > %s-a"%(sname,tmp_f))
|
||||
with open(f"{tmp_f}-n",'r') as tmp_ff:
|
||||
for row in tmp_ff: matched_names.append(row[:-1])
|
||||
with open(f"{tmp_f}-a",'r') as tmp_ff:
|
||||
for row in tmp_ff: matched_aliases.append(row[:-1])
|
||||
#print(matched_names)
|
||||
os.remove(f"{tmp_f}-n"); os.remove(f"{tmp_f}-a")
|
||||
db_connector = sqlite3.connect("/var/globalists/lists.db")
|
||||
db_cursor = db_connector.cursor()
|
||||
db_sentence = "SELECT substr(id,0,4) as list,nombre as name,alias,ubicacion as location,fechanac as birth_date,pais as nationality,rfc,programa as program,cargo as position,dependencia as department,fechapub as publication_date,estatus as status FROM lst WHERE "
|
||||
#nms = [nm for nm in matched_names if fuzz.token_set_ratio(data["name"].upper(),nm)>80]
|
||||
#als = [nm for nm in matched_aliases if fuzz.token_set_ratio(data["name"].upper(),nm)>80]
|
||||
nms = {nm:fuzz.token_set_ratio(data["name"].upper(),nm) for nm in matched_names}
|
||||
als = {nm:fuzz.token_set_ratio(data["name"].upper(),nm) for nm in matched_aliases}
|
||||
nms = {nm:nmp for nm,nmp in nms.items() if nmp>100*float(data["similarity"] or 0.8)}
|
||||
als = {nm:nmp for nm,nmp in als.items() if nmp>100*float(data["similarity"] or 0.8)}
|
||||
#print(nms)
|
||||
db_sentence+="( nombre IN ("+",".join([f"'{nm}'" for nm in nms])+")"
|
||||
db_sentence+=" OR alias IN ("+",".join([f"'{nm}'" for nm in als])+") )"
|
||||
db_sent_2 =" AND ".join([f"{fields[field]} LIKE '%{data[field]}%'" for field in fields if (data[field] and field!="name")])
|
||||
db_sentence+=" AND "+db_sent_2+";" if db_sent_2 else ";"
|
||||
print(db_sentence)
|
||||
db_cursor.execute(db_sentence)
|
||||
table = [{db_cursor.description[k][0]:row[k] for k in range(len(row))} for row in db_cursor.fetchall()]
|
||||
for row in table:
|
||||
row['name_similarity'] = nms.get(row['name'],0.0)/100.0
|
||||
row['alias_similarity'] = als.get(row['alias'],0.0)/100.0
|
||||
#print(table)
|
||||
db_cursor.close(); db_connector.close()
|
||||
response_queue.put(table)
|
||||
thread = Process(target=__match,args=(data,),daemon=True)
|
||||
thread.run()
|
||||
return jsonify({"success":True,"payload":response_queue.get()})
|
||||
@app.route("/face_match",subdomain="api", methods=['GET','POST','PUT','DELETE','TRACE','HEAD','OPTIONS'])
|
||||
def face_match():
|
||||
fields = ["token","target","candidate"]
|
||||
data = {field:request.args.get(field) for field in fields}
|
||||
#if not all(request.args.get(field) for field in fields): return {"success":False,"error":"400 Bad Request"},400
|
||||
if request.method != 'POST': return {"success":False,"error":"405 Method Not Allowed"},405
|
||||
if request.args.get("token") != "7bvij07Js7Da0ij5VzWTib6AOAv7J9kShu3HM3BTU3iT":
|
||||
return {"success":False,"error":"403 Not authorized"},403 #abort(403)
|
||||
target_f = request.files["target"]
|
||||
candidate_f = request.files["candidate"]
|
||||
# breakpoint()
|
||||
target_f.save("target.jpg");candidate_f.save("target2.jpg")
|
||||
target_enc = face_recognition.face_encodings(face_recognition.load_image_file(target_f))
|
||||
candidate_enc = face_recognition.face_encodings(face_recognition.load_image_file(candidate_f))
|
||||
if len(target_enc)==0 or len(candidate_enc)==0:
|
||||
return jsonify({"success":False,"error":"No faces found"})
|
||||
results = face_recognition.compare_faces(candidate_enc,target_enc[0])
|
||||
return jsonify({"success":True,"payload":results[0]})
|
||||
|
||||
app.run(host="0.0.0.0",port=443,ssl_context=("./fullchain.pem","./privkey.pem"),debug=True)
|
||||
#import wsgiserver
|
||||
#server = wsgiserver.WSGIServer(app,host="0.0.0.0",port=5000,certfile='./fullchain.pem',keyfile='./privkey.pem')
|
||||
#server.start()
|
BIN
target.jpg
Normal file
BIN
target.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 15 KiB |
BIN
target2.jpg
Normal file
BIN
target2.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 14 KiB |
Loading…
Reference in New Issue
Block a user