Easy Audio/Video Capture with Python

At present it is difficult to obtain audio/video data in Python. For example, many deep learning methods assume you have easy access to your data in the form of a numpy array. Often you don’t. Based on the good efforts of those online, this post presents a number of Python classes to address this issue.

Just give me the code.

General Interface

Firstly, we can use threads to constantly update sensor data in the background. We can then read this data asynchronously.

Secondly, we can define a general interface for sensor data.

import threading

class SensorSource:
    """Abstract object for a sensory modality."""
    def __init__(self):
        """Initialise object."""
        pass
    
    def start(self):
        """Start capture source."""
        if self.started:
            print('[!] Asynchronous capturing has already been started.')
            return None
        self.started = True
        self.thread = threading.Thread(
            target=self.update,
            args=()
        )
        self.thread.start()
        return self
    
    def update(self):
        """Update data."""
        pass
    
    def read(self):
        """Read data."""
        pass
    
    def stop(self):
        """Stop daemon."""
        self.started = False
        self.thread.join()

Video

For our video capture class, we can use OpenCV. You can install this in a conda environment using

conda install opencv
or via pip using
pip install opencv
. This allows access to the cv2 library.

Beware: you may need to do a bit of tweaking to get your video capture working – different cameras / system configurations need different tweaks.

# Video source

import cv2

class VideoSource(SensorSource):
    """Object for video using OpenCV."""
    def __init__(self, src=0):
        """Initialise video capture."""
        # width=640, height=480
        self.src = src
        self.cap = cv2.VideoCapture(self.src)
        #self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        #self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.grabbed, self.frame = self.cap.read()
        self.started = False
        self.read_lock = threading.Lock()
    
    def update(self):
        """Update based on new video data."""
        while self.started:
            grabbed, frame = self.cap.read()
            with self.read_lock:
                self.grabbed = grabbed
                self.frame = frame
                
    def read(self):
        """Read video."""
        with self.read_lock:
            frame = self.frame.copy()
            grabbed = self.grabbed
        return grabbed, frame

    def __exit__(self, exec_type, exc_value, traceback):
        self.cap.release()

The initialisation sets up the camera and the threading lock. The update method is run as part of the thread to continuously update the self.frame data. The data may then be (asynchronously) accessed using the read() method on the object. The exit line means that the camera resource is released when the object is deleted or the Python kernel is stopped so you can then use the camera in other applications.

Beware: I had issues setting the width and height so I have commented out those lines. Also remember OpenCV provides the data in BGR format – so channels 0, 1, 2 correspond to Blue, Green and Red rather than RGB. You might also want to set to YUV mode by adding the following to the __init__ method:

self.cap.set(16, 0)

Audio

You’ll see many posts online that use pyaudio for audio capture. I couldn’t get this to work in a conda environment due to an issue with the underlying PortAudio library. I had more success with alsaaudio:

conda install alsaaudio
or
pip install alsaaudio
.

# Audio source
import struct
from collections import deque
import numpy as np
import logging
import alsaaudio

class AudioSource(SensorSource):
    """Object for audio using alsaaudio."""
    def __init__(self, sample_freq=44100, nb_samples=65536):
        """Initialise audio capture."""
        # Initialise audio
        self.inp = alsaaudio.PCM(
            alsaaudio.PCM_CAPTURE,
            alsaaudio.PCM_NORMAL,
            device="default"
        )
        # set attributes: Mono, frequency, 16 bit little endian samples
        self.inp.setchannels(1)
        self.inp.setrate(sample_freq)
        self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
        self.inp.setperiodsize(512)
        self.read_lock = threading.Lock()
        # Create a FIFO structure for the data
        self._s_fifo = deque([0] * nb_samples, maxlen=nb_samples)
        self.l = 0
        self.started = False
        self.read_lock = threading.Lock()
    
    def update(self):
        """Update based on new audio data."""
        while self.started:
            self.l, data = self.inp.read()
            if self.l > 0:
                # extract and format sample 
                raw_smp_l = struct.unpack('h' * self.l, data)
                with self.read_lock:
                    self._s_fifo.extend(raw_smp_l)
            else:
                logging.error(
                    f'Sampler error occur (l={self.l} and len data={len(data)})'
                )
                
    def read(self):
        """Read audio."""
        with self.read_lock:
            return self.l, np.asarray(self._s_fifo, dtype=np.int16)

