71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
import json
|
|
import re
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
# import httpx
|
|
import requests
|
|
import parsel
|
|
import pyjq
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
class ScrapeTarget:
|
|
def __init__(self, product_name, url, selector, target_name=None, regex=None, parser=None, headers={}):
|
|
self.product_name = product_name
|
|
self.target_name = target_name if target_name else urlparse(url).hostname
|
|
self.url = url
|
|
self.selector = selector
|
|
self.regex = re.compile(regex if regex else r'([0-9]+,?)+(\.[0-9]{2})?')
|
|
self.parser = parser if parser else 'html'
|
|
if 'Referer' not in headers:
|
|
headers['Referer'] = 'google.com'
|
|
if 'DNT' not in headers:
|
|
headers['DNT'] = '1'
|
|
self.headers = headers
|
|
self.session = requests.Session()
|
|
|
|
# sanity check
|
|
valid_parsers = ('html', 'json')
|
|
if self.parser not in valid_parsers:
|
|
raise ValueError("Invalid parser configured (got '%s' but need one of %s) product: '%s', target: '%s'" % (self.parser, valid_parsers, self.product_name, self.target_name))
|
|
|
|
def query_target(self):
|
|
query_response = self.session.get(
|
|
self.url,
|
|
headers=self.headers,
|
|
)
|
|
logger.info('Status: %s', query_response.status_code)
|
|
# self.client.cookies.update(query_response.cookies)
|
|
query_response_text = query_response.text
|
|
logger.debug('Response: %s', query_response_text)
|
|
|
|
# parse the response and match the selector
|
|
selector_match = ''
|
|
if self.parser == 'html':
|
|
# parse response as html
|
|
selector = parsel.Selector(text=query_response_text)
|
|
selector_match = selector.css(self.selector).get()
|
|
elif self.parser == 'json':
|
|
# parse response as json
|
|
query_response_json = json.loads(query_response_text)
|
|
selector_match = str(pyjq.first(self.selector, query_response_json))
|
|
else:
|
|
raise ScrapeError('Invalid parser!')
|
|
|
|
if not selector_match:
|
|
raise ScrapeError('Failed to match selector!')
|
|
|
|
# match the regex
|
|
regex_match = self.regex.search(selector_match)
|
|
if regex_match:
|
|
str_result = regex_match.group(0).replace(',', '')
|
|
# convert the result to float
|
|
float_result = float(str_result)
|
|
return float_result
|
|
else:
|
|
raise ScrapeError('Failed to match regex!')
|
|
|
|
class ScrapeError(Exception):
|
|
def __init__(self, msg):
|
|
super().__init__(msg) |