数据抽析
GPS 数据抽析
完美解决拐点数据问题
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask
from flask_restful import reqparse, abort, Api, Resource
from math import sqrt, pow
from flaskext.mysql import MySQL
from pymongo import MongoClient
import json, time, datetime, decimal
import demjson
client = MongoClient('160.16.196.242', 27017)
db_auth = client.admin
db_auth.authenticate("xin", "luxury")
db = client.wiselbs
todos = db.wiselbs
__author__ = 'xin_li'
THRESHOLD = 0.0001 # 阈值
app = Flask(__name__)
api = Api(app)
# 点到点之间距离
def point2LineDistance(point_a, point_b, point_c):
"""
计算点a到点b c所在直线的距离
:param point_a:
:param point_b:
:param point_c:
:return:
"""
# 首先计算b c 所在直线的斜率和截距
if point_b[0] == point_c[0]:
return 9999999
slope = (point_b[1] - point_c[1]) / (point_b[0] - point_c[0])
intercept = point_b[1] - slope * point_b[0]
# 计算点a到b c所在直线的距离
distance = abs(slope * point_a[0] - point_a[1] + intercept) / sqrt(1 + pow(slope, 2))
return distance
# 算法
class DouglasPeuker(object):
def __init__(self):
self.threshold = THRESHOLD
self.qualify_list = list()
self.disqualify_list = list()
def diluting(self, point_list):
"""
抽稀
:param point_list:二维点列表
:return:
"""
if len(point_list) < 3:
self.qualify_list.extend(point_list[::-1])
else:
# 找到与收尾两点连线距离最大的点
max_distance_index, max_distance = 0, 0
for index, point in enumerate(point_list):
if index in [0, len(point_list) - 1]:
continue
distance = point2LineDistance(point, point_list[0], point_list[-1])
if distance > max_distance:
max_distance_index = index
max_distance = distance
# 若最大距离小于阈值,则去掉所有中间点。 反之,则将曲线按最大距离点分割
if max_distance < self.threshold:
self.qualify_list.append(point_list[-1])
self.qualify_list.append(point_list[0])
else:
# 将曲线按最大距离的点分割成两段
sequence_a = point_list[:max_distance_index]
sequence_b = point_list[max_distance_index:]
for sequence in [sequence_a, sequence_b]:
if len(sequence) < 3 and sequence == sequence_b:
self.qualify_list.extend(sequence[::-1])
else:
self.disqualify_list.append(sequence)
def main(self, point_list):
self.diluting(point_list)
while len(self.disqualify_list) > 0:
self.diluting(self.disqualify_list.pop())
python2json = {}
# 构造list
listData = [1, 2, 3]
python2json["listData"] = self.qualify_list
# print len(self.qualify_list)
python2json["count"] = len(self.qualify_list)
python2json["msg"] = "success"
JsonStr = json.dumps(python2json)
print (JsonStr)
# print self.qualify_list
# print len(self.qualify_list)
return JsonStr
# 格式化json
class CJsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, datetime.date):
return obj.strftime('%Y-%m-%d')
elif isinstance(obj, decimal.Decimal):
return float(str(obj))
else:
return json.JSONEncoder.default(self, obj)
# gps 数据抽析
class GpsDataFilter(Resource):
def get(self, todo_id):
mysql = MySQL()
app = Flask(__name__)
app.config['MYSQL_DATABASE_USER'] = 'root'
app.config['MYSQL_DATABASE_PASSWORD'] = '112233'
app.config['MYSQL_DATABASE_DB'] = 'biz'
app.config['MYSQL_DATABASE_HOST'] = '118.178.227.126'
mysql.init_app(app)
cursor = mysql.connect().cursor()
aa = cursor.execute("SELECT lng,lat,id from biz.t_system_group where lat is not null")
rows = cursor.fetchall()
list1 = []
for row in rows:
list1.append([row[0], row[1], row[2]])
result = json.dumps(list1, cls=CJsonEncoder)
print (result)
cursor.close()
d = DouglasPeuker()
a = d.main([demjson.decode(result)])
dd = demjson.decode(a)
# print(json.dumps(result))
# print a
return dd # TODOS[todo_id]
# 网关日志
class GatewayLog(Resource):
def get(self):
list1 = []
for i in db.wiselbs.find({}, {'_id': 0}).sort([{'_id', -1}]).limit(20):
list1.append(i)
a = json.dumps(list1, cls=CJsonEncoder)
dd = demjson.decode(a)
return dd
# 安装api
api.add_resource(GatewayLog, '/Gatewaylog')
api.add_resource(GpsDataFilter, '/GpsDataFilter/<todo_id>')
if __name__ == '__main__':
app.run(debug=True)