"""
* generate_rss.py - 25c3 Podcast Generator
* Version 0.8
*
* (C) 2009 by Kristian Mueller <kristian-m@kristian-m.de>
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
import urllib
import urllib2
import datetime
import logging
import sys
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
WIKIPAGE_URL = "http://events.ccc.de/congress/2008/wiki/Conference_Recordings"
FAHRPLAN_URL = "http://events.ccc.de/congress/2008/Fahrplan/events/%s.en.html"
XML_FARHPLAN_URL = "http://events.ccc.de/congress/2008/Fahrplan/schedule.en.xml"
FORMAT_H264 = 3
FORMAT_IPOD = 4
FORMAT_MP3 = 5
FORMAT_OGG = 6
CONTENT_TYPE_H264 = 'video/mp4'
CONTENT_TYPE_IPOD = 'video/mp4'
CONTENT_TYPE_MP3 = 'audio/mpeg'
CONTENT_TYPE_OGG = 'audio/ogg'
RSS_START = """<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>%s</title>
<description>%s</description>
<link>%s</link>
<image>
<title>%s</title>
<url>%s</url>
<link>%s</link>
</image>
<language>en-us</language>
<copyright>Creative Commons Namensnennung-NichtKommerziell-KeineBearbeitung 2.0 Germany License</copyright>
<itunes:keywords>25C3,25th Chaos Communication Congress,Chaos Computer Club,Event,Recording,Video,Lectures,Documentation</itunes:keywords>
<generator>generate_rss.py - 25c3 Podcast Generator</generator>
<itunes:author>Chaos Computer Club</itunes:author>
<itunes:subtitle>%s</itunes:subtitle>
<itunes:image href="%s" />
<itunes:category text="Technology">
<itunes:category text="Tech News"/>
</itunes:category>
<itunes:explicit>no</itunes:explicit>
<itunes:owner>
<itunes:email>craps@kristian-m.de</itunes:email>
<itunes:name>Kristian Mueller</itunes:name>
</itunes:owner>
<atom:link href="http://feeds.feedburner.com/25c3-UnofficialIpodFeed?format=xml" rel="self" type="application/rss+xml" />"""
RSS_ITEM = """
<item>
<title>%s</title>
<description>%s</description>
<link>%s</link>
<author>%s</author>
<guid>%s</guid>
<pubDate>%s</pubDate>
<enclosure type="%s" url="%s" length="%s"/>
</item>"""
RSS_END = """
</channel>
</rss>"""
PODCAST_TITLE_H264 = "25C3 - Unofficial H.264 Recordings"
PODCAST_TITLE_IPOD = "25C3 - Unofficial Video Recordings"
PODCAST_TITLE_MP3 = "25C3 - Unofficial Audio Recordings"
PODCAST_TITLE_OGG = "25C3 - Unofficial OGG Vorbis Recordings"
PODCAST_DESCRIPTION = """The 25th Chaos Communication Congress (25C3: Nothing to hide) took place from December 27th to December 30th 2008 at Berliner Congress Center in Berlin, Germany. This channel offers the complete set of available recordings of the 25C3 lectures. Most lectures are in english, some in german."""
PODCAST_SUBTITLE = "Recordings of the 25th Chaos Communication Congress: Nothing to hide (Berlin December 2008)"
PODCAST_LINK = "http://events.ccc.de/congress/2008"
PODCAST_BIG_LOGO = "http://events.ccc.de/congress/2008/wiki/images/7/77/PodcastLogo.png"
PODCAST_LOGO = "http://events.ccc.de/congress/2008/wiki/images/b/bf/PodcastLogo-144x144.png"
FALLBACK_SERVERS = []
import xml
import xml.parsers.expat
class XMLElement(object):
def __init__(self, name, attributes):
""" record tag name and attributes to dictionary"""
self.name = name
self.attributes = attributes
self.cdata = ''
self.children = []
def add_child(self, element):
self.children.append(element)
def get_attribute(self, key):
return self.attributes.get(key)
def get_data(self):
return self.cdata
def get_elements(self, name = ''):
if name:
return [child for child in self.children if child.name == name]
else:
return list(self.children)
class XML2Object(object):
def __init__(self):
self.root = None
self.node_stack = []
def start_element(self, name, attributes):
element = XMLElement(name.encode(), attributes)
if self.node_stack:
parent = self.node_stack[-1]
parent.add_child(element)
else:
self.root = element
self.node_stack.append(element)
def end_element(self, name):
self.node_stack.pop()
def character_data(self, data):
if data.strip():
data = data.encode('utf-8')
element = self.node_stack[-1]
element.cdata += data
def parse(self, data):
Parser = xml.parsers.expat.ParserCreate('utf-8')
Parser.StartElementHandler = self.start_element
Parser.EndElementHandler = self.end_element
Parser.CharacterDataHandler = self.character_data
parser_status = Parser.Parse(data, 1)
return self.root
def get_strings_from_html(html):
in_tag = 0
words = []
word = ""
for char in html:
if char == '<':
in_tag += 1
if in_tag == 1 and len(word.strip()) >= 1:
words.append(word.strip())
word = ""
elif char == '>':
in_tag -= 1
elif in_tag == 0:
if len(word) >= 1 and word[-1] == '/' and char == 'a':
word[-1] = ' '
word += char
return words
def check_url(url):
"""
returning filesize or 0 on error
"""
try:
http_response = urllib2.urlopen(url)
size = http_response.headers['content-length']
except:
return 0
return size
def get_event_data(xml_root):
"""
pulling events from the pentabarf XML tree
"""
xml_conference = None
for entry in xml_root.children:
if "conference" in entry.name:
xml_conference = entry
if not xml_conference:
raise Exception("No XML channel information found for fahrplan, fallig back to HTML Fahrplan")
xml_events = {}
event_dates = {}
current_date = None
for xml_days in xml_root.children:
if xml_days.name == 'day':
current_date = datetime.datetime.strptime(\
xml_days.attributes['date'], "%Y-%m-%d")
for rooms in xml_days.children:
if rooms.name == 'room':
for event in rooms.children:
if event.name == 'event':
xml_events[event.attributes['id']] = event
event_dates[event.attributes['id']] = current_date
return (xml_events, event_dates)
def get_meta_data(entry, xml_event = None, event_dates = {}):
if not 0 in entry.keys():
logging.debug( "Can not look up metadata - no id!")
return entry
id = entry[0]
try:
if not xml_event:
raise Exception("No XML data - will use HTTP to get meta data")
entry['guid'] = FAHRPLAN_URL % id
for event_xml in xml_event[id].children:
if event_xml.name == 'title':
entry['title'] = event_xml.cdata
elif event_xml.name == 'subtitle':
if len(event_xml.cdata.strip()) >= 1:
entry['title'] += " - %s" % event_xml.cdata
elif event_xml.name == 'description':
entry['description'] = event_xml.cdata.replace("<", "<").replace(">", ">").replace("&", "&")
elif event_xml.name == 'persons':
entry['author'] = "noreply@noreply.com ("
first = True
for person in event_xml.children:
if not first:
entry['author'] += '; '
else:
first = False
entry['author'] += person.cdata
entry['author'] += ")"
elif event_xml.name == 'start':
start_hours = int(event_xml.cdata[0:2])
start_minutes = int(event_xml.cdata[3:5])
if len(event_dates) > 0:
fmt = "%a, %d %b %Y %H:%M:%S +0000"
pub_date = event_dates[id] + datetime.timedelta(\
hours = start_hours, minutes = start_minutes)
pub_date = pub_date + datetime.timedelta(hours=1)
if id == '2952':
pub_date = pub_date - datetime.timedelta(hours=6)
entry['pubDate'] = pub_date.strftime(fmt)
except (IOError, Exception):
"""
could not open XML - will parse HTML page as a fallback
- this should not happen - let's hope HTML is still available
"""
fahrplan_html = urllib.urlopen(FAHRPLAN_URL % id).read()
entry['guid'] = FAHRPLAN_URL % id
try:
entry['title'] = fahrplan_html.split('<h1 class="title summary">')[1].split("</h1>")[0]
subtitle = fahrplan_html.split('<p class="subtitle">')[1].split("</p>")[0]
if len(subtitle) >= 1:
entry['title'] += " - "
entry['title'] += subtitle
logging.debug("=======================================================")
logging.debug("Title: %s " % entry['title'])
except:
logging.debug("couldn't get title for entry with id %s - this sucks." % id)
entry['title'] = ""
try:
entry['description'] = " ".join(get_strings_from_html(fahrplan_html.split('<div class="description">')[1].split("</div>")[0]))
logging.debug("Length of Description: %s " % len(entry['description']))
except:
logging.debug( "couldn't get description for entry with id %s - this sucks." % id)
entry['description'] = ""
try:
speakers_section = fahrplan_html.split('<th colspan="2">Speakers</th>')[1].split("</table>")[0]
entry['author'] = "noreply@noreply.com (%s)" % ("; ".join(get_strings_from_html(speakers_section)))
logging.debug("Authors: %s " % entry['author'])
except:
logging.debug("couldn't get author for entry with id %s - this sucks." % id)
try:
fmt = "%a, %d %b %Y %H:%M:%S +0000"
pub_string = fahrplan_html.split('<td class="value dtstart" title="')[1].split('"')[0]
pub_date = fahrplan_html.split('<td class="value">Day ')[1].split('(')[1].split(')')[0]
pub_time = datetime.date(int(pub_date[0:4]), int(pub_date[5:7]), int(pub_date[8:10]))
pub_time2 = datetime.datetime.strptime(pub_string, "%Y-%m-%dT%H:%M:%S+01:00")
pub_time = datetime.datetime.combine(pub_time, pub_time2.time()) + datetime.timedelta(hours=1)
if id == '2952':
pub_time = datetime.datetime.combine(pub_time, pub_time2.time()) - datetime.timedelta(hours=6)
entry['pubDate'] = pub_time.strftime(fmt)
logging.debug("Date/Time: %s " % entry['pubDate'])
except:
logging.debug( "couldn't get date for entry with id %s - this sucks." % id)
entry['pubDate'] = ""
return entry
def get_mirror_list():
wiki_html = urllib.urlopen(WIKIPAGE_URL).read()
mirror_table = wiki_html.split('<a name="Mirroring_official_files">')[1].split("</table>")[0]
column_counter = 0
entries = []
for line in mirror_table.split("\n"):
if "</tr>" in line:
column_counter = 0
elif "<td>" in line:
if "<td> " in line:
if "a href=\"" in line:
mirror = (line.split("a href=\"")[1].split("\"")[0])
if "http://" in mirror:
entries.append(mirror)
column_counter += 1
logging.debug("Got mirrorlist of %s entries." % len(entries))
return entries
def get_release_data():
wiki_html = urllib.urlopen(WIKIPAGE_URL).read()
release_table = wiki_html.split("</th><th> Title")[1].split("</table>")[0]
column_counter = 0
entries = []
entry = {}
try:
fahrplan_xml = urllib.urlopen(XML_FARHPLAN_URL).read()
xml_parser = XML2Object()
xml_root = xml_parser.parse(fahrplan_xml.encode('utf-8'))
(xml_event_data, event_dates) = get_event_data(xml_root)
except IOError:
logging.error("Aborting - we've got an IOError while getting Fahrplan XML")
xml_root = None
for line in release_table.split("\n"):
if "</tr>" in line:
entry = get_meta_data(entry, xml_event_data, event_dates)
entries.append(entry)
column_counter = 0
entry = {}
elif "<td>" in line:
if "<td> " in line:
if "a href=\"" in line:
entry[column_counter] = line.split("<td> ")[1].split("a href=\"")[1].split("\"")[0]
else:
entry[column_counter] = line.split("<td> ")[1]
else:
entry[column_counter] = ""
column_counter += 1
return entries
def generate_RSS(table, title, description, subtitle, link, big_logo, logo, content_type, format):
FALLBACK_SERVERS = get_mirror_list()
rss_string = RSS_START % (title, description, link, title, big_logo, link, \
subtitle, logo)
not_found_list = []
for row in table:
if len(row) >= 5:
if "http://" in row[format].lower():
server_walker = 0
length = check_url(row[format])
while length <= 0 and server_walker < len(FALLBACK_SERVERS):
file = row[format].split('/')[-2] + "/" + row[format].split('/')[-1]
clean_file = ""
if '.' in file:
if "ipod." in file:
file = "".join(file.split(".")[:-2]) + "." + ".".join(file.split(".")[-2:])
else:
file = "".join(file.split(".")[:-1]) + "." + file.split(".")[-1]
for i in file:
try:
if not i == ':' and not i == ',' and not i == '*' and not i == '(' and not i == ')' and not i == '!' and not i == '?':
clean_file += i.encode()
except:
pass
row[format] = FALLBACK_SERVERS[server_walker] + clean_file
logging.debug("Falling back to %s." % (row[format]))
length = check_url(row[format])
server_walker += 1
if length <= 0:
not_found_list.append(row[format])
continue
try:
link = row[format].encode("utf-8")
except:
logging.error("Failed to encode [%s]." % (row[format]))
continue
else:
continue
logging.debug("Adding Item %s." % (row['title']))
rss_string += RSS_ITEM % (row['title'],
row['description'],
link,
row['author'],
row['guid'],
row['pubDate'],
content_type,
link,
length)
if len(not_found_list) >= 1:
logging.error("Could not find URL for:")
for item in not_found_list:
logging.error(item)
rss_string += RSS_END
return rss_string
if '-h264' in sys.argv:
release_table = get_release_data()
feed_data = generate_RSS(release_table, PODCAST_TITLE_H264, \
PODCAST_DESCRIPTION, PODCAST_SUBTITLE, PODCAST_LINK, \
PODCAST_BIG_LOGO, PODCAST_LOGO, CONTENT_TYPE_H264, FORMAT_H264)
elif '-ipod' in sys.argv:
release_table = get_release_data()
feed_data = generate_RSS(release_table, PODCAST_TITLE_IPOD, \
PODCAST_DESCRIPTION, PODCAST_SUBTITLE, PODCAST_LINK, \
PODCAST_BIG_LOGO, PODCAST_LOGO, CONTENT_TYPE_IPOD, FORMAT_IPOD)
elif '-mp3' in sys.argv:
release_table = get_release_data()
feed_data = generate_RSS(release_table, PODCAST_TITLE_MP3, \
PODCAST_DESCRIPTION, PODCAST_SUBTITLE, PODCAST_LINK, \
PODCAST_BIG_LOGO, PODCAST_LOGO, CONTENT_TYPE_MP3, FORMAT_MP3)
elif '-ogg' in sys.argv:
release_table = get_release_data()
feed_data = generate_RSS(release_table, PODCAST_TITLE_OGG, \
PODCAST_DESCRIPTION, PODCAST_SUBTITLE, PODCAST_LINK, \
PODCAST_BIG_LOGO, PODCAST_LOGO, CONTENT_TYPE_OGG, FORMAT_OGG)
else:
print """Please provide one of the following content types:
-h264 - H.264 Video
-ipod - m4v Video (iPod compatible)
-mp3 - MP3 Audio
-ogg - OGG Vorbis Audio"""
feed_data = ""
print(feed_data)