Path: blob/trunk/rb/lib/selenium/webdriver/common/websocket_connection.rb
4030 views
# frozen_string_literal: true12# Licensed to the Software Freedom Conservancy (SFC) under one3# or more contributor license agreements. See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership. The SFC licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License. You may obtain a copy of the License at9#10# http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied. See the License for the16# specific language governing permissions and limitations17# under the License.1819require 'websocket'2021module Selenium22module WebDriver23class WebSocketConnection24CONNECTION_ERRORS = [25Errno::ECONNRESET, # connection is aborted (browser process was killed)26Errno::EPIPE, # broken pipe (browser process was killed)27Errno::EBADF, # file descriptor already closed (double-close or GC)28IOError, # Ruby socket read/write after close29EOFError # socket reached EOF after remote closed cleanly30].freeze3132RESPONSE_WAIT_TIMEOUT = 3033RESPONSE_WAIT_INTERVAL = 0.13435MAX_LOG_MESSAGE_SIZE = 99993637def initialize(url:)38@callback_threads = ThreadGroup.new3940@callbacks_mtx = Mutex.new41@messages_mtx = Mutex.new42@closing_mtx = Mutex.new4344@closing = false45@session_id = nil46@url = url4748process_handshake49@socket_thread = attach_socket_listener50end5152def close53@closing_mtx.synchronize do54return if @closing5556@closing = true57end5859begin60socket.close61rescue *CONNECTION_ERRORS => e62WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws63# already closed64end6566# Let threads unwind instead of calling exit67@socket_thread&.join(0.5)68@callback_threads.list.each do |thread|69thread.join(0.5)70rescue StandardError => e71WebDriver.logger.debug "Failed to join thread during close: #{e.class}: #{e.message}", id: :ws72end73end7475def callbacks76@callbacks ||= Hash.new { |callbacks, event| callbacks[event] = [] }77end7879def add_callback(event, &block)80@callbacks_mtx.synchronize do81callbacks[event] << block82block.object_id83end84end8586def remove_callback(event, id)87@callbacks_mtx.synchronize do88return if @closing8990callbacks_for_event = callbacks[event]91return if callbacks_for_event.reject! { |cb| cb.object_id == id }9293ids = callbacks_for_event.map(&:object_id)94raise Error::WebDriverError, "Callback with ID #{id} does not exist for event #{event}: #{ids}"95end96end9798def send_cmd(**payload)99id = next_id100data = payload.merge(id: id)101WebDriver.logger.debug "WebSocket -> #{data}"[...MAX_LOG_MESSAGE_SIZE], id: :ws102data = JSON.generate(data)103out_frame = WebSocket::Frame::Outgoing::Client.new(version: ws.version, data: data, type: 'text')104105begin106socket.write(out_frame.to_s)107rescue *CONNECTION_ERRORS => e108raise e, "WebSocket is closed (#{e.class}: #{e.message})"109end110111wait.until { @messages_mtx.synchronize { messages.delete(id) } }112end113114private115116def messages117@messages ||= {}118end119120def process_handshake121socket.print(ws.to_s)122ws << socket.readpartial(1024) until ws.finished?123end124125def attach_socket_listener126Thread.new do127Thread.current.report_on_exception = false128129loop do130break if @closing131132incoming_frame << socket.readpartial(1024)133134while (frame = incoming_frame.next)135break if @closing136137message = process_frame(frame)138next unless message['method']139140@messages_mtx.synchronize { callbacks[message['method']].dup }.each do |callback|141@callback_threads.add(callback_thread(message['params'], &callback))142end143end144end145rescue *CONNECTION_ERRORS, WebSocket::Error => e146WebDriver.logger.debug "WebSocket listener closed: #{e.class}: #{e.message}", id: :ws147end148end149150def incoming_frame151@incoming_frame ||= WebSocket::Frame::Incoming::Client.new(version: ws.version)152end153154def process_frame(frame)155message = frame.to_s156157# Firefox will periodically fail on unparsable empty frame158return {} if message.empty?159160msg = JSON.parse(message)161@messages_mtx.synchronize { messages[msg['id']] = msg if msg.key?('id') }162163WebDriver.logger.debug "WebSocket <- #{msg}"[...MAX_LOG_MESSAGE_SIZE], id: :ws164msg165end166167def callback_thread(params)168Thread.new do169Thread.current.abort_on_exception = false170Thread.current.report_on_exception = false171next if @closing172173yield params174rescue Error::WebDriverError, *CONNECTION_ERRORS => e175WebDriver.logger.debug "Callback aborted: #{e.class}: #{e.message}", id: :ws176rescue StandardError => e177next if @closing178179bt = Array(e.backtrace).first(5).join("\n")180WebDriver.logger.error "Callback error: #{e.class}: #{e.message}\n#{bt}", id: :ws181end182end183184def wait185@wait ||= Wait.new(timeout: RESPONSE_WAIT_TIMEOUT, interval: RESPONSE_WAIT_INTERVAL)186end187188def socket189@socket ||= if URI(@url).scheme == 'wss'190socket = TCPSocket.new(ws.host, ws.port)191socket = OpenSSL::SSL::SSLSocket.new(socket, OpenSSL::SSL::SSLContext.new)192socket.sync_close = true193socket.connect194195socket196else197TCPSocket.new(ws.host, ws.port)198end199end200201def ws202@ws ||= WebSocket::Handshake::Client.new(url: @url)203end204205def next_id206@id ||= 0207@id += 1208end209end # BiDi210end # WebDriver211end # Selenium212213214