综合案例2.0 - ESP32拍照上传阿里云OSS
- 案例说明
- 连线
- 功能实现
- 1.阿里云平台连接
- 2.OSS对象存储服务
- 3.ESP32-CAM开发环境
- 4.代码
- ESP32-CAM开发板代码
- HaaS506开发板代码
- 测试
- 数据转图片方法
案例说明
使用ESP32拍照,将照片数据上传阿里云OSS(通过4G网络上传)。
- 代码流程
- haas506连接阿里云,云端物模型下发指令给Haas506。
- Haas506通过TTL串口发送拍照指令给ESP32。
- ESP32将拍摄的照片数据发送给Haas506以TXT格式存入SD卡。
- Haas506将TXT文件上传OSS。
连线
功能实现
1.阿里云平台连接
阿里云环境创建参考高级组件库-aliyunIoT进行搭建。
创建产品,添加产品后添加物模型。
2.OSS对象存储服务
OSS对象存储服务开通参考2.12 haas506开发教程-高级组件库-OSS
开通存储服务,注意记录相关信息
keyid = 'yourKeyId'
keysecret = 'yourKeySecret'
endpoint = 'yourEndPoint'
bucketname = 'yourBucketName'
objectname = '文件存储地址'
3.ESP32-CAM开发环境
参考3.26 haas506 2.0开发教程-example- 简易相机-串口控制ESP32-CAM OV2640拍照
详细Arduino环境安装需要自己搜一下,网上资源很多。
4.代码
ESP32-CAM开发板代码
#include "esp_camera.h"
#include "FS.h"
#include <time.h>
#include "string.h"
String inputString = "";
bool stringComplete = false;
static camera_config_t camera_config = {
.pin_pwdn = 32,
.pin_reset = -1,
.pin_xclk = 0,
.pin_sscb_sda = 26,
.pin_sscb_scl = 27,
.pin_d7 = 35,
.pin_d6 = 34,
.pin_d5 = 39,
.pin_d4 = 36,
.pin_d3 = 21,
.pin_d2 = 19,
.pin_d1 = 18,
.pin_d0 = 5,
.pin_vsync = 25,
.pin_href = 23,
.pin_pclk = 22,
.xclk_freq_hz = 20000000,
.ledc_timer = LEDC_TIMER_0,
.ledc_channel = LEDC_CHANNEL_0,
.pixel_format = PIXFORMAT_JPEG,
.frame_size = FRAMESIZE_SVGA,
.jpeg_quality = 12,
.fb_count = 1,
};
esp_err_t camera_init() {
esp_err_t err = esp_camera_init(&camera_config);
if (err != ESP_OK) {
Serial.print("Camera Init Failed");
return err;
}
sensor_t * s = esp_camera_sensor_get();
if (s->id.PID == OV2640_PID) {
}
Serial.print("Camera Init OK");
return ESP_OK;
}
void setup()
{
Serial.begin(115200);
camera_init();
}
void loop()
{
if (Serial.available()) {
char inChar = (char)Serial.read();
if (inChar != '\n')
{
inputString += inChar;
}
if (inChar == '\n')
{
stringComplete = true;
}
if (stringComplete)
{
inputString=String(inputString);
if (inputString == "123")
{
camera_fb_t * fb = esp_camera_fb_get();
delay(10);
if(fb != NULL)
{
long len_tol=fb->len;
unsigned char i=0,j=1000;
for(i=0;i<(len_tol/j);i++)
{
Serial.write(fb->buf+i*j,j);
delay(50);
}
Serial.write(fb->buf+i*j,len_tol-i*j);
}
}
inputString = "";
stringComplete = false;
}
}
}
HaaS506开发板代码
aliyun.py
import network
import ujson
import utime as time
import modem
from aliyunIoT import Device
import ota
import kv
productKey = "*********"
productSecret = "***********"
global deviceName,g_connect_status,device_dyn_resigter_succed,netw
g_connect_status = False
netw = None
device = None
deviceSecret = None
device_dyn_resigter_succed = False
device = Device()
module_name = 'default'
app_version = '1.0.1'
info = {
'url': '',
'store_path': '/data/pyamp/app.zip',
'install_path': '/data/pyamp/',
'length': 0,
'hash_type': '',
'hash': ''
}
def on_trigger(data):
global info
info['url'] = data['url']
info['length'] = data['length']
info['module_name'] = data['module_name']
info['version'] = data['version']
info['hash'] = data['hash']
info['hash_type'] = data['hash_type']
dl_data = {}
dl_data['url'] = info['url']
dl_data['store_path'] = info['store_path']
ota.download(dl_data)
def on_download(data):
global info
if data >= 0:
print('Ota download succeed')
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['hash_type'] = info['hash_type']
param['hash'] = info['hash']
ota.verify(param)
def on_verify(data):
global info
print(data)
if data >= 0 :
print('Ota verify succeed')
print('Start Upgrade')
param = {}
param['length'] = info['length']
param['store_path'] = info['store_path']
param['install_path'] = info['install_path']
ota.upgrade(param)
def on_upgrade(data):
if data >= 0 :
print('Ota succeed')
reboot()
connect_state = False
def get_connect_state():
global connect_state
return connect_state
def on_connect(data):
global module_name,default_ver,productKey,deviceName,deviceSecret,on_trigger,on_download,on_verify,on_upgrade,connect_state
print('***** connect lp succeed****')
data_handle = {}
data_handle['device_handle'] = device.getDeviceHandle()
ota.init(data_handle)
connect_state = True
ota.on(1,on_trigger)
ota.on(2,on_download)
ota.on(3,on_verify)
ota.on(4,on_upgrade)
report_info = {
"device_handle": data_handle['device_handle'],
"product_key": productKey ,
"device_name": deviceName ,
"module_name": module_name ,
"version": app_version
}
ota.report(report_info)
def re_wicth(ret):
global switch
switch = ret
update('switch',switch)
def get_switch():
global switch
return switch
switch = 0
def on_props(request):
global switch
print('clound req data is {}'.format(request))
params=request['params']
params=eval(params)
if "switch" in params :
switch = params["switch"]
def on_disconnect():
print('linkkit is disconnected')
def on_service(id,request):
print('clound req id is {} , req is {}'.format(id,request))
def on_error(err):
print('err msg is {} '.format(err))
def on_4g_cb(args):
global g_connect_status
pdp = args[0]
netwk_sta = args[1]
if netwk_sta == 1:
g_connect_status = True
else:
g_connect_status = False
def connect_network():
global netw,on_4g_cb,g_connect_status
netw = network.NetWorkClient()
g_register_network = False
if netw._stagecode is not None and netw._stagecode == 3 and netw._subcode == 1:
g_register_network = True
else:
g_register_network = False
if g_register_network:
netw.on(1,on_4g_cb)
netw.connect(None)
else:
print('网络注册失败')
while True:
if g_connect_status:
print('网络连接成功')
break
time.sleep_ms(20)
def on_dynreg_cb(data):
global deviceSecret,device_dyn_resigter_succed
deviceSecret = data
device_dyn_resigter_succed = True
def dyn_register_device(productKey,productSecret,deviceName):
global on_dynreg_cb,device,deviceSecret,device_dyn_resigter_succed
key = '_amp_customer_devicesecret'
deviceSecretdict = kv.get(key)
print("deviceSecretdict:",deviceSecretdict)
if isinstance(deviceSecretdict,str):
deviceSecret = deviceSecretdict
if deviceSecretdict is None or deviceSecret is None:
key_info = {
'productKey': productKey ,
'productSecret': productSecret ,
'deviceName': deviceName
}
if not device_dyn_resigter_succed:
device.register(key_info,on_dynreg_cb)
def connect():
global deviceName,g_connect_status,device_dyn_resigter_succed
deviceName = None
deviceName = modem.info.getDevImei()
connect_network()
if deviceName is not None and len(deviceName) > 0 :
dyn_register_device(productKey,productSecret,deviceName)
else:
print("获取设备IMEI失败,无法进行动态注册")
while deviceSecret is None:
time.sleep(0.2)
print('动态注册成功:' + deviceSecret)
key_info = {
'region' : 'cn-shanghai' ,
'productKey': productKey ,
'deviceName': deviceName ,
'deviceSecret': deviceSecret ,
'keepaliveSec': 60,
}
print(key_info)
device.on(device.ON_CONNECT,on_connect)
device.on(device.ON_DISCONNECT,on_disconnect)
device.on(device.ON_PROPS,on_props)
device.on(device.ON_SERVICE,on_service)
device.on(device.ON_ERROR,on_error)
device.connect(key_info)
def update(key,value):
global device
post_data_str=ujson.dumps({key:value})
data={
"params":post_data_str
}
device.postProps(data)
main.py
- 更改代码中的相关信息
- keyid = ‘yourKeyId’
- keysecret = ‘yourKeySecret’
- endpoint = ‘yourEndPoint’
- bucketname = ‘yourBucketName’
- objectname = ‘文件存储地址’
import utime as time
import aliyun
import _thread
from driver import UART
import ubinascii
import OSS as oss
import uos
writeBuf = bytearray('123\n')
readBuf = bytearray(1000)
total_data=bytearray()
t=0
t_add = False
def oss_upload_file(filename):
keyid = 'LTAI5t7czX8UZBqzj14Fry9V'
keysecret = '5LKycgC9rXCulXt9mbsGhWMPJOztZk'
endpoint = 'oss-cn-shanghai.aliyuncs.com'
bucketname = 'testdemo5'
objectname = '/test/test.log'
ret = oss.uploadFile(keyid,keysecret,endpoint,bucketname,objectname,filename)
return ret
def write_txt_content(path,mode,data):
with open(path, mode) as f:
f.write(data)
print("--------------write----------------")
time.sleep_ms(1200)
if __name__ == '__main__':
uart=UART()
uart.open("serial1")
aliyun.connect()
aliyun.get_connect_state()
while not aliyun.get_connect_state():
time.sleep_ms(20)
while True:
if aliyun.get_switch():
uart.write(writeBuf)
aliyun.re_wicth(0)
print('start---')
ret = uart.read(readBuf)
if ret > 0:
t_add = True
t=0
total_data+=readBuf[:ret]
elif ret==0:
if t_add:
t+=1
time.sleep_ms(20)
if t >3:
uos.mountsd()
total_data+=bytearray('5')
total_data=ubinascii.hexlify(total_data).decode()
print('total_data ok!!')
write_txt_content('/sdcard0/test.log','w',total_data)
oss_upload_file('/sdcard0/test.log')
time.sleep_ms(1200)
print('upload oss finished...')
total_data=bytearray()
t_add = False
t=0
borad.json
{
"name": "haas506",
"version": "2.0.0",
"io": {
"serial1":{
"type":"UART",
"port":0,
"dataWidth":8,
"baudRate":115200,
"stopBits":1,
"flowControl":"disable",
"parity":"none"
}
},
"debugLevel": "ERROR",
"repl": "enable",
"replPort": 2
}
测试
1.上电后打印连接阿里云相关信息
阿里云设备在线
2.在线调试,发送拍照指令给Haas506
-
阿里云指令下发成功
-
照片数据传回并写入SD卡
-
上传0SS
-
文件存入OSS成功
数据转图片方法
(1)复制数据
代码运行完毕,会打印图片的16进制数据,该数据以ffd8开头,ffd9结尾,复制该数据。
(2)粘贴数据
先在电脑桌面上新建一个文件夹,例如txt2jpg。然后在该文件夹内创建一个txt文本,例如text.txt,将(1)中所复制的数据粘贴到test.txt内。
(3)执行python脚本
打开pythonIDE,将test.txt的路径填写到下面的代码中(注意是"/"),执行该脚本,将txt文本转化成jpg图片。
import binascii
with open('C:/Users/further/Desktop/txt2jpg/test.txt','rb') as f1, open('C:/Users/further/Desktop/txt2jpg/test.jpg', "wb") as f2:
payload=f1.read()
pic = binascii.a2b_hex(payload)
f2.write(pic)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)