/****************************************************************************** globus_job_manager.c Description: Resource Allocation Job Manager CVS Information: $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/source/globus_job_manager.c,v $ $Date: 2002/02/22 16:15:22 $ $Revision: 1.177.2.9 $ $Author: smartin $ ******************************************************************************/ /****************************************************************************** Include header files ******************************************************************************/ #include "globus_common.h" #if HAVE_UTIME_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "sslutils.h" #include "globus_common.h" #include "globus_gram_job_manager.h" #include "globus_gram_protocol.h" #include "globus_rsl.h" #include "globus_gass_file.h" #include "globus_gass_cache.h" #include "globus_gass_copy.h" #include "globus_duct_control.h" #include "globus_rsl_assist.h" #include "globus_io.h" /* * define the Globus object ids * This is regestered as a private enterprise * via IANA * http://www.isi.edu/in-notes/iana/assignments/enterprise-numbers * * iso.org.dod.internet.private.enterprise (1.3.6.1.4.1) * globus 3536 * security 1 * gssapi_ssleay 1 */ static gss_OID_desc gss_mech_oid_globus_gssapi_ssleay = {9, "\x2b\x06\x01\x04\x01\x9b\x50\x01\x01"}; #define GRAM_JOB_MANAGER_STATUS_FILE_SECONDS 600 /****************************************************************************** Type definitions ******************************************************************************/ typedef struct globus_l_gram_output_s { globus_off_t last_written; globus_off_t last_size; char * cache_file; globus_bool_t ok; int poll_frequency; int poll_counter; int idx; } globus_l_gram_output_t; typedef struct globus_l_gram_client_contact_s { char * contact; int job_state_mask; int failed_count; } globus_l_gram_client_contact_t; typedef struct globus_l_gram_conf_values_s { char * type; char * condor_arch; char * condor_os; char * rdn; char * host_dn; char * org_dn; char * gate_dn; char * gate_host; char * gate_port; char * gate_subject; char * host_osname; char * host_osversion; char * host_cputype; char * host_manufacturer; char * x509_cert_dir; char * globus_location; char * tcp_port_range; int num_env_adds; } globus_l_gram_conf_values_t; typedef struct globus_l_jm_http_query_s { void * arg; globus_io_handle_t * handle; globus_byte_t * buf; globus_size_t nbytes; int errorcode; } globus_l_jm_http_query_t; /* Only poll once every GRAM_JOB_MANAGER_POLL_FREQUENCY seconds */ #define GRAM_JOB_MANAGER_POLL_FREQUENCY 10 /* Only do status file cleanup once every * GRAM_JOB_MANAGER_STAT_FREQUENCY seconds */ #define GRAM_JOB_MANAGER_STAT_FREQUENCY 60 #define GRAM_JOB_MANAGER_TTL_LIMIT 60 #define GRAM_JOB_MANAGER_TTL_UPDATE 30 #define GRAM_JOB_MANAGER_COMMIT_TIMEOUT 60 /****************************************************************************** Module specific prototypes ******************************************************************************/ static int globus_l_gram_jm_read(int fd, globus_byte_t *buffer, size_t length); static int globus_l_gram_jm_write(int fd, globus_byte_t *buffer, size_t length); char * globus_i_filename_callback_func(int stdout_flag); static int globus_l_gram_reporting_file_gen(char * request_string, char * job_reporting_file, char * globus_id, char * job_id, int status); static char * globus_l_gram_genfilename(char * prefix, char * path, char * sufix); static int globus_l_gram_stage_file(char *url, char **staged_file_path, int mode); static int globus_l_gram_rsl_env_add(globus_rsl_t * ast_node, char * var, char * value); static int globus_l_gram_duct_environment(int count, char *myjob, char **newvar, char **newval); static void globus_l_gram_client_callback(int status, int failure_code); static int globus_l_gram_request_fill(globus_rsl_t * rsl_tree, globus_gram_jobmanager_request_t * req, globus_bool_t initialize); static int globus_l_gram_request_environment_append(globus_gram_jobmanager_request_t * req, globus_l_gram_conf_values_t * conf); static int globus_l_gram_client_contact_list_free(globus_list_t * contact_list); static void globus_l_gram_check_file_list(int check_fd, globus_list_t * file_list); static void globus_l_gram_flush_file_list(int check_fd, globus_list_t * file_list); static void globus_l_gram_delete_file_list(globus_list_t ** file_list); static int globus_l_gram_check_file(int out_fd, globus_l_gram_output_t * output); static char * globus_l_gram_getenv_var(char * env_var_name, char * default_name); static char * globus_l_gram_user_proxy_relocate(); static globus_bool_t globus_l_gram_status_file_cleanup( globus_abstime_t * time_stop, void * callback_arg); static int globus_l_gram_tokenize(char * command, char ** args, int * n); static void globus_l_gram_conf_values_init(globus_l_gram_conf_values_t * conf); static globus_bool_t globus_l_gram_jm_check_files( globus_abstime_t * time_stop, void * callback_arg); static globus_bool_t globus_l_gram_jm_check_status( globus_abstime_t * time_stop, void * callback_arg); void globus_l_jm_http_query_callback( void * arg, globus_io_handle_t * handle, globus_byte_t * buf, globus_size_t nbytes, int errorcode); globus_bool_t globus_l_jm_http_query_handler( globus_abstime_t * time_stop, void * callback_arg); void globus_l_gram_update_remote_file( int local_fd, int remote_fd, int * position); int globus_l_jm_handle_stdio_update( char * signal_arg); void globus_l_gram_set_state_file( char * hostname, char * uniq_id); int globus_l_gram_write_state_file( int status, int failure_code, char * job_id, char * rsl); globus_bool_t globus_l_gram_proxy_expiration( globus_abstime_t * time_stop, void * callback_arg); globus_bool_t globus_l_gram_ttl_update( globus_abstime_t * time_stop, void * callback_arg); int globus_l_gram_update_state_file( int status, int failure_code); int globus_l_gram_update_state_file_io(); int globus_l_gram_read_state_file( globus_gram_jobmanager_request_t * req, char ** rsl); static int globus_l_jobmanager_fault_callback(void *user_arg, int fault_code); /****************************************************************************** Define variables for external use ******************************************************************************/ extern int errno; /****************************************************************************** Define module specific variables ******************************************************************************/ /* * reason needed * -------------- */ static char * graml_env_x509_user_proxy = NULL; /* security */ static char * graml_env_krb5ccname = NULL; /* security */ static char * graml_env_nlspath = NULL; /* poe fork */ static char * graml_env_logname = NULL; /* all */ static char * graml_env_home = NULL; /* all */ static char * graml_env_tz = NULL; /* all */ /* * other GRAM local variables */ static FILE * graml_log_fp = NULL; static char * graml_job_contact = NULL; static char * graml_env_globus_id = NULL; static globus_rsl_t * graml_rsl_tree = NULL; static int graml_cleanup_print_flag = 1; static globus_bool_t graml_jm_cancel = GLOBUS_FALSE; static globus_bool_t graml_jm_commit_request = GLOBUS_FALSE; static globus_bool_t graml_jm_commit_end = GLOBUS_FALSE; static globus_bool_t graml_jm_request_made = GLOBUS_FALSE; static char * graml_gass_cache_tag = GLOBUS_NULL; static char * graml_job_state_file = GLOBUS_NULL; static char * graml_job_state_file_dir = GLOBUS_NULL; static int graml_commit_time_extend = 0; /* gass cache handle */ static globus_gass_cache_t globus_l_cache_handle; /* structures to manage line-buffered stdout and stderr */ static globus_list_t * globus_l_gram_stdout_files = GLOBUS_NULL; static globus_list_t * globus_l_gram_stderr_files = GLOBUS_NULL; static int globus_l_gram_stdout_fd=-1; static int globus_l_gram_stderr_fd=-1; static int globus_l_gram_stdout_remote_fd=-1; static int globus_l_gram_stderr_remote_fd=-1; static int globus_l_gram_stdout_remote_sent=0; static int globus_l_gram_stderr_remote_sent=0; static globus_bool_t globus_l_gram_merged_stdio = GLOBUS_FALSE; static int globus_l_gram_stdout_size=0; static int globus_l_gram_stderr_size=0; static globus_bool_t globus_l_gram_stdout_ignored = GLOBUS_TRUE; static globus_bool_t globus_l_gram_stderr_ignored = GLOBUS_TRUE; globus_list_t * globus_l_gram_client_contacts = GLOBUS_NULL; static int graml_my_count; static globus_mutex_t graml_api_mutex; static globus_cond_t graml_api_cond; static int graml_stdout_count; static int graml_stderr_count; static globus_bool_t graml_api_mutex_is_initialized = GLOBUS_FALSE; static globus_bool_t graml_jm_done = GLOBUS_FALSE; static globus_bool_t graml_jm_stop = GLOBUS_FALSE; static globus_bool_t graml_jm_can_exit = GLOBUS_TRUE; static globus_list_t * graml_jm_outstanding_connections = GLOBUS_NULL; static globus_bool_t graml_jm_ttl_expired = GLOBUS_FALSE; static globus_bool_t graml_jm_request_failed = GLOBUS_FALSE; static long graml_jm_ttl = 0; static char * graml_remote_io_url = GLOBUS_NULL; static char * graml_remote_io_url_file = GLOBUS_NULL; #define GRAM_LOCK { \ int err; \ assert (graml_api_mutex_is_initialized); \ err = globus_mutex_lock (&graml_api_mutex); assert (!err); \ } #define GRAM_UNLOCK { \ int err; \ err = globus_mutex_unlock (&graml_api_mutex); assert (!err); \ } #define GRAM_TIMED_WAIT(wait_time) { \ globus_abstime_t abs; \ int save_errno; \ abs.tv_sec = time(GLOBUS_NULL) + wait_time; \ abs.tv_nsec = 0; \ while(!graml_jm_done) \ { \ save_errno = globus_cond_timedwait(&graml_api_cond, \ &graml_api_mutex, \ &abs); \ if(save_errno == ETIMEDOUT) \ { \ break; \ } \ } \ } /****************************************************************************** Function: main() Description: Parameters: Returns: ******************************************************************************/ int main(int argc, char **argv) { int i; int x; int tag_index; int rc; int length; int job_state_mask; int save_logfile_always_flag = 0; int save_logfile_on_errors_flag = 0; int krbflag = 0; int tmp_status; int publish_jobs_flag = 0; char *rsl_spec = GLOBUS_NULL; /* Must free! */ char tmp_buffer[256]; char job_reporting_file[512]; char * job_reporting_dir = GLOBUS_NULL; char * home_dir = NULL; char * client_contact_str = GLOBUS_NULL; char * my_host; char * libexecdir; char * final_rsl_spec = GLOBUS_NULL; unsigned short my_port; unsigned long my_pid; unsigned long my_time; FILE * fp; FILE * test_fp; struct stat statbuf; globus_byte_t buffer[GLOBUS_GRAM_PROTOCOL_MAX_MSG_SIZE]; globus_byte_t * reply = NULL; globus_size_t replysize; globus_byte_t * sendbuf; globus_size_t sendsize; globus_rsl_t * rsl_tree = NULL; globus_gass_cache_entry_t * cache_entries; int cache_size; globus_symboltable_t * symbol_table = NULL; globus_gram_jobmanager_request_t * request; globus_l_gram_client_contact_t * client_contact_node; globus_l_gram_conf_values_t conf; globus_result_t error; globus_callback_handle_t gass_poll_handle; globus_callback_handle_t stat_cleanup_poll_handle; globus_callback_handle_t ttl_update_handle; globus_callback_handle_t proxy_expiration_handle; char * sleeptime_str; long sleeptime; int debugging_without_client = 0; int sent_request_failure = GLOBUS_FALSE; /* gssapi */ OM_uint32 major_status = 0; OM_uint32 minor_status = 0; int token_status = 0; gss_ctx_id_t context_handle = GSS_C_NO_CONTEXT; #if 0 /* Unused */ char tmp_version[64]; #endif size_t jrbuf_size; int args_fd=0; //*********** GRIDBANK MODIFICATION ************************* //Variables for recording resource usage information struct rusage resource_usage; FILE* resource_usage_file; char* resource_usage_filename; char* rur_globus_id = GLOBUS_NULL; char* char_ptr = NULL; char* slashch = NULL; time_t wallclock_time; FILE* job_request_file;//for multiple jobs char* job_request_filename = NULL; int rur_seq_no = 0; int request_no = 0; char request_buffer[100]; memset(request_buffer,0,sizeof(request_buffer)); //*********** GRIDBANK MODIFICATION END ************************* /* * Stdin and stdout point at socket to client * Make sure no buffering. * stderr may also, depending on the option in the grid-services */ setbuf(stdout,NULL); /* Initialize modules that I use */ rc = globus_module_activate(GLOBUS_COMMON_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "common module activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_IO_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "io activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_GRAM_PROTOCOL_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gram protocol activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_GASS_COPY_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gass_copy activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_GASS_CACHE_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gass_cache activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_GASS_FILE_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gass_file activation failed with rc=%d\n", rc); exit(1); } rc = globus_module_activate(GLOBUS_DUCT_CONTROL_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "%s activation failed with rc=%d\n", GLOBUS_DUCT_CONTROL_MODULE->module_name, rc); exit(1); } rc = globus_module_activate(GLOBUS_GRAM_JOBMANAGER_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "%s activation failed with rc=%d\n", GLOBUS_GRAM_JOBMANAGER_MODULE->module_name, rc); exit(1); } globus_nexus_enable_fault_tolerance( globus_l_jobmanager_fault_callback, GLOBUS_NULL); if (! graml_api_mutex_is_initialized) { /* initialize mutex which makes the client thread-safe */ int err; err = globus_mutex_init (&graml_api_mutex, NULL); assert (!err); err = globus_cond_init (&graml_api_cond, NULL); assert (!err); graml_api_mutex_is_initialized = GLOBUS_TRUE; } GRAM_LOCK; *job_reporting_file = '\0'; /* if -conf is passed then get the arguments from the file * specified */ if (argc > 2 && !strcmp(argv[1],"-conf")) { char ** newargv; char * newbuf; int newargc = 52; int pfd; newargv = (char**) malloc(newargc * sizeof(char *)); /* not freeded */ newargv[0] = argv[0]; /* get file length via fseek & ftell */ if ((fp = fopen(argv[2], "r")) == NULL) { fprintf(stderr, "failed to open configuration file\n"); exit(1); } fseek(fp, 0, SEEK_END); length = ftell(fp); if (length <=0) { fprintf(stderr,"failed to determine length of configuration file\n"); exit(1); } fclose(fp); pfd = open(argv[2],O_RDONLY); newbuf = (char *) malloc(length+1); /* dont free */ i = read(pfd, newbuf, length); if (i < 0) { fprintf(stderr, "Unable to read parameters from configuration " "file\n"); exit(1); } newbuf[i] = '\0'; close(pfd); newargv[0] = argv[0]; newargc--; globus_l_gram_tokenize(newbuf, &newargv[1], &newargc); for (i=3; i,\n" "\n" "Note: if type=condor then\n" " -condor-os & -condor-arch are required.\n" "\n"); exit(1); } else { fprintf(stderr, "Warning: Ignoring unknown argument %s\n\n", argv[i]); } } if (globus_jobmanager_request_init(&request) != GLOBUS_SUCCESS) { fprintf(stderr, "ERROR: globus_jobmanager_request_init() failed.\n"); exit(1); } graml_env_home = globus_l_gram_getenv_var("HOME", NULL); if (!graml_env_home) { /* we have to have HOME because we might need it for the default * directory for the user's job */ fprintf(stderr, "ERROR: unable to get HOME from the environment.\n"); exit(1); } if ((sleeptime_str = globus_libc_getenv("GLOBUS_JOB_MANAGER_SLEEP"))) { sleeptime = atoi(sleeptime_str); globus_libc_usleep(sleeptime * 1000 * 1000); } if (save_logfile_always_flag || save_logfile_on_errors_flag) { /* * Open the gram logfile just for testing! */ sprintf(tmp_buffer, "%s/gram_job_mgr_%lu.log", graml_env_home, (unsigned long) getpid()); if ((request->jobmanager_log_fp = fopen(tmp_buffer, "a")) == NULL) { sprintf(tmp_buffer, "/tmp/gram_job_mgr_%lu.log", (unsigned long) getpid()); if ((request->jobmanager_log_fp = fopen(tmp_buffer, "a")) == NULL) { fprintf(stderr, "JM: Cannot open gram logfile.\n"); } else { sprintf(tmp_buffer, "/dev/null"); } } } else { /* don't write a log file */ sprintf(tmp_buffer, "/dev/null"); } if (!request->jobmanager_log_fp) { request->jobmanager_log_fp = fopen("/dev/null", "w"); } setbuf(request->jobmanager_log_fp,NULL); graml_log_fp = request->jobmanager_log_fp; request->jobmanager_logfile = (char *) globus_libc_strdup(tmp_buffer); globus_jobmanager_log( request->jobmanager_log_fp, "-----------------------------------------\n"); globus_jobmanager_log( request->jobmanager_log_fp, "JM: Entering gram_job_manager main().\n"); if (conf.type == GLOBUS_NULL) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Jobmanager service misconfigured. " "jobmanager Type not defined.\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_GATEKEEPER_MISCONFIGURED); } if (! conf.rdn) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: -rdn parameter required\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_GATEKEEPER_MISCONFIGURED); } request->jobmanager_type = (char *) globus_libc_strdup(conf.type); if (strcasecmp(request->jobmanager_type, "condor") == 0) { if (conf.condor_arch == NULL) { globus_jobmanager_log( request->jobmanager_log_fp, "JMI: Condor_arch must be specified when " "jobmanager type is condor\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_CONDOR_ARCH); } if (conf.condor_os == NULL) { globus_jobmanager_log( request->jobmanager_log_fp, "JMI: Condor_os must be specified when " "jobmanager type is condor\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_CONDOR_OS); } request->condor_arch = conf.condor_arch; request->condor_os = conf.condor_os; } /* tell the API to use this callback function for filenames */ request->filename_callback_func = (globus_gram_job_manager_callback_func_t) globus_i_filename_callback_func; globus_jobmanager_log( request->jobmanager_log_fp, "JM: HOME = %s\n", graml_env_home); graml_env_logname = globus_l_gram_getenv_var("LOGNAME", "noname"); graml_env_globus_id = globus_l_gram_getenv_var("GLOBUS_ID", "unknown globusid"); /* * Getting environment variables to be added to the job's environment. * LOGNAME and HOME will be added as well */ conf.x509_cert_dir = globus_l_gram_getenv_var("X509_CERT_DIR", NULL); graml_env_krb5ccname = globus_l_gram_getenv_var("KRB5CCNAME", NULL); graml_env_nlspath = globus_l_gram_getenv_var("NLSPATH", NULL); graml_env_tz = globus_l_gram_getenv_var("TZ", NULL); if (conf.x509_cert_dir) { conf.num_env_adds++; } if (conf.tcp_port_range) { globus_libc_setenv("GLOBUS_TCP_PORT_RANGE", conf.tcp_port_range, GLOBUS_TRUE); conf.num_env_adds++; } /* * Getting the paths to the (relocatable) deploy and install trees. */ if (home_dir) conf.globus_location = globus_libc_strdup(home_dir); else { error = globus_location(&conf.globus_location); if (error != GLOBUS_SUCCESS) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: globus_location failed \n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_GATEKEEPER_MISCONFIGURED); } } globus_jobmanager_log( request->jobmanager_log_fp, "JM: GLOBUS_DEPLOY_PATH = %s\n", (conf.globus_location) ? (conf.globus_location) : "NULL"); /* increment the counter for GLOBUS_LOCATION */ conf.num_env_adds++; globus_libc_setenv("GLOBUS_LOCATION", conf.globus_location, GLOBUS_TRUE); if (libexecdir) { request->jobmanager_libexecdir = globus_l_gram_genfilename(conf.globus_location, libexecdir, NULL); } else { request->jobmanager_libexecdir = globus_l_gram_genfilename(conf.globus_location, "libexec", NULL); } globus_jobmanager_log( request->jobmanager_log_fp, "JM: jobmanager_libexecdir = %s\n", request->jobmanager_libexecdir); if(! debugging_without_client) { char * args_fd_str; args_fd_str = globus_libc_getenv("GRID_SECURITY_HTTP_BODY_FD"); if ((!args_fd_str) || ((args_fd = atoi(args_fd_str)) == 0)) { GRAM_UNLOCK; globus_jobmanager_log( request->jobmanager_log_fp, "JM: Cannot open HTTP Body file\n" ); exit(1); } jrbuf_size = lseek(args_fd, 0, SEEK_END); lseek(args_fd, 0, SEEK_SET); if (jrbuf_size > GLOBUS_GRAM_PROTOCOL_MAX_MSG_SIZE) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: RSL file to big\n"); exit (1); } if (read(args_fd, buffer, jrbuf_size) != jrbuf_size) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Error reading the RSL file\n"); exit (1); } (void *) close(args_fd); rc = globus_gram_protocol_unpack_job_request( buffer, jrbuf_size, &job_state_mask, &client_contact_str, &rsl_spec ); } if (rc == GLOBUS_GRAM_PROTOCOL_ERROR_VERSION_MISMATCH) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: ERROR: globus gram protocol version mismatch!\n"); globus_jobmanager_log( request->jobmanager_log_fp, "JM: gram protocol version = %d\n", GLOBUS_GRAM_PROTOCOL_VERSION); fprintf( stderr, "ERROR: globus gram protocol version mismatch!\n"); fprintf( stderr, "gram job manager version = %d\n", GLOBUS_GRAM_PROTOCOL_VERSION); } else if (rc) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: ERROR: globus gram protocol failure!\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_PROTOCOL_FAILED); } if (!debugging_without_client) { if (globus_gss_assist_import_sec_context( &minor_status, &context_handle, &token_status, -1, request->jobmanager_log_fp) != GSS_S_COMPLETE) { globus_jobmanager_log( request->jobmanager_log_fp, "JM:Failed to load security context\n"); return(GLOBUS_GRAM_PROTOCOL_ERROR_GATEKEEPER_MISCONFIGURED); } globus_jobmanager_log(request->jobmanager_log_fp, "JM: context loaded\n"); } if (client_contact_str!=NULL) { client_contact_node = (globus_l_gram_client_contact_t *) globus_libc_malloc(sizeof(globus_l_gram_client_contact_t)); client_contact_node->contact = client_contact_str; client_contact_node->job_state_mask = job_state_mask; client_contact_node->failed_count = 0; globus_list_insert(&globus_l_gram_client_contacts, (void *) client_contact_node); globus_jobmanager_log( request->jobmanager_log_fp, "JM: client contact = %s\n", client_contact_str); } globus_jobmanager_log( request->jobmanager_log_fp, "JM: rsl_specification = %s\n", rsl_spec); globus_jobmanager_log( request->jobmanager_log_fp, "JM: job status mask = %d\n",job_state_mask); /* create listener port that will be used by client API funcs */ rc = globus_gram_protocol_allow_attach( &my_port, &my_host, (void *) request, globus_l_jm_http_query_callback, GLOBUS_NULL ); if (rc != GLOBUS_SUCCESS) { GRAM_UNLOCK; return GLOBUS_GRAM_PROTOCOL_ERROR_JM_FAILED_ALLOW_ATTACH; } my_pid = getpid(); my_time = time(0); sprintf(tmp_buffer, "https://%s:%hu/%lu/%lu/", my_host, my_port, my_pid, my_time); if (debugging_without_client) { printf("Job Contact: %s\n", tmp_buffer); } graml_job_contact = (char *) globus_libc_strdup (tmp_buffer); globus_libc_setenv("GLOBUS_GRAM_JOB_CONTACT", graml_job_contact, 1); conf.num_env_adds++; graml_gass_cache_tag = (char *) globus_libc_strdup (graml_job_contact); sprintf(tmp_buffer, "%lu.%lu", my_pid, my_time); request->uniq_id = (char *)globus_libc_strdup (tmp_buffer); /* call the RSL routine to parse the user request */ rsl_tree = globus_rsl_parse(rsl_spec); globus_free(rsl_spec); rsl_spec = GLOBUS_NULL; if (!rsl_tree) { rc = GLOBUS_FAILURE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_BAD_RSL; } else { /* printf("\n------------ after parse ---------------\n\n"); * globus_rsl_print_recursive(rsl_tree); */ /* * build symbol table for RSL evaluation. * variable found in the RSL will be replaced with these values. */ symbol_table = (globus_symboltable_t *) globus_libc_malloc (sizeof(globus_symboltable_t)); globus_symboltable_init(symbol_table, globus_hashtable_string_hash, globus_hashtable_string_keyeq); globus_symboltable_create_scope(symbol_table); globus_symboltable_insert(symbol_table, (void *) "HOME", (void *) graml_env_home); globus_symboltable_insert(symbol_table, (void *) "GLOBUS_GRAM_JOB_CONTACT", (void *) graml_job_contact); if (graml_env_logname) globus_symboltable_insert(symbol_table, (void *) "LOGNAME", (void *) graml_env_logname); if (graml_env_globus_id) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_ID", (void *) graml_env_globus_id); if (conf.org_dn) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_ORG_DN", (void *) conf.org_dn); if (conf.rdn) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_GRAM_RDN", (void *) conf.rdn); if (conf.host_dn) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_HOST_DN", (void *) conf.host_dn); if (conf.host_manufacturer) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_HOST_MANUFACTURER", (void *) conf.host_manufacturer); if (conf.host_cputype) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_HOST_CPUTYPE", (void *) conf.host_cputype); if (conf.host_osname) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_HOST_OSNAME", (void *) conf.host_osname); if (conf.host_osversion) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_HOST_OSVERSION", (void *) conf.host_osversion); if (conf.gate_host) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_GATEKEEPER_HOST", (void *) conf.gate_host); if (conf.gate_port) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_GATEKEEPER_PORT", (void *) conf.gate_port); if (conf.gate_subject) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_GATEKEEPER_SUBJECT", (void *) conf.gate_subject); if (conf.condor_os) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_CONDOR_OS", (void *) conf.condor_os); if (conf.condor_arch) globus_symboltable_insert(symbol_table, (void *) "GLOBUS_CONDOR_ARCH", (void *) conf.condor_arch); if (conf.globus_location) { globus_symboltable_insert(symbol_table, (void *) "GLOBUS_LOCATION", (void *) conf.globus_location); globus_symboltable_insert(symbol_table, (void *) "GLOBUS_TOOLS_PATH", (void *) conf.globus_location); globus_symboltable_insert(symbol_table, (void *) "GLOBUS_DEVELOPMENT_PATH", (void *) conf.globus_location); globus_symboltable_insert(symbol_table, (void *) "GLOBUS_SERVICES_PATH", (void *) conf.globus_location); globus_symboltable_insert(symbol_table, (void *) "GLOBUS_INSTALL_PATH", (void *) conf.globus_location); } if (globus_rsl_eval(rsl_tree, symbol_table) != 0) { rc = GLOBUS_FAILURE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_EVALUATION_FAILED; } if (request->jobmanager_log_fp != NULL) { if ((final_rsl_spec = globus_rsl_unparse(rsl_tree)) != GLOBUS_NULL) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: final rsl specification >>>>\n"); globus_jobmanager_log( request->jobmanager_log_fp, "%s\n", final_rsl_spec); globus_jobmanager_log( request->jobmanager_log_fp, "JM: <<<< final rsl specification\n"); } } } if (rc == GLOBUS_SUCCESS) { rc = globus_gass_cache_open(NULL, &globus_l_cache_handle); if( rc != GLOBUS_SUCCESS ) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_CACHE; } } if (rc == GLOBUS_SUCCESS) { /* fill the request structure with values from the RSL */ rc = globus_l_gram_request_fill(rsl_tree, request, GLOBUS_TRUE); } if (rc == GLOBUS_SUCCESS && request->jm_restart != NULL) { char *orig_rsl; globus_rsl_t *orig_rsl_tree; sscanf(request->jm_restart, "https://%*[^:]:%*d/%d/%d/", &my_pid, &my_time); sprintf(tmp_buffer, "https://%s:%hu/%lu/%lu/", my_host, my_port, my_pid, my_time); graml_job_contact = (char *) globus_libc_strdup (tmp_buffer); sprintf(tmp_buffer, "%lu.%lu", my_pid, my_time); request->uniq_id = (char *)globus_libc_strdup (tmp_buffer); globus_l_gram_set_state_file(my_host, request->uniq_id); rc = globus_l_gram_read_state_file(request, &orig_rsl); if (rc == GLOBUS_SUCCESS && (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE || request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED || request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_UNSUBMITTED)) { graml_jm_done = GLOBUS_TRUE; } if (rc == GLOBUS_SUCCESS) { orig_rsl_tree = globus_rsl_parse( orig_rsl ); rc = globus_l_gram_request_fill(orig_rsl_tree, request, GLOBUS_TRUE); free(orig_rsl); } if (rc == GLOBUS_SUCCESS) { rc = globus_l_gram_request_fill(rsl_tree, request, GLOBUS_FALSE); } if (rc == GLOBUS_SUCCESS) { /* memory from the rsl tree is referenced by request globus_rsl_free_recursive(rsl_tree); */ rsl_tree = orig_rsl_tree; } else if (request->failure_code == 0) { request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RESTART_FAILED; } if (request->failure_code != 0) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; } } if (rc == GLOBUS_SUCCESS && graml_remote_io_url != NULL) { int i; FILE *fp; unsigned long timestamp; char *fname; globus_libc_sprintf(tmp_buffer,"%sdev/remote_io_url", graml_gass_cache_tag); rc = globus_gass_cache_add(&globus_l_cache_handle, tmp_buffer, graml_gass_cache_tag, GLOBUS_TRUE, ×tamp, &fname); if (rc == GLOBUS_GASS_CACHE_ADD_EXISTS || rc == GLOBUS_GASS_CACHE_ADD_NEW) { rc = globus_gass_cache_add_done(&globus_l_cache_handle, tmp_buffer, graml_gass_cache_tag, timestamp); if (rc == GLOBUS_SUCCESS) { fp = fopen( fname, "w" ); } if (rc!=GLOBUS_SUCCESS || fp == NULL) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_WRITING_REMOTE_IO_URL; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening remote io url file\n"); } } else { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_WRITING_REMOTE_IO_URL; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error creating remote io url file\n"); } if (rc == GLOBUS_SUCCESS) { graml_remote_io_url_file = (char *)globus_libc_strdup(fname); fprintf( fp, "%s\n", graml_remote_io_url ); fclose( fp ); for(i = 0; request->environment[i] != GLOBUS_NULL; i++) { ; } request->environment = (char **) globus_libc_realloc(request->environment, (i+3) * sizeof(char *)); request->environment[i] = (char *)globus_libc_strdup("GLOBUS_REMOTE_IO_URL"); ++i; request->environment[i] = (char *)globus_libc_strdup(graml_remote_io_url_file); ++i; request->environment[i] = GLOBUS_NULL; } else { free( graml_remote_io_url ); graml_remote_io_url = NULL; } } if (rc == GLOBUS_SUCCESS) { /* * append to the req->environment with values from the conf file */ rc = globus_l_gram_request_environment_append(request, &conf); } if (rc == GLOBUS_SUCCESS) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: opening stdout fd\n"); /* open "real" stdout descriptor */ globus_l_gram_stdout_fd = globus_gass_open(request->my_stdout, O_WRONLY|O_APPEND|O_CREAT, 0777); if (globus_l_gram_stdout_fd < 0) { request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDOUT; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening outfile \n"); } else { if (request->my_stdout_tag != GLOBUS_NULL && request->jm_restart == GLOBUS_NULL) { char * filename; unsigned long timestamp; /* try to add the specific tag to the cache entry */ /* will prevent automatic deletion of the stdout */ /* when the job finish; usefull for "batch jobs" */ /* Use the option Do Not Create: I want to add it */ /* to the cache only if I have an x-gass-cache URL*/ /* (in which case the globus_open(stdout) has */ /* previously created this cache entry. */ rc = globus_gass_cache_add(&globus_l_cache_handle, request->my_stdout, request->my_stdout_tag, GLOBUS_FALSE, ×tamp, &filename); if(rc == GLOBUS_GASS_CACHE_ADD_EXISTS) { rc = globus_gass_cache_add_done(&globus_l_cache_handle, request->my_stdout, request->my_stdout_tag, timestamp); if (rc != GLOBUS_SUCCESS) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDOUT; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error add done stdout tag \n"); } else { globus_libc_free(filename); rc = GLOBUS_SUCCESS; } } else { if (rc != GLOBUS_GASS_CACHE_URL_NOT_FOUND) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDOUT; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error adding stdout tag \n"); } else { rc = GLOBUS_SUCCESS; } } } if (rc == GLOBUS_SUCCESS) { int my_rc; globus_url_scheme_t url_scheme; my_rc = globus_url_get_scheme(request->my_stdout, &url_scheme); if (my_rc == GLOBUS_SUCCESS && url_scheme != GLOBUS_URL_SCHEME_UNKNOWN && url_scheme != GLOBUS_URL_SCHEME_FILE && url_scheme != GLOBUS_URL_SCHEME_X_GASS_CACHE) { char url[1024]; unsigned long timestamp; char *fname; globus_l_gram_stdout_remote_fd = globus_l_gram_stdout_fd; globus_l_gram_stdout_remote_sent=request->stdout_position; globus_libc_sprintf(url,"%sdev/stdout", graml_gass_cache_tag); rc = globus_gass_cache_add(&globus_l_cache_handle, url, graml_gass_cache_tag, GLOBUS_TRUE, ×tamp, &fname); if (rc == GLOBUS_GASS_CACHE_ADD_EXISTS || rc == GLOBUS_GASS_CACHE_ADD_NEW) { rc = globus_gass_cache_add_done(&globus_l_cache_handle, url, graml_gass_cache_tag, timestamp); if (rc == GLOBUS_SUCCESS) globus_l_gram_stdout_fd = globus_gass_open( fname, O_RDWR|O_APPEND|O_CREAT, 0777); if (rc!=GLOBUS_SUCCESS || globus_l_gram_stdout_fd < 0) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDOUT; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening outfile \n"); } } else { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDOUT; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening outfile \n"); } } } } } if ( rc == GLOBUS_SUCCESS && !strcmp( request->my_stdout, request->my_stderr) ) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: merged stdout/stderr\n"); globus_l_gram_merged_stdio = GLOBUS_TRUE; globus_l_gram_stderr_fd = globus_l_gram_stdout_fd; globus_l_gram_stderr_remote_fd = globus_l_gram_stdout_remote_fd; globus_l_gram_stderr_remote_sent = globus_l_gram_stdout_remote_sent; } else { if (rc == GLOBUS_SUCCESS) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: opening stderr fd\n"); /* open "real" stderr descriptor */ globus_l_gram_stderr_fd = globus_gass_open(request->my_stderr, O_WRONLY|O_APPEND|O_CREAT, 0777); if (globus_l_gram_stderr_fd < 0) { request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDERR; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; rc = GLOBUS_FAILURE; } else { if (request->my_stderr_tag != GLOBUS_NULL && request->jm_restart == GLOBUS_NULL) { char * filename; unsigned long timestamp; /* try to add the specific tag to the cache entry */ /* will prevent automatic deletion of the stderr */ /* when the job finish; usefull for "batch jobs" */ /* Use the option Do Not Create: I want to add it */ /* to the cache only if I have an x-gass-cache URL*/ /* (in which case the globus_open(stderr) has */ /* previously created this cache entry. */ rc = globus_gass_cache_add(&globus_l_cache_handle, request->my_stderr, request->my_stderr_tag, GLOBUS_FALSE, ×tamp, &filename); if(rc == GLOBUS_GASS_CACHE_ADD_EXISTS) { rc = globus_gass_cache_add_done(&globus_l_cache_handle, request->my_stderr, request->my_stderr_tag, timestamp); if (rc != GLOBUS_SUCCESS) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDERR; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error add done stderr tag \n"); } else { globus_free(filename); rc = GLOBUS_SUCCESS; } } else { if (rc != GLOBUS_GASS_CACHE_URL_NOT_FOUND) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDERR; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error adding stderr tag \n"); } else { rc = GLOBUS_SUCCESS; } } } if (rc == GLOBUS_SUCCESS) { int my_rc; globus_url_scheme_t url_scheme; my_rc = globus_url_get_scheme(request->my_stderr, &url_scheme); if (my_rc == GLOBUS_SUCCESS && url_scheme != GLOBUS_URL_SCHEME_UNKNOWN && url_scheme != GLOBUS_URL_SCHEME_FILE && url_scheme != GLOBUS_URL_SCHEME_X_GASS_CACHE) { char url[1024]; unsigned long timestamp; char *fname; globus_l_gram_stderr_remote_fd = globus_l_gram_stderr_fd; globus_l_gram_stderr_remote_sent=request->stderr_position; globus_libc_sprintf(url,"%sdev/stderr", graml_gass_cache_tag); rc = globus_gass_cache_add(&globus_l_cache_handle, url, graml_gass_cache_tag, GLOBUS_TRUE, ×tamp, &fname); if (rc == GLOBUS_GASS_CACHE_ADD_EXISTS || rc == GLOBUS_GASS_CACHE_ADD_NEW) { rc = globus_gass_cache_add_done(&globus_l_cache_handle, url, graml_gass_cache_tag, timestamp); if (rc == GLOBUS_SUCCESS) globus_l_gram_stderr_fd = globus_gass_open( fname, O_RDWR|O_APPEND|O_CREAT, 0777); if (rc!=GLOBUS_SUCCESS || globus_l_gram_stderr_fd < 0) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDERR; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening stderr fd\n"); } } else { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_OPENING_STDERR; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error opening stderr fd\n"); } } } } } } if (rc == GLOBUS_SUCCESS && request->jm_restart != NULL) { globus_list_t *tmp_list; globus_off_t stdout_size = 0; globus_off_t stderr_size = 0; tmp_list = globus_l_gram_stdout_files; while(!globus_list_empty(tmp_list)) { globus_l_gram_output_t *output_handle = (globus_l_gram_output_t *) globus_list_first(tmp_list); stdout_size += output_handle->last_written; tmp_list = globus_list_rest(tmp_list); } tmp_list = globus_l_gram_stderr_files; while(!globus_list_empty(tmp_list)) { globus_l_gram_output_t *output_handle = (globus_l_gram_output_t *) globus_list_first(tmp_list); stderr_size += output_handle->last_written; tmp_list = globus_list_rest(tmp_list); } if ( globus_l_gram_merged_stdio ) { stdout_size += stderr_size; ftruncate(globus_l_gram_stdout_fd, stdout_size); lseek(globus_l_gram_stdout_fd, stdout_size, SEEK_SET); } else { ftruncate(globus_l_gram_stdout_fd, stdout_size); lseek(globus_l_gram_stdout_fd, stdout_size, SEEK_SET); ftruncate(globus_l_gram_stderr_fd, stderr_size); lseek(globus_l_gram_stderr_fd, stderr_size, SEEK_SET); } } if (!krbflag) { if (rc == GLOBUS_SUCCESS && ! debugging_without_client) { gss_OID_set mechs; int present = 0; /* * relocate the user proxy to the gass cache and * return the local file name. */ globus_jobmanager_log( request->jobmanager_log_fp, "JM: user proxy relocation\n"); /* * Figure out if we're using GSI */ major_status = gss_indicate_mechs(&minor_status, &mechs); if(major_status == GSS_S_COMPLETE) { major_status = gss_test_oid_set_member( &minor_status, &gss_mech_oid_globus_gssapi_ssleay, mechs, &present); if(major_status != GSS_S_COMPLETE) { present = 0; } gss_release_oid_set(&minor_status, &mechs); } /* If so, relocate our delegated proxy */ if (present) { graml_env_x509_user_proxy = globus_l_gram_user_proxy_relocate(request); globus_jobmanager_log( request->jobmanager_log_fp, "JM: GSSAPI type is GSI\n"); if ((!graml_env_x509_user_proxy)) { request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_USER_PROXY_NOT_FOUND; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; rc = GLOBUS_FAILURE; } else { for(x = 0; request->environment[x] != GLOBUS_NULL; x++) { ; } request->environment = (char **) globus_libc_realloc(request->environment, (x+3) * sizeof(char *)); request->environment[x] = "X509_USER_PROXY"; ++x; request->environment[x] = graml_env_x509_user_proxy; ++x; request->environment[x] = GLOBUS_NULL; } } } else { graml_env_x509_user_proxy = (char *) getenv("X509_USER_PROXY"); if (graml_env_x509_user_proxy) { if (remove(graml_env_x509_user_proxy) != 0) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Cannot remove user proxy file --> %s\n", graml_env_x509_user_proxy); } else { globus_jobmanager_log( request->jobmanager_log_fp, "JM: request failed at startup removed user proxy --> %s\n", graml_env_x509_user_proxy); } } } } /*krbflag */ if (graml_env_x509_user_proxy) { proxy_cred_desc *pcd = NULL; time_t time_after; time_t time_now; ASN1_UTCTIME *asn1_time = NULL; globus_reltime_t delay_time; globus_libc_setenv( "X509_USER_PROXY", graml_env_x509_user_proxy, GLOBUS_TRUE ); globus_jobmanager_log( request->jobmanager_log_fp, "JM: set JM env X509_USER_PROXY to point to %s\n", graml_env_x509_user_proxy); /* read in proxy */ /* initialize SSLeay and the error strings */ ERR_load_prxyerr_strings(0); SSLeay_add_ssl_algorithms(); /* Load proxy */ pcd = proxy_cred_desc_new(); if (pcd && proxy_load_user_cert(pcd, graml_env_x509_user_proxy, NULL, NULL)) { /* Freeing this struct seems to screw up other stuff that uses X509 proxy_cred_desc_free(pcd); */ pcd = NULL; } if (pcd && (pcd->upkey = X509_get_pubkey(pcd->ucert)) == NULL) { /* Freeing this struct seems to screw up other stuff that uses X509 proxy_cred_desc_free(pcd); */ pcd = NULL; } if (pcd) { /* validity: set time_diff to time to expiration (in seconds) */ asn1_time = ASN1_UTCTIME_new(); X509_gmtime_adj(asn1_time,0); time_now = ASN1_UTCTIME_mktime(asn1_time); time_after = ASN1_UTCTIME_mktime(X509_get_notAfter(pcd->ucert)); if ((time_after - time_now) - 300 <= 0) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_USER_PROXY_EXPIRED; rc = GLOBUS_FAILURE; globus_jobmanager_log(request->jobmanager_log_fp, "JM: user proxy lifetime is less than minimum (5 minutes)\n"); } else { /* set timer */ GlobusTimeReltimeSet(delay_time, (time_after - time_now) - 300, 0); globus_callback_register_oneshot(&proxy_expiration_handle, &delay_time, globus_l_gram_proxy_expiration, (void *)request, GLOBUS_NULL, GLOBUS_NULL); } /* Freeing this struct seems to screw up other stuff that uses X509 proxy_cred_desc_free(pcd); */ } else { globus_jobmanager_log(request->jobmanager_log_fp, "JM: problem reading user proxy\n"); } } if (rc == GLOBUS_SUCCESS && request->save_state == GLOBUS_TRUE) { char *pos; globus_reltime_t delay_time; globus_reltime_t period_time; if ( graml_job_state_file == NULL ) globus_l_gram_set_state_file( my_host, request->uniq_id ); if ((final_rsl_spec = globus_rsl_unparse(rsl_tree)) == GLOBUS_NULL) final_rsl_spec = (char *) globus_libc_strdup("RSL UNKNOWN"); rc = globus_l_gram_write_state_file(request->status, request->failure_code, request->job_id, final_rsl_spec); if (rc != GLOBUS_SUCCESS) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_WRITING_STATE_FILE; rc = GLOBUS_FAILURE; globus_jobmanager_log( request->jobmanager_log_fp, "JM: error writing the state file\n"); } GlobusTimeReltimeSet(delay_time, 0, 0); GlobusTimeReltimeSet(period_time, GRAM_JOB_MANAGER_TTL_UPDATE, 0); globus_callback_register_periodic(&ttl_update_handle, &delay_time, &period_time, globus_l_gram_ttl_update, (void *)GRAM_JOB_MANAGER_TTL_LIMIT, GLOBUS_NULL, GLOBUS_NULL); } if (request->two_phase_commit != 0) { int my_rc; /* Send reply to submitter before the actual request if we're * doing 2-phase commit. */ if(!debugging_without_client) { my_rc = globus_gram_protocol_pack_job_request_reply( (rc == GLOBUS_SUCCESS) ? GLOBUS_GRAM_PROTOCOL_ERROR_WAITING_FOR_COMMIT : request->failure_code, (rc == GLOBUS_SUCCESS) ? graml_job_contact : GLOBUS_NULL, &reply, &replysize); if (my_rc==GLOBUS_SUCCESS) { my_rc = globus_gram_protocol_frame_reply( 200, reply, replysize, &sendbuf, &sendsize); } if (my_rc!=GLOBUS_SUCCESS) { my_rc = globus_gram_protocol_frame_reply( 400, GLOBUS_NULL, 0, &sendbuf, &sendsize); } } if (reply) globus_libc_free(reply); globus_jobmanager_log( request->jobmanager_log_fp, "JM: before sending to client: rc=%d (%s)\n", rc, globus_gram_protocol_error_string(my_rc)); if (my_rc == GLOBUS_SUCCESS && !debugging_without_client) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: sending to client;\n"); for (i=0; ijobmanager_log_fp, "%c", sendbuf[i] ); globus_jobmanager_log( request->jobmanager_log_fp, "-------------\n"); /* send this reply back down the socket to the client */ major_status = globus_gss_assist_wrap_send( &minor_status, context_handle, (char *) sendbuf, sendsize, &token_status, globus_gss_assist_token_send_fd, stdout, request->jobmanager_log_fp); globus_jobmanager_log( request->jobmanager_log_fp, "JM: major=%x minor=%x\n", major_status, minor_status); /* * close the connection (both stdin and stdout are connected * to the socket) */ close(0); close(1); /* * Reopen stdin and stdout to /dev/null (the jobmanager library * expects them to be open). */ open("/dev/null",O_RDONLY); open("/dev/null",O_WRONLY); globus_libc_free(sendbuf); } if ( rc == GLOBUS_SUCCESS ) { rc = my_rc; } /* * If the reply reported failure or wasn't sent, suppress a later * callback for the some failure. */ if(rc != GLOBUS_SUCCESS) { sent_request_failure = GLOBUS_TRUE; } } fflush(request->jobmanager_log_fp); if (rc == GLOBUS_SUCCESS && request->two_phase_commit != 0) { int start_time = time(GLOBUS_NULL); int save_errno; while(!graml_jm_commit_request && !graml_jm_cancel) { globus_abstime_t timeout; timeout.tv_sec = start_time + request->two_phase_commit + graml_commit_time_extend; timeout.tv_nsec = 0; save_errno = globus_cond_timedwait(&graml_api_cond, &graml_api_mutex, &timeout); if(save_errno == ETIMEDOUT) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: timed out waiting for commit signal\n"); rc = GLOBUS_FAILURE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_COMMIT_TIMED_OUT; break; } } if (graml_jm_cancel) { rc = GLOBUS_FAILURE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_USER_CANCELLED; } } if (graml_jm_ttl_expired) { rc = GLOBUS_FAILURE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_TTL_EXPIRED; } if (rc == GLOBUS_SUCCESS && !graml_jm_done) { if (request->save_state == GLOBUS_TRUE) { globus_l_gram_write_state_file(GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED, GLOBUS_GRAM_PROTOCOL_ERROR_SUBMIT_UNKNOWN, request->job_id, final_rsl_spec); } graml_rsl_tree = rsl_tree; rc = globus_jobmanager_request(request); if ( rc == GLOBUS_SUCCESS ) { //*********** GRIDBANK MODIFICATION ************************* wallclock_time = time(&wallclock_time);//record/note start wallclock time //Resource Usage Record: job request acknowledgement rur_globus_id = globus_libc_strdup(graml_env_globus_id); //replace '/' and ' ' (space) characters with '_' for filename purposes char_ptr = rur_globus_id; slashch = strchr(char_ptr, '/'); *slashch = '_'; char_ptr = slashch; char_ptr++; while(char_ptr != '\0') { slashch = strchr(char_ptr, '/'); if(slashch==NULL) break; *slashch = '_'; char_ptr = slashch; char_ptr++; } char_ptr = rur_globus_id; slashch = strchr(char_ptr, ' '); *slashch = '_'; char_ptr = slashch; char_ptr++; while(char_ptr != '\0') { slashch = strchr(char_ptr, ' '); if(slashch==NULL) break; *slashch = '_'; char_ptr = slashch; char_ptr++; } job_request_filename = (char*)malloc(strlen(rur_globus_id)+15); memset(job_request_filename,0,strlen(rur_globus_id)+15); strcpy(job_request_filename,"/tmp/"); strcat(job_request_filename,rur_globus_id); strcat(job_request_filename,".requests"); job_request_file = fopen(job_request_filename,"r+"); if(job_request_file!=NULL) { while(fgets(request_buffer, sizeof(request_buffer), job_request_file)) { char_ptr = request_buffer; while(*char_ptr != ':') char_ptr++; *char_ptr = 0; //FILE* temp = fopen("/tmp/debug","a"); //fprintf(temp,"%s",request_buffer); request_no = atoi(request_buffer); if(request_no > rur_seq_no) rur_seq_no = request_no; } rur_seq_no++; job_request_file = freopen(job_request_filename, "a", job_request_file); fprintf(job_request_file,"%i:%s:%s", rur_seq_no, request->uniq_id, asctime(gmtime(&wallclock_time))); fflush(job_request_file); fclose(job_request_file); } else//file does not exist { job_request_file = fopen(job_request_filename,"w"); fprintf(job_request_file,"1:%s:%s", request->uniq_id, asctime(gmtime(&wallclock_time))); rur_seq_no = 1; fflush(job_request_file); fclose(job_request_file); } //end of job request acknowledgement for resource usage record //*********** GRIDBANK MODIFICATION END ************************* graml_jm_request_made = GLOBUS_TRUE; } } if (rc == GLOBUS_SUCCESS && request->save_state == GLOBUS_TRUE) { globus_l_gram_write_state_file(request->status, request->failure_code, request->job_id, final_rsl_spec); } /* * Send reply with the job contact or error status */ if (rc == GLOBUS_SUCCESS) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: request was successful, sending message to client\n"); /* This should probably go somewhere else! */ if ((request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE) || (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED) || (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_UNSUBMITTED)) { graml_jm_done = GLOBUS_TRUE; } /* On restart, report an unsubmitted job as a failure condition */ if (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_UNSUBMITTED && request->jm_restart != NULL) { request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_JOB_UNSUBMITTED; } } else { globus_jobmanager_log( request->jobmanager_log_fp, "JM: request failed with error %d (%s), " "sending message to client\n", request->failure_code, globus_gram_protocol_error_string(request->failure_code)); graml_jm_request_failed = GLOBUS_TRUE; request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; if (globus_l_gram_stdout_fd != -1) { globus_l_gram_delete_file_list(&globus_l_gram_stdout_files); globus_gass_close(globus_l_gram_stdout_fd); } if (globus_l_gram_stderr_fd != -1 && !globus_l_gram_merged_stdio) { globus_l_gram_delete_file_list(&globus_l_gram_stderr_files); globus_gass_close(globus_l_gram_stderr_fd); } if (globus_l_gram_stdout_remote_fd != -1) { globus_gass_close(globus_l_gram_stdout_remote_fd); } if (globus_l_gram_stderr_remote_fd != -1 && !globus_l_gram_merged_stdio) { globus_gass_close(globus_l_gram_stderr_remote_fd); } } if (request->two_phase_commit == 0) { if(!debugging_without_client) { if (graml_jm_request_failed) sent_request_failure = GLOBUS_TRUE; rc = globus_gram_protocol_pack_job_request_reply( (graml_jm_request_failed) ? request->failure_code : GLOBUS_SUCCESS, (graml_jm_request_failed) ? GLOBUS_NULL : graml_job_contact, &reply, &replysize); if (rc==GLOBUS_SUCCESS) { rc = globus_gram_protocol_frame_reply( 200, reply, replysize, &sendbuf, &sendsize); } if (rc!=GLOBUS_SUCCESS) { rc = globus_gram_protocol_frame_reply( 400, GLOBUS_NULL, 0, &sendbuf, &sendsize); } } if (reply) globus_libc_free(reply); globus_jobmanager_log( request->jobmanager_log_fp, "JM: before sending to client: rc=%d (%s)\n", rc, globus_gram_protocol_error_string(rc)); if (rc == GLOBUS_SUCCESS && !debugging_without_client) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: sending to client:\n"); for (i=0; ijobmanager_log_fp, "%c", sendbuf[i] ); globus_jobmanager_log( request->jobmanager_log_fp, "-------------\n"); /* send this reply back down the socket to the client */ major_status = globus_gss_assist_wrap_send( &minor_status, context_handle, (char *) sendbuf, sendsize, &token_status, globus_gss_assist_token_send_fd, stdout, request->jobmanager_log_fp); globus_jobmanager_log( request->jobmanager_log_fp, "JM: major=%x minor=%x\n", major_status, minor_status); /* * close the connection (both stdin and stdout are connected * to the socket) */ close(0); close(1); globus_libc_free(sendbuf); } } if (!graml_jm_request_failed) { globus_reltime_t delay_time; globus_reltime_t period_time; if (!request->job_id) request->job_id = (char *) globus_libc_strdup ("UNKNOWN"); /* send callback with the status */ if(!graml_jm_done) { globus_l_gram_client_callback(request->status, request->failure_code); } /* if we are publishing jobs, then setup the necessary variables */ if (job_reporting_dir) { if ((final_rsl_spec = globus_rsl_unparse(rsl_tree)) == GLOBUS_NULL) final_rsl_spec = (char *) globus_libc_strdup("RSL UNKNOWN"); sprintf( job_reporting_file, "%s/%s_%s.%s", job_reporting_dir, conf.rdn, graml_env_logname, request->job_id ); globus_jobmanager_log( request->jobmanager_log_fp, "JM: job_reporting_file = %s\n", job_reporting_file); globus_l_gram_reporting_file_gen(final_rsl_spec, job_reporting_file, graml_env_globus_id, request->job_id, request->status); } if (request->poll_frequency == 0) { request->poll_frequency = GRAM_JOB_MANAGER_POLL_FREQUENCY; } globus_jobmanager_log( request->jobmanager_log_fp, "JM: poll frequency = %d\n", request->poll_frequency); GlobusTimeReltimeSet(delay_time, 0, 0); GlobusTimeReltimeSet(period_time, GRAM_JOB_MANAGER_STAT_FREQUENCY, 0); globus_callback_register_periodic(&stat_cleanup_poll_handle, &delay_time, &period_time, globus_l_gram_status_file_cleanup, (void *) job_reporting_dir, GLOBUS_NULL, GLOBUS_NULL); GlobusTimeReltimeSet(period_time, 2, 0); globus_callback_register_periodic(&gass_poll_handle, &delay_time, &period_time, globus_l_gram_jm_check_files, (void *) request, GLOBUS_NULL, GLOBUS_NULL); while (!graml_jm_done && !graml_jm_ttl_expired) { /* * The only thing that can wake this up prematurely is a request * from the client to cancel the job. */ GRAM_TIMED_WAIT(request->poll_frequency); /* * stuff may have occurred while we were unlocked, * so we need to poll file descriptors, etc to see * if state change occurred */ if (!graml_jm_done) { /* check if cancel handler was called */ if (job_reporting_dir) { /* touch the file so we know we did not crash */ if ( utime(job_reporting_file, NULL) != 0 ) { if(errno == ENOENT) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: job status file not found, " "rewritting it with current " "status.\n"); globus_l_gram_reporting_file_gen(final_rsl_spec, job_reporting_file, graml_env_globus_id, request->job_id, request->status); } } } rc = globus_jobmanager_request_check(request); if ( rc == GLOBUS_GRAM_JOBMANAGER_STATUS_CHANGED || rc == GLOBUS_GRAM_JOBMANAGER_STATUS_FAILED ) { if (request->save_state == GLOBUS_TRUE) { globus_l_gram_update_state_file( request->status, request->failure_code ); } if (rc == GLOBUS_GRAM_JOBMANAGER_STATUS_FAILED) { /* * unable to get a status for the job. * often the result of a broken poll script. */ globus_jobmanager_request_cancel(request); request->status=GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED; } if ((request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE) || (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED)) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: request check returned DONE or " "FAILED\n"); graml_jm_done = 1; //*********** GRIDBANK MODIFICATION ************************* //************RECORD RESOURCE USAGE************************************* resource_usage_filename = globus_libc_malloc(strlen(graml_env_globus_id) + 50); memset(resource_usage_filename,0, strlen(graml_env_globus_id) + 50 ); strcpy(resource_usage_filename,"/tmp/"); if( strcmp(graml_env_globus_id,"unknown globusid")==0 ) strcat(resource_usage_filename, "Unauthenticated_client.rur");//this should never happen: implies security breach else { strcat(resource_usage_filename,rur_globus_id); strcat(resource_usage_filename,".rur."); memset(request_buffer,0,sizeof(request_buffer)); sprintf(request_buffer,"%i",rur_seq_no); strcat(resource_usage_filename,request_buffer); } if((resource_usage_file=fopen(resource_usage_filename,"a"))==NULL) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: failed to open resource usage file\n"); //fprintf(globus_l_gram_stdout_fd, "Failed to open resource usage file\n"); } if(getrusage(RUSAGE_CHILDREN, &resource_usage)) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: failed to get resource usage\n"); if(resource_usage_file != NULL) fprintf(resource_usage_file, "getrusage failed\n"); } else { fprintf(resource_usage_file,"Resource Usage Record (Globus v2)"); fprintf(resource_usage_file,"\nclient certificate subject name:%s",graml_env_globus_id); fprintf(resource_usage_file,"\nclient contact:%s",client_contact_node->contact); fprintf(resource_usage_file,"\nresource certificate subject name:%s", conf.gate_subject); fprintf(resource_usage_file,"\nresource address:%s",my_host); fprintf(resource_usage_file,"\nresource manufacturer:%s",conf.host_manufacturer); fprintf(resource_usage_file,"\nresource cpu type:%s",conf.host_cputype); fprintf(resource_usage_file,"\nresource os:%s",conf.host_osname); fprintf(resource_usage_file,"\nresource os version:%s",conf.host_osversion); fprintf(resource_usage_file,"\nresource domain name:%s",conf.host_dn); fprintf(resource_usage_file,"\nresource condor os:%s",conf.condor_os); fprintf(resource_usage_file,"\nresource org domain name:%s",conf.org_dn); fprintf(resource_usage_file,"\nresource gate host:%s",conf.gate_host); if( request->status = GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE ) fprintf(resource_usage_file,"\njob status:done"); else fprintf(resource_usage_file,"\njob status:failed : %s", globus_gram_protocol_error_string(request->failure_code)); fprintf(resource_usage_file,"\nexecutable:%s",request->executable); fprintf(resource_usage_file,"\nstart date:%s", asctime(gmtime(&wallclock_time)));//wallclock_time was taken at the time of job submission wallclock_time = time(&wallclock_time);//get current time fprintf(resource_usage_file,"end date:%s", asctime(gmtime(&wallclock_time))); fprintf(resource_usage_file,"jobmanager job id:%s", request->uniq_id); fprintf(resource_usage_file,"\nnumber of processes:%i", request->count); fprintf(resource_usage_file,"\nproject:%s", request->project); fprintf(resource_usage_file,"\nuser time used (s):%i",(int)resource_usage.ru_utime.tv_sec); fprintf(resource_usage_file,"\nuser time used (us):%i",(int)resource_usage.ru_utime.tv_usec); fprintf(resource_usage_file,"\nsystem time used (s):%i",resource_usage.ru_stime.tv_sec); fprintf(resource_usage_file,"\nsystem time used (us):%i",resource_usage.ru_stime.tv_usec); fprintf(resource_usage_file,"\nmaximum resident set size:%i", resource_usage.ru_maxrss); fprintf(resource_usage_file,"\nintegral shared memory size:%i", resource_usage.ru_ixrss); fprintf(resource_usage_file,"\nintegral unshared data size:%i", resource_usage.ru_idrss); fprintf(resource_usage_file,"\nintegral unshared stack size:%i", resource_usage.ru_isrss); fprintf(resource_usage_file,"\npage reclaims:%i", resource_usage.ru_minflt); fprintf(resource_usage_file,"\npage faults:%i", resource_usage.ru_majflt); fprintf(resource_usage_file,"\nswaps:%i", resource_usage.ru_nswap); fprintf(resource_usage_file,"\nblock input operations:%i", resource_usage.ru_inblock); fprintf(resource_usage_file,"\nblock output operations:%i", resource_usage.ru_oublock); fprintf(resource_usage_file,"\nmessages sent:%i", resource_usage.ru_msgsnd); fprintf(resource_usage_file,"\nmessages received:%i", resource_usage.ru_msgrcv); fprintf(resource_usage_file,"\nsignals received:%i", resource_usage.ru_nsignals); fprintf(resource_usage_file,"\nvoluntary context switches:%i", resource_usage.ru_nvcsw); fprintf(resource_usage_file,"\ninvoluntary context switches:%i\n", resource_usage.ru_nivcsw); fclose(resource_usage_file); } globus_libc_free(resource_usage_filename); //*********** GRIDBANK MODIFICATION END ************************* } else { /* send callback of new status * The tmp_status variable is needed because * a cancel request could come in and set the flag * before this callback completes. Also, we cannot * be lock when doing a send_rsr which is done in * the client_callback routine. */ tmp_status = request->status; globus_l_gram_client_callback(tmp_status, request->failure_code); globus_l_gram_reporting_file_gen(final_rsl_spec, job_reporting_file, graml_env_globus_id, request->job_id, request->status); } } } } /* endwhile */ while (!graml_jm_can_exit) { globus_cond_wait(&graml_api_cond, &graml_api_mutex); } globus_callback_unregister(stat_cleanup_poll_handle); globus_callback_unregister(gass_poll_handle); } /* endif */ if (request->save_state == GLOBUS_TRUE) { globus_callback_unregister(ttl_update_handle); } globus_jobmanager_log( request->jobmanager_log_fp, "JM: we're done. doing cleanup\n"); if (!graml_jm_ttl_expired) { /* This may take a while and doesn't affect shared data, so let * any client queries get processed. */ GRAM_UNLOCK; /* Flush remaining stdout and stderr to final destinations */ if (globus_l_gram_stdout_fd != -1) { globus_l_gram_flush_file_list(globus_l_gram_stdout_fd, globus_l_gram_stdout_files); } if (globus_l_gram_stderr_fd != -1) { globus_l_gram_flush_file_list(globus_l_gram_stderr_fd, globus_l_gram_stderr_files); } if ( request->save_state == GLOBUS_TRUE ) { globus_l_gram_update_state_file_io(); } if (globus_l_gram_stdout_remote_fd != -1) { globus_l_gram_update_remote_file(globus_l_gram_stdout_fd, globus_l_gram_stdout_remote_fd, &globus_l_gram_stdout_remote_sent); } if (globus_l_gram_stderr_remote_fd != -1 && !globus_l_gram_merged_stdio) { globus_l_gram_update_remote_file(globus_l_gram_stderr_fd, globus_l_gram_stderr_remote_fd, &globus_l_gram_stderr_remote_sent); } GRAM_LOCK; } /* close stdout and stderr */ if (globus_l_gram_stdout_remote_fd != -1) { globus_gass_close(globus_l_gram_stdout_remote_fd); } if (globus_l_gram_stderr_remote_fd != -1 && !globus_l_gram_merged_stdio) { globus_gass_close(globus_l_gram_stderr_remote_fd); } if (globus_l_gram_stdout_fd != -1) { struct stat file_stat; file_stat.st_size = 0; fstat(globus_l_gram_stdout_fd,&file_stat); globus_l_gram_stdout_size = file_stat.st_size; globus_l_gram_delete_file_list(&globus_l_gram_stdout_files); globus_gass_close(globus_l_gram_stdout_fd); } if (globus_l_gram_stderr_fd != -1 && !globus_l_gram_merged_stdio) { struct stat file_stat; file_stat.st_size = 0; fstat(globus_l_gram_stderr_fd,&file_stat); globus_l_gram_stderr_size = file_stat.st_size; globus_l_gram_delete_file_list(&globus_l_gram_stderr_files); globus_gass_close(globus_l_gram_stderr_fd); } /* set the failure code for the callback */ if (graml_jm_ttl_expired) { request->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_TTL_EXPIRED; } if(graml_jm_ttl_expired || graml_jm_stop || (!sent_request_failure && ((request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE) || (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED) || (request->status == GLOBUS_GRAM_PROTOCOL_JOB_STATE_UNSUBMITTED)))) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: sending final callback.\n"); graml_commit_time_extend = 0; if (graml_jm_ttl_expired || graml_jm_stop) { globus_l_gram_client_callback(GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED, request->failure_code); } else { globus_l_gram_client_callback(request->status, request->failure_code); } if (request->two_phase_commit != 0) { int start_time = time(GLOBUS_NULL); int save_errno; if (request->save_state == GLOBUS_TRUE && request->failure_code != GLOBUS_GRAM_PROTOCOL_ERROR_COMMIT_TIMED_OUT) { globus_l_gram_update_state_file( request->status, request->failure_code ); } globus_jobmanager_log( request->jobmanager_log_fp, "JM: waiting for commit signal\n" ); while(!graml_jm_commit_end && !graml_jm_ttl_expired && !graml_jm_stop && request->failure_code != GLOBUS_GRAM_PROTOCOL_ERROR_COMMIT_TIMED_OUT) { globus_abstime_t timeout; timeout.tv_sec = start_time + request->two_phase_commit + graml_commit_time_extend; timeout.tv_nsec = 0; save_errno = globus_cond_timedwait(&graml_api_cond, &graml_api_mutex, &timeout); if(save_errno == ETIMEDOUT) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: timed out waiting for commit signal\n"); break; } } } /* * Check to see if the job status file exists. If so, then delete it. */ if (stat(job_reporting_file, &statbuf) == 0) { if (remove(job_reporting_file) != 0) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Failed to remove job status file --> %s\n", job_reporting_file); } } } if (globus_l_gram_client_contact_list_free(globus_l_gram_client_contacts) != GLOBUS_SUCCESS) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Error freeing client contact list.\n"); } if (!graml_jm_ttl_expired && !graml_jm_stop && !(request->two_phase_commit != 0 && !graml_jm_commit_end && request->save_state != 0)) { /* clear any other cache entries which contain the gram job id as * the tag */ globus_jobmanager_log( request->jobmanager_log_fp, "JM: Cleaning GASS cache\n"); rc = globus_gass_cache_list(&globus_l_cache_handle, &cache_entries, &cache_size); if(rc == GLOBUS_SUCCESS) { for(i=0; ijobmanager_log_fp, "Trying to clean up with \n", cache_entries[i].url, graml_gass_cache_tag); globus_gass_cache_cleanup_tag(&globus_l_cache_handle, cache_entries[i].url, graml_gass_cache_tag); } } /* for each tags */ } /* for each cache entries */ globus_gass_cache_list_free(cache_entries, cache_size); } /* * Check to see if the job state file exists. If so, then delete it. */ if (graml_job_state_file != NULL && stat(graml_job_state_file, &statbuf) == 0) { if (remove(graml_job_state_file) != 0) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: Failed to remove job statue file --> %s\n", graml_job_state_file); } } } else if (!graml_jm_ttl_expired) { /* * We're leaving the state file behind. Set the TTL in it to the * current time since we're about to exit. */ if ( graml_job_state_file != NULL ) { globus_l_gram_ttl_update( NULL, (void *)0 ); } } globus_gass_cache_close(&globus_l_cache_handle); GRAM_UNLOCK; fflush(request->jobmanager_log_fp); globus_jobmanager_log( request->jobmanager_log_fp, "JM: freeing RSL.\n"); if (graml_rsl_tree) globus_rsl_free_recursive(graml_rsl_tree); globus_jobmanager_log( request->jobmanager_log_fp, "JM: starting deactivate routines.\n"); globus_gram_protocol_callback_disallow(graml_job_contact); rc = globus_module_deactivate(GLOBUS_GRAM_PROTOCOL_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "%s deactivation failed with rc=%d\n", GLOBUS_GRAM_PROTOCOL_MODULE->module_name, rc); exit(1); } rc = globus_module_deactivate(GLOBUS_GRAM_JOBMANAGER_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "%s deactivation failed with rc=%d\n", GLOBUS_GRAM_JOBMANAGER_MODULE->module_name, rc); exit(1); } rc = globus_module_deactivate(GLOBUS_DUCT_CONTROL_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "%s deactivation failed with rc=%d\n", GLOBUS_DUCT_CONTROL_MODULE->module_name, rc); exit(1); } rc = globus_module_deactivate(GLOBUS_GASS_FILE_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gass_file deactivation failed with rc=%d\n", rc); exit(1); } rc = globus_module_deactivate(GLOBUS_GASS_CACHE_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "gass_cache deactivation failed with rc=%d\n", rc); exit(1); } rc = globus_module_deactivate(GLOBUS_GASS_COPY_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "globus_gass_copy deactivation failed with rc=%d\n", rc); exit(1); } /* * make sure we issue any last outstanding queries { globus_list_t * list; globus_gram_protocol_monitor_t * monitor; for (list=graml_jm_outstanding_connections; list; list = globus_list_rest(list)) { monitor = globus_list_first(list); globus_mutex_lock(&monitor->mutex); { while (!monitor->done) globus_cond_wait(&monitor->cond, &monitor->mutex); } globus_mutex_unlock(&monitor->mutex); globus_mutex_destroy(&monitor->mutex); globus_cond_destroy(&monitor->mutex); globus_libc_free(monitor); } if (graml_jm_outstanding_connections) globus_list_free(graml_jm_outstanding_connections); } */ rc = globus_module_deactivate(GLOBUS_IO_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "io deactivation failed with rc=%d\n", rc); exit(1); } rc = globus_module_deactivate(GLOBUS_COMMON_MODULE); if (rc != GLOBUS_SUCCESS) { fprintf(stderr, "common deactivation failed with rc=%d\n", rc); exit(1); } if ( save_logfile_always_flag || (save_logfile_on_errors_flag && graml_jm_request_failed && !request->dry_run) ) { globus_jobmanager_log( request->jobmanager_log_fp, "JM: exiting globus_gram_job_manager.\n"); } else if (strcmp(request->jobmanager_logfile, "/dev/null") != 0) { /* * Check to see if the jm log file exists. If so, then delete it. */ if (stat(request->jobmanager_logfile, &statbuf) == 0) { if (remove(request->jobmanager_logfile) != 0) { fprintf(stderr, "failed to remove job manager log file = %s\n", request->jobmanager_logfile); } } } return(0); } /* main() */ /****************************************************************************** Function: globus_l_gram_conf_values_init() Description: Parameters: Returns: ******************************************************************************/ static void globus_l_gram_conf_values_init( globus_l_gram_conf_values_t * conf ) { if (!conf) return; conf->type = GLOBUS_NULL; conf->condor_arch = GLOBUS_NULL; conf->condor_os = GLOBUS_NULL; conf->rdn = GLOBUS_NULL; conf->host_dn = GLOBUS_NULL; conf->org_dn = GLOBUS_NULL; conf->gate_dn = GLOBUS_NULL; conf->gate_host = GLOBUS_NULL; conf->gate_port = GLOBUS_NULL; conf->gate_subject = GLOBUS_NULL; conf->host_osname = GLOBUS_NULL; conf->host_osversion = GLOBUS_NULL; conf->host_cputype = GLOBUS_NULL; conf->host_manufacturer = GLOBUS_NULL; conf->x509_cert_dir = GLOBUS_NULL; conf->globus_location = GLOBUS_NULL; conf->tcp_port_range = GLOBUS_NULL; conf->num_env_adds = 0; return; } /* globus_l_gram_conf_values_init() */ /****************************************************************************** Function: globus_l_gram_client_callback() Description: Parameters: Returns: ******************************************************************************/ static void globus_l_gram_client_callback(int status, int failure_code) { int rc; globus_byte_t * message; globus_size_t msgsize; globus_list_t * tmp_list; globus_l_gram_client_contact_t * client_contact_node; globus_gram_protocol_monitor_t * monitor; tmp_list = globus_l_gram_client_contacts; message = GLOBUS_NULL; globus_jobmanager_log( graml_log_fp, "JM: %s empty client callback list.\n", (tmp_list) ? ("NOT") : "" ); if (tmp_list) { rc = globus_gram_protocol_pack_status_update_message( graml_job_contact, status, failure_code, &message, &msgsize); if (rc != GLOBUS_SUCCESS) { globus_jobmanager_log( graml_log_fp, "JM: error %d while creating status message\n" ); return; } } while(!globus_list_empty(tmp_list)) { client_contact_node = (globus_l_gram_client_contact_t *) globus_list_first(tmp_list); if ((status & client_contact_node->job_state_mask) && client_contact_node->failed_count < 4) { monitor = (globus_gram_protocol_monitor_t *) globus_libc_malloc(sizeof(globus_gram_protocol_monitor_t)); globus_mutex_init(&monitor->mutex, GLOBUS_NULL); globus_cond_init(&monitor->cond, GLOBUS_NULL); monitor->done = GLOBUS_FALSE; globus_jobmanager_log( graml_log_fp, "JM: sending callback of status %d (failure code %d) to %s.\n", status, failure_code, client_contact_node->contact); rc = globus_gram_protocol_post_and_get( client_contact_node->contact, client_contact_node->contact, GLOBUS_NULL, /* default attr */ message, msgsize, GLOBUS_NULL, /* ignore reply */ GLOBUS_NULL, monitor ); if (rc==GLOBUS_SUCCESS) { globus_list_insert(&graml_jm_outstanding_connections, (void *) monitor); } else /* connect failed, most likely */ { globus_jobmanager_log( graml_log_fp, "JM: callback failed, rc = %d, \"%s\"\n", rc, globus_gram_protocol_error_string (rc)); globus_libc_free(monitor); client_contact_node->failed_count++; } } tmp_list = globus_list_rest (tmp_list); } /* this is safe, as the post() has copied the message to another buffer and framed it with HTTP headers etc. */ if (message) globus_libc_free(message); } /* globus_l_gram_client_callback() */ /****************************************************************************** Function: globus_l_gram_reporting_file_gen() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_reporting_file_gen(char * request_string, char * job_reporting_file, char * globus_id, char * job_id, int status) { FILE * status_fp; char status_str[64]; struct stat statbuf; globus_jobmanager_log( graml_log_fp, "JM: in globus_l_gram_reporting_file_gen\n"); switch(status) { case GLOBUS_GRAM_PROTOCOL_JOB_STATE_PENDING: strcpy(status_str, "PENDING "); break; case GLOBUS_GRAM_PROTOCOL_JOB_STATE_ACTIVE: strcpy(status_str, "ACTIVE "); break; case GLOBUS_GRAM_PROTOCOL_JOB_STATE_FAILED: strcpy(status_str, "FAILED "); break; case GLOBUS_GRAM_PROTOCOL_JOB_STATE_DONE: strcpy(status_str, "DONE "); break; case GLOBUS_GRAM_PROTOCOL_JOB_STATE_SUSPENDED: strcpy(status_str, "SUSPENDED "); break; default: strcpy(status_str, "UNKNOWN "); } if (stat(job_reporting_file, &statbuf) == 0) { /* the file exists, so just update the first line which is the * job status */ if ((status_fp = fopen(job_reporting_file, "r+")) == NULL) { globus_jobmanager_log( graml_log_fp, "JM: Failed opening job status file %s\n", job_reporting_file); return(1); } fprintf(status_fp, "%s\n", status_str); } else { if ((status_fp = fopen(job_reporting_file, "w")) == NULL) { globus_jobmanager_log(graml_log_fp, "JM: Failed opening job status file %s\n", job_reporting_file); return(1); } else { fprintf(status_fp, "%s\n", status_str); fprintf(status_fp, "%s\n", request_string); fprintf(status_fp, "%s\n", graml_job_contact); fprintf(status_fp, "%s\n", job_id); fprintf(status_fp, "%s\n", globus_id); } } fclose(status_fp); return(0); } /* globus_l_gram_reporting_file_gen() */ /****************************************************************************** Function: globus_l_gram_rsl_env_add() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_rsl_env_add(globus_rsl_t * ast_node, char * var, char * value) { globus_rsl_t * tmp_rsl_ptr; globus_list_t * tmp_rsl_list; globus_list_t * new_list; if (globus_rsl_is_boolean(ast_node)) { tmp_rsl_list = globus_rsl_boolean_get_operand_list(ast_node); while (! globus_list_empty(tmp_rsl_list)) { tmp_rsl_ptr = (globus_rsl_t *) globus_list_first (tmp_rsl_list); globus_l_gram_rsl_env_add(tmp_rsl_ptr, var, value); tmp_rsl_list = globus_list_rest(tmp_rsl_list); } } else if (globus_rsl_is_relation(ast_node)) { if (!globus_rsl_is_relation_attribute_equal(ast_node, "environment")) { return(0); } new_list = NULL; globus_list_insert(&new_list, (void *) globus_rsl_value_make_literal(value)); globus_list_insert(&new_list, (void *) globus_rsl_value_make_literal(var)); globus_list_insert( globus_rsl_value_sequence_get_list_ref( globus_rsl_relation_get_value_sequence(ast_node)), (void *) globus_rsl_value_make_sequence(new_list)); return(0); } else { return(1); } return(0); } /* globus_l_gram_rsl_env_add() */ /****************************************************************************** Function: globus_l_gram_request_fill() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_request_fill(globus_rsl_t * rsl_tree, globus_gram_jobmanager_request_t * req, globus_bool_t initialize) { int x; char ** tmp_param; char * gram_myjob; char * staged_file_path; char * ptr; if (rsl_tree == NULL) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_NULL_SPECIFICATION_TREE; return(GLOBUS_FAILURE); } /* Canonize the RSL attributes. This will remove underscores and lowercase * all character. For example, givin the RSL relation "(Max_Time=20)" the * attribute "Max_Time" will be altered in the rsl_tree to be "maxtime". * */ if (globus_rsl_assist_attributes_canonicalize(rsl_tree) != GLOBUS_SUCCESS) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_NULL_SPECIFICATION_TREE; return(GLOBUS_FAILURE); } /********************************** * GET RESTART PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_RESTART_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_RESTART; return(GLOBUS_FAILURE); } if (tmp_param[0]) { req->jm_restart = (tmp_param)[0]; } /********************************** * GET PROGRAM (executable) PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_EXECUTABLE_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_EXECUTABLE; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->executable = (tmp_param)[0]; else if ( req->jm_restart == NULL && initialize == GLOBUS_TRUE ) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_UNDEFINED_EXE; return(GLOBUS_FAILURE); } /********************************** * GET PROGRAM ARGUMENTS PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_MULTI_LITERAL, GLOBUS_GRAM_PROTOCOL_ARGUMENTS_PARAM, &(req->arguments)) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_ARGUMENTS; return(GLOBUS_FAILURE); } /********************************** * GET DIR PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_DIR_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_DIRECTORY; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->directory = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->directory = graml_env_home; /* * change to the right directory, so that std* files * are interpreted relative to this directory */ if (chdir(req->directory) != 0) { globus_jobmanager_log( req->jobmanager_log_fp, "JM: Couldn't change to directory %s\n", req->directory ); req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_BAD_DIRECTORY; return(GLOBUS_FAILURE); } /********************************** * GET STDIN PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_STDIN_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDIN; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->my_stdin = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->my_stdin = GLOBUS_GRAM_PROTOCOL_DEFAULT_STDIN; /********************************** * GET STDOUT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_MULTI_LITERAL, GLOBUS_GRAM_PROTOCOL_STDOUT_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDOUT; return(GLOBUS_FAILURE); } if (tmp_param[0]) { req->my_stdout = tmp_param[0]; if (tmp_param[1]) { req->my_stdout_tag = tmp_param[1]; if (tmp_param[2]) { /* error: stdout can be of the form URL or URL TAG only */ req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDOUT; return(GLOBUS_FAILURE); } } else { req->my_stdout_tag = GLOBUS_NULL; } } else if (initialize == GLOBUS_TRUE) { req->my_stdout = GLOBUS_GRAM_PROTOCOL_DEFAULT_STDOUT; } if (strcmp(req->my_stdout, GLOBUS_GRAM_PROTOCOL_DEFAULT_STDOUT) == 0) { globus_l_gram_stdout_ignored = GLOBUS_TRUE; } else { globus_l_gram_stdout_ignored = GLOBUS_FALSE; } /********************************** * GET STDOUT_POSITION PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_STDOUT_POSITION_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDOUT_POSITION; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_STDOUT_POSITION; return(GLOBUS_FAILURE); } else { req->stdout_position = x; } } else if (initialize == GLOBUS_TRUE) { req->stdout_position = 0; } /********************************** * GET STDERR PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_MULTI_LITERAL, GLOBUS_GRAM_PROTOCOL_STDERR_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDERR; return(GLOBUS_FAILURE); } if (tmp_param[0]) { req->my_stderr = tmp_param[0]; if (tmp_param[1]) { req->my_stderr_tag = tmp_param[1]; if (tmp_param[2]) { /* error: stdout can be of the form URL or URL TAG only */ req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDERR; return(GLOBUS_FAILURE); } } else { req->my_stderr_tag = GLOBUS_NULL; } } else if (initialize == GLOBUS_TRUE) { req->my_stderr = GLOBUS_GRAM_PROTOCOL_DEFAULT_STDERR; } if (strcmp(req->my_stderr, GLOBUS_GRAM_PROTOCOL_DEFAULT_STDERR) == 0) { globus_l_gram_stderr_ignored = GLOBUS_TRUE; } else { globus_l_gram_stderr_ignored = GLOBUS_FALSE; } /********************************** * GET STDERR_POSITION PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_STDERR_POSITION_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_STDERR_POSITION; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_STDERR_POSITION; return(GLOBUS_FAILURE); } else { req->stderr_position = x; } } else if (initialize == GLOBUS_TRUE) { req->stderr_position = 0; } /********************************** * GET COUNT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_COUNT_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_COUNT; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 1) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_COUNT; return(GLOBUS_FAILURE); } else { req->count = x; } } else if (initialize == GLOBUS_TRUE) { req->count = 1; } /* save count parameter for reporting to MDS */ graml_my_count = req->count; /********************************** * GET MIN_MEMORY PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MIN_MEMORY_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MIN_MEMORY; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = (int) strtol(tmp_param[0], &ptr, 10); if (strlen(ptr) > 0 || x < 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_MIN_MEMORY; return(GLOBUS_FAILURE); } else { req->min_memory = x; } } else if (initialize == GLOBUS_TRUE) { req->min_memory = 0; } /********************************** * GET MAX_MEMORY PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MAX_MEMORY_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MAX_MEMORY; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = (int) strtol(tmp_param[0], &ptr, 10); if (strlen(ptr) > 0 || x < 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_MAX_MEMORY; return(GLOBUS_FAILURE); } else { req->max_memory = x; } } else if (initialize == GLOBUS_TRUE) { req->max_memory = 0; } /********************************** * GET MAX_WALL_TIME PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MAX_WALL_TIME_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MAX_WALL_TIME; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 1) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_MAX_WALL_TIME; return(GLOBUS_FAILURE); } else { req->max_wall_time = x; } } else if (initialize == GLOBUS_TRUE) { req->max_wall_time = 0; } /********************************** * GET MAX_CPU_TIME PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MAX_CPU_TIME_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MAX_CPU_TIME; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 1) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_MAX_CPU_TIME; return(GLOBUS_FAILURE); } else { req->max_cpu_time = x; } } else if (initialize == GLOBUS_TRUE) { req->max_cpu_time = 0; } /********************************** * GET MAX_TIME PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MAX_TIME_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MAXTIME; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 1) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_MAXTIME; return(GLOBUS_FAILURE); } else { req->max_time = x; } } else if (initialize == GLOBUS_TRUE) { req->max_time = 0; } /********************************** * GET START_TIME PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_START_TIME_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_START_TIME; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->start_time = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->start_time = GLOBUS_GRAM_PROTOCOL_DEFAULT_START_TIME; /********************************** * GET HOST_COUNT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_HOST_COUNT_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_HOST_COUNT; return(GLOBUS_FAILURE); } if (tmp_param[0]) { x = atoi(tmp_param[0]); if (x < 1) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_HOST_COUNT; return(GLOBUS_FAILURE); } else { req->host_count = x; } } else if (initialize == GLOBUS_TRUE) { req->host_count = 0; } /********************************** * GET PARADYN PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_PARADYN_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_PARADYN; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->paradyn = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->paradyn = NULL; /********************************** * GET JOB_TYPE PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_JOB_TYPE_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_JOBTYPE; return(GLOBUS_FAILURE); } if (tmp_param[0]) { if (strncmp(tmp_param[0], "mpi", 3) == 0) req->job_type = GLOBUS_GRAM_JOBMANAGER_JOBTYPE_MPI; else if (strncmp(tmp_param[0], "single", 6) == 0) req->job_type = GLOBUS_GRAM_JOBMANAGER_JOBTYPE_SINGLE; else if (strncmp(tmp_param[0], "multiple", 8) == 0) req->job_type = GLOBUS_GRAM_JOBMANAGER_JOBTYPE_MULTIPLE; else if (strncmp(tmp_param[0], "condor", 6) == 0) req->job_type = GLOBUS_GRAM_JOBMANAGER_JOBTYPE_CONDOR; else { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_JOBTYPE; return(GLOBUS_FAILURE); } } else if (initialize == GLOBUS_TRUE) { req->job_type = GLOBUS_GRAM_JOBMANAGER_JOBTYPE_MULTIPLE; } /********************************** * GET MYJOB PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_MYJOB_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_MYJOB; return(GLOBUS_FAILURE); } if (tmp_param[0]) { if ((strncmp(tmp_param[0], "collective", 10) != 0) && (strncmp(tmp_param[0], "independent", 11) != 0)) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_GRAM_MYJOB; return(GLOBUS_FAILURE); } gram_myjob = tmp_param[0]; } else if (initialize == GLOBUS_TRUE) gram_myjob = GLOBUS_GRAM_PROTOCOL_DEFAULT_MYJOB; /********************************** * GET DRY_RUN PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_DRY_RUN_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_DRYRUN; return(GLOBUS_FAILURE); } if (tmp_param[0]) if (strncmp(tmp_param[0], "yes", 3) == 0) req->dry_run = GLOBUS_TRUE; else req->dry_run = GLOBUS_FALSE; else if (initialize == GLOBUS_TRUE) req->dry_run = GLOBUS_FALSE; /********************************** * GET SAVE_STATE PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_SAVE_STATE_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_SAVE_STATE; return(GLOBUS_FAILURE); } if (tmp_param[0]) if (strncmp(tmp_param[0], "yes", 3) == 0) req->save_state = GLOBUS_TRUE; else req->save_state = GLOBUS_FALSE; else if (initialize == GLOBUS_TRUE) req->save_state = GLOBUS_FALSE; /********************************** * GET TWO_PHASE_COMMIT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_TWO_PHASE_COMMIT_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_TWO_PHASE_COMMIT; return(GLOBUS_FAILURE); } if (tmp_param[0]) { if (strncmp(tmp_param[0], "yes", 3) == 0) { req->two_phase_commit = GRAM_JOB_MANAGER_COMMIT_TIMEOUT; } else { x = (int) strtol(tmp_param[0], &ptr, 10); if (strlen(ptr) > 0 || x < 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_TWO_PHASE_COMMIT; return(GLOBUS_FAILURE); } else { req->two_phase_commit = x; } } } else if (initialize == GLOBUS_TRUE) { req->two_phase_commit = 0; } /********************************** * GET REMOTE IO URL PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_REMOTE_IO_URL_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_REMOTE_IO_URL; return(GLOBUS_FAILURE); } if (tmp_param[0]) graml_remote_io_url = tmp_param[0]; else if (initialize == GLOBUS_TRUE) graml_remote_io_url = NULL; /********************************** * GET QUEUE PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_QUEUE_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_QUEUE; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->queue = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->queue = NULL; /********************************** * GET RESERVATION HANDLE PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_RESERVATION_HANDLE_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_RESERVATION_HANDLE; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->reservation_handle = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->reservation_handle = NULL; /********************************** * GET PROJECT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SINGLE_LITERAL, GLOBUS_GRAM_PROTOCOL_PROJECT_PARAM, &tmp_param) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_PROJECT; return(GLOBUS_FAILURE); } if (tmp_param[0]) req->project = tmp_param[0]; else if (initialize == GLOBUS_TRUE) req->project = NULL; /********************************** * GET ENVIRONMENT PARAM */ if (globus_rsl_param_get(rsl_tree, GLOBUS_RSL_PARAM_SEQUENCE, GLOBUS_GRAM_PROTOCOL_ENVIRONMENT_PARAM, &(req->environment)) != 0) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_RSL_ENVIRONMENT; return(GLOBUS_FAILURE); } { char *newvar; char *newval; int i; int rc; /* add duct environment string to environment */ rc = globus_l_gram_duct_environment(req->count, gram_myjob, &newvar, &newval); if(rc == GLOBUS_SUCCESS) { for(i = 0; req->environment[i] != GLOBUS_NULL; i++) { ; } req->environment = (char **) globus_libc_realloc(req->environment, (i+3) * sizeof(char *)); req->environment[i] = newvar; ++i; req->environment[i] = newval; ++i; req->environment[i] = GLOBUS_NULL; if (globus_l_gram_rsl_env_add(rsl_tree, newvar, newval) != 0) { globus_jobmanager_log( req->jobmanager_log_fp, "JM: ERROR adding %s to the environment= parameter " "of the RSL.\n", newvar); } } } /* GEM: Stage executable and stdin to local filesystem, if they are URLs. * Do this before paradyn rewriting. */ /* We need to do this on restart to get the local filenames for cached * URLs. No new transfer will be done, but a new tag will be added to * the existing GASS cache entry. */ if (req->jm_restart == NULL || initialize == GLOBUS_FALSE) { if (globus_l_gram_stage_file(req->executable, &staged_file_path, 0700) != GLOBUS_SUCCESS) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_STAGING_EXECUTABLE; return(GLOBUS_FAILURE); } if (staged_file_path) { req->executable = staged_file_path; globus_jobmanager_log( req->jobmanager_log_fp, "JM: executable staged filename is %s\n", staged_file_path); } if (globus_l_gram_stage_file(req->my_stdin, &staged_file_path, 0400) != GLOBUS_SUCCESS) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_STAGING_STDIN; return(GLOBUS_FAILURE); } if (staged_file_path) { req->my_stdin = staged_file_path; globus_jobmanager_log( req->jobmanager_log_fp, "JM: stdin staged filename is %s\n", staged_file_path); } } if (req->jm_restart == NULL) { if (grami_is_paradyn_job(req)) { if (!grami_paradyn_rewrite_params(req)) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_INVALID_PARADYN; return(GLOBUS_FAILURE); } if (globus_l_gram_stage_file(req->executable, &staged_file_path, 0700) != GLOBUS_SUCCESS) { req->failure_code = GLOBUS_GRAM_PROTOCOL_ERROR_STAGING_EXECUTABLE; return(GLOBUS_FAILURE); } if (staged_file_path) { req->executable = staged_file_path; } } } return(GLOBUS_SUCCESS); } /* globus_l_gram_request_fill() */ /****************************************************************************** Function: globus_l_gram_request_environment_append() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_request_environment_append(globus_gram_jobmanager_request_t * req, globus_l_gram_conf_values_t * conf) { int x; /* * if there are no additional environment variables then just return. */ if (conf->num_env_adds < 1) { return(GLOBUS_SUCCESS); } /* * determine the number of environment vars in the request. */ for(x = 0; req->environment[x] != GLOBUS_NULL; x++) { ; } /* * Allocate additional space to hold the default environment variables. */ req->environment = (char **) globus_libc_realloc(req->environment, (conf->num_env_adds*2+x+1) * sizeof(char *)); if (conf->x509_cert_dir) { req->environment[x] = "X509_CERT_DIR"; ++x; req->environment[x] = conf->x509_cert_dir; ++x; } if (graml_job_contact) { req->environment[x] = "GLOBUS_GRAM_JOB_CONTACT"; ++x; req->environment[x] = graml_job_contact; ++x; } if (conf->globus_location) { req->environment[x] = "GLOBUS_LOCATION"; ++x; req->environment[x] = conf->globus_location; ++x; } if (conf->tcp_port_range) { req->environment[x] = "GLOBUS_TCP_PORT_RANGE"; ++x; req->environment[x] = conf->tcp_port_range; ++x; } req->environment[x] = GLOBUS_NULL; return(GLOBUS_SUCCESS); } /* globus_l_gram_request_environment_append() */ /****************************************************************************** Function: globus_l_gram_genfilename() Description: generate an absolute file name given a starting prefix, a relative or absolute path, and a sufix Only use prefix if path is relative. Parameters: Returns: a pointer to a string which could be freeded. ******************************************************************************/ static char * globus_l_gram_genfilename(char * prefixp, char * pathp, char * sufixp) { char * newfilename; int prefixl, pathl, sufixl; char * prefix, * path, * sufix; prefix = (prefixp) ? prefixp : ""; path = (pathp) ? pathp : ""; sufix = (sufixp) ? sufixp : ""; prefixl = strlen(prefix); pathl = strlen(path); sufixl = strlen(sufix); newfilename = (char *) calloc(1, (prefixl + pathl + sufixl + 3)); if (newfilename) { if (*path != '/') { strcat(newfilename, prefix); if ((prefixl != 0) && (prefix[prefixl-1] != '/')) strcat(newfilename, "/"); } strcat(newfilename, path); if ((pathl != 0) && (sufixl != 0) && (path[pathl-1] != '/') && sufix[0] != '/') strcat(newfilename, "/"); strcat(newfilename, sufix); } return newfilename; } /* globus_l_gram_genfilename */ /****************************************************************************** Function: globus_l_gram_stage_file() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_stage_file(char *url, char **staged_file_path, int mode) { globus_url_t gurl; int rc; int error_flag = 0; char * staged_file_url; globus_result_t result; globus_gass_copy_handle_t handle; *staged_file_path = GLOBUS_NULL; if (url == GLOBUS_NULL) { return(GLOBUS_FAILURE); } if (strlen(url) == 0) { return(GLOBUS_FAILURE); } globus_jobmanager_log( graml_log_fp, "JM: staging file = %s\n", url); rc = globus_url_parse(url, &gurl); if(rc == GLOBUS_SUCCESS) /* this is a valid URL */ { unsigned long timestamp; rc = globus_gass_cache_add(&globus_l_cache_handle, url, graml_gass_cache_tag, GLOBUS_TRUE, ×tamp, staged_file_path); if(rc == GLOBUS_GASS_CACHE_ADD_EXISTS) { globus_gass_cache_add_done(&globus_l_cache_handle, url, graml_gass_cache_tag, timestamp); } else if(rc == GLOBUS_GASS_CACHE_ADD_NEW) { staged_file_url = globus_libc_malloc(strlen(*staged_file_path) + 6); globus_libc_sprintf(staged_file_url, "file:%s", *staged_file_path); result = globus_gass_copy_handle_init(&handle, GLOBUS_NULL); if(result != GLOBUS_SUCCESS) { globus_jobmanager_log(graml_log_fp, "JM: failed to init copy handle\n"); error_flag = 1; } else { result = globus_gass_copy_url_to_url( &handle, url, GLOBUS_NULL, staged_file_url, GLOBUS_NULL); if(result != GLOBUS_SUCCESS) { globus_jobmanager_log(graml_log_fp, "JM: failed to copy url\n"); error_flag = 1; } else { globus_gass_copy_handle_destroy(&handle); } } globus_libc_free(staged_file_url); globus_gass_cache_add_done(&globus_l_cache_handle, url, graml_gass_cache_tag, timestamp); } } globus_url_destroy(&gurl); globus_jobmanager_log( graml_log_fp, "JM: new name = %s\n", url); if (error_flag != GLOBUS_SUCCESS) { return(GLOBUS_FAILURE); } return(GLOBUS_SUCCESS); } /* globus_l_gram_stage_file */ /****************************************************************************** Function: globus_l_gram_duct_environment() Description: Parameters: Returns: ******************************************************************************/ static int globus_l_gram_duct_environment(int count, char *myjob, char **newvar, char **newval) { globus_duct_control_t *duct; int rc; duct = globus_libc_malloc(sizeof(globus_duct_control_t)); if(strcmp(myjob, "collective") != 0) { count=1; } rc = globus_duct_control_init(duct, count, GLOBUS_NULL, GLOBUS_NULL); if(rc != GLOBUS_SUCCESS) { globus_jobmanager_log( graml_log_fp, "JM: duct_control_init_failed: %d\n", rc); return GLOBUS_GRAM_PROTOCOL_ERROR_DUCT_INIT_FAILED; } rc = globus_duct_control_contact_url(duct, newval); if(rc != GLOBUS_SUCCESS) { globus_jobmanager_log( graml_log_fp, "JM: duct_control_contact_url failed: %d\n", rc); return(GLOBUS_GRAM_PROTOCOL_ERROR_DUCT_LSP_FAILED); } (*newvar) = globus_libc_strdup("GLOBUS_GRAM_MYJOB_CONTACT"); return GLOBUS_SUCCESS; } /* globus_l_gram_duct_environment */ /****************************************************************************** Function: globus_l_gram_getenv_var() Description: Parameters: Returns: ******************************************************************************/ static char * globus_l_gram_getenv_var(char * env_var, char * default_val) { char * tmp_env_val; char * env_val; tmp_env_val = (char *) globus_libc_getenv(env_var); if (tmp_env_val) { env_val = (char *) globus_libc_strdup (tmp_env_val);