28 #ifndef WEBSOCKETPP_CONNECTION_IMPL_HPP
29 #define WEBSOCKETPP_CONNECTION_IMPL_HPP
31 #include <websocketpp/processors/hybi00.hpp>
32 #include <websocketpp/processors/hybi07.hpp>
33 #include <websocketpp/processors/hybi08.hpp>
34 #include <websocketpp/processors/hybi13.hpp>
36 #include <websocketpp/processors/processor.hpp>
38 #include <websocketpp/common/platforms.hpp>
39 #include <websocketpp/common/system_error.hpp>
50 namespace istate = session::internal_state;
52 template <
typename config>
54 termination_handler new_handler)
57 "connection set_termination_handler");
61 m_termination_handler = new_handler;
64 template <
typename config>
67 return m_processor->get_origin(m_request);
70 template <
typename config>
73 return m_send_buffer_size;
76 template <
typename config>
82 template <
typename config>
84 frame::opcode::value op)
86 message_ptr msg = m_msg_manager->get_message(op,payload.size());
87 msg->append_payload(payload);
92 template <
typename config>
94 frame::opcode::value op)
96 message_ptr msg = m_msg_manager->get_message(op,len);
97 msg->append_payload(payload,len);
102 template <
typename config>
110 scoped_lock_type lock(m_connection_state_lock);
111 if (m_state != session::state::open) {
116 message_ptr outgoing_msg;
117 bool needs_writing =
false;
119 if (msg->get_prepared()) {
122 scoped_lock_type lock(m_write_lock);
123 write_push(outgoing_msg);
124 needs_writing = !m_write_flag && !m_send_queue.empty();
126 outgoing_msg = m_msg_manager->get_message();
132 scoped_lock_type lock(m_write_lock);
133 lib::error_code ec = m_processor->prepare_data_frame(msg,outgoing_msg);
139 write_push(outgoing_msg);
140 needs_writing = !m_write_flag && !m_send_queue.empty();
144 transport_con_type::dispatch(lib::bind(
150 return lib::error_code();
153 template <
typename config>
160 scoped_lock_type lock(m_connection_state_lock);
161 if (m_state != session::state::open) {
162 std::stringstream ss;
163 ss <<
"connection::ping called from invalid state " << m_state;
170 message_ptr msg = m_msg_manager->get_message();
176 ec = m_processor->prepare_ping(payload,msg);
180 if (m_pong_timeout_handler) {
183 m_ping_timer->cancel();
186 if (m_pong_timeout_dur > 0) {
187 m_ping_timer = transport_con_type::set_timer(
190 &type::handle_pong_timeout,
193 lib::placeholders::_1
201 set but the transport in use does not support timeouts.");
205 bool needs_writing =
false;
207 scoped_lock_type lock(m_write_lock);
209 needs_writing = !m_write_flag && !m_send_queue.empty();
213 transport_con_type::dispatch(lib::bind(
219 ec = lib::error_code();
222 template<
typename config>
231 template<
typename config>
233 lib::error_code
const & ec)
245 if (m_pong_timeout_handler) {
246 m_pong_timeout_handler(m_connection_hdl,payload);
250 template <
typename config>
257 scoped_lock_type lock(m_connection_state_lock);
258 if (m_state != session::state::open) {
259 std::stringstream ss;
260 ss <<
"connection::pong called from invalid state " << m_state;
267 message_ptr msg = m_msg_manager->get_message();
273 ec = m_processor->prepare_pong(payload,msg);
276 bool needs_writing =
false;
278 scoped_lock_type lock(m_write_lock);
280 needs_writing = !m_write_flag && !m_send_queue.empty();
284 transport_con_type::dispatch(lib::bind(
290 ec = lib::error_code();
293 template<
typename config>
302 template <
typename config>
304 std::string
const & reason, lib::error_code & ec)
311 std::string tr(reason,0,std::min<size_t>(reason.size(),
314 scoped_lock_type lock(m_connection_state_lock);
316 if (m_state != session::state::open) {
324 template<
typename config>
326 std::string
const & reason)
329 close(code,reason,ec);
339 template <
typename config>
342 return transport_con_type::interrupt(
344 &type::handle_interrupt,
351 template <
typename config>
353 if (m_interrupt_handler) {
354 m_interrupt_handler(m_connection_hdl);
358 template <
typename config>
361 return transport_con_type::dispatch(
363 &type::handle_pause_reading,
370 template <
typename config>
376 template <
typename config>
379 return transport_con_type::dispatch(
381 &type::handle_resume_reading,
388 template <
typename config>
404 template <
typename config>
407 return m_uri->get_secure();
410 template <
typename config>
413 return m_uri->get_host();
416 template <
typename config>
419 return m_uri->get_resource();
422 template <
typename config>
425 return m_uri->get_port();
428 template <
typename config>
434 template <
typename config>
445 template <
typename config>
447 return m_subprotocol;
450 template <
typename config>
451 std::vector<std::string>
const &
453 return m_requested_subprotocols;
456 template <
typename config>
458 lib::error_code & ec)
466 if (value.empty() || std::find_if(value.begin(),value.end(),
473 m_requested_subprotocols.push_back(value);
476 template <
typename config>
479 this->add_subprotocol(value,ec);
486 template <
typename config>
488 lib::error_code & ec)
496 ec = lib::error_code();
500 std::vector<std::string>::iterator it;
502 it = std::find(m_requested_subprotocols.begin(),
503 m_requested_subprotocols.end(),
506 if (it == m_requested_subprotocols.end()) {
511 m_subprotocol =
value;
514 template <
typename config>
517 this->select_subprotocol(value,ec);
524 template <
typename config>
527 return m_request.get_header(key);
530 template <
typename config>
533 return m_request.get_body();
536 template <
typename config>
539 return m_response.get_header(key);
542 template <
typename config>
545 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
546 throw exception(
"Call to set_status from invalid state",
549 m_response.set_status(code);
551 template <
typename config>
553 std::string
const & msg)
555 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
556 throw exception(
"Call to set_status from invalid state",
560 m_response.set_status(code,msg);
562 template <
typename config>
564 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
565 throw exception(
"Call to set_status from invalid state",
569 m_response.set_body(value);
572 template <
typename config>
574 std::string
const & val)
577 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
579 m_response.append_header(key,val);
581 throw exception(
"Call to append_header from invalid state",
585 if (m_internal_state == istate::USER_INIT) {
587 m_request.append_header(key,val);
589 throw exception(
"Call to append_header from invalid state",
594 template <
typename config>
596 std::string
const & val)
599 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
601 m_response.replace_header(key,val);
603 throw exception(
"Call to replace_header from invalid state",
607 if (m_internal_state == istate::USER_INIT) {
609 m_request.replace_header(key,val);
611 throw exception(
"Call to replace_header from invalid state",
616 template <
typename config>
620 if (m_internal_state == istate::PROCESS_HTTP_REQUEST) {
622 m_response.remove_header(key);
624 throw exception(
"Call to remove_header from invalid state",
628 if (m_internal_state == istate::USER_INIT) {
630 m_request.remove_header(key);
632 throw exception(
"Call to remove_header from invalid state",
649 template <
typename config>
653 if (m_handshake_timer) {
654 m_handshake_timer->cancel();
655 m_handshake_timer.reset();
659 m_http_state = session::http_state::deferred;
661 return lib::error_code();
674 template <
typename config>
677 scoped_lock_type lock(m_connection_state_lock);
678 if (m_http_state != session::http_state::deferred) {
683 m_http_state = session::http_state::body_written;
686 this->write_http_response(lib::error_code());
687 ec = lib::error_code();
690 template <
typename config>
693 this->send_http_response(ec);
704 template <
typename config>
708 if (m_internal_state != istate::USER_INIT) {
714 m_internal_state = istate::TRANSPORT_INIT;
719 transport_con_type::init(
721 &type::handle_transport_init,
723 lib::placeholders::_1
728 template <
typename config>
729 void connection<config>::handle_transport_init(lib::error_code
const & ec) {
732 lib::error_code ecm = ec;
734 if (m_internal_state != istate::TRANSPORT_INIT) {
736 "handle_transport_init must be called from transport init state");
742 s <<
"handle_transport_init received error: "<< ecm.message();
745 this->terminate(ecm);
751 m_internal_state = istate::READ_HTTP_REQUEST;
752 this->read_handshake(1);
756 m_internal_state = istate::WRITE_HTTP_REQUEST;
757 m_processor = get_processor(config::client_version);
758 this->send_http_request();
762 template <
typename config>
763 void connection<config>::read_handshake(
size_t num_bytes) {
766 if (m_open_handshake_timeout_dur > 0) {
767 m_handshake_timer = transport_con_type::set_timer(
768 m_open_handshake_timeout_dur,
770 &type::handle_open_handshake_timeout,
772 lib::placeholders::_1
777 transport_con_type::async_read_at_least(
780 config::connection_read_buffer_size,
782 &type::handle_read_handshake,
784 lib::placeholders::_1,
785 lib::placeholders::_2
792 template <
typename config>
793 void connection<config>::handle_read_handshake(lib::error_code
const & ec,
794 size_t bytes_transferred)
798 lib::error_code ecm = ec;
801 scoped_lock_type lock(m_connection_state_lock);
803 if (m_state == session::state::connecting) {
804 if (m_internal_state != istate::READ_HTTP_REQUEST) {
807 }
else if (m_state == session::state::closed) {
812 "handle_read_handshake invoked after connection was closed");
823 "got (expected) eof/state error from closed con");
828 this->terminate(ecm);
833 if (bytes_transferred > config::connection_read_buffer_size) {
839 size_t bytes_processed = 0;
841 bytes_processed = m_request.consume(m_buf,bytes_transferred);
842 }
catch (http::exception &e) {
845 m_response.set_status(e.m_error_code,e.m_error_msg);
852 if (bytes_processed > bytes_transferred) {
860 s <<
"bytes_transferred: " << bytes_transferred
861 <<
" bytes, bytes processed: " << bytes_processed <<
" bytes";
865 if (m_request.ready()) {
866 lib::error_code processor_ec = this->initialize_processor();
868 this->write_http_response_error(processor_ec);
872 if (m_processor && m_processor->get_version() == 0) {
875 if (bytes_transferred-bytes_processed >= 8) {
876 m_request.replace_header(
877 "Sec-WebSocket-Key3",
878 std::string(m_buf+bytes_processed,m_buf+bytes_processed+8)
880 bytes_processed += 8;
884 m_response.set_status(http::status_code::internal_server_error);
892 if (m_request.get_header(
"Sec-WebSocket-Key3") !=
"") {
901 std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
902 m_buf_cursor = bytes_transferred-bytes_processed;
905 m_internal_state = istate::PROCESS_HTTP_REQUEST;
908 lib::error_code handshake_ec = this->process_handshake_request();
909 if (!m_is_http || m_http_state != session::http_state::deferred) {
910 this->write_http_response(handshake_ec);
914 transport_con_type::async_read_at_least(
917 config::connection_read_buffer_size,
919 &type::handle_read_handshake,
921 lib::placeholders::_1,
922 lib::placeholders::_2
933 template <
typename config>
934 void connection<config>::write_http_response_error(lib::error_code
const & ec) {
935 if (m_internal_state != istate::READ_HTTP_REQUEST) {
937 "write_http_response_error called in invalid state");
942 m_internal_state = istate::PROCESS_HTTP_REQUEST;
944 this->write_http_response(ec);
949 template <
typename config>
950 void connection<config>::handle_read_frame(lib::error_code
const & ec,
951 size_t bytes_transferred)
955 lib::error_code ecm = ec;
957 if (!ecm && m_internal_state != istate::PROCESS_CONNECTION) {
965 if (m_state == session::state::closed) {
970 }
else if (m_state == session::state::closing && !m_is_server) {
974 terminate(lib::error_code());
982 if (m_state == session::state::closed) {
984 "handle_read_frame: got invalid istate in closed state");
988 if (m_state == session::state::closed) {
992 terminate(lib::error_code());
1000 log_err(echannel,
"handle_read_frame", ecm);
1001 this->terminate(ecm);
1015 std::stringstream s;
1016 s <<
"p = " << p <<
" bytes transferred = " << bytes_transferred;
1020 while (p < bytes_transferred) {
1022 std::stringstream s;
1023 s <<
"calling consume with " << bytes_transferred-p <<
" bytes";
1027 lib::error_code consume_ec;
1029 p += m_processor->consume(
1030 reinterpret_cast<uint8_t*>(m_buf)+p,
1031 bytes_transferred-p,
1036 std::stringstream s;
1037 s <<
"bytes left after consume: " << bytes_transferred-p;
1043 if (config::drop_on_protocol_error) {
1044 this->terminate(consume_ec);
1047 lib::error_code close_ec;
1050 consume_ec.message(),
1056 this->terminate(close_ec);
1063 if (m_processor->ready()) {
1065 std::stringstream s;
1066 s <<
"Complete message received. Dispatching";
1070 message_ptr msg = m_processor->get_message();
1076 if (m_state != session::state::open) {
1078 }
else if (m_message_handler) {
1079 m_message_handler(m_connection_hdl, msg);
1082 process_control_frame(msg);
1091 template <
typename config>
1097 transport_con_type::async_read_at_least(
1107 config::connection_read_buffer_size,
1112 template <
typename config>
1118 return lib::error_code();
1125 m_response.set_status(http::status_code::bad_request);
1129 m_processor = get_processor(version);
1133 return lib::error_code();
1139 m_response.set_status(http::status_code::bad_request);
1141 std::stringstream ss;
1142 std::string sep =
"";
1143 std::vector<int>::const_iterator it;
1150 m_response.replace_header(
"Sec-WebSocket-Version",ss.str());
1154 template <
typename config>
1165 (transport_con_type::is_secure() ?
"https" :
"http")
1168 if (!m_uri->get_valid()) {
1170 m_response.set_status(http::status_code::bad_request);
1174 if (m_http_handler) {
1176 m_http_handler(m_connection_hdl);
1178 if (m_state == session::state::closed) {
1182 set_status(http::status_code::upgrade_required);
1186 return lib::error_code();
1189 lib::error_code ec = m_processor->validate_handshake(m_request);
1195 m_response.set_status(http::status_code::bad_request);
1201 std::pair<lib::error_code,std::string> neg_results;
1202 neg_results = m_processor->negotiate_extensions(m_request);
1204 if (neg_results.first) {
1208 m_response.set_status(http::status_code::bad_request);
1209 return neg_results.first;
1214 if (neg_results.second.size() > 0) {
1215 m_response.replace_header(
"Sec-WebSocket-Extensions",
1216 neg_results.second);
1221 m_uri = m_processor->get_uri(m_request);
1224 if (!m_uri->get_valid()) {
1226 m_response.set_status(http::status_code::bad_request);
1231 lib::error_code subp_ec = m_processor->extract_subprotocols(m_request,
1232 m_requested_subprotocols);
1239 if (!m_validate_handler || m_validate_handler(m_connection_hdl)) {
1240 m_response.set_status(http::status_code::switching_protocols);
1244 ec = m_processor->process_handshake(m_request,m_subprotocol,m_response);
1247 std::stringstream s;
1248 s <<
"Processing error: " << ec <<
"(" << ec.message() <<
")";
1251 m_response.set_status(http::status_code::internal_server_error);
1261 if (m_response.get_status_code() == http::status_code::uninitialized) {
1262 m_response.set_status(http::status_code::bad_request);
1268 return lib::error_code();
1271 template <
typename config>
1280 if (m_response.get_status_code() == http::status_code::uninitialized) {
1281 m_response.
set_status(http::status_code::internal_server_error);
1287 m_response.set_version(
"HTTP/1.1");
1290 if (m_response.get_header(
"Server") ==
"") {
1291 if (!m_user_agent.empty()) {
1292 m_response.replace_header(
"Server",m_user_agent);
1294 m_response.remove_header(
"Server");
1300 m_handshake_buffer = m_processor->get_raw(m_response);
1303 m_handshake_buffer = m_response.raw();
1308 if (m_response.get_header(
"Sec-WebSocket-Key3") !=
"") {
1315 transport_con_type::async_write(
1316 m_handshake_buffer.data(),
1317 m_handshake_buffer.size(),
1319 &type::handle_write_http_response,
1321 lib::placeholders::_1
1326 template <
typename config>
1327 void connection<config>::handle_write_http_response(lib::error_code
const & ec) {
1330 lib::error_code ecm = ec;
1333 scoped_lock_type lock(m_connection_state_lock);
1335 if (m_state == session::state::connecting) {
1336 if (m_internal_state != istate::PROCESS_HTTP_REQUEST) {
1339 }
else if (m_state == session::state::closed) {
1344 "handle_write_http_response invoked after connection was closed");
1355 "got (expected) eof/state error from closed con");
1360 this->terminate(ecm);
1364 if (m_handshake_timer) {
1365 m_handshake_timer->cancel();
1366 m_handshake_timer.reset();
1369 if (m_response.get_status_code() != http::status_code::switching_protocols)
1376 std::stringstream s;
1377 s <<
"Handshake ended with HTTP error: "
1378 << m_response.get_status_code();
1384 this->log_http_result();
1388 "got to writing HTTP results with m_ec set: "+m_ec.message());
1393 this->terminate(m_ec);
1397 this->log_open_result();
1399 m_internal_state = istate::PROCESS_CONNECTION;
1400 m_state = session::state::open;
1402 if (m_open_handler) {
1403 m_open_handler(m_connection_hdl);
1406 this->handle_read_frame(lib::error_code(), m_buf_cursor);
1409 template <
typename config>
1410 void connection<config>::send_http_request() {
1419 ec = m_processor->client_handshake_request(m_request,m_uri,
1420 m_requested_subprotocols);
1432 if (m_request.get_header(
"User-Agent") ==
"") {
1433 if (!m_user_agent.empty()) {
1434 m_request.replace_header(
"User-Agent",m_user_agent);
1436 m_request.remove_header(
"User-Agent");
1440 m_handshake_buffer = m_request.raw();
1446 if (m_open_handshake_timeout_dur > 0) {
1447 m_handshake_timer = transport_con_type::set_timer(
1448 m_open_handshake_timeout_dur,
1450 &type::handle_open_handshake_timeout,
1452 lib::placeholders::_1
1457 transport_con_type::async_write(
1458 m_handshake_buffer.data(),
1459 m_handshake_buffer.size(),
1461 &type::handle_send_http_request,
1463 lib::placeholders::_1
1468 template <
typename config>
1469 void connection<config>::handle_send_http_request(lib::error_code
const & ec) {
1472 lib::error_code ecm = ec;
1475 scoped_lock_type lock(m_connection_state_lock);
1477 if (m_state == session::state::connecting) {
1478 if (m_internal_state != istate::WRITE_HTTP_REQUEST) {
1481 m_internal_state = istate::READ_HTTP_RESPONSE;
1483 }
else if (m_state == session::state::closed) {
1488 "handle_send_http_request invoked after connection was closed");
1499 "got (expected) eof/state error from closed con");
1504 this->terminate(ecm);
1508 transport_con_type::async_read_at_least(
1511 config::connection_read_buffer_size,
1513 &type::handle_read_http_response,
1515 lib::placeholders::_1,
1516 lib::placeholders::_2
1521 template <
typename config>
1522 void connection<config>::handle_read_http_response(lib::error_code
const & ec,
1523 size_t bytes_transferred)
1527 lib::error_code ecm = ec;
1530 scoped_lock_type lock(m_connection_state_lock);
1532 if (m_state == session::state::connecting) {
1533 if (m_internal_state != istate::READ_HTTP_RESPONSE) {
1536 }
else if (m_state == session::state::closed) {
1541 "handle_read_http_response invoked after connection was closed");
1552 "got (expected) eof/state error from closed con");
1557 this->terminate(ecm);
1561 size_t bytes_processed = 0;
1564 bytes_processed = m_response.consume(m_buf,bytes_transferred);
1565 }
catch (http::exception & e) {
1567 std::string(
"error in handle_read_http_response: ")+e.what());
1574 if (m_response.headers_ready()) {
1575 if (m_handshake_timer) {
1576 m_handshake_timer->cancel();
1577 m_handshake_timer.reset();
1580 lib::error_code validate_ec = m_processor->validate_server_handshake_response(
1586 this->terminate(validate_ec);
1591 m_internal_state = istate::PROCESS_CONNECTION;
1592 m_state = session::state::open;
1594 this->log_open_result();
1596 if (m_open_handler) {
1597 m_open_handler(m_connection_hdl);
1603 std::copy(m_buf+bytes_processed,m_buf+bytes_transferred,m_buf);
1604 m_buf_cursor = bytes_transferred-bytes_processed;
1606 this->handle_read_frame(lib::error_code(), m_buf_cursor);
1608 transport_con_type::async_read_at_least(
1611 config::connection_read_buffer_size,
1613 &type::handle_read_http_response,
1615 lib::placeholders::_1,
1616 lib::placeholders::_2
1622 template <
typename config>
1623 void connection<config>::handle_open_handshake_timeout(
1624 lib::error_code
const & ec)
1630 "open handle_open_handshake_timeout error: "+ec.message());
1638 template <
typename config>
1639 void connection<config>::handle_close_handshake_timeout(
1640 lib::error_code
const & ec)
1646 "asio open handle_close_handshake_timeout error: "+ec.message());
1654 template <
typename config>
1655 void connection<config>::terminate(lib::error_code
const & ec) {
1661 if (m_handshake_timer) {
1662 m_handshake_timer->cancel();
1663 m_handshake_timer.reset();
1666 terminate_status tstat = unknown;
1670 m_local_close_reason = ec.message();
1675 m_http_state = session::http_state::closed;
1677 if (m_state == session::state::connecting) {
1678 m_state = session::state::closed;
1686 }
else if (m_state != session::state::closed) {
1687 m_state = session::state::closed;
1691 "terminate called on connection that was already terminated");
1697 transport_con_type::async_shutdown(
1699 &type::handle_terminate,
1702 lib::placeholders::_1
1707 template <
typename config>
1708 void connection<config>::handle_terminate(terminate_status tstat,
1709 lib::error_code
const & ec)
1721 if (tstat == failed) {
1723 if (m_fail_handler) {
1724 m_fail_handler(m_connection_hdl);
1727 }
else if (tstat == closed) {
1728 if (m_close_handler) {
1729 m_close_handler(m_connection_hdl);
1739 if (m_termination_handler) {
1741 m_termination_handler(type::get_shared());
1742 }
catch (std::exception
const & e) {
1744 std::string(
"termination_handler call failed. Reason was: ")+e.what());
1749 template <
typename config>
1754 scoped_lock_type lock(m_write_lock);
1766 message_ptr next_message = write_pop();
1767 while (next_message) {
1768 m_current_msgs.push_back(next_message);
1769 if (!next_message->get_terminal()) {
1770 next_message = write_pop();
1772 next_message = message_ptr();
1776 if (m_current_msgs.empty()) {
1783 m_write_flag =
true;
1787 typename std::vector<message_ptr>::iterator it;
1788 for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) {
1789 std::string
const & header = (*it)->get_header();
1790 std::string
const & payload = (*it)->get_payload();
1799 std::stringstream general,header,payload;
1801 general <<
"Dispatching write containing " << m_current_msgs.size()
1802 <<
" message(s) containing ";
1803 header <<
"Header Bytes: \n";
1804 payload <<
"Payload Bytes: \n";
1809 for (
size_t i = 0; i < m_current_msgs.size(); i++) {
1810 hbytes += m_current_msgs[i]->get_header().size();
1811 pbytes += m_current_msgs[i]->get_payload().size();
1814 header <<
"[" << i <<
"] ("
1815 << m_current_msgs[i]->get_header().size() <<
") "
1820 payload <<
"[" << i <<
"] ("
1821 << m_current_msgs[i]->get_payload().size() <<
") ["<<m_current_msgs[i]->get_opcode()<<
"] "
1822 << (m_current_msgs[i]->get_opcode() == frame::opcode::text ?
1823 m_current_msgs[i]->get_payload() :
1831 general << hbytes <<
" header bytes and " << pbytes <<
" payload bytes";
1839 transport_con_type::async_write(
1841 m_write_frame_handler
1845 template <
typename config>
1852 bool terminal = m_current_msgs.back()->get_terminal();
1854 m_send_buffer.clear();
1855 m_current_msgs.clear();
1860 this->terminate(ec);
1865 this->terminate(lib::error_code());
1869 bool needs_writing =
false;
1871 scoped_lock_type lock(m_write_lock);
1874 m_write_flag =
false;
1876 needs_writing = !m_send_queue.empty();
1879 if (needs_writing) {
1880 transport_con_type::dispatch(lib::bind(
1887 template <
typename config>
1893 template <
typename config>
1898 frame::opcode::value op = msg->get_opcode();
1901 std::stringstream s;
1902 s <<
"Control frame received with opcode " << op;
1905 if (m_state == session::state::closed) {
1909 if (op != frame::opcode::CLOSE && m_state != session::state::open) {
1914 if (op == frame::opcode::PING) {
1915 bool should_reply =
true;
1917 if (m_ping_handler) {
1918 should_reply = m_ping_handler(m_connection_hdl, msg->get_payload());
1922 this->pong(msg->get_payload(),ec);
1927 }
else if (op == frame::opcode::PONG) {
1928 if (m_pong_handler) {
1929 m_pong_handler(m_connection_hdl, msg->get_payload());
1932 m_ping_timer->cancel();
1934 }
else if (op == frame::opcode::CLOSE) {
1941 if (config::drop_on_protocol_error) {
1942 s <<
"Received invalid close code " << m_remote_close_code
1943 <<
" dropping connection per config.";
1945 this->terminate(ec);
1947 s <<
"Received invalid close code " << m_remote_close_code
1948 <<
" sending acknowledgement and closing";
1951 "Invalid close code");
1961 if (config::drop_on_protocol_error) {
1963 "Received invalid close reason. Dropping connection per config");
1964 this->terminate(ec);
1967 "Received invalid close reason. Sending acknowledgement and closing");
1969 "Invalid close reason");
1977 if (m_state == session::state::open) {
1979 s <<
"Received close frame with code " << m_remote_close_code
1980 <<
" and reason " << m_remote_close_reason;
1983 ec = send_close_ack();
1987 }
else if (m_state == session::state::closing && !m_was_clean) {
2001 terminate(lib::error_code());
2014 template <
typename config>
2016 std::string
const & reason)
2018 return send_close_frame(code,reason,
true,m_is_server);
2021 template <
typename config>
2023 std::string
const & reason,
bool ack,
bool terminal)
2035 if (config::silent_close) {
2038 m_local_close_reason =
"";
2041 m_local_close_code = code;
2042 m_local_close_reason = reason;
2046 m_local_close_reason =
"";
2049 "acknowledging a no-status close with normal code");
2051 m_local_close_reason =
"";
2054 m_local_close_code = m_remote_close_code;
2055 m_local_close_reason = m_remote_close_reason;
2058 std::stringstream s;
2059 s <<
"Closing with code: " << m_local_close_code <<
", and reason: "
2060 << m_local_close_reason;
2063 message_ptr msg = m_msg_manager->get_message();
2068 lib::error_code ec = m_processor->prepare_close(m_local_close_code,
2069 m_local_close_reason,msg);
2078 msg->set_terminal(
true);
2081 m_state = session::state::closing;
2089 if (m_close_handshake_timeout_dur > 0) {
2090 m_handshake_timer = transport_con_type::set_timer(
2091 m_close_handshake_timeout_dur,
2093 &type::handle_close_handshake_timeout,
2095 lib::placeholders::_1
2100 bool needs_writing =
false;
2102 scoped_lock_type lock(m_write_lock);
2104 needs_writing = !m_write_flag && !m_send_queue.empty();
2107 if (needs_writing) {
2108 transport_con_type::dispatch(lib::bind(
2114 return lib::error_code();
2117 template <
typename config>
2118 typename connection<config>::processor_ptr
2119 connection<config>::get_processor(
int version)
const {
2126 p = lib::make_shared<processor::hybi00<config> >(
2127 transport_con_type::is_secure(),
2133 p = lib::make_shared<processor::hybi07<config> >(
2134 transport_con_type::is_secure(),
2141 p = lib::make_shared<processor::hybi08<config> >(
2142 transport_con_type::is_secure(),
2149 p = lib::make_shared<processor::hybi13<config> >(
2150 transport_con_type::is_secure(),
2161 p->set_max_message_size(m_max_message_size);
2166 template <
typename config>
2167 void connection<config>::write_push(
typename config::message_type::ptr msg)
2173 m_send_buffer_size += msg->get_payload().size();
2174 m_send_queue.push(msg);
2177 std::stringstream s;
2178 s <<
"write_push: message count: " << m_send_queue.size()
2179 <<
" buffer size: " << m_send_buffer_size;
2184 template <
typename config>
2185 typename config::message_type::ptr connection<config>::write_pop()
2189 if (m_send_queue.empty()) {
2193 msg = m_send_queue.front();
2195 m_send_buffer_size -= msg->get_payload().size();
2199 std::stringstream s;
2200 s <<
"write_pop: message count: " << m_send_queue.size()
2201 <<
" buffer size: " << m_send_buffer_size;
2207 template <
typename config>
2208 void connection<config>::log_open_result()
2210 std::stringstream s;
2220 s << (version == -1 ?
"HTTP" :
"WebSocket") <<
" Connection ";
2223 s << transport_con_type::get_remote_endpoint() <<
" ";
2226 if (version != -1) {
2227 s <<
"v" << version <<
" ";
2231 std::string ua = m_request.get_header(
"User-Agent");
2240 s << (m_uri ? m_uri->get_resource() :
"NULL") <<
" ";
2243 s << m_response.get_status_code();
2248 template <
typename config>
2249 void connection<config>::log_close_result()
2251 std::stringstream s;
2254 <<
"close local:[" << m_local_close_code
2255 << (m_local_close_reason ==
"" ?
"" :
","+m_local_close_reason)
2256 <<
"] remote:[" << m_remote_close_code
2257 << (m_remote_close_reason ==
"" ?
"" :
","+m_remote_close_reason) <<
"]";
2262 template <
typename config>
2263 void connection<config>::log_fail_result()
2265 std::stringstream s;
2270 s <<
"WebSocket Connection ";
2273 s << transport_con_type::get_remote_endpoint();
2277 s <<
" v" << version;
2281 std::string ua = m_request.get_header(
"User-Agent");
2290 s << (m_uri ? m_uri->get_resource() :
"-");
2293 s <<
" " << m_response.get_status_code();
2296 s <<
" " << m_ec <<
" " << m_ec.message();
2301 template <
typename config>
2302 void connection<config>::log_http_result() {
2303 std::stringstream s;
2311 s << (m_request.get_header(
"host") ==
"" ?
"-" : m_request.get_header(
"host"))
2312 <<
" " << transport_con_type::get_remote_endpoint()
2313 <<
" \"" << m_request.get_method()
2314 <<
" " << (m_uri ? m_uri->get_resource() :
"-")
2315 <<
" " << m_request.get_version() <<
"\" " << m_response.get_status_code()
2316 <<
" " << m_response.get_body().size();
2319 std::string ua = m_request.get_header(
"User-Agent");
2332 #endif // WEBSOCKETPP_CONNECTION_IMPL_HPP
bool is_control(value v)
Check if an opcode is for a control frame.
lib::error_code process_handshake_request()
void add_subprotocol(std::string const &request, lib::error_code &ec)
Adds the given subprotocol string to the request list (exception free)
void set_termination_handler(termination_handler new_handler)
void read_frame()
Issue a new transport read unless reading is paused.
std::vector< std::string > const & get_requested_subprotocols() const
Gets all of the subprotocols requested by the client.
uint16_t value
The type of a close code value.
static level const control
One line per control frame.
std::string const & get_subprotocol() const
Gets the negotated subprotocol.
bool terminal(value code)
Determine if the code represents an unrecoverable error.
uri_ptr get_uri_from_host(request_type &request, std::string scheme)
Extract a URI ptr from the host header of the request.
lib::error_code pause_reading()
Pause reading of new data.
bool is_websocket_handshake(request_type &r)
Determine whether or not a generic HTTP request is a WebSocket handshake.
Attempted to use a client specific feature on a server endpoint.
session::state::value get_state() const
Return the connection state.
static std::vector< int > const versions_supported(helper, helper+4)
Container that stores the list of protocol versions supported.
Selected subprotocol was not requested by the client.
int get_websocket_version(request_type &r)
Extract the version from a WebSocket handshake request.
uri_ptr get_uri() const
Gets the connection URI.
void replace_header(std::string const &key, std::string const &val)
Replace a header.
void ping(std::string const &payload)
Send a ping.
Represents an individual WebSocket connection.
size_t get_buffered_amount() const
Get the size of the outgoing write buffer (in payload bytes)
static level const frame_payload
One line per frame, includes the full message payload (warning: chatty)
static value const protocol_error
A protocol error occurred.
static value const normal
static level const devel
Low level debugging information (warning: very chatty)
std::string string_replace_all(std::string subject, std::string const &search, std::string const &replace)
Replace all occurrances of a substring with another.
status::value extract_code(std::string const &payload, lib::error_code &ec)
Extract a close code value from a close payload.
void select_subprotocol(std::string const &value, lib::error_code &ec)
Select a subprotocol to use (exception free)
std::string to_hex(std::string const &input)
Convert std::string to ascii printed string of hex digits.
void pong(std::string const &payload)
Send a pong.
void send_http_response()
Send deferred HTTP Response.
bool is_not_token_char(unsigned char c)
Is the character a non-token.
void handle_resume_reading()
Resume reading callback.
static level const frame_header
One line per frame, includes the full frame header.
lib::error_code initialize_processor()
static level const devel
Development messages (warning: very chatty)
std::string const & get_request_header(std::string const &key) const
Retrieve a request header.
static level const disconnect
One line for each closed connection. Includes closing codes and reasons.
lib::error_code resume_reading()
Resume reading of new data.
void close(close::status::value const code, std::string const &reason)
Close the connection.
lib::error_code defer_http_response()
Defer HTTP Response until later (Exception free)
Invalid WebSocket protocol version.
void set_status(http::status_code::value code)
Set response status code and message.
void handle_pong_timeout(std::string payload, lib::error_code const &ec)
Utility method that gets called back when the ping timer expires.
close::status::value to_ws(lib::error_code ec)
Converts a processor error_code into a websocket close code.
lib::error_code make_error_code(error::processor_errors e)
Create an error code with the given value and the processor category.
void append_header(std::string const &key, std::string const &val)
Append a header.
lib::error_code send(std::string const &payload, frame::opcode::value op=frame::opcode::text)
Create a message and then add it to the outgoing send queue.
The connection was in the wrong state for this operation.
void write_frame()
Checks if there are frames in the send queue and if there are sends one.
void handle_write_frame(lib::error_code const &ec)
Process the results of a frame write operation and start the next write.
Namespace for the WebSocket++ project.
std::string const & get_origin() const
Return the same origin policy origin value from the opening request.
std::string const & get_response_header(std::string const &key) const
Retrieve a response header.
uint16_t get_port() const
Returns the port component of the connection URI.
std::string const & get_resource() const
Returns the resource component of the connection URI.
A simple utility buffer class.
std::vector< int > const & get_supported_versions() const
Get array of WebSocket protocol versions that this connection supports.
The endpoint is out of outgoing message buffers.
lib::shared_ptr< uri > uri_ptr
Pointer to a URI.
void handle_interrupt()
Transport inturrupt callback.
static value const no_status
A dummy value to indicate that no status code was received.
void handle_pause_reading()
Pause reading callback.
WebSocket close handshake timed out.
bool get_secure() const
Returns the secure flag from the connection URI.
static level const fail
One line for each failed WebSocket connection with details.
static value const abnormal_close
A dummy value to indicate that the connection was closed abnormally.
std::string const & get_request_body() const
Retrieve a request body.
static uint8_t const close_reason_size
Maximum size of close frame reason.
read or write after shutdown
void set_uri(uri_ptr uri)
Sets the connection URI.
WebSocket opening handshake timed out.
std::string const & get_host() const
Returns the host component of the connection URI.
Attempted to use a server specific feature on a client endpoint.
lib::error_code interrupt()
Asyncronously invoke handler::on_inturrupt.
std::string extract_reason(std::string const &payload, lib::error_code &ec)
Extract the reason string from a close payload.
static level const rerror
An invalid uri was supplied.
Unsupported WebSocket protocol version.
static value const blank
A blank value for internal use.
static level const connect
Information about new connections.
static level const http
Access related to HTTP requests.
void remove_header(std::string const &key)
Remove a header.
void set_body(std::string const &value)
Set response body content.