The approach for audio is similar to video. We set up an audio input source and a threading lock in the __init__ method. In the audio case, we are recording a (time) series of audio samples, so we do this in a buffer of length nb_samples. The deque object acts as a FIFO queue and provides this buffer. The update method is run continuously in the background within the thread and adds new samples to the queue over time, with old samples falling off the back of the queue. The struct library is used to decode the binary data from the alsaaudio object and convert it into integer values that we can add to the queue. When we read the data, we convert the queue to a 16-bit integer numpy array.

In both cases, the read() method returns a tuple: (data_check_value, data) where the data_check_value is a value returned from the underlying capture objects. It is often useful for debugging.

Combining and Simplifying

Now we have defined sensor data sources, we can combine them so that we only need to perform one read() call to obtain data from all sources. To do this, we create a wrapper object that allows us to iterate through each added sensor data source.

class CombinedSource:
    """Object to combine multiple modalities."""
    def __init__(self):
        """Initialise."""
        self.sources = dict()
    
    def add_source(self, source, name=None):
        """Add a source object.
        
        source is a derived class from SensorSource
        name is an optional string name."""
        if not name:
            name = source.__class__.__name__
        self.sources[name] = source
        
    def start(self):
        """Start all sources."""
        for name, source in self.sources.items():
            source.start()
    
    def read(self):
        """Read from all sources.
        
        return as dict of tuples."""
        data = dict()
        for name, source in self.sources.items():
            data[name] = source.read()[1]
        return data
    
    def stop(self):
        """Stop all sources."""
        for name, source in self.sources.items():
            source.stop()
            
    def __del__(self):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()
    
    def __exit__(self, exec_type, exc_value, traceback):
        for name, source in self.sources.items():
            if source.__class__.__name__ == "VideoSource":
                source.cap.release()

The delete and exit logic is added to clean up the camera object – without these the camera is kept open and locked, which can cause problems. Data is returned as a dictionary, indexed by a string name for the data source.

We can simplify things even further by creating a derived class that automatically adds an audio and video capture object.

class AVCapture(CombinedSource):
    """Auto populate with audio and video."""
    def __init__(self):
        """Initialise."""
        self.sources = dict()
        a = AudioSource()
        self.add_source(a, "audio")
        v = VideoSource()
        self.add_source(v, "video")

This then allows us to access audio and video data in a couple of lines.

av = AVCapture()
av.start()
data = av.read()

Here are some outputs from:

import matplotlib.pyplot as plt
plt.imshow(data["video"])
plt.plot(data["audio"])
Colours are crazy because imshow expects RGB not BGR!
BBC Radio 6 Music in Graphical Form

Finishing Off

You can find the code in a Gist here, together with some testing lines that you could easily convert into a library.

You can also expand the sensor classes to capture other data. I plan to create a class to capture CPU and memory use information.

Advertisements

Capturing Live Audio and Video in Python

In my robotics projects I want to capture live audio and video data and convert it into Numpy multi-dimensional arrays for further processing. To save you several days, this blog post explains how I go about doing this.

Audio / Video Not Audio + Video

A first realisation is that you need to capture audio and video independently. You can record movie files with audio, but as far as I could find there is no simple way to live capture both audio and video data.

Video

For video processing, I found there were two different approaches that could be used to process video data:

  • Open CV in Python; and
  • Wrapping FFMPEG using SubProcess.

Open CV

The default library for video processing in Python is OpenCV. Things have come a long way since my early experiences with OpenCV in C++ over a decade ago. Now there is a nice Python wrapper and you don’t need to touch any low-level code. The tutorials here are a good place to start.

I generally use Conda/Anaconda these days to manage my Python environments (the alternative being old skool virtual environments). Setting up a new environment with Jupyter Notebook and Open CV is now straightforward:

conda install opencv jupyter

As a note – installing OpenCV in Conda seems to have been a pain up to a few years ago. There are thus several out of date Stack Overflow answers that come up in the searches, that refer to installing from specific sources (e.g. from menpo). This appears not to be needed now.

One problem I had in Linux (Ubuntu 18.04) is that the GTK libraries didn’t play nicely in the Conda environment. I could capture images from the webcam but not display them in a window. This lead me to look for alternative visualisation strategies that I describe below.

