Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
SeleniumHQ
GitHub Repository: SeleniumHQ/Selenium
Path: blob/trunk/rb/lib/selenium/webdriver/common/websocket_connection.rb
4030 views
1
# frozen_string_literal: true
2
3
# Licensed to the Software Freedom Conservancy (SFC) under one
4
# or more contributor license agreements. See the NOTICE file
5
# distributed with this work for additional information
6
# regarding copyright ownership. The SFC licenses this file
7
# to you under the Apache License, Version 2.0 (the
8
# "License"); you may not use this file except in compliance
9
# with the License. You may obtain a copy of the License at
10
#
11
# http://www.apache.org/licenses/LICENSE-2.0
12
#
13
# Unless required by applicable law or agreed to in writing,
14
# software distributed under the License is distributed on an
15
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
# KIND, either express or implied. See the License for the
17
# specific language governing permissions and limitations
18
# under the License.
19
20
require 'websocket'
21
22
module Selenium
23
module WebDriver
24
class WebSocketConnection
25
CONNECTION_ERRORS = [
26
Errno::ECONNRESET, # connection is aborted (browser process was killed)
27
Errno::EPIPE, # broken pipe (browser process was killed)
28
Errno::EBADF, # file descriptor already closed (double-close or GC)
29
IOError, # Ruby socket read/write after close
30
EOFError # socket reached EOF after remote closed cleanly
31
].freeze
32
33
RESPONSE_WAIT_TIMEOUT = 30
34
RESPONSE_WAIT_INTERVAL = 0.1
35
36
MAX_LOG_MESSAGE_SIZE = 9999
37
38
def initialize(url:)
39
@callback_threads = ThreadGroup.new
40
41
@callbacks_mtx = Mutex.new
42
@messages_mtx = Mutex.new
43
@closing_mtx = Mutex.new
44
45
@closing = false
46
@session_id = nil
47
@url = url
48
49
process_handshake
50
@socket_thread = attach_socket_listener
51
end
52
53
def close
54
@closing_mtx.synchronize do
55
return if @closing
56
57
@closing = true
58
end
59
60
begin
61
socket.close
62
rescue *CONNECTION_ERRORS => e
63
WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
64
# already closed
65
end
66
67
# Let threads unwind instead of calling exit
68
@socket_thread&.join(0.5)
69
@callback_threads.list.each do |thread|
70
thread.join(0.5)
71
rescue StandardError => e
72
WebDriver.logger.debug "Failed to join thread during close: #{e.class}: #{e.message}", id: :ws
73
end
74
end
75
76
def callbacks
77
@callbacks ||= Hash.new { |callbacks, event| callbacks[event] = [] }
78
end
79
80
def add_callback(event, &block)
81
@callbacks_mtx.synchronize do
82
callbacks[event] << block
83
block.object_id
84
end
85
end
86
87
def remove_callback(event, id)
88
@callbacks_mtx.synchronize do
89
return if @closing
90
91
callbacks_for_event = callbacks[event]
92
return if callbacks_for_event.reject! { |cb| cb.object_id == id }
93
94
ids = callbacks_for_event.map(&:object_id)
95
raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"
96
end
97
end
98
99
def send_cmd(**payload)
100
id = next_id
101
data = payload.merge(id: id)
102
WebDriver.logger.debug "WebSocket -> #{data}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
103
data = JSON.generate(data)
104
out_frame = WebSocket::Frame::Outgoing::Client.new(version: ws.version, data: data, type: 'text')
105
106
begin
107
socket.write(out_frame.to_s)
108
rescue *CONNECTION_ERRORS => e
109
raise e, "WebSocket is closed (#{e.class}: #{e.message})"
110
end
111
112
wait.until { @messages_mtx.synchronize { messages.delete(id) } }
113
end
114
115
private
116
117
def messages
118
@messages ||= {}
119
end
120
121
def process_handshake
122
socket.print(ws.to_s)
123
ws << socket.readpartial(1024) until ws.finished?
124
end
125
126
def attach_socket_listener
127
Thread.new do
128
Thread.current.report_on_exception = false
129
130
loop do
131
break if @closing
132
133
incoming_frame << socket.readpartial(1024)
134
135
while (frame = incoming_frame.next)
136
break if @closing
137
138
message = process_frame(frame)
139
next unless message['method']
140
141
@messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|
142
@callback_threads.add(callback_thread(message['params'], &callback))
143
end
144
end
145
end
146
rescue *CONNECTION_ERRORS, WebSocket::Error => e
147
WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws
148
end
149
end
150
151
def incoming_frame
152
@incoming_frame ||= WebSocket::Frame::Incoming::Client.new(version: ws.version)
153
end
154
155
def process_frame(frame)
156
message = frame.to_s
157
158
# Firefox will periodically fail on unparsable empty frame
159
return {} if message.empty?
160
161
msg = JSON.parse(message)
162
@messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }
163
164
WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws
165
msg
166
end
167
168
def callback_thread(params)
169
Thread.new do
170
Thread.current.abort_on_exception = false
171
Thread.current.report_on_exception = false
172
next if @closing
173
174
yield params
175
rescue Error::WebDriverError, *CONNECTION_ERRORS => e
176
WebDriver.logger.debug "Callback aborted: #{e.class}: #{e.message}", id: :ws
177
rescue StandardError => e
178
next if @closing
179
180
bt = Array(e.backtrace).first(5).join("\n")
181
WebDriver.logger.error "Callback error: #{e.class}: #{e.message}\n#{bt}", id: :ws
182
end
183
end
184
185
def wait
186
@wait ||= Wait.new(timeout: RESPONSE_WAIT_TIMEOUT, interval: RESPONSE_WAIT_INTERVAL)
187
end
188
189
def socket
190
@socket ||= if URI(@url).scheme == 'wss'
191
socket = TCPSocket.new(ws.host, ws.port)
192
socket = OpenSSL::SSL::SSLSocket.new(socket, OpenSSL::SSL::SSLContext.new)
193
socket.sync_close = true
194
socket.connect
195
196
socket
197
else
198
TCPSocket.new(ws.host, ws.port)
199
end
200
end
201
202
def ws
203
@ws ||= WebSocket::Handshake::Client.new(url: @url)
204
end
205
206
def next_id
207
@id ||= 0
208
@id += 1
209
end
210
end # BiDi
211
end # WebDriver
212
end # Selenium
213
214