Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/libraries/AP_Camera/examples/gst_rtsp_to_wx.py
9821 views
1
""""
2
Capture a RTSP video stream and display in wxpython
3
4
Usage
5
-----
6
7
1. rtsp server
8
9
python ./gst_rtsp_server.py
10
11
3. display in wxpython
12
13
python ./gst_rtsp_to_wx.py
14
15
Acknowledgments
16
---------------
17
18
Video class to capture GStreamer frames
19
https://www.ardusub.com/developers/opencv.html
20
21
ImagePanel class to display openCV images in wxWidgets
22
https://stackoverflow.com/questions/14804741/opencv-integration-with-wxpython
23
"""
24
25
# flake8: noqa
26
27
import copy
28
import cv2
29
import gi
30
import numpy as np
31
import threading
32
import wx
33
34
35
gi.require_version("Gst", "1.0")
36
from gi.repository import Gst
37
38
39
class VideoStream:
40
"""BlueRov video capture class constructor - adapted to capture rtspsrc
41
42
Attributes:
43
address (string): RTSP address
44
port (int): RTSP port
45
mount_point (string): video stream mount point
46
video_decode (string): Transform YUV (12bits) to BGR (24bits)
47
video_pipe (object): GStreamer top-level pipeline
48
video_sink (object): Gstreamer sink element
49
video_sink_conf (string): Sink configuration
50
video_source (string): Udp source ip and port
51
latest_frame (np.ndarray): Latest retrieved video frame
52
"""
53
54
def __init__(
55
self, address="127.0.0.1", port=8554, mount_point="/camera", latency=50
56
):
57
Gst.init(None)
58
59
self.address = address
60
self.port = port
61
self.mount_point = mount_point
62
self.latency = latency
63
64
self.latest_frame = self._new_frame = None
65
66
self.video_source = (
67
f"rtspsrc location=rtsp://{address}:{port}{mount_point} latency={latency}"
68
)
69
70
# Python does not have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
71
self.video_decode = (
72
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert"
73
)
74
# Create a sink to get data
75
self.video_sink_conf = (
76
"! appsink emit-signals=true sync=false max-buffers=2 drop=true"
77
)
78
79
self.video_pipe = None
80
self.video_sink = None
81
82
self.run()
83
84
def start_gst(self, config=None):
85
""" Start gstreamer pipeline and sink
86
Pipeline description list e.g:
87
[
88
'videotestsrc ! decodebin', \
89
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
90
'! appsink'
91
]
92
93
Args:
94
config (list, optional): Gstreamer pileline description list
95
"""
96
97
if not config:
98
config = [
99
"videotestsrc ! decodebin",
100
"! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert",
101
"! appsink",
102
]
103
104
command = " ".join(config)
105
self.video_pipe = Gst.parse_launch(command)
106
self.video_pipe.set_state(Gst.State.PLAYING)
107
self.video_sink = self.video_pipe.get_by_name("appsink0")
108
109
@staticmethod
110
def gst_to_opencv(sample):
111
"""Transform byte array into np array
112
113
Args:
114
sample (TYPE): Description
115
116
Returns:
117
TYPE: Description
118
"""
119
buf = sample.get_buffer()
120
caps_structure = sample.get_caps().get_structure(0)
121
array = np.ndarray(
122
(caps_structure.get_value("height"), caps_structure.get_value("width"), 3),
123
buffer=buf.extract_dup(0, buf.get_size()),
124
dtype=np.uint8,
125
)
126
return array
127
128
def frame(self):
129
"""Get Frame
130
131
Returns:
132
np.ndarray: latest retrieved image frame
133
"""
134
if self.frame_available:
135
self.latest_frame = self._new_frame
136
# reset to indicate latest frame has been 'consumed'
137
self._new_frame = None
138
return self.latest_frame
139
140
def frame_available(self):
141
"""Check if a new frame is available
142
143
Returns:
144
bool: true if a new frame is available
145
"""
146
return self._new_frame is not None
147
148
def run(self):
149
"""Get frame to update _new_frame"""
150
151
self.start_gst(
152
[
153
self.video_source,
154
self.video_decode,
155
self.video_sink_conf,
156
]
157
)
158
159
self.video_sink.connect("new-sample", self.callback)
160
161
def callback(self, sink):
162
sample = sink.emit("pull-sample")
163
self._new_frame = self.gst_to_opencv(sample)
164
165
return Gst.FlowReturn.OK
166
167
168
class ImagePanel(wx.Panel):
169
def __init__(self, parent, video_stream, fps=30):
170
wx.Panel.__init__(self, parent)
171
172
self._video_stream = video_stream
173
174
# Shared between threads
175
self._frame_lock = threading.Lock()
176
self._latest_frame = None
177
178
print("Waiting for video stream...")
179
waited = 0
180
while not self._video_stream.frame_available():
181
waited += 1
182
print("\r Frame not available (x{})".format(waited), end="")
183
cv2.waitKey(30)
184
print("\nSuccess! Video stream available")
185
186
if self._video_stream.frame_available():
187
# Only retrieve and display a frame if it's new
188
frame = copy.deepcopy(self._video_stream.frame())
189
190
# Frame size
191
height, width, _ = frame.shape
192
193
parent.SetSize((width, height))
194
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
195
196
self.bmp = wx.Bitmap.FromBuffer(width, height, frame)
197
198
self.timer = wx.Timer(self)
199
self.timer.Start(int(1000 / fps))
200
201
self.Bind(wx.EVT_PAINT, self.OnPaint)
202
self.Bind(wx.EVT_TIMER, self.NextFrame)
203
204
def OnPaint(self, evt):
205
dc = wx.BufferedPaintDC(self)
206
dc.DrawBitmap(self.bmp, 0, 0)
207
208
def NextFrame(self, event):
209
if self._video_stream.frame_available():
210
frame = copy.deepcopy(self._video_stream.frame())
211
212
# Convert frame to bitmap for wxFrame
213
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
214
self.bmp.CopyFromBuffer(frame)
215
self.Refresh()
216
217
218
def main():
219
220
# create the video stream
221
video_stream = VideoStream(mount_point="/camera")
222
223
# app must run on the main thread
224
app = wx.App()
225
wx_frame = wx.Frame(None)
226
227
# create the image panel
228
image_panel = ImagePanel(wx_frame, video_stream, fps=30)
229
230
wx_frame.Show()
231
app.MainLoop()
232
233
234
if __name__ == "__main__":
235
main()
236
237