import dht import machine import utime import urequests import network import json import esp #import warnings from flashbdev import bdev """ Variables -------------------------------------------------------------------------------- """ # Do we want to measure internal or external? VCC / ADC ADC_MODE_VCC = 255 ADC_MODE_ADC = 0 possible_opts = [ 'ssid', 'pass', 'ip', 'netmask', 'gw', 'destination', 'dns', '_INFLUX_HOST', '_INFLUX_PORT', '_INFLUX_DB', '_INFLUX_USER', '_INFLUX_PASS', '_LOCATION_TYPE'] """ Functions -------------------------------------------------------------------------------- """ def set_adc_mode(mode): # Switch between internal and external ADC. Returns if restart is needed # Mode up in variables sector_size = bdev.SEC_SIZE flash_size = esp.flash_size() # device dependent init_sector = int(flash_size / sector_size - 4) data = bytearray(esp.flash_read(init_sector * sector_size, sector_size)) if data[107] == mode: # flash is already correct; nothing to do return False else: data[107] = mode # re-write flash esp.flash_erase(init_sector) esp.flash_write(init_sector * sector_size, data) print("ADC mode changed in flash; restart to use it!") return True def check_wake_state(): # check if the device woke from a deep sleep if machine.reset_cause() == machine.DEEPSLEEP_RESET: print('woke from a deep sleep') else: print(machine.reset_cause()) utime.sleep_ms(10) def deep_sleep(sleep_minutes): sleep_ms = sleep_minutes * 60 * 1000 rtc = machine.RTC() rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP) rtc.alarm(rtc.ALARM0, sleep_ms) machine.deepsleep() def light_sleep(sleep_minutes): sleep_ms = sleep_minutes * 60 * 1000 utime.sleep_ms(sleep_ms) def read_network_config(): config_dict = {} try: with open('inet.conf', 'r') as conf_handler: config = conf_handler.readlines() for item in config: tmp_list = item.split("=") option = tmp_list[0].strip() if len(tmp_list) == 2 and option in possible_opts: value = tmp_list[1].strip() config_dict.update({option: value}) else: print("ERROR: Option not approved") except Exception as e: print("WARNING: Errors in INFRA config, still going for AP") return False print(config_dict) return config_dict def set_up_infra(_SSID, _PASS, _TIMEOUT): sta = network.WLAN(network.STA_IF) ap = network.WLAN(network.AP_IF) print("Disabling AP") ap.active(False) print("Activating INFRA") sta.active(True) sta.isconnected() # False print("Connecting to infra") sta.connect(_SSID, _PASS) connection_tout = _TIMEOUT print("Let's wait for the network to come up") while not (sta.isconnected()): if connection_tout > 0: print("Trying... {} more times".format(connection_tout)) utime.sleep_ms(500) connection_tout = connection_tout - 1 else: print("Out of retrys while waiting for network, going to sleep") utime.sleep_ms(500) return False network_config = sta.ifconfig() return network_config """ Network configuration section -------------------------------------------------------------------------------- """ try: net_config = read_network_config() if net_config: net_conf_result = set_up_infra(net_config["ssid"], net_config["pass"], 15) else: print("ERROR: cannot read from inet.conf file") utime.sleep_ms(100) if net_conf_result: print("Connected:") print(" IP: {}".format(net_conf_result[0])) print(" MASK: {}".format(net_conf_result[1])) print(" GW: {}".format(net_conf_result[2])) print(" DNS: {}\n".format(net_conf_result[3])) else: print("WARNING: Network config not done") except Exception as e: print("ERROR: Network configuration failed with: {}".format(e)) print("Network section done") """ Measure temperhumi -------------------------------------------------------------------------------- """ def measure_temp(): print("getting the temperature here") d1 = machine.Pin(5, machine.Pin.IN) try: local_dht = dht.DHT11(d1) local_dht.measure() temperature = local_dht.temperature() humidity = local_dht.humidity() print("T: {}".format(temperature)) print("H: {}".format(humidity)) except Exception as e: print("ERROR: Temp measurement: {}".format(e)) temperature = 255 humidity = 255 return (temperature, humidity) """ Measure battery -------------------------------------------------------------------------------- """ print("Setting ADC mode here") set_adc_mode(ADC_MODE_VCC) vcc = machine.ADC(1) val = vcc.read() utime.sleep(2) battery = vcc.read() """ Data send -------------------------------------------------------------------------------- curl http://192.168.8.15:8086/query?db=weather_v2 --data-urlencode 'q=SELECT * FROM "wind" ORDER BY "time" DESC LIMIT 5' curl -X POST 'http://192.168.8.15:8086/write?db=weather' --data-binary 'usense,type=room humidity=100' """ while True: if net_conf_result: temp_tuple = measure_temp() val = vcc.read() utime.sleep(2) battery = vcc.read() post_str = "http://{}/write?db={}" post = post_str.format(net_config['_INFLUX_HOST'], net_config['_INFLUX_DB']) data_str = "usense,type={_type} humidity={_humi} \n " + \ "usense,type={_type} temperature={_temp}\n " + \ "usense,type={_type} battery={_batt}" data = data_str.format( _type=net_config['_LOCATION_TYPE'], _humi=temp_tuple[1], _temp=temp_tuple[0], _batt=battery) print(post_str) print(data) try: result = urequests.post(post, data=data) except Exception as e: print("ERROR: Data send 'urequest': {}".format(e)) #deep_sleep(28) light_sleep(5) print(result.text) light_sleep(5) #deep_sleep(28) else: print("WARNING: Not sending data, as infra is not available") light_sleep(5) machine.reset() #deep_sleep(28) utime.sleep(10)