CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/libraries/AP_Camera/examples/gst_rtsp_to_wx.py
Views: 1799
1
""""
2
Capture a RTSP video stream and display in wxpython
3
4
Usage
5
-----
6
7
1. rtsp server
8
9
python ./gst_rtsp_server.py
10
11
3. display in wxpython
12
13
python ./gst_rtsp_to_wx.py
14
15
Acknowledgments
16
---------------
17
18
Video class to capture GStreamer frames
19
https://www.ardusub.com/developers/opencv.html
20
21
ImagePanel class to display openCV images in wxWidgets
22
https://stackoverflow.com/questions/14804741/opencv-integration-with-wxpython
23
"""
24
25
import copy
26
import cv2
27
import gi
28
import numpy as np
29
import threading
30
import wx
31
32
33
gi.require_version("Gst", "1.0")
34
from gi.repository import Gst
35
36
37
class VideoStream:
38
"""BlueRov video capture class constructor - adapted to capture rtspsrc
39
40
Attributes:
41
address (string): RTSP address
42
port (int): RTSP port
43
mount_point (string): video stream mount point
44
video_decode (string): Transform YUV (12bits) to BGR (24bits)
45
video_pipe (object): GStreamer top-level pipeline
46
video_sink (object): Gstreamer sink element
47
video_sink_conf (string): Sink configuration
48
video_source (string): Udp source ip and port
49
latest_frame (np.ndarray): Latest retrieved video frame
50
"""
51
52
def __init__(
53
self, address="127.0.0.1", port=8554, mount_point="/camera", latency=50
54
):
55
Gst.init(None)
56
57
self.address = address
58
self.port = port
59
self.mount_point = mount_point
60
self.latency = latency
61
62
self.latest_frame = self._new_frame = None
63
64
self.video_source = (
65
f"rtspsrc location=rtsp://{address}:{port}{mount_point} latency={latency}"
66
)
67
68
# Python does not have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
69
self.video_decode = (
70
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert"
71
)
72
# Create a sink to get data
73
self.video_sink_conf = (
74
"! appsink emit-signals=true sync=false max-buffers=2 drop=true"
75
)
76
77
self.video_pipe = None
78
self.video_sink = None
79
80
self.run()
81
82
def start_gst(self, config=None):
83
""" Start gstreamer pipeline and sink
84
Pipeline description list e.g:
85
[
86
'videotestsrc ! decodebin', \
87
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
88
'! appsink'
89
]
90
91
Args:
92
config (list, optional): Gstreamer pileline description list
93
"""
94
95
if not config:
96
config = [
97
"videotestsrc ! decodebin",
98
"! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert",
99
"! appsink",
100
]
101
102
command = " ".join(config)
103
self.video_pipe = Gst.parse_launch(command)
104
self.video_pipe.set_state(Gst.State.PLAYING)
105
self.video_sink = self.video_pipe.get_by_name("appsink0")
106
107
@staticmethod
108
def gst_to_opencv(sample):
109
"""Transform byte array into np array
110
111
Args:
112
sample (TYPE): Description
113
114
Returns:
115
TYPE: Description
116
"""
117
buf = sample.get_buffer()
118
caps_structure = sample.get_caps().get_structure(0)
119
array = np.ndarray(
120
(caps_structure.get_value("height"), caps_structure.get_value("width"), 3),
121
buffer=buf.extract_dup(0, buf.get_size()),
122
dtype=np.uint8,
123
)
124
return array
125
126
def frame(self):
127
"""Get Frame
128
129
Returns:
130
np.ndarray: latest retrieved image frame
131
"""
132
if self.frame_available:
133
self.latest_frame = self._new_frame
134
# reset to indicate latest frame has been 'consumed'
135
self._new_frame = None
136
return self.latest_frame
137
138
def frame_available(self):
139
"""Check if a new frame is available
140
141
Returns:
142
bool: true if a new frame is available
143
"""
144
return self._new_frame is not None
145
146
def run(self):
147
"""Get frame to update _new_frame"""
148
149
self.start_gst(
150
[
151
self.video_source,
152
self.video_decode,
153
self.video_sink_conf,
154
]
155
)
156
157
self.video_sink.connect("new-sample", self.callback)
158
159
def callback(self, sink):
160
sample = sink.emit("pull-sample")
161
self._new_frame = self.gst_to_opencv(sample)
162
163
return Gst.FlowReturn.OK
164
165
166
class ImagePanel(wx.Panel):
167
def __init__(self, parent, video_stream, fps=30):
168
wx.Panel.__init__(self, parent)
169
170
self._video_stream = video_stream
171
172
# Shared between threads
173
self._frame_lock = threading.Lock()
174
self._latest_frame = None
175
176
print("Waiting for video stream...")
177
waited = 0
178
while not self._video_stream.frame_available():
179
waited += 1
180
print("\r Frame not available (x{})".format(waited), end="")
181
cv2.waitKey(30)
182
print("\nSuccess! Video stream available")
183
184
if self._video_stream.frame_available():
185
# Only retrieve and display a frame if it's new
186
frame = copy.deepcopy(self._video_stream.frame())
187
188
# Frame size
189
height, width, _ = frame.shape
190
191
parent.SetSize((width, height))
192
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
193
194
self.bmp = wx.Bitmap.FromBuffer(width, height, frame)
195
196
self.timer = wx.Timer(self)
197
self.timer.Start(int(1000 / fps))
198
199
self.Bind(wx.EVT_PAINT, self.OnPaint)
200
self.Bind(wx.EVT_TIMER, self.NextFrame)
201
202
def OnPaint(self, evt):
203
dc = wx.BufferedPaintDC(self)
204
dc.DrawBitmap(self.bmp, 0, 0)
205
206
def NextFrame(self, event):
207
if self._video_stream.frame_available():
208
frame = copy.deepcopy(self._video_stream.frame())
209
210
# Convert frame to bitmap for wxFrame
211
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
212
self.bmp.CopyFromBuffer(frame)
213
self.Refresh()
214
215
216
def main():
217
218
# create the video stream
219
video_stream = VideoStream(mount_point="/camera")
220
221
# app must run on the main thread
222
app = wx.App()
223
wx_frame = wx.Frame(None)
224
225
# create the image panel
226
image_panel = ImagePanel(wx_frame, video_stream, fps=30)
227
228
wx_frame.Show()
229
app.MainLoop()
230
231
232
if __name__ == "__main__":
233
main()
234
235