Source code for physalia.power_meters
"""Models to interact with different power meters."""
import abc
import time
import click
from physalia.third_party.monsoon import Monsoon
from physalia.third_party import monsoon_async
from physalia.utils import android
[docs]class PowerMeter(object):
"""Abstract class for interaction with a power monitor."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
[docs] def start(self):
"""Start measuring energy consumption."""
return
@abc.abstractmethod
[docs] def stop(self):
"""Stop measuring energy consumption.
Returns:
tuple: energy consumption in Joules; duration; error flag.
"""
return
[docs]class EmulatedPowerMeter(PowerMeter):
"""PowerMeter implementation to emulate a power monitor."""
def __init__(self): # noqa: D102
self.start_time = None
[docs] def start(self):
"""Start measuring energy consumption."""
self.start_time = time.time()
[docs] def stop(self):
"""Stop measuring energy consumption.
Returns:
tuple: energy consumption in Joules; duration; error flag.
"""
duration = time.time() - self.start_time
energy_consumption = duration
return energy_consumption, duration, False
def __str__(self):
"""Return the name of this power meter."""
return "Emulated"
[docs]class MonsoonPowerMeter(PowerMeter):
"""PowerMeter implementation for Monsoon.
Make sure the Android device has Passlock disabled.
Your server and device have to be connected to the same network.
"""
def __init__(self, voltage=3.8, sample_hz=50000, serial=12886): # noqa: D102
self.monsoon = None
self.monsoon_reader = None
self.monsoon_data = None
self.voltage = None
self.serial = None
self.sample_hz = sample_hz
self.setup_monsoon(voltage, serial)
click.secho(
"Monsoon is ready.",
fg='green'
)
if not android.is_android_device_available():
click.secho(
"You can now turn the phone on.",
fg='blue'
)
for _ in range(50):
click.secho(
"Waiting for an Android device...",
fg='blue'
)
time.sleep(3)
if android.is_android_device_available():
click.secho(
"Found a {}!".format(android.get_device_model()),
fg='green'
)
break
android.connect_adb_through_wifi()
self.monsoon_usb_enabled(False)
if android.is_locked():
click.secho(
"Device seems to be locked. "
"Disabling Passlock is recommended!",
fg='yellow'
)
[docs] def setup_monsoon(self, voltage, serial):
"""Set up monsoon.
Args:
voltage: Voltage output of the power monitor.
serial: serial number of the power monitor.
"""
click.secho(
"Setting up Monsoon {} with {}V...".format(
serial, voltage
),
fg='blue'
)
self.serial = serial
self.voltage = voltage
self.monsoon = Monsoon(serial=self.serial)
self.monsoon.set_voltage(self.voltage)
if android.is_android_device_available():
android.reconnect_adb_through_usb()
self.monsoon_usb_enabled(True)
[docs] def monsoon_usb_enabled(self, enabled):
"""Enable/disable monsoon's usb port."""
# pylint: disable=too-many-function-args
# something is conflicting with timeout_decorator
self.monsoon.usb(
self.monsoon,
{True:'on', False:'off'}[enabled]
)
[docs] def start(self):
"""Start measuring energy consumption."""
self.monsoon_reader = monsoon_async.MonsoonReader(
self.monsoon,
self.sample_hz
)
self.monsoon_reader.prepare()
self.monsoon_reader.start()
[docs] def stop(self):
"""Stop measuring."""
self.monsoon_reader.stop()
if self.monsoon_reader.data:
data_points = self.monsoon_reader.data.data_points
sample_hz = self.monsoon_reader.data.hz
energy_consumption = sum(data_points)/sample_hz/1000
duration = len(data_points)/sample_hz
return energy_consumption, duration, False
return -1, -1, True
def __str__(self):
"""Return the name of this power meter with sample frequency."""
return "Monsoon-{}Hz".format(self.sample_hz)