A good place to start with OpenCV is this video tutorial. As drawing windows led to errors I designed a workaround where I used PIL (Python Image Library) and IPython to generate an image from the Numpy array and then show it at about 30 fps. The code separates out each of the YUV components and displays them next to each other. This is useful for bio-inspired processing.

# Imports
import PIL
import io
import cv2
import matplotlib.pyplot as plt
from IPython import display
import time
import numpy as np

# Function to convert array to JPEG for display as video frame
def showarray(a, fmt='jpeg'):
    f = io.BytesIO()
    PIL.Image.fromarray(a).save(f, fmt)
    display.display(display.Image(data=f.getvalue()))

# Initialise camera
cam = cv2.VideoCapture(0)
# Optional - set to YUV mode (remove for BGR)
cam.set(16, 0)
# These allow for a frame rate to be printed
t1 = time.time()

# Loops until an interrupt
try:
    while(True):
        t2 = time.time()
        # Capture frame-by-frame
        ret, frame = cam.read()
        # Join components horizontally
        joined_array = np.concatenate(
        (
            frame[:,:,0], 
            frame[:, 1::2, 1], 
            frame[:, 0::2, 1]
        ), axis=1)
        # Use above function to show array
        showarray(joined_array)
        # Print frame rate
        print(f"{int(1/(t2-t1))} FPS")
        
        # Display the frame until new frame is available
        display.clear_output(wait=True)
        t1 = t2
except KeyboardInterrupt:
    # Release the camera when interrupted
    cam.release()
    print("Stream stopped")</code></pre>

In the above code, “frame” is a three-dimensional tensor or array where the first dimension relates to rows of the image (e.g. the y-direction of the image), the second dimension relates to columns of the image (e.g. the x-direction of the image) and the third dimension relates to the three colour channels. Often for image processing it is useful to separate out the channels and just work on a single channel at a time (e.g. equivalent to a 2D matrix or grayscale image).

FFMPEG

An alternative to using OpenCV is to use subprocess to wrap the FFMPEG, a command line video and audio processing utility.

This is a little trickier as it involves accessing the video buffers. I have based on solution on this guide by Zulko here.

#Imports
import subprocess as sp
import numpy as np
import matplotlib.pyplot as plt

FFMPEG_BIN = "ffmpeg"
# Define command line command
command = [ FFMPEG_BIN,
            '-i', '/dev/video0',
            '-f', 'image2pipe',
            '-pix_fmt', 'rgb24',
            '-an','-sn', #-an, -sn disables audio and sub-title processing respectively
            '-vcodec', 'rawvideo', '-']
# Open pipe
pipe = sp.Popen(command, stdout = sp.PIPE, bufsize=(640*480*3))

# Display a few frames
no_of_frames = 5
fig, axes = plt.subplots(no_of_frames, 1)

for i in range(0, no_of_frames):
    # Get the raw byte values from the buffer
    raw_image = pipe.stdout.read(640*480*3)
    # transform the byte read into a numpy array
    image = np.frombuffer(raw_image, dtype='uint8')
    image = image.reshape((480,640, 3))
    # Flush the pipe
    pipe.stdout.flush()
    axes[i].imshow(image)

Now I had issues flushing the pipe in a Jupyter notebook, so I ended up using the OpenCV method in the end. Also it is trickier working out the byte structure for YUV data.

Audio

My middle daughter generates a lot of noise.

For audio, there are also a number of options. I have tried:

Now PyAudio appears to be preferred. However, I am quickly learning that audio / video processing in Python is not yet as polished as pure image processing or building a neural network.

PyAudio provides a series of wrappers around the PortAudio libraries. However, I had issues getting this to work in an Conda environment. Initially, no audio devices showed up. After a long time working through Stack Overflow, I found that installing from the Conda-Forge source did allow me to find audio devices (see here). But even though I could see the audio devices I then had errors opening an audio stream. (One tip for both audio and video is to look at your terminal output when capturing audio and video – the low level errors will be displayed here rather than in a Jupyter notebook.)

AlsaAudio

Given my difficulties with PyAudio I then tried AlsaAudio. I had more success with this.

My starting point was the code for recording audio that is provided in the AlsaAudio Github repository. The code below records a snippet of audio then loads it from the file into a Numpy array. It became the starting point for a streaming solution.

# Imports
import alsaaudio
import time
import numpy as np

# Setup Audio for Capture
inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK, device="default")
inp.setchannels(1)
inp.setrate(44100)
inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
inp.setperiodsize(160)

