Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider the --interface argument when broadcasting for scheduler discovery #544

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions daemon/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2227,7 +2227,7 @@ bool Daemon::reconnect()

if (!discover || (nullptr == (scheduler = discover->try_get_scheduler()) && discover->timed_out())) {
delete discover;
discover = new DiscoverSched(netname, max_scheduler_pong, schedname, scheduler_port);
discover = new DiscoverSched(daemon_interface, netname, max_scheduler_pong, schedname, scheduler_port);
}

if (!scheduler) {
Expand Down Expand Up @@ -2592,7 +2592,7 @@ int main(int argc, char **argv)
return 1;
}

list<string> nl = get_netnames(200, d.scheduler_port);
list<string> nl = get_netnames(d.daemon_interface, 200, d.scheduler_port);
trace() << "Netnames:" << endl;

for (list<string>::const_iterator it = nl.begin(); it != nl.end(); ++it) {
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2125,7 +2125,7 @@ int main(int argc, char *argv[])

time_t next_listen = 0;

Broadcasts::broadcastSchedulerVersion(scheduler_port, netname, starttime);
Broadcasts::broadcastSchedulerVersion(scheduler_interface, scheduler_port, netname, starttime);
last_announce = starttime;

while (!exit_main_loop) {
Expand All @@ -2139,7 +2139,7 @@ int main(int argc, char *argv[])
their daemons if we are the preferred scheduler (daemons with version new enough
should automatically select the best scheduler, but old daemons connect randomly). */
if (last_announce + 120 < time(nullptr)) {
Broadcasts::broadcastSchedulerVersion(scheduler_port, netname, starttime);
Broadcasts::broadcastSchedulerVersion(scheduler_interface, scheduler_port, netname, starttime);
last_announce = time(nullptr);
}

Expand Down
38 changes: 21 additions & 17 deletions services/comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ static int get_second_port_for_debug( int port )
return secondPort ? secondPort : -1;
}

void Broadcasts::broadcastSchedulerVersion(int scheduler_port, const char* netname, time_t starttime)
void Broadcasts::broadcastSchedulerVersion(const std::string &interface, int scheduler_port, const char* netname, time_t starttime)
{
// Code for older schedulers than version 38. Has endianness problems, the message size
// is not BROAD_BUFLEN and the netname is possibly not null-terminated.
Expand All @@ -1370,7 +1370,7 @@ void Broadcasts::broadcastSchedulerVersion(int scheduler_port, const char* netna
buf[4 + sizeof(uint64_t)] = length_netname;
strncpy(buf + 5 + sizeof(uint64_t), netname, length_netname - 1);
buf[ schedbuflen - 1 ] = '\0';
broadcastData(scheduler_port, buf, schedbuflen);
broadcastData(interface, scheduler_port, buf, schedbuflen);
delete[] buf;
// Latest version.
buf = new char[ BROAD_BUFLEN ];
Expand All @@ -1388,7 +1388,7 @@ void Broadcasts::broadcastSchedulerVersion(int scheduler_port, const char* netna
const int OFFSET = 4 + 2 * sizeof(uint32_t);
snprintf(buf + OFFSET, BROAD_BUFLEN - OFFSET, "%s", netname);
buf[BROAD_BUFLEN - 1] = 0;
broadcastData(scheduler_port, buf, BROAD_BUFLEN);
broadcastData(interface, scheduler_port, buf, BROAD_BUFLEN);
delete[] buf;
}

Expand Down Expand Up @@ -1420,7 +1420,7 @@ void Broadcasts::getSchedulerVersionData( const char* buf, int* protocol, time_t
}

/* Returns a filedesc. or a negative value for errors. */
static int open_send_broadcast(int port, const char* buf, int size)
static int open_send_broadcast(const std::string &interface, int port, const char* buf, int size)
{
int ask_fd;
struct sockaddr_in remote_addr;
Expand Down Expand Up @@ -1466,6 +1466,9 @@ static int open_send_broadcast(int port, const char* buf, int size)
continue;
}

if (!interface.empty() && interface != addr->ifa_name)
continue;

static bool in_tests = getenv( "ICECC_TESTS" ) != nullptr;
if (!in_tests) {
if (ntohl(((struct sockaddr_in *) addr->ifa_addr)->sin_addr.s_addr) == 0x7f000001) {
Expand Down Expand Up @@ -1505,17 +1508,17 @@ static int open_send_broadcast(int port, const char* buf, int size)
return ask_fd;
}

void Broadcasts::broadcastData(int port, const char* buf, int len)
void Broadcasts::broadcastData(const std::string &interface, int port, const char* buf, int len)
{
int fd = open_send_broadcast(port, buf, len);
int fd = open_send_broadcast(interface, port, buf, len);
if (fd >= 0) {
if ((-1 == close(fd)) && (errno != EBADF)){
log_perror("close failed");
}
}
int secondPort = get_second_port_for_debug( port );
if( secondPort > 0 ) {
int fd2 = open_send_broadcast(secondPort, buf, len);
int fd2 = open_send_broadcast(interface, secondPort, buf, len);
if (fd2 >= 0) {
if ((-1 == close(fd2)) && (errno != EBADF)){
log_perror("close failed");
Expand All @@ -1524,9 +1527,10 @@ void Broadcasts::broadcastData(int port, const char* buf, int len)
}
}

DiscoverSched::DiscoverSched(const std::string &_netname, int _timeout,
DiscoverSched::DiscoverSched(const std::string &_interface, const std::string &_netname, int _timeout,
const std::string &_schedname, int port)
: netname(_netname)
: interface(_interface)
, netname(_netname)
, schedname(_schedname)
, timeout(_timeout)
, ask_fd(-1)
Expand Down Expand Up @@ -1567,7 +1571,7 @@ DiscoverSched::DiscoverSched(const std::string &_netname, int _timeout,
netname = ""; // take whatever the machine is giving us
attempt_scheduler_connect();
} else {
sendSchedulerDiscovery( PROTOCOL_VERSION );
sendSchedulerDiscovery(interface, PROTOCOL_VERSION );
}
}

Expand Down Expand Up @@ -1600,14 +1604,14 @@ void DiscoverSched::attempt_scheduler_connect()
}
}

void DiscoverSched::sendSchedulerDiscovery( int version )
void DiscoverSched::sendSchedulerDiscovery(const std::string &interface, int version )
{
assert( version < 128 );
char buf = version;
ask_fd = open_send_broadcast(sport, &buf, 1);
ask_fd = open_send_broadcast(interface, sport, &buf, 1);
int secondPort = get_second_port_for_debug( sport );
if( secondPort > 0 )
ask_second_fd = open_send_broadcast(secondPort, &buf, 1);
ask_second_fd = open_send_broadcast(interface, secondPort, &buf, 1);
}

bool DiscoverSched::isSchedulerDiscovery(const char* buf, int buflen, int* daemon_version)
Expand Down Expand Up @@ -1865,7 +1869,7 @@ bool DiscoverSched::get_broad_answer(int ask_fd, int timeout, char *buf2, struct
return true;
}

list<string> DiscoverSched::getNetnames(int timeout, int port)
list<string> DiscoverSched::getNetnames(const std::string &interface, int timeout, int port)
{
list<string> l;
int ask_fd;
Expand All @@ -1874,7 +1878,7 @@ list<string> DiscoverSched::getNetnames(int timeout, int port)
time_t time0 = time(nullptr);

char buf = PROTOCOL_VERSION;
ask_fd = open_send_broadcast(port, &buf, 1);
ask_fd = open_send_broadcast(interface, port, &buf, 1);

do {
char buf2[BROAD_BUFLEN];
Expand All @@ -1900,9 +1904,9 @@ list<string> DiscoverSched::getNetnames(int timeout, int port)
return l;
}

list<string> get_netnames(int timeout, int port)
list<string> get_netnames(const std::string &interface, int timeout, int port)
{
return DiscoverSched::getNetnames(timeout, port);
return DiscoverSched::getNetnames(interface, timeout, port);
}

void Msg::fill_from_channel(MsgChannel *)
Expand Down
14 changes: 8 additions & 6 deletions services/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,15 @@ class Broadcasts
{
public:
// Broadcasts a message about this scheduler and its information.
static void broadcastSchedulerVersion(int scheduler_port, const char* netname, time_t starttime);
static void broadcastSchedulerVersion(const std::string &interface, int scheduler_port, const char* netname, time_t starttime);
// Checks if the data received is a scheduler version broadcast.
static bool isSchedulerVersion(const char* buf, int buflen);
// Reads data from a scheduler version broadcast.
static void getSchedulerVersionData( const char* buf, int* protocol, time_t* time, std::string* netname );
/// Broadcasts the given data on the given port.
static const int BROAD_BUFLEN = 268;
private:
static void broadcastData(int port, const char* buf, int size);
static void broadcastData(const std::string &interface, int port, const char* buf, int size);
};

// --------------------------------------------------------------------------
Expand All @@ -315,7 +315,8 @@ class DiscoverSched
/* Connect to a scheduler waiting max. TIMEOUT seconds.
schedname can be the hostname of a box running a scheduler, to avoid
broadcasting, port can be specified explicitly */
DiscoverSched(const std::string &_netname = std::string(),
DiscoverSched(const std::string &_interface = std::string(),
const std::string &_netname = std::string(),
int _timeout = 2,
const std::string &_schedname = std::string(),
int port = 0);
Expand Down Expand Up @@ -361,7 +362,7 @@ class DiscoverSched

/* Return a list of all reachable netnames. We wait max. WAITTIME
milliseconds for answers. */
static std::list<std::string> getNetnames(int waittime = 2000, int port = 8765);
static std::list<std::string> getNetnames(const std::string &interface = std::string(), int waittime = 2000, int port = 8765);

// Checks if the data is from a scheduler discovery broadcast, returns version of the sending
// daemon is yes.
Expand All @@ -371,6 +372,7 @@ class DiscoverSched

private:
struct sockaddr_in remote_addr;
std::string interface;
std::string netname;
std::string schedname;
int timeout;
Expand All @@ -385,7 +387,7 @@ class DiscoverSched
bool multiple;

void attempt_scheduler_connect();
void sendSchedulerDiscovery( int version );
void sendSchedulerDiscovery(const std::string &interface, int version );
static bool get_broad_answer(int ask_fd, int timeout, char *buf2, struct sockaddr_in *remote_addr,
socklen_t *remote_len);
static void get_broad_data(const char* buf, const char** name, int* version, time_t* start_time);
Expand All @@ -394,7 +396,7 @@ class DiscoverSched

/* Return a list of all reachable netnames. We wait max. WAITTIME
milliseconds for answers. */
std::list<std::string> get_netnames(int waittime = 2000, int port = 8765);
std::list<std::string> get_netnames(const std::string &interface, int waittime = 2000, int port = 8765);

class PingMsg : public Msg
{
Expand Down