from selenium import webdriver
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from bs4 import BeautifulSoup
import redis
import re
import json
import random
import traceback
def get_info(text, cur_url):
soup = BeautifulSoup(text, 'lxml')
all_a = soup.findAll('a', class_='company-info')
for a in all_a:
try:
financing, industry = re.findall('<p>(.*?)<span class="vline"></span>(.*?)</p>', str(a.find('p')))[0]
data = dict(
href=a['href'],
ka=a['ka'],
img=a.find('img')['src'],
company=a.find('h4').text,
financing=financing,
industry=industry,
referer=cur_url
)
r.hset("companies_hash", data['company'], json.dumps(data))
r.sadd('companies_set', data['company'])
r.sadd('companies_set_bak', data['company'])
print(data)
except Exception as e:
print(e)
def login():
driver.get("/?ka=header-login")
time.sleep(1)
driver.find_element_by_xpath("//body/div[@id='wrap']/div[2]/div[1]/div[2]/div[2]").click()
wait = WebDriverWait(driver, 30) # Explicitly wait
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'jobs-list')))
def sleep():
time.sleep(random.choice(range(6, 12)))
def search_city(city):
driver.find_element_by_link_text(city).click()
time.sleep(1)
for i in range(8):
print(
f"-------------------------------------city:{city}-----page: {i}-----------------------------------------")
if i > 0:
target = driver.find_element_by_css_selector("a[ka='page-next']")
driver.execute_script("arguments[0].scrollIntoView();", target)
target.click()
text = driver.page_source
cur_url = driver.current_url
get_info(text, cur_url)
sleep()
def get_jobs(text, company):
soup = BeautifulSoup(text, 'lxml')
all_li = soup.find('div', class_='job-list').findAll('li')
for li in all_li:
try:
a = li.find('a')
data = {key: a[key] for key in ['data-jid', 'href', 'ka']}
for key in ['job-title', 'job-area', 'job-pub-time']:
data[key] = li.find('span', class_=key).text
data['salary'] = li.find('span', class_='red').text
data['work_years'], data['education'] = re.findall(re.compile(
'</span>(.*?)<em class="vline"></em>(.*?)</p>', re.S), str(li.find('p')))[0]
data['avator'], data['hr'], data['hr_positon'] = re.findall(re.compile(
'<img src="(.*?)\?x-oss-process=image/resize,w_40,limit_0"/>(.*?)<em class="vline"></em>(.*?)</h3>',
re.S),
str(li.find('div', class_='info-publis')))[0]
for key in ['work_years', 'education', 'hr', 'hr_positon', 'job-area']:
data[key] = re.sub('[\[\]]', '', data[key].strip())
data['company'] = company
print(f"---------{data['job-title']} --- {data['job-area']} --- {company}-------")
r.hset("jobs_hash", data['data-jid'], json.dumps(data))
r.sadd('jobs_set', data['data-jid'])
r.sadd('jobs_set_bak', data['data-jid'])
except Exception as e:
print(e)
def company_info():
info = json.loads(r.hget('companies_hash', r.spop('companies_set')))
company = info['company']
href = info["href"].replace("gongsi", "gongsir").rsplit(".", 1)[0]
url = f'{base_url}{href}_100000.html?ka={info["ka"]}'
driver.get(url)
time.sleep(random.choice([1, 2, 3]))
index = 1
while True:
print(f'-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------)
page = driver.page_source
if 'Click to verify' in page:
print(f'-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------)
wait = WebDriverWait(driver, 12 * 60 * 60) # Explicitly wait
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'company-banner')))
continue
get_jobs(page, company)
try:
target = driver.find_element_by_css_selector('a[class="next"]')
driver.execute_script("arguments[0].scrollIntoView();", target)
target.click()
index += 1
sleep()
except Exception as e:
break
def deal(info, key, func, soup):
try:
info[key] = func(soup)
except Exception as e:
exstr = traceback.format_exc()
print(exstr)
def save_info(base_name, key, data):
r.hset(f"{base_name}_hash", data[key], json.dumps(data))
r.sadd(f"{base_name}_set", data[key])
r.sadd(f"{base_name}_set_bak", data[key])
def get_promotion_jobs(lst):
for job in lst:
try:
job_href = job.find('div', class_='name').find('a')['href']
company = job.find('div', class_='info-primary').find('p', class_='gray')
company_name = company.find('a').text.strip()
job_info = dict(
job_name=job.find('div', class_='name').find('a').text.rsplit(" ", 1)[0],
job_ka=job.find('div', class_='name').find('a')['ka'],
salary=job.find('div', class_='name').find('span', class_='red').text,
job_id=job_href.rsplit('/', 1)[1].split('.html')[0],
job_href=job_href,
company_name=company_name,
job_city=company.text.rsplit('·', 1)[1]
)
save_info("boss_jobs", "job_id", job_info)
companies_info = dict(
company_name=company_name,
company_ka=company.find('a')['ka'],
company_href=company.find('a')['href'],
)
save_info("boss_companies", "company_name", companies_info)
except Exception as e:
exstr = traceback.format_exc()
print(exstr)
def get_job_info(text, data):
soup = BeautifulSoup(text, 'lxml')
info = {}
for k, v in (('job_id', 'data-jid'), ('job_href', 'href'), ('job_ka', 'ka'), ('job_name', 'job-title'),
('job_city', 'job-area'), ('update_date', 'job-pub-time'), ('salary', 'salary'),
('experience', 'work_years'), ('education', 'education'), ('hr_avatar', 'avator'),
('hr_name', 'hr'), ('hr_position', 'hr_positon'), ('company_name', 'company')):
info[k] = data[v]
deal(info, "job_describe", lambda x: x.find('div', class_="job-sec").find('div', class_='text').text.strip(), soup)
deal(info, "job_tags", lambda x: [i.text for i in x.find('div', class_='job-tags').findAll('span')], soup)
deal(info, "update_date", lambda x: x.find('div', class_='sider-company').find('p', class_='gray').text, soup)
deal(info, "job_location",
lambda x: x.find('div', class_='job-location').find('div', class_='location-address').text, soup)
try:
info['experience'], info['education'] = re.findall(
'</a><em class="dolt"></em>(.*?)<em class="dolt"></em>(.*?)</p>',
str(soup.find('div', class_='info-primary').find('p')))[0]
detail_op = soup.find('div', class_='job-detail').find('div', class_='detail-op')
deal(info, "hr_name", lambda x: x.find('h2', class_='name').text, detail_op)
deal(info, "hr_position", lambda x: x.find('p', class_='gray').text, detail_op)
deal(info, "hr_avatar", lambda x: x.find('img')['src'].split('?')[0], detail_op)
save_info("boss_jobs", "job_id", info)
promotion_jobs = soup.find('div', class_='promotion-job').findAll('li')
get_promotion_jobs(promotion_jobs)
except Exception as e:
exstr = traceback.format_exc()
print(exstr)
def communicate(url, info):
driver.get(url)
time.sleep(1)
page = driver.page_source
if 'Click to verify' in page:
print(f'-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------)
wait = WebDriverWait(driver, 12 * 60 * 60) # Explicitly wait
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'job-status')))
time.sleep(1)
try:
get_job_info(page, info)
driver.find_element_by_css_selector('a[class="btn btn-startchat"]').click()
time.sleep(1)
driver.find_element_by_css_selector('span[class="btn btn-sure"]').click()
except Exception as e:
exstr = traceback.format_exc()
print(exstr)
def run(mode, is_login=True):
if is_login:
login()
if mode == 'city':
driver.find_element_by_link_text("company").click()
time.sleep(1)
for city in cities:
search_city(city)
elif mode == 'company':
while r.scard('companies_set') != 0:
if len(driver.window_handles) > 1:
driver.switch_to_window(driver.window_handles[0])
company_info()
sleep()
elif mode == 'send':
while r.scard('job_data') != 0:
job_id = r.srandmember("job_data")
info = json.loads(r.hget('jobs_hash', job_id))
print(f'-----------------------------------------------------------------------------------------------------------------------------)
communicate(f"{base_url}{info['href']}", info)
r.srem('job_data', job_id)
time.sleep(random.choice([10, 15]))
time.sleep(2)
if __name__ == '__main__':
r = redis.Redis(password='123zxc')
cities = ['Beijing', 'Hangzhou', 'Tianjin', 'Suzhou', 'Shanghai']
base_url = ''
driver = webdriver.Chrome("/mnt/2D97AD940A9AD661/python_project/boss/chromedriver")
driver.maximize_window()
run('send', True)
sleep()
driver.quit()