/** * stratum_client.cpp */ #include #include #include #include #include #include #include #include #include "ee382n_bitcoin/stratum_client.hpp" #include "ee382n_bitcoin/mining_work.hpp" #include "nlohmann/json.hpp" using json = nlohmann::json; namespace stratum_v1 { StratumClient::StratumClient(const std::string &worker_name, const std::string &worker_pass) { _id = 0; _sock = 0; _worker_name = worker_name; _worker_pass = worker_pass; } bool StratumClient::mining_subscribe() { int status = fcntl(_sock, F_GETFD); if (status < 0) { std::cerr << "[StratumClient::mining_subscribe] _sock not valid" << std::endl; return false; } int msg_id = _id++; json stratum_connect; stratum_connect["id"] = msg_id; stratum_connect["method"] = "mining.subscribe"; stratum_connect["params"] = std::vector({"ECE382NFinalProjectMiner/1.0.0"}); if(!transmit(stratum_connect)) { return false; } // Wait for response int retry = 0; while(_response_map.find(msg_id) == _response_map.end() && retry++ < StratumClient::MAX_RETRIES) { if(!recv_once()) { std::cerr << "[StratumClient::mining_subscribe] Detected receive error, trying again" << std::endl; } } std::cout << "[StratumClient::mining_subscribe] Received auth response: " << _response_map[msg_id] << std::endl; // Print subscriptions std::cout << "[StratumClient::mining_subscribe] Subscribed to channels "; for(auto pair : _response_map[msg_id]["result"][0]) { std::cout << pair[0] << " "; } std::cout << std::endl; // Get xnonce1 and xnonce2_size from subscribe response _xnonce1 = _response_map[msg_id]["result"][1]; _xnonce2_size = _response_map[msg_id]["result"][2]; _response_map.erase(msg_id); return true; } bool StratumClient::mining_authorize() { int status = fcntl(_sock, F_GETFD); if (status < 0) { std::cerr << "[StratumClient::mining_subscribe] _sock not valid" << std::endl; return false; } int msg_id = _id++; json stratum_auth; stratum_auth["id"] = msg_id; stratum_auth["method"] = "mining.authorize"; stratum_auth["params"] = std::vector({_worker_name, _worker_pass}); // Send message if (!transmit(stratum_auth)) { return false; } // Wait for response int retry = 0; while(_response_map.find(msg_id) == _response_map.end() && retry++ < StratumClient::MAX_RETRIES) { if(!recv_once()) { std::cerr << "[StratumClient::mining_authorize] Detected receive error, trying again" << std::endl; } } std::cout << "[StratumClient::mining_authorize] Received auth response: " << _response_map[msg_id] << std::endl; _response_map.erase(msg_id); return true; } bool StratumClient::mining_submit(const std::string& job_id, const std::string& xnonce2, const std::string& ntime, const std::string& nonce, const std::string& version) { int status = fcntl(_sock, F_GETFD); if (status < 0) { std::cerr << "[StratumClient::mining_subscribe] _sock not valid" << std::endl; return false; } int msg_id = _id++; json msg; msg["id"] = msg_id; msg["method"] = "mining.submit"; // msg["params"] = std::vector({_worker_name, job_id, xnonce2, ntime, nonce, version}); msg["params"] = std::vector({_worker_name, job_id, xnonce2, ntime, nonce}); // Send message if (!transmit(msg)) { return false; } std::cout << "[StratumClient::mining_submit] Submitted share" << std::endl; // Wait for response // TODO: Make these send functions thread-safe // This is a hack because we're assuming this is only ever called when spin_forever is already running // in a different thread. // int retry = 0; // while(_response_map.find(msg_id) == _response_map.end() && retry++ < StratumClient::MAX_RETRIES) { // if(!recv_once()) { // std::cerr << "[StratumClient::mining_submit] Detected receive error, trying again" << std::endl; // } // } // // std::cout << "[StratumClient::mining_submit] Received submit response: " << _response_map[msg_id] << std::endl; // _response_map.erase(msg_id); return true; } bool StratumClient::mining_submit(const MiningWork &mining_work, uint32_t nonce) { std::string nonce_str = MiningWork::bin2hex(reinterpret_cast(&nonce), 4); return mining_submit(mining_work.get_job_id(), mining_work.get_xnonce2(), mining_work.get_ntime(), nonce_str, mining_work.get_version()); } bool StratumClient::connect(const std::string& url, const uint16_t port) { _url = url; _port = port; struct hostent* host = gethostbyname(url.c_str()); sockaddr_in send_sock_addr = {0}; bzero((char*) &send_sock_addr, sizeof(send_sock_addr)); send_sock_addr.sin_family = AF_INET; send_sock_addr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*)*host->h_addr_list)); send_sock_addr.sin_port = htons(port); _sock = socket(AF_INET, SOCK_STREAM, 0); int status = ::connect(_sock, (sockaddr*) &send_sock_addr, sizeof(send_sock_addr)); if (status < 0) { std::cerr << "[StratumClient] Error opening TCP socket!" << std::endl; return false; } std::cout << "[StratumClient] Opened TCP socket to mining pool!" << std::endl; return true; } bool StratumClient::reconnect() { return connect(_url, _port); } bool StratumClient::transmit(const json& msg) const { std::string raw = msg.dump() + "\n"; std::cout << "[StratumClient::transmit] Sending: " << raw << std::endl; size_t bytes_written = send(_sock, raw.c_str(), raw.length(), 0); return bytes_written == raw.length(); } bool StratumClient::recv_once() { static uint8_t recv_buf[16000] = {0}; static size_t bytes_in_buf = 0; size_t bytes_remain = sizeof(recv_buf) - bytes_in_buf; if(bytes_remain == 0) { std::cerr << "[StratumClient::recv_once] Receive buffer full" << std::endl; return false; } ssize_t bytes_read = recv(_sock, recv_buf, sizeof(recv_buf), 0); if(bytes_read == 0) { std::cerr << "[StratumClient::recv_once] Connection closed" << std::endl; reconnect(); mining_subscribe(); mining_authorize(); return false; } else if(bytes_read < 0 && errno == EAGAIN) { std::cerr << "[StratumClient::recv_once] No data for now" << std::endl; return false; } else if(bytes_read < 0) { std::cerr << "[StratumClient::recv_once] Connection error" << std::endl; return false; } bytes_in_buf += bytes_read; uint8_t *line_start = recv_buf; uint8_t *line_end; // Loop over \n-terminated lines in recv_buf // https://stackoverflow.com/a/6090610 while ((line_end = (uint8_t*)memchr((void*)line_start, '\n', bytes_in_buf - (line_start - recv_buf)))) { *line_end = 0; std::string json_str(line_start, line_end); try { json msg = json::parse(json_str); if (msg.contains("id") && !msg["id"].is_null()) { std::cout << "[StratumClient::recv_once] Received server->client response " << msg["id"] << ": " << msg << std::endl; _response_map.emplace(msg["id"], msg); } else { std::cout << "[StratumClient::recv_once] Received server->client message: " << msg << " (" << _recv_q.size() << " messages in queue)" << std::endl; _recv_q.push(msg); } } catch (json::exception& e) { std::cerr << "[StratumClient::recv_once] JSON exception: " << e.what() << std::endl; std::cerr << "\t" << json_str << std::endl; }; line_start = line_end + 1; } /* Shift buffer down so the unprocessed data is at the start */ bytes_in_buf -= (line_start - recv_buf); memmove(recv_buf, line_start, bytes_in_buf); return true; } void StratumClient::spin_forever() { unsigned long retries = 0; while(1) { if(!recv_once()) { std::cerr << "[StratumClient::spin_forever] Detected receive error, trying again" << std::endl; retries++; if(retries > 10) { usleep(1e5); } } else { retries = 0; } while(!_recv_q.empty()) { auto& msg = _recv_q.front(); if(msg.contains("method") && msg["method"] == "mining.notify") { std::cout << "[StratumClient::spin_forever] Received mining.notify (" << _work_q.size() << " work already in queue)" << std::endl; stratum_v1::MiningWork work(msg["params"], _xnonce1, _xnonce2_size); std::unique_lock lock(_work_m); if(work.get_clean_jobs()) { // Clear work queue when new block is received std::queue().swap(_work_q); } _work_q.push(work); _work_cond.notify_one(); if(new_work_callback) { new_work_callback(work.get_clean_jobs()); } } else if(msg.contains("method") && msg["method"] == "mining.set_difficulty") { _difficulty = msg["params"][0]; std::cout << "[StratumClient::spin_forever] Received mining.set_difficulty updated to " << _difficulty << std::endl; if(set_difficulty_callback) { set_difficulty_callback(_difficulty); } } else { std::cerr << "[StratumClient::spin_forever] Unexpected message from server: " << msg << std::endl; } _recv_q.pop(); } } } bool StratumClient::work_available() { return !_work_q.empty(); } stratum_v1::MiningWork StratumClient::get_work() { std::unique_lock lock(_work_m); _work_cond.wait(lock, [this]() {return !_work_q.empty(); }); stratum_v1::MiningWork item = _work_q.front(); _work_q.pop(); return item; } double StratumClient::get_difficulty() const { return _difficulty; } } // namespace stratum_v1