# Record a short snippet
with open("test.wav", 'wb') as f:
    loops = 1000000
    while loops > 0:
        loops -= 1
        # Read data from device
        l, data = inp.read()
      
        if l:
            f.write(data)
            time.sleep(.001)

f = open("test.wav", 'rb')

# Open the device in playback mode. 
out = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, device="default")

# Set attributes: Mono, 44100 Hz, 16 bit little endian frames
out.setchannels(1)
out.setrate(44100)
out.setformat(alsaaudio.PCM_FORMAT_S16_LE)

# The period size controls the internal number of frames per period.
# The significance of this parameter is documented in the ALSA api.
# We also have 2 bytes per sample so 160*2 = 320 = number of bytes read from buffer
out.setperiodsize(160)

# Read data from stdin
data = f.read(320)
numpy_array = np.frombuffer(data, dtype='<i2')
while data:
    out.write(data)
    data = f.read(320)
    decoded_block = np.frombuffer(data, dtype='<i2')
    numpy_array = np.concatenate((numpy_array, decoded_block))

The numpy_array is then a long array of sound amplitudes.

Sampler Object

I found a nice little Gist for computing the FFT here. This uses a Sampler object to wrap the AlsaAudio object.

from collections import deque
import struct
import sys
import threading
import alsaaudio
import numpy as np

# some const
# 44100 Hz sampling rate (for 0-22050 Hz view, 0.0227ms/sample)
SAMPLE_FREQ = 44100
# 66000 samples buffer size (near 1.5 second)
NB_SAMPLE = 66000

class Sampler(threading.Thread):
    def __init__(self):
        # init thread
        threading.Thread.__init__(self)
        self.daemon = True
        # init ALSA audio
        self.inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL, device="default")
        # set attributes: Mono, frequency, 16 bit little endian samples
        self.inp.setchannels(1)
        self.inp.setrate(SAMPLE_FREQ)
        self.inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
        self.inp.setperiodsize(512)
        # sample FIFO
        self._s_lock = threading.Lock()
        self._s_fifo = deque([0] * NB_SAMPLE, maxlen=NB_SAMPLE)

    def get_sample(self):
        with self._s_lock:
            return list(self._s_fifo)

    def run(self):
        while True:
            # read data from device
            l, data = self.inp.read()
            if l > 0:
                # extract and format sample (normalize sample to 1.0/-1.0 float)
                raw_smp_l = struct.unpack('h' * l, data)
                smp_l = (float(raw_smp / 32767) for raw_smp in raw_smp_l)
                with self._s_lock:
                    self._s_fifo.extend(smp_l)
            else:
                print('sampler error occur (l=%s and len data=%s)' % (l, len(data)), file=sys.stderr)

Next Steps

This is where I am so far.

The next steps are:

  • look into threading and multiprocessing so that we can run parallel audio and video sampling routines;
  • extend the audio (and video?) processing to obtain the FFT; and
  • optimise for speed of capture.

Hacker News Update: Raspicam & WeMo

A quick update on my recent discoveries.

Raspicam

I now have a Raspberry Pi Camera Board (Raspicam)!

There is a brilliant combo deal on at the moment allowing you to buy a Raspicam, Model A + 4GB SD card for about £35 (including VAT + shipping!)! That’s £35 for a device that can run OpenCV with a camera capable of 30fps at HD resolutions. I will leave you to think about that for a moment.

The downside is that the software is still not quite there. The Raspicam couples directly to the Raspberry Pi; this means it is not (at the moment) available as a standard USB video device (e.g. /dev/video0 on Linux). Now most Linux software and packages like SimpleCV work based on a standard USB video device. This means as of 24 October 2013 you cannot use SimpleCV with the Raspicam.

However, not to fret! The Internet is on it. I imagine that we will see better drivers for the Raspicam from the official development communities very soon. While we wait:

WeMo and Python

As you will see from the previous posts I have been using IFTTT as a make-shift interface between my Raspberry Pi and my WeMo Motion detector and switch.  This morning though I found a Python module that appears to enable you to control the Switch and listen to motion events via Python. Hurray!

The module is called ouimeaux (there is a French theme this week). Details can be found here: link.

Very soon I hope to adapt my existing code to control my Hue lights based on motion events (e.g. turn on when someone walks in the room, turn off when no motion). Watch this space.