diff --git a/lab02/Makefile b/lab02/Makefile new file mode 100644 index 0000000..56b7df5 --- /dev/null +++ b/lab02/Makefile @@ -0,0 +1,36 @@ +CC := gcc +CFLAGS := -Wall +BIN := bin/ +LIBS := -lpthread + +SSOURCES := server.c +SOBJECTS = $(addprefix $(BIN), $(SSOURCES:.c=.o)) + +SERVERNAME := otpserver +SERVERNAME := $(addprefix $(BIN), $(SERVERNAME)) + +CSOURCES := client.c +COBJECTS = $(addprefix $(BIN), $(CSOURCES:.c=.o)) + +CLIENTNAME := otpclient +CLIENTNAME := $(addprefix $(BIN), $(CLIENTNAME)) + + +.PHONY: all +all: $(SERVERNAME) $(CLIENTNAME) + +$(SERVERNAME): $(SOBJECTS) + $(CC) $(CFLAGS) $(SOBJECTS) -o $(SERVERNAME) $(LIBS) + +$(CLIENTNAME): $(COBJECTS) + $(CC) $(CFLAGS) $(COBJECTS) -o $(CLIENTNAME) $(LIBS) + +$(BIN)%.o: %.c | $(BIN) + $(CC) $(CFLAGS) -c $< -o $@ + +$(BIN): + @mkdir -p $(BIN) + +.PHONY: clean +clean: + @rm -rf $(BIN) diff --git a/lab02/client.c b/lab02/client.c new file mode 100644 index 0000000..7a183ca --- /dev/null +++ b/lab02/client.c @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SOCKET_PATH "/tmp/otp_socket" + +#define CMD_ECHO 1 +#define CMD_TIMER 2 + +struct request { + int priority; + int command; + int value; +}; + +void send_fd(int socket, int fd) { + struct msghdr msg = {0}; + + char buf[CMSG_SPACE(sizeof(fd))]; + memset(buf, 0, sizeof(buf)); + + struct iovec io = { .iov_base = (void*)"F", .iov_len = 1 }; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(fd)); + + memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); + + sendmsg(socket, &msg, 0); +} + +int main(int argc, char *argv[]) { + + if (argc < 4) { + printf("Usage: %s \n", argv[0]); + return 1; + } + + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, SOCKET_PATH); + + connect(sock, (struct sockaddr*)&addr, sizeof(addr)); + + int efd = eventfd(0, EFD_NONBLOCK); + + send_fd(sock, efd); + + struct request req; + req.priority = atoi(argv[1]); + req.command = atoi(argv[2]); + req.value = atoi(argv[3]); + + write(sock, &req, sizeof(req)); + + int response; + read(sock, &response, sizeof(response)); + printf("Sync response: %d\n", response); + + struct pollfd pfd = { .fd = efd, .events = POLLIN }; + + printf("Waiting for async notification...\n"); + + poll(&pfd, 1, -1); + + uint64_t val; + read(efd, &val, sizeof(val)); + printf("Async notification received!\n"); + + close(sock); + close(efd); + return 0; +} \ No newline at end of file diff --git a/lab02/server.c b/lab02/server.c new file mode 100644 index 0000000..a857504 --- /dev/null +++ b/lab02/server.c @@ -0,0 +1,156 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SOCKET_PATH "/tmp/otp_socket" +#define MAX_CLIENTS 10 + +#define CMD_ECHO 1 +#define CMD_TIMER 2 + +struct request { + int priority; + int command; + int value; +}; + +typedef struct client_request { + int client_fd; + int event_fd; + struct request req; + struct client_request *next; +} client_request_t; + +client_request_t *queue_head = NULL; +pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; + +void log_msg(const char *msg) { + printf("[SERVER] %s\n", msg); +} + +void enqueue(client_request_t *node) { + pthread_mutex_lock(&queue_mutex); + + if (!queue_head || node->req.priority > queue_head->req.priority) { + node->next = queue_head; + queue_head = node; + } else { + client_request_t *cur = queue_head; + while (cur->next && cur->next->req.priority >= node->req.priority) + cur = cur->next; + node->next = cur->next; + cur->next = node; + } + + pthread_mutex_unlock(&queue_mutex); +} + +client_request_t* dequeue() { + pthread_mutex_lock(&queue_mutex); + client_request_t *node = queue_head; + if (queue_head) + queue_head = queue_head->next; + pthread_mutex_unlock(&queue_mutex); + return node; +} + +void* worker_thread(void *arg) { + while (1) { + client_request_t *node = dequeue(); + if (!node) { + usleep(100000); + continue; + } + + char buffer[128]; + sprintf(buffer, "Processing request with priority %d", node->req.priority); + log_msg(buffer); + + int response = 0; + + if (node->req.command == CMD_ECHO) { + response = node->req.value; + write(node->client_fd, &response, sizeof(response)); + } + else if (node->req.command == CMD_TIMER) { + sleep(node->req.value); + uint64_t val = 1; + write(node->event_fd, &val, sizeof(val)); + response = 0; + write(node->client_fd, &response, sizeof(response)); + } + + free(node); + } + return NULL; +} + +int recv_fd(int socket) { + struct msghdr msg = {0}; + char buf[CMSG_SPACE(sizeof(int))]; + memset(buf, 0, sizeof(buf)); + + struct iovec io = { .iov_base = (void*)"F", .iov_len = 1 }; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = buf; + msg.msg_controllen = sizeof(buf); + + recvmsg(socket, &msg, 0); + + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + int fd; + memcpy(&fd, CMSG_DATA(cmsg), sizeof(fd)); + return fd; +} + +int main() { + int server_fd = socket(AF_UNIX, SOCK_STREAM, 0); + + struct sockaddr_un addr; + unlink(SOCKET_PATH); + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, SOCKET_PATH); + + bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)); + listen(server_fd, MAX_CLIENTS); + + log_msg("Server started"); + + pthread_t worker; + pthread_create(&worker, NULL, worker_thread, NULL); + + while (1) { + int client_fd = accept(server_fd, NULL, NULL); + log_msg("Client connected"); + + int event_fd = recv_fd(client_fd); + + struct request req; + read(client_fd, &req, sizeof(req)); + + client_request_t *node = malloc(sizeof(client_request_t)); + node->client_fd = client_fd; + node->event_fd = event_fd; + node->req = req; + node->next = NULL; + + enqueue(node); + } + + close(server_fd); + unlink(SOCKET_PATH); + return 0; +}