Source code for vi3o.mjpg

"""
:mod:`vi3o.mjpg` --- Motion JPEG video loading
==============================================
"""

import json
import os, sys
from vi3o.utils import SlicedView, index_file, Frame
try:
    from vi3o._mjpg import ffi, lib
except ImportError as e:
    import warnings
    warnings.warn("Failed to import. Try to recompile/reinstall vi3o. " + str(e))

[docs] class Mjpg(object): """ If a filename that ends with .mjpg is passed to :func:`vi3o.Video` this kind of object is returned. It has a few additional format specific properties: """ def __init__(self, filename, grey=False): # Be compatible with pathlib.Path filenames filename = str(filename).encode('utf-8') self.filename = filename self.grey = grey open(filename).close() self._myiter = None self._index = None def __iter__(self): return MjpgIter(self.filename, self.grey) @property def myiter(self): if self._myiter is None: self._myiter = iter(self) return self._myiter @property def offset(self): if self._index is None: idx = index_file(self.filename, self.grey) if os.path.exists(idx): self._index = json.load(open(idx)) else: self._index = [self.myiter.m.start_position_in_file for img in self.myiter] with open(idx, 'w') as fd: json.dump(self._index, fd) return self._index @property def systimes(self): raise NotImplementedError def _sliced_systimes(self, range): return [self.systimes[i] for i in range] def __getitem__(self, item): if isinstance(item, slice): return SlicedView(self, item, {'systimes': self._sliced_systimes}) if (item < 0): item += len(self) lib.mjpg_seek(self.myiter.m, self.offset[item]) self.myiter.fcnt = item return self.myiter.next() def __len__(self): return len(self.offset) @property def hwid(self): """ The Axis hardware id of the camera that made this recording. """ self.myiter.next() return ffi.string(self.myiter.m.hwid) @property def serial_number(self): """ The Axis serial number or mac address of the camera that made this recording. """ self.myiter.next() return ffi.string(self.myiter.m.serial) @property def firmware_version(self): """ The firmware version running in the camera when it made this recording. """ self.myiter.next() return ffi.string(self.myiter.m.firmware)
class MjpgIter(object): def __init__(self, filename, grey=False): self.m = ffi.new("struct mjpg *") self.fcnt = 0 if grey: r = lib.mjpg_open(self.m, filename, lib.IMTYPE_GRAY, lib.IMORDER_INTERLEAVED) self.channels = 1 else: r = lib.mjpg_open(self.m, filename, lib.IMTYPE_RGB, lib.IMORDER_INTERLEAVED) self.channels = 3 if r != lib.OK: raise IOError("Failed to open: " + filename) def __iter__(self): return self def next(self): r = lib.mjpg_next_head(self.m) if r != lib.OK: raise StopIteration if self.channels == 1: shape = (self.m.height, self.m.width) else: shape = (self.m.height, self.m.width, self.channels) img = Frame(shape, 'B') assert img.__array_interface__['strides'] is None self.m.pixels = ffi.cast('unsigned char *', img.__array_interface__['data'][0]) r = lib.mjpg_next_data(self.m) if r != lib.OK: raise StopIteration # img = img.reshape(shape).view(type=Frame) img.timestamp = self.m.timestamp_sec + self.m.timestamp_usec / 1000000.0 img.systime = img.timestamp img.index = self.fcnt self.fcnt += 1 return img def __next__(self): return self.next() def __del__(self): lib.mjpg_close(self.m)
[docs] def jpg_info(filename): """ Reads a single jpeg image from the file *filename* and extracts the Axis user data header. The information it contains is returned as a dict with the keys "hwid", "serial_numer" and "firmware_version". """ if sys.version_info > (3,): filename = bytes(filename, "utf8") m = ffi.new("struct mjpg *") r = lib.mjpg_open(m, filename, lib.IMTYPE_GRAY, lib.IMORDER_PLANAR) lib.mjpg_next_head(m) res = {'hwid': ffi.string(m.hwid), 'serial_number': ffi.string(m.serial), 'firmware_version': ffi.string(m.firmware), 'timestamp': m.timestamp_sec + m.timestamp_usec / 1000000.0} lib.mjpg_close(m) return res