Source code for vi3o.sync

from vi3o.utils import SlicedView

from vi3o import Video


[docs] class SyncedVideos(object): """ Synchronize a set of videos using the `systime` timestamps. Frames will be dropped to adjust the frame rate to match the video with the lowest frame rate. Initial and trailing parts of the videos where there are not frames from all vidos will be dropped. To for example play 3 videos syncronized side by side, use: .. code-block:: python from vi3o import SyncedVideos, view, flipp for a, b, c in SyncedVideos('a.mkv', 'b.mkv', 'c.mkv'): flipp() view(a) view(b) view(c) It is also possible to access random frames or slices: .. code-block:: python from vi3o import SyncedVideos recoding = SyncedVideos('a.mkv', 'b.mkv', 'c.mkv'): first_part = recoding[:250] last_part = recoding[-250:] half_frame_rate = recoding[::2] backwards = recoding[::-1] The input argument *filenames_or_videos* is a list of either file names or *Video* objects. """ def __init__(self, *filenames_or_videos): self.videos = [Video(v) if isinstance(v, str) else v for v in filenames_or_videos] start = max([v[0].systime for v in self.videos]) times = [TimedIter(v.systimes) for v in self.videos] frames = [v.next_timed(start) for v in times] self.start_systime = sum([f[1] for f in frames]) / len(frames) self.start_index = [f[0] for f in frames] self.intervall = max([(v[-1].systime - v[0].systime) / len(v) for v in self.videos]) self._systimes = None self._indexes = None def __iter__(self): return SyncVideoIter(self.videos, self.start_index, self.start_systime - self.intervall, self.intervall) def _calc_index_times(self): times = [TimedIter(v.systimes) for v in self.videos] systime = self.start_systime self._index_times = [] while True: try: self._index_times.append(tuple(t.next_timed(systime) for t in times)) except IndexError: break systime += self.intervall self._indexes = [tuple(t[0] for t in it) for it in self._index_times] self._systimes = [tuple(t[1] for t in it) for it in self._index_times] @property def systimes(self): """ Retunrs a list of systime timestamps without decoding any pixel data. """ if self._systimes is None: self._calc_index_times() return self._systimes def _sliced_systimes(self, range): return [self.systimes[i] for i in range] @property def indexes(self): """ Retunrs a list of frame indexes of the frames used in the synced stream. """ if self._indexes is None: self._calc_index_times() return self._indexes def _sliced_indexes(self, range): return [self.indexes[i] for i in range] def __getitem__(self, item): if isinstance(item, slice): return SlicedView(self, item, {'systimes': self._sliced_systimes, 'indexes': self._sliced_indexes}) if (item < 0): item += len(self) idx = self.indexes[item] return [v[i] for i, v in zip(idx, self.videos)] def __len__(self): return len(self.indexes)
class TimedIter(object): def __init__(self, systimes): self.systimes = systimes self.prev = 0 def next_timed(self, systime): if self.systimes[self.prev] < systime: while not (self.systimes[self.prev] <= systime <= self.systimes[self.prev+1]): self.prev += 1 if systime - self.systimes[self.prev] > self.systimes[self.prev + 1] - systime: self.prev += 1 return (self.prev, self.systimes[self.prev]) class TimedVideoIter(object): def __init__(self, video): self.video = iter(video) self.prev = next(self.video) def next_timed(self, systime): if self.prev.systime > systime: return self.prev img = next(self.video) while not (self.prev.systime <= systime <= img.systime): self.prev = img img = next(self.video) if systime - self.prev.systime > img.systime - systime: self.prev = img return img else: p = self.prev self.prev = img return p class SyncVideoIter(object): def __init__(self, videos, start_index, start_systime, intervall): self.videos = [TimedVideoIter(v[i:]) for v,i in zip(videos, start_index)] self.systime = start_systime self.intervall = intervall def next(self): self.systime += self.intervall return [v.next_timed(self.systime) for v in self.videos] def __iter__(self): return self def __next__(self): return self.next()